From e96c7c5bf1dd5eeca3fc334f431b497eb22b1b0a Mon Sep 17 00:00:00 2001 From: tinqs-limited Date: Fri, 22 May 2026 18:47:47 +0100 Subject: [PATCH] =?UTF-8?q?feat:=20CI=20orchestrator=20=E2=80=94=20Lambda?= =?UTF-8?q?=20dispatch=20+=20Fargate=20routing=20+=20cancel?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit dispatch: receives Gitea webhook, routes by runs-on label to Fargate tasks (go/node/docker/godot) or Lambda executor (deploy). Path filter evaluation, DynamoDB run tracking, cancel via StopTask. exec: lightweight Lambda for deploy-only jobs (S3 sync, ECS update). SAM template: API Gateway + 2 Lambdas + DynamoDB + cleanup cron. --- orchestrator/Makefile | 45 ++ orchestrator/README.md | 56 +++ orchestrator/deploy/template.yaml | 187 ++++++++ orchestrator/dispatch/go.mod | 14 + orchestrator/dispatch/main.go | 702 ++++++++++++++++++++++++++++++ orchestrator/exec/go.mod | 8 + orchestrator/exec/main.go | 233 ++++++++++ orchestrator/test-event.json | 9 + 8 files changed, 1254 insertions(+) create mode 100644 orchestrator/Makefile create mode 100644 orchestrator/README.md create mode 100644 orchestrator/deploy/template.yaml create mode 100644 orchestrator/dispatch/go.mod create mode 100644 orchestrator/dispatch/main.go create mode 100644 orchestrator/exec/go.mod create mode 100644 orchestrator/exec/main.go create mode 100644 orchestrator/test-event.json diff --git a/orchestrator/Makefile b/orchestrator/Makefile new file mode 100644 index 0000000..16ad3aa --- /dev/null +++ b/orchestrator/Makefile @@ -0,0 +1,45 @@ +.PHONY: build build-dispatch build-exec tidy test-local deploy deploy-guided clean + +REGION ?= eu-west-1 + +build: build-dispatch build-exec + +build-dispatch: + cd dispatch && GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -tags lambda.norpc -o bootstrap . + cd dispatch && zip -j function.zip bootstrap && rm bootstrap + +build-exec: + cd exec && GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -tags lambda.norpc -o bootstrap . + cd exec && zip -j function.zip bootstrap && rm bootstrap + +tidy: + cd dispatch && go mod tidy + cd exec && go mod tidy + +test-local: + cd deploy && sam local invoke DispatchFunction \ + --template-file template.yaml \ + --event ../test-event.json \ + --region $(REGION) + +deploy: + cd deploy && sam build --template-file template.yaml && \ + sam deploy \ + --template-file .aws-sam/build/template.yaml \ + --stack-name tinqs-ci \ + --region $(REGION) \ + --capabilities CAPABILITY_IAM \ + --parameter-overrides \ + GiteaToken=$${GITEA_TOKEN} \ + Subnets=$${SUBNETS} \ + SecurityGroup=$${SECURITY_GROUP} \ + --resolve-s3 \ + --no-confirm-changeset + +deploy-guided: + cd deploy && sam build --template-file template.yaml && \ + sam deploy --guided --template-file .aws-sam/build/template.yaml --stack-name tinqs-ci + +clean: + rm -f dispatch/bootstrap dispatch/function.zip + rm -f exec/bootstrap exec/function.zip diff --git a/orchestrator/README.md b/orchestrator/README.md new file mode 100644 index 0000000..63b6357 --- /dev/null +++ b/orchestrator/README.md @@ -0,0 +1,56 @@ +# tinqs/ci orchestrator + +Lambda-based CI dispatcher for Tinqs Studio. Receives Gitea webhooks and routes jobs to the right execution environment. + +## Architecture + +``` +Gitea push webhook + │ + ▼ + API Gateway POST /webhook + │ + ▼ + ci-dispatch Lambda + │ + ├── runs-on: go/node/docker/godot + │ → Start Fargate task with matching image + │ → Track in DynamoDB for cancel + │ + ├── runs-on: deploy + │ → Invoke ci-exec Lambda directly + │ + └── runs-on: host + → Skip (handled by registered runner) +``` + +## Deploy + +Requires: AWS SAM CLI, AWS credentials, Gitea token. + +```bash +# First time (interactive) +GITEA_TOKEN=xxx make deploy-guided + +# Subsequent deploys +GITEA_TOKEN=xxx SUBNETS=subnet-abc,subnet-def SECURITY_GROUP=sg-xxx make deploy +``` + +After deploy, configure the webhook URL as a Gitea **system webhook**: +- URL: `https://.execute-api.eu-west-1.amazonaws.com/prod/webhook` +- Method: POST +- Content type: application/json +- Events: Push, Workflow Job (for cancel) + +## Cancel support + +When a user cancels a job in the Gitea UI, the `workflow_job` webhook fires with `action: cancelled`. The dispatcher looks up the Fargate task ARN in DynamoDB and calls `ecs:StopTask`. + +A cleanup cron (every 5 min) also kills Fargate tasks that have been running longer than 30 minutes. + +## Local testing + +```bash +make build +make test-local # requires SAM CLI + Docker +``` diff --git a/orchestrator/deploy/template.yaml b/orchestrator/deploy/template.yaml new file mode 100644 index 0000000..e00d71a --- /dev/null +++ b/orchestrator/deploy/template.yaml @@ -0,0 +1,187 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: Tinqs CI Orchestrator — webhook dispatch, Fargate routing, cancel support + +Parameters: + GiteaURL: + Type: String + Default: https://tinqs.com + GiteaToken: + Type: String + NoEcho: true + ECSCluster: + Type: String + Default: tinqs-git + Subnets: + Type: CommaDelimitedList + Description: VPC subnet IDs for Fargate tasks + SecurityGroup: + Type: String + Description: Security group for Fargate tasks + ECRBase: + Type: String + Default: 149751500842.dkr.ecr.eu-west-1.amazonaws.com + TaskRoleArn: + Type: String + Default: arn:aws:iam::149751500842:role/tinqs-git-task + ExecRoleArn: + Type: String + Default: arn:aws:iam::149751500842:role/ecsTaskExecutionRole + +Globals: + Function: + Runtime: provided.al2023 + Architectures: [x86_64] + Timeout: 30 + MemorySize: 128 + +Resources: + + # --- API Gateway --- + WebhookApi: + Type: AWS::Serverless::Api + Properties: + Name: tinqs-ci-webhook + StageName: prod + + # --- DynamoDB --- + RunsTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: tinqs-ci-runs + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: repo + AttributeType: S + - AttributeName: run_id + AttributeType: S + KeySchema: + - AttributeName: repo + KeyType: HASH + - AttributeName: run_id + KeyType: RANGE + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + + # --- Dispatcher Lambda --- + DispatchFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: tinqs-ci-dispatch + Handler: bootstrap + CodeUri: ../dispatch/ + Description: Receives webhook, routes to Fargate or Lambda by runs-on label + Timeout: 60 + MemorySize: 256 + Environment: + Variables: + GITEA_URL: !Ref GiteaURL + GITEA_TOKEN: !Ref GiteaToken + EXECUTOR_FUNCTION_NAME: !Ref ExecFunction + ECS_CLUSTER: !Ref ECSCluster + SUBNETS: !Join [",", !Ref Subnets] + SECURITY_GROUP: !Ref SecurityGroup + ECR_BASE: !Ref ECRBase + DDB_TABLE: !Ref RunsTable + TASK_ROLE_ARN: !Ref TaskRoleArn + EXEC_ROLE_ARN: !Ref ExecRoleArn + Policies: + - LambdaInvokePolicy: + FunctionName: !Ref ExecFunction + - DynamoDBCrudPolicy: + TableName: !Ref RunsTable + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - ecs:RunTask + - ecs:StopTask + - ecs:RegisterTaskDefinition + - ecs:DescribeTasks + Resource: '*' + - Effect: Allow + Action: iam:PassRole + Resource: + - !Ref TaskRoleArn + - !Ref ExecRoleArn + Events: + Webhook: + Type: Api + Properties: + RestApiId: !Ref WebhookApi + Path: /webhook + Method: POST + + # --- Executor Lambda (deploy-only jobs) --- + ExecFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: tinqs-ci-exec + Handler: bootstrap + CodeUri: ../exec/ + Description: Executes deploy-only workflow steps directly in Lambda + Timeout: 900 + MemorySize: 2048 + EphemeralStorage: + Size: 5120 + Environment: + Variables: + GITEA_URL: !Ref GiteaURL + GITEA_TOKEN: !Ref GiteaToken + Policies: + - S3CrudPolicy: + BucketName: tinqs-cli-releases + - S3CrudPolicy: + BucketName: arikigame.com + - S3CrudPolicy: + BucketName: docs.tinqs.com + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - ecs:UpdateService + - ecs:DescribeServices + Resource: !Sub 'arn:aws:ecs:${AWS::Region}:${AWS::AccountId}:service/tinqs-git/*' + - Effect: Allow + Action: + - cloudfront:CreateInvalidation + Resource: '*' + + # --- Cleanup Cron (every 5 min) --- + CleanupRule: + Type: AWS::Events::Rule + Properties: + Name: tinqs-ci-cleanup + ScheduleExpression: 'rate(5 minutes)' + State: ENABLED + Targets: + - Id: cleanup + Arn: !GetAtt DispatchFunction.Arn + Input: '{"body":"{\"action\":\"cleanup\"}","headers":{}}' + + CleanupPermission: + Type: AWS::Lambda::Permission + Properties: + FunctionName: !Ref DispatchFunction + Action: lambda:InvokeFunction + Principal: events.amazonaws.com + SourceArn: !GetAtt CleanupRule.Arn + + # --- Log Group --- + CILogGroup: + Type: AWS::Logs::LogGroup + Properties: + LogGroupName: /ecs/tinqs-ci + RetentionInDays: 14 + +Outputs: + WebhookURL: + Description: Configure this as Gitea system webhook + Value: !Sub 'https://${WebhookApi}.execute-api.${AWS::Region}.amazonaws.com/prod/webhook' + DispatchArn: + Value: !GetAtt DispatchFunction.Arn + ExecArn: + Value: !GetAtt ExecFunction.Arn + RunsTable: + Value: !Ref RunsTable diff --git a/orchestrator/dispatch/go.mod b/orchestrator/dispatch/go.mod new file mode 100644 index 0000000..e81358c --- /dev/null +++ b/orchestrator/dispatch/go.mod @@ -0,0 +1,14 @@ +module tinqs.com/tinqs/ci/orchestrator/dispatch + +go 1.23 + +require ( + github.com/aws/aws-lambda-go v1.47.0 + github.com/aws/aws-sdk-go-v2 v1.32.5 + github.com/aws/aws-sdk-go-v2/config v1.28.0 + github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0 + github.com/aws/aws-sdk-go-v2/service/ecs v1.52.0 + github.com/aws/aws-sdk-go-v2/service/lambda v1.69.0 + gopkg.in/yaml.v3 v3.0.1 +) diff --git a/orchestrator/dispatch/main.go b/orchestrator/dispatch/main.go new file mode 100644 index 0000000..8dadbba --- /dev/null +++ b/orchestrator/dispatch/main.go @@ -0,0 +1,702 @@ +// tinqs/ci orchestrator — dispatcher Lambda +// +// Receives Gitea system webhooks, determines which workflows to run, +// and routes each job to the right execution environment based on runs-on label: +// +// go, node, docker, godot → Fargate task with matching runner image +// deploy → Lambda direct execution (ci-exec) +// host → skip (handled by registered always-on runner) +// +// Also handles cancel events (stop Fargate tasks) and cleanup cron. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "net/http" + "os" + "strings" + "time" + + "github.com/aws/aws-lambda-go/events" + "github.com/aws/aws-lambda-go/lambda" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/aws/aws-sdk-go-v2/service/ecs" + awslambda "github.com/aws/aws-sdk-go-v2/service/lambda" + "gopkg.in/yaml.v3" +) + +// --- Gitea webhook types --- + +type GiteaPushEvent struct { + Ref string `json:"ref"` + Before string `json:"before"` + After string `json:"after"` + Repository GiteaRepo `json:"repository"` + Pusher GiteaUser `json:"pusher"` + Commits []GiteaCommit `json:"commits"` +} + +type GiteaRepo struct { + ID int `json:"id"` + Name string `json:"name"` + FullName string `json:"full_name"` + CloneURL string `json:"clone_url"` + HTMLURL string `json:"html_url"` +} + +type GiteaUser struct { + Login string `json:"login"` + Email string `json:"email"` +} + +type GiteaCommit struct { + ID string `json:"id"` + Message string `json:"message"` + Added []string `json:"added"` + Removed []string `json:"removed"` + Modified []string `json:"modified"` +} + +// --- Workflow YAML (minimal parse) --- + +type Workflow struct { + Name string `yaml:"name"` + On interface{} `yaml:"on"` + Jobs map[string]WorkflowJob `yaml:"jobs"` +} + +type WorkflowJob struct { + RunsOn string `yaml:"runs-on"` +} + +// --- DynamoDB run record --- + +type RunRecord struct { + Repo string `dynamodbav:"repo"` + RunID string `dynamodbav:"run_id"` + TaskArn string `dynamodbav:"task_arn"` + Status string `dynamodbav:"status"` + Workflow string `dynamodbav:"workflow"` + Label string `dynamodbav:"label"` + StartedAt int64 `dynamodbav:"started_at"` + TTL int64 `dynamodbav:"ttl"` +} + +// --- Image routing --- + +var labelToImage = map[string]string{ + "go": "tinqs-runner-go", + "node": "tinqs-runner-node", + "docker": "tinqs-runner-docker", + "deploy": "tinqs-runner-deploy", + "godot": "tinqs-runner-godot", +} + +var labelToResources = map[string][2]string{ + "go": {"1024", "2048"}, // 1 vCPU, 2 GB + "node": {"1024", "2048"}, + "docker": {"2048", "4096"}, // 2 vCPU, 4 GB (Docker builds are heavy) + "deploy": {"512", "1024"}, // 0.5 vCPU, 1 GB (lightweight) + "godot": {"2048", "4096"}, +} + +// --- Config from env --- + +type cfg struct { + GiteaURL string + GiteaToken string + ExecFnName string + ECSCluster string + Subnets []string + SecurityGroup string + ECRBase string + DDBTable string + TaskRoleArn string + ExecRoleArn string +} + +func loadCfg() cfg { + return cfg{ + GiteaURL: os.Getenv("GITEA_URL"), + GiteaToken: os.Getenv("GITEA_TOKEN"), + ExecFnName: os.Getenv("EXECUTOR_FUNCTION_NAME"), + ECSCluster: os.Getenv("ECS_CLUSTER"), + Subnets: strings.Split(os.Getenv("SUBNETS"), ","), + SecurityGroup: os.Getenv("SECURITY_GROUP"), + ECRBase: os.Getenv("ECR_BASE"), // e.g. 149751500842.dkr.ecr.eu-west-1.amazonaws.com + DDBTable: os.Getenv("DDB_TABLE"), + TaskRoleArn: os.Getenv("TASK_ROLE_ARN"), + ExecRoleArn: os.Getenv("EXEC_ROLE_ARN"), + } +} + +func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { + c := loadCfg() + + // Handle cleanup cron (EventBridge invokes with action=cleanup) + if request.Body == "" || strings.Contains(request.Body, `"action":"cleanup"`) { + return handleCleanup(ctx, c) + } + + // Handle cancel webhook + eventType := request.Headers["x-gitea-event"] + if eventType == "" { + eventType = request.Headers["X-Gitea-Event"] + } + if eventType == "workflow_job" { + return handleCancel(ctx, c, request.Body) + } + + // Handle push event + var push GiteaPushEvent + if err := json.Unmarshal([]byte(request.Body), &push); err != nil { + log.Printf("Failed to parse webhook: %v", err) + return respond(400, `{"error":"invalid payload"}`) + } + + // Only process branch pushes + if !strings.HasPrefix(push.Ref, "refs/heads/") { + return respond(200, `{"status":"skipped","reason":"not a branch push"}`) + } + + branch := strings.TrimPrefix(push.Ref, "refs/heads/") + log.Printf("Push to %s branch=%s by=%s commits=%d", + push.Repository.FullName, branch, push.Pusher.Login, len(push.Commits)) + + // Collect all changed files from commits + changedFiles := collectChangedFiles(push.Commits) + + // Fetch and filter workflows + workflows, err := fetchWorkflows(c.GiteaURL, c.GiteaToken, push.Repository.FullName, branch) + if err != nil || len(workflows) == 0 { + return respond(200, `{"status":"no_workflows"}`) + } + + // Route each workflow + dispatched := 0 + for name, content := range workflows { + var wf Workflow + if err := yaml.Unmarshal([]byte(content), &wf); err != nil { + log.Printf("Failed to parse %s: %v", name, err) + continue + } + + // Check if workflow triggers on this branch + paths + if !shouldTrigger(wf, branch, changedFiles) { + log.Printf("Skipping %s (no matching trigger)", name) + continue + } + + // Get the first job's runs-on label + label := "host" + for _, job := range wf.Jobs { + if job.RunsOn != "" { + label = job.RunsOn + break + } + } + + runID := fmt.Sprintf("%s-%s-%d", push.After[:7], name, time.Now().UnixMilli()) + + switch label { + case "deploy": + // Lightweight: invoke Lambda executor directly + if err := invokeLambdaExec(ctx, c, push, name, content); err != nil { + log.Printf("Failed to invoke executor for %s: %v", name, err) + continue + } + case "host": + // Legacy: skip, the always-on runner handles it + log.Printf("Skipping %s (runs-on: host — handled by registered runner)", name) + continue + default: + // Fargate: start ephemeral runner with matching image + taskArn, err := startFargateRunner(ctx, c, push, label, name, runID) + if err != nil { + log.Printf("Failed to start Fargate for %s: %v", name, err) + continue + } + // Track in DynamoDB + if err := trackRun(ctx, c, push.Repository.FullName, runID, taskArn, name, label); err != nil { + log.Printf("Warning: failed to track run: %v", err) + } + } + + dispatched++ + } + + return respond(200, fmt.Sprintf( + `{"status":"dispatched","workflows":%d,"dispatched":%d,"repo":"%s","branch":"%s"}`, + len(workflows), dispatched, push.Repository.FullName, branch)) +} + +// --- Fargate runner --- + +func startFargateRunner(ctx context.Context, c cfg, push GiteaPushEvent, label, workflow, runID string) (string, error) { + awsCfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return "", fmt.Errorf("aws config: %w", err) + } + + imageName := labelToImage[label] + if imageName == "" { + return "", fmt.Errorf("unknown label: %s", label) + } + + image := fmt.Sprintf("%s/%s:latest", c.ECRBase, imageName) + resources := labelToResources[label] + cpu, memory := resources[0], resources[1] + + client := ecs.NewFromConfig(awsCfg) + + // Register a one-off task definition for this run + taskDefOut, err := client.RegisterTaskDefinition(ctx, &ecs.RegisterTaskDefinitionInput{ + Family: aws.String(fmt.Sprintf("tinqs-ci-%s", label)), + RequiresCompatibilities: []ecs.Compatibility{ecs.CompatibilityFargate}, + NetworkMode: ecs.NetworkModeAwsvpc, + Cpu: aws.String(cpu), + Memory: aws.String(memory), + TaskRoleArn: aws.String(c.TaskRoleArn), + ExecutionRoleArn: aws.String(c.ExecRoleArn), + ContainerDefinitions: []ecs.ContainerDefinition{{ + Name: aws.String("runner"), + Image: aws.String(image), + Essential: aws.Bool(true), + Environment: []ecs.KeyValuePair{ + {Name: aws.String("GITEA_INSTANCE_URL"), Value: aws.String(c.GiteaURL)}, + {Name: aws.String("GITEA_RUNNER_REGISTRATION_TOKEN"), Value: aws.String(c.GiteaToken)}, + {Name: aws.String("GITEA_RUNNER_NAME"), Value: aws.String(fmt.Sprintf("ephemeral-%s-%s", label, runID[:8]))}, + {Name: aws.String("GITEA_RUNNER_LABELS"), Value: aws.String(fmt.Sprintf("%s:host", label))}, + {Name: aws.String("GITEA_RUNNER_EPHEMERAL"), Value: aws.String("true")}, + }, + LogConfiguration: &ecs.LogConfiguration{ + LogDriver: ecs.LogDriverAwslogs, + Options: map[string]string{ + "awslogs-group": "/ecs/tinqs-ci", + "awslogs-region": "eu-west-1", + "awslogs-stream-prefix": label, + }, + }, + }}, + }) + if err != nil { + return "", fmt.Errorf("register task def: %w", err) + } + + taskDef := fmt.Sprintf("%s:%d", *taskDefOut.TaskDefinition.Family, + taskDefOut.TaskDefinition.Revision) + + // Run the task + runOut, err := client.RunTask(ctx, &ecs.RunTaskInput{ + Cluster: aws.String(c.ECSCluster), + TaskDefinition: aws.String(taskDef), + LaunchType: ecs.LaunchTypeFargate, + Count: aws.Int32(1), + NetworkConfiguration: &ecs.NetworkConfiguration{ + AwsvpcConfiguration: &ecs.AwsVpcConfiguration{ + Subnets: c.Subnets, + SecurityGroups: []string{c.SecurityGroup}, + AssignPublicIp: ecs.AssignPublicIpEnabled, + }, + }, + }) + if err != nil { + return "", fmt.Errorf("run task: %w", err) + } + + if len(runOut.Tasks) == 0 { + return "", fmt.Errorf("no tasks started") + } + + taskArn := *runOut.Tasks[0].TaskArn + log.Printf("Started Fargate task: %s (image=%s, label=%s)", taskArn, imageName, label) + return taskArn, nil +} + +// --- Lambda executor (for deploy-only jobs) --- + +func invokeLambdaExec(ctx context.Context, c cfg, push GiteaPushEvent, workflow, content string) error { + if c.ExecFnName == "" { + log.Printf("[DRY RUN] Would invoke executor for %s", workflow) + return nil + } + + awsCfg, err := config.LoadDefaultConfig(ctx) + if err != nil { + return err + } + + payload, _ := json.Marshal(map[string]interface{}{ + "repo": push.Repository, + "ref": push.Ref, + "commit_sha": push.After, + "pusher": push.Pusher.Login, + "workflow_name": workflow, + "workflow_yaml": content, + "gitea_url": c.GiteaURL, + "gitea_token": c.GiteaToken, + }) + + client := awslambda.NewFromConfig(awsCfg) + _, err = client.Invoke(ctx, &awslambda.InvokeInput{ + FunctionName: aws.String(c.ExecFnName), + InvocationType: "Event", + Payload: payload, + }) + return err +} + +// --- Cancel handler --- + +func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayProxyResponse, error) { + // Parse workflow_job event to find the run to cancel + var event struct { + Action string `json:"action"` + Repository GiteaRepo `json:"repository"` + } + if err := json.Unmarshal([]byte(body), &event); err != nil { + return respond(400, `{"error":"invalid cancel event"}`) + } + + if event.Action != "cancelled" { + return respond(200, `{"status":"ignored","reason":"not a cancel event"}`) + } + + log.Printf("Cancel request for %s", event.Repository.FullName) + + // Find active runs for this repo in DynamoDB + awsCfg, _ := config.LoadDefaultConfig(ctx) + ddb := dynamodb.NewFromConfig(awsCfg) + + out, err := ddb.Query(ctx, &dynamodb.QueryInput{ + TableName: aws.String(c.DDBTable), + KeyConditionExpression: aws.String("repo = :repo"), + FilterExpression: aws.String("#s = :running"), + ExpressionAttributeNames: map[string]string{"#s": "status"}, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":repo": &types.AttributeValueMemberS{Value: event.Repository.FullName}, + ":running": &types.AttributeValueMemberS{Value: "running"}, + }, + }) + if err != nil { + log.Printf("DynamoDB query failed: %v", err) + return respond(500, `{"error":"db query failed"}`) + } + + // Stop all running Fargate tasks for this repo + ecsClient := ecs.NewFromConfig(awsCfg) + stopped := 0 + for _, item := range out.Items { + var rec RunRecord + if err := attributevalue.UnmarshalMap(item, &rec); err != nil { + continue + } + if rec.TaskArn == "" { + continue + } + + _, err := ecsClient.StopTask(ctx, &ecs.StopTaskInput{ + Cluster: aws.String(c.ECSCluster), + Task: aws.String(rec.TaskArn), + Reason: aws.String("Cancelled via Gitea webhook"), + }) + if err != nil { + log.Printf("Failed to stop task %s: %v", rec.TaskArn, err) + continue + } + + // Update status in DynamoDB + updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "cancelled") + stopped++ + log.Printf("Stopped task %s for %s", rec.TaskArn, rec.Repo) + } + + return respond(200, fmt.Sprintf(`{"status":"cancelled","stopped":%d}`, stopped)) +} + +// --- Cleanup cron --- + +func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse, error) { + log.Println("Running cleanup...") + + awsCfg, _ := config.LoadDefaultConfig(ctx) + ddb := dynamodb.NewFromConfig(awsCfg) + ecsClient := ecs.NewFromConfig(awsCfg) + + // Scan for runs older than 30 minutes that are still "running" + cutoff := time.Now().Add(-30 * time.Minute).Unix() + + out, err := ddb.Scan(ctx, &dynamodb.ScanInput{ + TableName: aws.String(c.DDBTable), + FilterExpression: aws.String("#s = :running AND started_at < :cutoff"), + ExpressionAttributeNames: map[string]string{"#s": "status"}, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":running": &types.AttributeValueMemberS{Value: "running"}, + ":cutoff": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", cutoff)}, + }, + }) + if err != nil { + return respond(500, fmt.Sprintf(`{"error":"%s"}`, err)) + } + + killed := 0 + for _, item := range out.Items { + var rec RunRecord + if err := attributevalue.UnmarshalMap(item, &rec); err != nil { + continue + } + + if rec.TaskArn != "" { + ecsClient.StopTask(ctx, &ecs.StopTaskInput{ + Cluster: aws.String(c.ECSCluster), + Task: aws.String(rec.TaskArn), + Reason: aws.String("Timed out (cleanup cron)"), + }) + } + + updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "timeout") + killed++ + log.Printf("Killed stale run: %s/%s (started %d)", rec.Repo, rec.RunID, rec.StartedAt) + } + + return respond(200, fmt.Sprintf(`{"status":"cleanup","killed":%d}`, killed)) +} + +// --- DynamoDB helpers --- + +func trackRun(ctx context.Context, c cfg, repo, runID, taskArn, workflow, label string) error { + awsCfg, _ := config.LoadDefaultConfig(ctx) + ddb := dynamodb.NewFromConfig(awsCfg) + + now := time.Now() + rec := RunRecord{ + Repo: repo, + RunID: runID, + TaskArn: taskArn, + Status: "running", + Workflow: workflow, + Label: label, + StartedAt: now.Unix(), + TTL: now.Add(7 * 24 * time.Hour).Unix(), + } + + item, err := attributevalue.MarshalMap(rec) + if err != nil { + return err + } + + _, err = ddb.PutItem(ctx, &dynamodb.PutItemInput{ + TableName: aws.String(c.DDBTable), + Item: item, + }) + return err +} + +func updateRunStatus(ctx context.Context, ddb *dynamodb.Client, table, repo, runID, status string) { + ddb.UpdateItem(ctx, &dynamodb.UpdateItemInput{ + TableName: aws.String(table), + Key: map[string]types.AttributeValue{ + "repo": &types.AttributeValueMemberS{Value: repo}, + "run_id": &types.AttributeValueMemberS{Value: runID}, + }, + UpdateExpression: aws.String("SET #s = :status"), + ExpressionAttributeNames: map[string]string{"#s": "status"}, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":status": &types.AttributeValueMemberS{Value: status}, + }, + }) +} + +// --- Gitea API --- + +func fetchWorkflows(baseURL, token, repoFullName, branch string) (map[string]string, error) { + url := fmt.Sprintf("%s/api/v1/repos/%s/contents/.gitea/workflows?ref=%s", + baseURL, repoFullName, branch) + + req, _ := http.NewRequest("GET", url, nil) + req.Header.Set("Authorization", "token "+token) + + resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == 404 { + return nil, nil + } + + var contents []struct { + Name string `json:"name"` + Path string `json:"path"` + } + if err := json.NewDecoder(resp.Body).Decode(&contents); err != nil { + return nil, err + } + + workflows := make(map[string]string) + for _, c := range contents { + if !strings.HasSuffix(c.Name, ".yml") && !strings.HasSuffix(c.Name, ".yaml") { + continue + } + content, err := fetchRawFile(baseURL, token, repoFullName, c.Path, branch) + if err != nil { + log.Printf("Failed to fetch %s: %v", c.Path, err) + continue + } + workflows[c.Name] = content + } + return workflows, nil +} + +func fetchRawFile(baseURL, token, repoFullName, path, branch string) (string, error) { + url := fmt.Sprintf("%s/api/v1/repos/%s/raw/%s?ref=%s", baseURL, repoFullName, path, branch) + req, _ := http.NewRequest("GET", url, nil) + req.Header.Set("Authorization", "token "+token) + + resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + body := make([]byte, 1<<20) + n, _ := resp.Body.Read(body) + return string(body[:n]), nil +} + +// --- Trigger evaluation --- + +func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool { + // Parse the on: section + onMap, ok := wf.On.(map[string]interface{}) + if !ok { + return true // simple trigger like on: push — always matches + } + + pushCfg, ok := onMap["push"] + if !ok { + return false // no push trigger + } + + pushMap, ok := pushCfg.(map[string]interface{}) + if !ok { + return true // on: push without config — matches all + } + + // Check branches + if branches, ok := pushMap["branches"].([]interface{}); ok { + matched := false + for _, b := range branches { + if fmt.Sprint(b) == branch { + matched = true + break + } + } + if !matched { + return false + } + } + + // Check paths (if specified, at least one changed file must match) + if paths, ok := pushMap["paths"].([]interface{}); ok && len(changedFiles) > 0 { + matched := false + for _, p := range paths { + pattern := fmt.Sprint(p) + for _, f := range changedFiles { + if matchPath(pattern, f) { + matched = true + break + } + } + if matched { + break + } + } + if !matched { + return false + } + } + + // Check paths-ignore + if ignorePaths, ok := pushMap["paths-ignore"].([]interface{}); ok && len(changedFiles) > 0 { + allIgnored := true + for _, f := range changedFiles { + ignored := false + for _, p := range ignorePaths { + if matchPath(fmt.Sprint(p), f) { + ignored = true + break + } + } + if !ignored { + allIgnored = false + break + } + } + if allIgnored { + return false + } + } + + return true +} + +func matchPath(pattern, file string) bool { + // Simple glob matching: foo/** matches foo/bar/baz, *.go matches main.go + if strings.HasSuffix(pattern, "/**") { + prefix := strings.TrimSuffix(pattern, "/**") + return strings.HasPrefix(file, prefix+"/") || file == prefix + } + if strings.HasPrefix(pattern, "*.") { + ext := strings.TrimPrefix(pattern, "*") + return strings.HasSuffix(file, ext) + } + return strings.HasPrefix(file, pattern) || file == pattern +} + +func collectChangedFiles(commits []GiteaCommit) []string { + seen := make(map[string]bool) + var files []string + for _, c := range commits { + for _, f := range c.Added { + if !seen[f] { + files = append(files, f) + seen[f] = true + } + } + for _, f := range c.Modified { + if !seen[f] { + files = append(files, f) + seen[f] = true + } + } + for _, f := range c.Removed { + if !seen[f] { + files = append(files, f) + seen[f] = true + } + } + } + return files +} + +func respond(status int, body string) (events.APIGatewayProxyResponse, error) { + return events.APIGatewayProxyResponse{ + StatusCode: status, + Headers: map[string]string{"Content-Type": "application/json"}, + Body: body, + }, nil +} + +func main() { + lambda.Start(handler) +} diff --git a/orchestrator/exec/go.mod b/orchestrator/exec/go.mod new file mode 100644 index 0000000..49ef67a --- /dev/null +++ b/orchestrator/exec/go.mod @@ -0,0 +1,8 @@ +module tinqs.com/tinqs/ci/orchestrator/exec + +go 1.23 + +require ( + github.com/aws/aws-lambda-go v1.47.0 + gopkg.in/yaml.v3 v3.0.1 +) diff --git a/orchestrator/exec/main.go b/orchestrator/exec/main.go new file mode 100644 index 0000000..960971b --- /dev/null +++ b/orchestrator/exec/main.go @@ -0,0 +1,233 @@ +// tinqs/ci orchestrator — executor Lambda +// +// Handles deploy-only jobs directly in Lambda (no Fargate needed). +// Parses workflow YAML, runs shell steps, reports commit status to Gitea. +// 15 min timeout, 2 GB memory, 5 GB /tmp. + +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "os/exec" + "time" + + "github.com/aws/aws-lambda-go/lambda" + "gopkg.in/yaml.v3" +) + +type ExecPayload struct { + Repo GiteaRepo `json:"repo"` + Ref string `json:"ref"` + CommitSHA string `json:"commit_sha"` + Pusher string `json:"pusher"` + Workflow string `json:"workflow_name"` + WorkflowYAML string `json:"workflow_yaml"` + GiteaURL string `json:"gitea_url"` + GiteaToken string `json:"gitea_token"` +} + +type GiteaRepo struct { + FullName string `json:"full_name"` + CloneURL string `json:"clone_url"` +} + +type Workflow struct { + Name string `yaml:"name"` + Jobs map[string]WorkflowJob `yaml:"jobs"` +} + +type WorkflowJob struct { + Steps []WorkflowStep `yaml:"steps"` + Env map[string]string `yaml:"env"` +} + +type WorkflowStep struct { + Name string `yaml:"name"` + Run string `yaml:"run"` + Uses string `yaml:"uses"` + Env map[string]string `yaml:"env"` +} + +type StepResult struct { + Name string `json:"name"` + Status string `json:"status"` + Duration string `json:"duration"` +} + +type JobResult struct { + Workflow string `json:"workflow"` + Status string `json:"status"` + Steps []StepResult `json:"steps"` + Duration string `json:"duration"` +} + +func handler(ctx context.Context, payload ExecPayload) (JobResult, error) { + sha := payload.CommitSHA + if len(sha) > 7 { + sha = sha[:7] + } + log.Printf("Executor: repo=%s workflow=%s sha=%s", payload.Repo.FullName, payload.Workflow, sha) + + start := time.Now() + + var wf Workflow + if err := yaml.Unmarshal([]byte(payload.WorkflowYAML), &wf); err != nil { + return JobResult{Workflow: payload.Workflow, Status: "failure"}, err + } + + setCommitStatus(payload, "pending", "CI running...") + + overallStatus := "success" + var steps []StepResult + + for _, job := range wf.Jobs { + // Build environment + env := os.Environ() + env = append(env, + "CI=true", + fmt.Sprintf("GITHUB_REPOSITORY=%s", payload.Repo.FullName), + fmt.Sprintf("GITHUB_REF=%s", payload.Ref), + fmt.Sprintf("GITHUB_SHA=%s", payload.CommitSHA), + ) + for k, v := range job.Env { + env = append(env, fmt.Sprintf("%s=%s", k, v)) + } + + workdir := "/tmp/workspace" + os.MkdirAll(workdir, 0755) + os.RemoveAll(workdir) + os.MkdirAll(workdir, 0755) + + for i, step := range job.Steps { + stepStart := time.Now() + name := step.Name + if name == "" { + name = fmt.Sprintf("Step %d", i+1) + } + + // Handle checkout action + if step.Uses != "" && contains(step.Uses, "checkout") { + cloneURL := payload.Repo.CloneURL + if payload.GiteaToken != "" { + cloneURL = fmt.Sprintf("https://token:%s@%s", + payload.GiteaToken, cloneURL[8:]) // strip https:// + } + branch := payload.Ref + if len(branch) > 11 && branch[:11] == "refs/heads/" { + branch = branch[11:] + } + cmd := exec.CommandContext(ctx, "git", "clone", "--depth=1", "--branch", branch, cloneURL, workdir) + if out, err := cmd.CombinedOutput(); err != nil { + log.Printf("Checkout failed: %s", string(out)) + steps = append(steps, StepResult{Name: name, Status: "failure", Duration: since(stepStart)}) + overallStatus = "failure" + break + } + steps = append(steps, StepResult{Name: name, Status: "success", Duration: since(stepStart)}) + continue + } + + // Skip unknown actions + if step.Uses != "" { + steps = append(steps, StepResult{Name: name, Status: "skipped", Duration: since(stepStart)}) + continue + } + + if step.Run == "" { + continue + } + + // Run shell step + stepEnv := make([]string, len(env)) + copy(stepEnv, env) + for k, v := range step.Env { + stepEnv = append(stepEnv, fmt.Sprintf("%s=%s", k, v)) + } + + cmd := exec.CommandContext(ctx, "bash", "-c", step.Run) + cmd.Dir = workdir + cmd.Env = stepEnv + + var output bytes.Buffer + cmd.Stdout = io.MultiWriter(&output, os.Stdout) + cmd.Stderr = io.MultiWriter(&output, os.Stderr) + + if err := cmd.Run(); err != nil { + log.Printf("Step %d FAILED: %v", i+1, err) + steps = append(steps, StepResult{Name: name, Status: "failure", Duration: since(stepStart)}) + overallStatus = "failure" + break + } + + steps = append(steps, StepResult{Name: name, Status: "success", Duration: since(stepStart)}) + } + + if overallStatus == "failure" { + break + } + } + + desc := fmt.Sprintf("CI %s in %s", overallStatus, time.Since(start).Round(time.Second)) + setCommitStatus(payload, overallStatus, desc) + + return JobResult{ + Workflow: payload.Workflow, + Status: overallStatus, + Steps: steps, + Duration: time.Since(start).Round(time.Second).String(), + }, nil +} + +func setCommitStatus(payload ExecPayload, state, description string) { + if payload.GiteaURL == "" || payload.GiteaToken == "" { + return + } + + url := fmt.Sprintf("%s/api/v1/repos/%s/statuses/%s", + payload.GiteaURL, payload.Repo.FullName, payload.CommitSHA) + + body, _ := json.Marshal(map[string]string{ + "state": state, + "description": description, + "context": fmt.Sprintf("ci/%s", payload.Workflow), + }) + + req, _ := http.NewRequest("POST", url, bytes.NewReader(body)) + req.Header.Set("Authorization", "token "+payload.GiteaToken) + req.Header.Set("Content-Type", "application/json") + + resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req) + if err != nil { + log.Printf("Failed to set commit status: %v", err) + return + } + resp.Body.Close() +} + +func contains(s, sub string) bool { + return len(s) >= len(sub) && (s == sub || len(s) > 0 && containsStr(s, sub)) +} + +func containsStr(s, sub string) bool { + for i := 0; i <= len(s)-len(sub); i++ { + if s[i:i+len(sub)] == sub { + return true + } + } + return false +} + +func since(t time.Time) string { + return time.Since(t).Round(time.Millisecond).String() +} + +func main() { + lambda.Start(handler) +} diff --git a/orchestrator/test-event.json b/orchestrator/test-event.json new file mode 100644 index 0000000..2eb8a8e --- /dev/null +++ b/orchestrator/test-event.json @@ -0,0 +1,9 @@ +{ + "body": "{\"ref\":\"refs/heads/main\",\"before\":\"0000000\",\"after\":\"abc1234567890\",\"repository\":{\"id\":1,\"name\":\"studio\",\"full_name\":\"tinqs/studio\",\"clone_url\":\"https://tinqs.com/tinqs/studio.git\",\"html_url\":\"https://tinqs.com/tinqs/studio\"},\"pusher\":{\"login\":\"ozan\",\"email\":\"ozan@tinqs.com\"},\"commits\":[{\"id\":\"abc1234567890\",\"message\":\"test push\",\"added\":[\"cmd/tstudio/main.go\"],\"removed\":[],\"modified\":[]}]}", + "headers": { + "x-gitea-event": "push", + "content-type": "application/json" + }, + "httpMethod": "POST", + "path": "/webhook" +}