diff --git a/orchestrator/deploy/template.yaml b/orchestrator/deploy/template.yaml index e00d71a..7cabbc7 100644 --- a/orchestrator/deploy/template.yaml +++ b/orchestrator/deploy/template.yaml @@ -1,6 +1,6 @@ AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 -Description: Tinqs CI Orchestrator — webhook dispatch, Fargate routing, cancel support +Description: Tinqs CI Orchestrator — Lambda dispatch + EC2 Spot runners Parameters: GiteaURL: @@ -9,24 +9,18 @@ Parameters: GiteaToken: Type: String NoEcho: true - ECSCluster: - Type: String - Default: tinqs-git - Subnets: - Type: CommaDelimitedList - Description: VPC subnet IDs for Fargate tasks + RunnerAMI: + Type: AWS::EC2::Image::Id + Description: Pre-baked AMI with Go, Node, Docker, AWS CLI, act_runner + Subnet: + Type: AWS::EC2::Subnet::Id + Description: Public subnet for spot instances SecurityGroup: + Type: AWS::EC2::SecurityGroup::Id + Description: Security group for spot instances + InstanceProfileArn: Type: String - Description: Security group for Fargate tasks - ECRBase: - Type: String - Default: 149751500842.dkr.ecr.eu-west-1.amazonaws.com - TaskRoleArn: - Type: String - Default: arn:aws:iam::149751500842:role/tinqs-git-task - ExecRoleArn: - Type: String - Default: arn:aws:iam::149751500842:role/ecsTaskExecutionRole + Description: IAM instance profile ARN for runners Globals: Function: @@ -71,7 +65,7 @@ Resources: FunctionName: tinqs-ci-dispatch Handler: bootstrap CodeUri: ../dispatch/ - Description: Receives webhook, routes to Fargate or Lambda by runs-on label + Description: Receives webhook, starts Spot instances or invokes Lambda executor Timeout: 60 MemorySize: 256 Environment: @@ -79,13 +73,11 @@ Resources: GITEA_URL: !Ref GiteaURL GITEA_TOKEN: !Ref GiteaToken EXECUTOR_FUNCTION_NAME: !Ref ExecFunction - ECS_CLUSTER: !Ref ECSCluster - SUBNETS: !Join [",", !Ref Subnets] + RUNNER_AMI: !Ref RunnerAMI + SUBNET: !Ref Subnet SECURITY_GROUP: !Ref SecurityGroup - ECR_BASE: !Ref ECRBase DDB_TABLE: !Ref RunsTable - TASK_ROLE_ARN: !Ref TaskRoleArn - EXEC_ROLE_ARN: !Ref ExecRoleArn + INSTANCE_PROFILE: !Ref InstanceProfileArn Policies: - LambdaInvokePolicy: FunctionName: !Ref ExecFunction @@ -95,16 +87,14 @@ Resources: Statement: - Effect: Allow Action: - - ecs:RunTask - - ecs:StopTask - - ecs:RegisterTaskDefinition - - ecs:DescribeTasks + - ec2:RunInstances + - ec2:TerminateInstances + - ec2:DescribeInstances + - ec2:CreateTags Resource: '*' - Effect: Allow Action: iam:PassRole - Resource: - - !Ref TaskRoleArn - - !Ref ExecRoleArn + Resource: !Ref InstanceProfileArn Events: Webhook: Type: Api @@ -172,12 +162,12 @@ Resources: CILogGroup: Type: AWS::Logs::LogGroup Properties: - LogGroupName: /ecs/tinqs-ci + LogGroupName: /tinqs/ci RetentionInDays: 14 Outputs: WebhookURL: - Description: Configure this as Gitea system webhook + Description: Configure as Gitea system webhook Value: !Sub 'https://${WebhookApi}.execute-api.${AWS::Region}.amazonaws.com/prod/webhook' DispatchArn: Value: !GetAtt DispatchFunction.Arn diff --git a/orchestrator/dispatch/go.mod b/orchestrator/dispatch/go.mod index e81358c..dfacdb4 100644 --- a/orchestrator/dispatch/go.mod +++ b/orchestrator/dispatch/go.mod @@ -8,7 +8,7 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.28.0 github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0 - github.com/aws/aws-sdk-go-v2/service/ecs v1.52.0 + github.com/aws/aws-sdk-go-v2/service/ec2 v1.198.0 github.com/aws/aws-sdk-go-v2/service/lambda v1.69.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/orchestrator/dispatch/main.go b/orchestrator/dispatch/main.go index 8dadbba..78ea7c1 100644 --- a/orchestrator/dispatch/main.go +++ b/orchestrator/dispatch/main.go @@ -3,16 +3,17 @@ // Receives Gitea system webhooks, determines which workflows to run, // and routes each job to the right execution environment based on runs-on label: // -// go, node, docker, godot → Fargate task with matching runner image +// go, node, docker, godot → EC2 Spot instance (pre-baked AMI, self-terminates) // deploy → Lambda direct execution (ci-exec) // host → skip (handled by registered always-on runner) // -// Also handles cancel events (stop Fargate tasks) and cleanup cron. +// Also handles cancel events (terminate instances) and cleanup cron. package main import ( "context" + "encoding/base64" "encoding/json" "fmt" "log" @@ -28,7 +29,8 @@ import ( "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" - "github.com/aws/aws-sdk-go-v2/service/ecs" + "github.com/aws/aws-sdk-go-v2/service/ec2" + ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" awslambda "github.com/aws/aws-sdk-go-v2/service/lambda" "gopkg.in/yaml.v3" ) @@ -80,68 +82,101 @@ type WorkflowJob struct { // --- DynamoDB run record --- type RunRecord struct { - Repo string `dynamodbav:"repo"` - RunID string `dynamodbav:"run_id"` - TaskArn string `dynamodbav:"task_arn"` - Status string `dynamodbav:"status"` - Workflow string `dynamodbav:"workflow"` - Label string `dynamodbav:"label"` - StartedAt int64 `dynamodbav:"started_at"` - TTL int64 `dynamodbav:"ttl"` + Repo string `dynamodbav:"repo"` + RunID string `dynamodbav:"run_id"` + InstanceID string `dynamodbav:"instance_id"` + Status string `dynamodbav:"status"` + Workflow string `dynamodbav:"workflow"` + Label string `dynamodbav:"label"` + StartedAt int64 `dynamodbav:"started_at"` + TTL int64 `dynamodbav:"ttl"` } -// --- Image routing --- +// --- Spot instance config per label --- -var labelToImage = map[string]string{ - "go": "tinqs-runner-go", - "node": "tinqs-runner-node", - "docker": "tinqs-runner-docker", - "deploy": "tinqs-runner-deploy", - "godot": "tinqs-runner-godot", +type spotConfig struct { + InstanceType string + MaxPrice string // spot bid (empty = on-demand price cap) } -var labelToResources = map[string][2]string{ - "go": {"1024", "2048"}, // 1 vCPU, 2 GB - "node": {"1024", "2048"}, - "docker": {"2048", "4096"}, // 2 vCPU, 4 GB (Docker builds are heavy) - "deploy": {"512", "1024"}, // 0.5 vCPU, 1 GB (lightweight) - "godot": {"2048", "4096"}, +var labelToSpot = map[string]spotConfig{ + "go": {InstanceType: "t3.small", MaxPrice: "0.008"}, + "node": {InstanceType: "t3.small", MaxPrice: "0.008"}, + "docker": {InstanceType: "t3.medium", MaxPrice: "0.016"}, + "deploy": {InstanceType: "t3.micro", MaxPrice: "0.004"}, + "godot": {InstanceType: "t3.medium", MaxPrice: "0.016"}, } // --- Config from env --- type cfg struct { - GiteaURL string - GiteaToken string - ExecFnName string - ECSCluster string - Subnets []string + GiteaURL string + GiteaToken string + ExecFnName string + AMI string // pre-baked AMI with Go, Node, Docker, AWS CLI, act_runner + Subnet string SecurityGroup string - ECRBase string - DDBTable string - TaskRoleArn string - ExecRoleArn string + DDBTable string + InstanceProfile string + Region string } func loadCfg() cfg { return cfg{ - GiteaURL: os.Getenv("GITEA_URL"), - GiteaToken: os.Getenv("GITEA_TOKEN"), - ExecFnName: os.Getenv("EXECUTOR_FUNCTION_NAME"), - ECSCluster: os.Getenv("ECS_CLUSTER"), - Subnets: strings.Split(os.Getenv("SUBNETS"), ","), - SecurityGroup: os.Getenv("SECURITY_GROUP"), - ECRBase: os.Getenv("ECR_BASE"), // e.g. 149751500842.dkr.ecr.eu-west-1.amazonaws.com - DDBTable: os.Getenv("DDB_TABLE"), - TaskRoleArn: os.Getenv("TASK_ROLE_ARN"), - ExecRoleArn: os.Getenv("EXEC_ROLE_ARN"), + GiteaURL: os.Getenv("GITEA_URL"), + GiteaToken: os.Getenv("GITEA_TOKEN"), + ExecFnName: os.Getenv("EXECUTOR_FUNCTION_NAME"), + AMI: os.Getenv("RUNNER_AMI"), + Subnet: os.Getenv("SUBNET"), + SecurityGroup: os.Getenv("SECURITY_GROUP"), + DDBTable: os.Getenv("DDB_TABLE"), + InstanceProfile: os.Getenv("INSTANCE_PROFILE"), // IAM instance profile ARN + Region: os.Getenv("AWS_REGION"), } } +// --- User-data script template --- +// The instance boots, registers as ephemeral runner, picks one job, then terminates itself. + +func userDataScript(c cfg, label, runnerName string) string { + script := fmt.Sprintf(`#!/bin/bash +set -euo pipefail +exec > /var/log/tinqs-ci.log 2>&1 + +echo "=== Tinqs CI Runner: %s ===" +INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id) +REGION=%s + +# Self-termination trap — kill instance on exit (success or failure) +cleanup() { + echo "=== Job done, terminating $INSTANCE_ID ===" + aws ec2 terminate-instances --instance-ids "$INSTANCE_ID" --region "$REGION" || true +} +trap cleanup EXIT + +# act_runner is pre-installed in the AMI at /usr/local/bin/act_runner +cd /tmp + +# Register as ephemeral runner (picks one job, then exits) +act_runner register --no-interactive \ + --instance %s \ + --token %s \ + --name %s \ + --labels "%s:host" + +# Run — blocks until the job completes, then exits (ephemeral mode) +act_runner daemon + +echo "=== Runner exited, cleanup will terminate instance ===" +`, runnerName, c.Region, c.GiteaURL, c.GiteaToken, runnerName, label) + + return base64.StdEncoding.EncodeToString([]byte(script)) +} + func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { c := loadCfg() - // Handle cleanup cron (EventBridge invokes with action=cleanup) + // Handle cleanup cron if request.Body == "" || strings.Contains(request.Body, `"action":"cleanup"`) { return handleCleanup(ctx, c) } @@ -162,7 +197,6 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events return respond(400, `{"error":"invalid payload"}`) } - // Only process branch pushes if !strings.HasPrefix(push.Ref, "refs/heads/") { return respond(200, `{"status":"skipped","reason":"not a branch push"}`) } @@ -171,16 +205,13 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events log.Printf("Push to %s branch=%s by=%s commits=%d", push.Repository.FullName, branch, push.Pusher.Login, len(push.Commits)) - // Collect all changed files from commits changedFiles := collectChangedFiles(push.Commits) - // Fetch and filter workflows workflows, err := fetchWorkflows(c.GiteaURL, c.GiteaToken, push.Repository.FullName, branch) if err != nil || len(workflows) == 0 { return respond(200, `{"status":"no_workflows"}`) } - // Route each workflow dispatched := 0 for name, content := range workflows { var wf Workflow @@ -189,13 +220,11 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events continue } - // Check if workflow triggers on this branch + paths if !shouldTrigger(wf, branch, changedFiles) { log.Printf("Skipping %s (no matching trigger)", name) continue } - // Get the first job's runs-on label label := "host" for _, job := range wf.Jobs { if job.RunsOn != "" { @@ -208,24 +237,20 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events switch label { case "deploy": - // Lightweight: invoke Lambda executor directly if err := invokeLambdaExec(ctx, c, push, name, content); err != nil { log.Printf("Failed to invoke executor for %s: %v", name, err) continue } case "host": - // Legacy: skip, the always-on runner handles it - log.Printf("Skipping %s (runs-on: host — handled by registered runner)", name) + log.Printf("Skipping %s (runs-on: host — registered runner)", name) continue default: - // Fargate: start ephemeral runner with matching image - taskArn, err := startFargateRunner(ctx, c, push, label, name, runID) + instanceID, err := startSpotRunner(ctx, c, label, runID) if err != nil { - log.Printf("Failed to start Fargate for %s: %v", name, err) + log.Printf("Failed to start spot for %s: %v", name, err) continue } - // Track in DynamoDB - if err := trackRun(ctx, c, push.Repository.FullName, runID, taskArn, name, label); err != nil { + if err := trackRun(ctx, c, push.Repository.FullName, runID, instanceID, name, label); err != nil { log.Printf("Warning: failed to track run: %v", err) } } @@ -238,87 +263,80 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events len(workflows), dispatched, push.Repository.FullName, branch)) } -// --- Fargate runner --- +// --- EC2 Spot runner --- -func startFargateRunner(ctx context.Context, c cfg, push GiteaPushEvent, label, workflow, runID string) (string, error) { +func startSpotRunner(ctx context.Context, c cfg, label, runID string) (string, error) { awsCfg, err := config.LoadDefaultConfig(ctx) if err != nil { return "", fmt.Errorf("aws config: %w", err) } - imageName := labelToImage[label] - if imageName == "" { + spot, ok := labelToSpot[label] + if !ok { return "", fmt.Errorf("unknown label: %s", label) } - image := fmt.Sprintf("%s/%s:latest", c.ECRBase, imageName) - resources := labelToResources[label] - cpu, memory := resources[0], resources[1] + runnerName := fmt.Sprintf("spot-%s-%s", label, runID[:12]) + userData := userDataScript(c, label, runnerName) - client := ecs.NewFromConfig(awsCfg) + client := ec2.NewFromConfig(awsCfg) - // Register a one-off task definition for this run - taskDefOut, err := client.RegisterTaskDefinition(ctx, &ecs.RegisterTaskDefinitionInput{ - Family: aws.String(fmt.Sprintf("tinqs-ci-%s", label)), - RequiresCompatibilities: []ecs.Compatibility{ecs.CompatibilityFargate}, - NetworkMode: ecs.NetworkModeAwsvpc, - Cpu: aws.String(cpu), - Memory: aws.String(memory), - TaskRoleArn: aws.String(c.TaskRoleArn), - ExecutionRoleArn: aws.String(c.ExecRoleArn), - ContainerDefinitions: []ecs.ContainerDefinition{{ - Name: aws.String("runner"), - Image: aws.String(image), - Essential: aws.Bool(true), - Environment: []ecs.KeyValuePair{ - {Name: aws.String("GITEA_INSTANCE_URL"), Value: aws.String(c.GiteaURL)}, - {Name: aws.String("GITEA_RUNNER_REGISTRATION_TOKEN"), Value: aws.String(c.GiteaToken)}, - {Name: aws.String("GITEA_RUNNER_NAME"), Value: aws.String(fmt.Sprintf("ephemeral-%s-%s", label, runID[:8]))}, - {Name: aws.String("GITEA_RUNNER_LABELS"), Value: aws.String(fmt.Sprintf("%s:host", label))}, - {Name: aws.String("GITEA_RUNNER_EPHEMERAL"), Value: aws.String("true")}, - }, - LogConfiguration: &ecs.LogConfiguration{ - LogDriver: ecs.LogDriverAwslogs, - Options: map[string]string{ - "awslogs-group": "/ecs/tinqs-ci", - "awslogs-region": "eu-west-1", - "awslogs-stream-prefix": label, - }, - }, - }}, - }) - if err != nil { - return "", fmt.Errorf("register task def: %w", err) - } + out, err := client.RunInstances(ctx, &ec2.RunInstancesInput{ + ImageId: aws.String(c.AMI), + InstanceType: ec2types.InstanceType(spot.InstanceType), + MinCount: aws.Int32(1), + MaxCount: aws.Int32(1), + UserData: aws.String(userData), - taskDef := fmt.Sprintf("%s:%d", *taskDefOut.TaskDefinition.Family, - taskDefOut.TaskDefinition.Revision) - - // Run the task - runOut, err := client.RunTask(ctx, &ecs.RunTaskInput{ - Cluster: aws.String(c.ECSCluster), - TaskDefinition: aws.String(taskDef), - LaunchType: ecs.LaunchTypeFargate, - Count: aws.Int32(1), - NetworkConfiguration: &ecs.NetworkConfiguration{ - AwsvpcConfiguration: &ecs.AwsVpcConfiguration{ - Subnets: c.Subnets, - SecurityGroups: []string{c.SecurityGroup}, - AssignPublicIp: ecs.AssignPublicIpEnabled, + // Spot request + InstanceMarketOptions: &ec2types.InstanceMarketOptionsRequest{ + MarketType: ec2types.MarketTypeSpot, + SpotOptions: &ec2types.SpotMarketOptions{ + MaxPrice: aws.String(spot.MaxPrice), + SpotInstanceType: ec2types.SpotInstanceTypeOneTime, + InstanceInterruptionBehavior: ec2types.InstanceInterruptionBehaviorTerminate, }, }, + + // Network + NetworkInterfaces: []ec2types.InstanceNetworkInterfaceSpecification{{ + DeviceIndex: aws.Int32(0), + SubnetId: aws.String(c.Subnet), + Groups: []string{c.SecurityGroup}, + AssociatePublicIpAddress: aws.Bool(true), + }}, + + // IAM role (for AWS CLI, ECR, S3, ECS access) + IamInstanceProfile: &ec2types.IamInstanceProfileSpecification{ + Arn: aws.String(c.InstanceProfile), + }, + + // Tags for identification and cleanup + TagSpecifications: []ec2types.TagSpecification{{ + ResourceType: ec2types.ResourceTypeInstance, + Tags: []ec2types.Tag{ + {Key: aws.String("Name"), Value: aws.String(runnerName)}, + {Key: aws.String("tinqs-ci"), Value: aws.String("true")}, + {Key: aws.String("tinqs-ci-run"), Value: aws.String(runID)}, + {Key: aws.String("tinqs-ci-label"), Value: aws.String(label)}, + }, + }}, + + // Auto-terminate safety net (instance shuts down → terminates) + InstanceInitiatedShutdownBehavior: ec2types.ShutdownBehaviorTerminate, }) if err != nil { - return "", fmt.Errorf("run task: %w", err) + return "", fmt.Errorf("run instances: %w", err) } - if len(runOut.Tasks) == 0 { - return "", fmt.Errorf("no tasks started") + if len(out.Instances) == 0 { + return "", fmt.Errorf("no instances launched") } - taskArn := *runOut.Tasks[0].TaskArn - log.Printf("Started Fargate task: %s (image=%s, label=%s)", taskArn, imageName, label) - return taskArn, nil + instanceID := *out.Instances[0].InstanceId + log.Printf("Started spot instance: %s (type=%s, label=%s, runner=%s)", + instanceID, spot.InstanceType, label, runnerName) + return instanceID, nil } // --- Lambda executor (for deploy-only jobs) --- @@ -357,9 +375,8 @@ func invokeLambdaExec(ctx context.Context, c cfg, push GiteaPushEvent, workflow, // --- Cancel handler --- func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayProxyResponse, error) { - // Parse workflow_job event to find the run to cancel var event struct { - Action string `json:"action"` + Action string `json:"action"` Repository GiteaRepo `json:"repository"` } if err := json.Unmarshal([]byte(body), &event); err != nil { @@ -372,7 +389,6 @@ func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayPro log.Printf("Cancel request for %s", event.Repository.FullName) - // Find active runs for this repo in DynamoDB awsCfg, _ := config.LoadDefaultConfig(ctx) ddb := dynamodb.NewFromConfig(awsCfg) @@ -387,36 +403,37 @@ func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayPro }, }) if err != nil { - log.Printf("DynamoDB query failed: %v", err) return respond(500, `{"error":"db query failed"}`) } - // Stop all running Fargate tasks for this repo - ecsClient := ecs.NewFromConfig(awsCfg) + ec2Client := ec2.NewFromConfig(awsCfg) stopped := 0 + + // Collect instance IDs to terminate + var instanceIDs []string + var records []RunRecord for _, item := range out.Items { var rec RunRecord - if err := attributevalue.UnmarshalMap(item, &rec); err != nil { - continue - } - if rec.TaskArn == "" { + if err := attributevalue.UnmarshalMap(item, &rec); err != nil || rec.InstanceID == "" { continue } + instanceIDs = append(instanceIDs, rec.InstanceID) + records = append(records, rec) + } - _, err := ecsClient.StopTask(ctx, &ecs.StopTaskInput{ - Cluster: aws.String(c.ECSCluster), - Task: aws.String(rec.TaskArn), - Reason: aws.String("Cancelled via Gitea webhook"), + if len(instanceIDs) > 0 { + _, err := ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{ + InstanceIds: instanceIDs, }) if err != nil { - log.Printf("Failed to stop task %s: %v", rec.TaskArn, err) - continue + log.Printf("Failed to terminate instances: %v", err) + } else { + stopped = len(instanceIDs) + for _, rec := range records { + updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "cancelled") + } + log.Printf("Terminated %d instances for %s", stopped, event.Repository.FullName) } - - // Update status in DynamoDB - updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "cancelled") - stopped++ - log.Printf("Stopped task %s for %s", rec.TaskArn, rec.Repo) } return respond(200, fmt.Sprintf(`{"status":"cancelled","stopped":%d}`, stopped)) @@ -429,9 +446,8 @@ func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse, awsCfg, _ := config.LoadDefaultConfig(ctx) ddb := dynamodb.NewFromConfig(awsCfg) - ecsClient := ecs.NewFromConfig(awsCfg) + ec2Client := ec2.NewFromConfig(awsCfg) - // Scan for runs older than 30 minutes that are still "running" cutoff := time.Now().Add(-30 * time.Minute).Unix() out, err := ddb.Scan(ctx, &dynamodb.ScanInput{ @@ -447,24 +463,51 @@ func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse, return respond(500, fmt.Sprintf(`{"error":"%s"}`, err)) } - killed := 0 + // Batch terminate stale instances + var staleIDs []string + var staleRecs []RunRecord for _, item := range out.Items { var rec RunRecord - if err := attributevalue.UnmarshalMap(item, &rec); err != nil { + if err := attributevalue.UnmarshalMap(item, &rec); err != nil || rec.InstanceID == "" { continue } + staleIDs = append(staleIDs, rec.InstanceID) + staleRecs = append(staleRecs, rec) + } - if rec.TaskArn != "" { - ecsClient.StopTask(ctx, &ecs.StopTaskInput{ - Cluster: aws.String(c.ECSCluster), - Task: aws.String(rec.TaskArn), - Reason: aws.String("Timed out (cleanup cron)"), - }) + killed := 0 + if len(staleIDs) > 0 { + ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{ + InstanceIds: staleIDs, + }) + for _, rec := range staleRecs { + updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "timeout") } + killed = len(staleIDs) + log.Printf("Killed %d stale instances", killed) + } - updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "timeout") - killed++ - log.Printf("Killed stale run: %s/%s (started %d)", rec.Repo, rec.RunID, rec.StartedAt) + // Also sweep any tinqs-ci tagged instances that somehow escaped DynamoDB + descOut, _ := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ + Filters: []ec2types.Filter{ + {Name: aws.String("tag:tinqs-ci"), Values: []string{"true"}}, + {Name: aws.String("instance-state-name"), Values: []string{"running", "pending"}}, + }, + }) + var orphanIDs []string + for _, res := range descOut.Reservations { + for _, inst := range res.Instances { + if inst.LaunchTime != nil && time.Since(*inst.LaunchTime) > 30*time.Minute { + orphanIDs = append(orphanIDs, *inst.InstanceId) + } + } + } + if len(orphanIDs) > 0 { + ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{ + InstanceIds: orphanIDs, + }) + log.Printf("Killed %d orphan instances (tag sweep)", len(orphanIDs)) + killed += len(orphanIDs) } return respond(200, fmt.Sprintf(`{"status":"cleanup","killed":%d}`, killed)) @@ -472,20 +515,20 @@ func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse, // --- DynamoDB helpers --- -func trackRun(ctx context.Context, c cfg, repo, runID, taskArn, workflow, label string) error { +func trackRun(ctx context.Context, c cfg, repo, runID, instanceID, workflow, label string) error { awsCfg, _ := config.LoadDefaultConfig(ctx) ddb := dynamodb.NewFromConfig(awsCfg) now := time.Now() rec := RunRecord{ - Repo: repo, - RunID: runID, - TaskArn: taskArn, - Status: "running", - Workflow: workflow, - Label: label, - StartedAt: now.Unix(), - TTL: now.Add(7 * 24 * time.Hour).Unix(), + Repo: repo, + RunID: runID, + InstanceID: instanceID, + Status: "running", + Workflow: workflow, + Label: label, + StartedAt: now.Unix(), + TTL: now.Add(7 * 24 * time.Hour).Unix(), } item, err := attributevalue.MarshalMap(rec) @@ -576,23 +619,21 @@ func fetchRawFile(baseURL, token, repoFullName, path, branch string) (string, er // --- Trigger evaluation --- func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool { - // Parse the on: section onMap, ok := wf.On.(map[string]interface{}) if !ok { - return true // simple trigger like on: push — always matches + return true } pushCfg, ok := onMap["push"] if !ok { - return false // no push trigger + return false } pushMap, ok := pushCfg.(map[string]interface{}) if !ok { - return true // on: push without config — matches all + return true } - // Check branches if branches, ok := pushMap["branches"].([]interface{}); ok { matched := false for _, b := range branches { @@ -606,7 +647,6 @@ func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool { } } - // Check paths (if specified, at least one changed file must match) if paths, ok := pushMap["paths"].([]interface{}); ok && len(changedFiles) > 0 { matched := false for _, p := range paths { @@ -626,7 +666,6 @@ func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool { } } - // Check paths-ignore if ignorePaths, ok := pushMap["paths-ignore"].([]interface{}); ok && len(changedFiles) > 0 { allIgnored := true for _, f := range changedFiles { @@ -651,7 +690,6 @@ func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool { } func matchPath(pattern, file string) bool { - // Simple glob matching: foo/** matches foo/bar/baz, *.go matches main.go if strings.HasSuffix(pattern, "/**") { prefix := strings.TrimSuffix(pattern, "/**") return strings.HasPrefix(file, prefix+"/") || file == prefix