// tinqs/ci orchestrator — dispatcher Lambda // // Receives Gitea system webhooks, determines which workflows to run, // and routes each job to the right execution environment based on runs-on label: // // go, node, docker, godot → EC2 Spot instance (pre-baked AMI, self-terminates) // deploy → Lambda direct execution (ci-exec) // host → skip (handled by registered always-on runner) // // Also handles cancel events (terminate instances) and cleanup cron. package main import ( "context" "encoding/base64" "encoding/json" "fmt" "log" "net/http" "os" "strings" "time" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/aws/aws-sdk-go-v2/service/ec2" ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types" awslambda "github.com/aws/aws-sdk-go-v2/service/lambda" "gopkg.in/yaml.v3" ) // --- Gitea webhook types --- type GiteaPushEvent struct { Ref string `json:"ref"` Before string `json:"before"` After string `json:"after"` Repository GiteaRepo `json:"repository"` Pusher GiteaUser `json:"pusher"` Commits []GiteaCommit `json:"commits"` } type GiteaRepo struct { ID int `json:"id"` Name string `json:"name"` FullName string `json:"full_name"` CloneURL string `json:"clone_url"` HTMLURL string `json:"html_url"` } type GiteaUser struct { Login string `json:"login"` Email string `json:"email"` } type GiteaCommit struct { ID string `json:"id"` Message string `json:"message"` Added []string `json:"added"` Removed []string `json:"removed"` Modified []string `json:"modified"` } // --- Workflow YAML (minimal parse) --- type Workflow struct { Name string `yaml:"name"` On interface{} `yaml:"on"` Jobs map[string]WorkflowJob `yaml:"jobs"` } type WorkflowJob struct { RunsOn string `yaml:"runs-on"` } // --- DynamoDB run record --- type RunRecord struct { Repo string `dynamodbav:"repo"` RunID string `dynamodbav:"run_id"` InstanceID string `dynamodbav:"instance_id"` Status string `dynamodbav:"status"` Workflow string `dynamodbav:"workflow"` Label string `dynamodbav:"label"` StartedAt int64 `dynamodbav:"started_at"` TTL int64 `dynamodbav:"ttl"` } // --- Spot instance config per label --- type spotConfig struct { InstanceType string MaxPrice string // spot bid (empty = on-demand price cap) } var labelToSpot = map[string]spotConfig{ "go": {InstanceType: "t3.small", MaxPrice: "0.02"}, "node": {InstanceType: "t3.small", MaxPrice: "0.02"}, "docker": {InstanceType: "t3.medium", MaxPrice: "0.04"}, "deploy": {InstanceType: "t3.micro", MaxPrice: "0.01"}, "godot": {InstanceType: "t3.medium", MaxPrice: "0.04"}, } // --- Config from env --- type cfg struct { GiteaURL string GiteaToken string // API token (for fetching workflows, setting commit status) RunnerToken string // Runner registration token (for act_runner register) ExecFnName string AMI string // pre-baked AMI with Go, Node, Docker, AWS CLI, act_runner Subnet string SecurityGroup string DDBTable string InstanceProfile string } func loadCfg() cfg { return cfg{ GiteaURL: os.Getenv("GITEA_URL"), GiteaToken: os.Getenv("GITEA_TOKEN"), RunnerToken: os.Getenv("RUNNER_TOKEN"), ExecFnName: os.Getenv("EXECUTOR_FUNCTION_NAME"), AMI: os.Getenv("RUNNER_AMI"), Subnet: os.Getenv("SUBNET"), SecurityGroup: os.Getenv("SECURITY_GROUP"), DDBTable: os.Getenv("DDB_TABLE"), InstanceProfile: os.Getenv("INSTANCE_PROFILE"), } } // --- User-data script template --- // The instance boots, registers as ephemeral runner, picks one job, then terminates itself. func userDataScript(c cfg, label, runnerName string) string { script := fmt.Sprintf(`#!/bin/bash set -euo pipefail exec > /var/log/tinqs-ci.log 2>&1 echo "=== Tinqs CI Runner: %s ===" # Get instance metadata (IMDSv2) TOKEN=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 60") INSTANCE_ID=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/instance-id) REGION=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/placement/region) # Self-termination trap — kill instance on exit (success or failure) cleanup() { echo "=== Job done, terminating $INSTANCE_ID ===" aws ec2 terminate-instances --instance-ids "$INSTANCE_ID" --region "$REGION" || true } trap cleanup EXIT # Source Go/Node paths from AMI export PATH=$PATH:/usr/local/go/bin:/usr/local/bin export HOME=/root # Ensure AWS SDK can find instance profile credentials via IMDS # act_runner may change HOME — force credential chain to use IMDS export AWS_EC2_METADATA_DISABLED=false # Create proper working directory for act_runner mkdir -p /opt/runner && cd /opt/runner # Register as ephemeral runner (picks one job, then exits) act_runner register --no-interactive \ --instance %s \ --token %s \ --name %s \ --labels "%s:host" # Configure runner to use /opt/runner as workdir cat > .runner.yaml << 'RUNCFG' log: level: info runner: capacity: 1 timeout: 30m envs: HOME: /root AWS_DEFAULT_REGION: eu-west-1 PATH: /usr/local/go/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin host: workdir_parent: /opt/runner/work RUNCFG mkdir -p /opt/runner/work # Run — blocks until the job completes, then exits (ephemeral mode) act_runner daemon --config .runner.yaml echo "=== Runner exited, cleanup will terminate instance ===" `, runnerName, c.GiteaURL, c.RunnerToken, runnerName, label) return base64.StdEncoding.EncodeToString([]byte(script)) } func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) { c := loadCfg() // Handle cleanup cron if request.Body == "" || strings.Contains(request.Body, `"action":"cleanup"`) { return handleCleanup(ctx, c) } // Handle cancel webhook eventType := request.Headers["x-gitea-event"] if eventType == "" { eventType = request.Headers["X-Gitea-Event"] } if eventType == "workflow_job" { return handleCancel(ctx, c, request.Body) } // Handle push event var push GiteaPushEvent if err := json.Unmarshal([]byte(request.Body), &push); err != nil { log.Printf("Failed to parse webhook: %v", err) return respond(400, `{"error":"invalid payload"}`) } if !strings.HasPrefix(push.Ref, "refs/heads/") { return respond(200, `{"status":"skipped","reason":"not a branch push"}`) } branch := strings.TrimPrefix(push.Ref, "refs/heads/") log.Printf("Push to %s branch=%s by=%s commits=%d", push.Repository.FullName, branch, push.Pusher.Login, len(push.Commits)) changedFiles := collectChangedFiles(push.Commits) workflows, err := fetchWorkflows(c.GiteaURL, c.GiteaToken, push.Repository.FullName, branch) if err != nil || len(workflows) == 0 { return respond(200, `{"status":"no_workflows"}`) } dispatched := 0 for name, content := range workflows { var wf Workflow if err := yaml.Unmarshal([]byte(content), &wf); err != nil { log.Printf("Failed to parse %s: %v", name, err) continue } if !shouldTrigger(wf, branch, changedFiles) { log.Printf("Skipping %s (no matching trigger)", name) continue } label := "host" for _, job := range wf.Jobs { if job.RunsOn != "" { label = job.RunsOn break } } runID := fmt.Sprintf("%s-%s-%d", push.After[:7], name, time.Now().UnixMilli()) switch label { case "deploy": if err := invokeLambdaExec(ctx, c, push, name, content); err != nil { log.Printf("Failed to invoke executor for %s: %v", name, err) continue } case "host": log.Printf("Skipping %s (runs-on: host — registered runner)", name) continue default: instanceID, err := startSpotRunner(ctx, c, label, runID) if err != nil { log.Printf("Failed to start spot for %s: %v", name, err) continue } if err := trackRun(ctx, c, push.Repository.FullName, runID, instanceID, name, label); err != nil { log.Printf("Warning: failed to track run: %v", err) } } dispatched++ } return respond(200, fmt.Sprintf( `{"status":"dispatched","workflows":%d,"dispatched":%d,"repo":"%s","branch":"%s"}`, len(workflows), dispatched, push.Repository.FullName, branch)) } // --- EC2 Spot runner --- func startSpotRunner(ctx context.Context, c cfg, label, runID string) (string, error) { awsCfg, err := config.LoadDefaultConfig(ctx) if err != nil { return "", fmt.Errorf("aws config: %w", err) } spot, ok := labelToSpot[label] if !ok { return "", fmt.Errorf("unknown label: %s", label) } runnerName := fmt.Sprintf("spot-%s-%s", label, runID[:12]) userData := userDataScript(c, label, runnerName) client := ec2.NewFromConfig(awsCfg) out, err := client.RunInstances(ctx, &ec2.RunInstancesInput{ ImageId: aws.String(c.AMI), InstanceType: ec2types.InstanceType(spot.InstanceType), MinCount: aws.Int32(1), MaxCount: aws.Int32(1), UserData: aws.String(userData), // Spot request InstanceMarketOptions: &ec2types.InstanceMarketOptionsRequest{ MarketType: ec2types.MarketTypeSpot, SpotOptions: &ec2types.SpotMarketOptions{ MaxPrice: aws.String(spot.MaxPrice), SpotInstanceType: ec2types.SpotInstanceTypeOneTime, InstanceInterruptionBehavior: ec2types.InstanceInterruptionBehaviorTerminate, }, }, // Network NetworkInterfaces: []ec2types.InstanceNetworkInterfaceSpecification{{ DeviceIndex: aws.Int32(0), SubnetId: aws.String(c.Subnet), Groups: []string{c.SecurityGroup}, AssociatePublicIpAddress: aws.Bool(true), }}, // IAM role (for AWS CLI, ECR, S3, ECS access) IamInstanceProfile: &ec2types.IamInstanceProfileSpecification{ Arn: aws.String(c.InstanceProfile), }, // Tags for identification and cleanup TagSpecifications: []ec2types.TagSpecification{{ ResourceType: ec2types.ResourceTypeInstance, Tags: []ec2types.Tag{ {Key: aws.String("Name"), Value: aws.String(runnerName)}, {Key: aws.String("tinqs-ci"), Value: aws.String("true")}, {Key: aws.String("tinqs-ci-run"), Value: aws.String(runID)}, {Key: aws.String("tinqs-ci-label"), Value: aws.String(label)}, }, }}, // Auto-terminate safety net (instance shuts down → terminates) InstanceInitiatedShutdownBehavior: ec2types.ShutdownBehaviorTerminate, }) if err != nil { return "", fmt.Errorf("run instances: %w", err) } if len(out.Instances) == 0 { return "", fmt.Errorf("no instances launched") } instanceID := *out.Instances[0].InstanceId log.Printf("Started spot instance: %s (type=%s, label=%s, runner=%s)", instanceID, spot.InstanceType, label, runnerName) return instanceID, nil } // --- Lambda executor (for deploy-only jobs) --- func invokeLambdaExec(ctx context.Context, c cfg, push GiteaPushEvent, workflow, content string) error { if c.ExecFnName == "" { log.Printf("[DRY RUN] Would invoke executor for %s", workflow) return nil } awsCfg, err := config.LoadDefaultConfig(ctx) if err != nil { return err } payload, _ := json.Marshal(map[string]interface{}{ "repo": push.Repository, "ref": push.Ref, "commit_sha": push.After, "pusher": push.Pusher.Login, "workflow_name": workflow, "workflow_yaml": content, "gitea_url": c.GiteaURL, "gitea_token": c.GiteaToken, }) client := awslambda.NewFromConfig(awsCfg) _, err = client.Invoke(ctx, &awslambda.InvokeInput{ FunctionName: aws.String(c.ExecFnName), InvocationType: "Event", Payload: payload, }) return err } // --- Cancel handler --- func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayProxyResponse, error) { var event struct { Action string `json:"action"` Repository GiteaRepo `json:"repository"` } if err := json.Unmarshal([]byte(body), &event); err != nil { return respond(400, `{"error":"invalid cancel event"}`) } if event.Action != "cancelled" { return respond(200, `{"status":"ignored","reason":"not a cancel event"}`) } log.Printf("Cancel request for %s", event.Repository.FullName) awsCfg, _ := config.LoadDefaultConfig(ctx) ddb := dynamodb.NewFromConfig(awsCfg) out, err := ddb.Query(ctx, &dynamodb.QueryInput{ TableName: aws.String(c.DDBTable), KeyConditionExpression: aws.String("repo = :repo"), FilterExpression: aws.String("#s = :running"), ExpressionAttributeNames: map[string]string{"#s": "status"}, ExpressionAttributeValues: map[string]types.AttributeValue{ ":repo": &types.AttributeValueMemberS{Value: event.Repository.FullName}, ":running": &types.AttributeValueMemberS{Value: "running"}, }, }) if err != nil { return respond(500, `{"error":"db query failed"}`) } ec2Client := ec2.NewFromConfig(awsCfg) stopped := 0 // Collect instance IDs to terminate var instanceIDs []string var records []RunRecord for _, item := range out.Items { var rec RunRecord if err := attributevalue.UnmarshalMap(item, &rec); err != nil || rec.InstanceID == "" { continue } instanceIDs = append(instanceIDs, rec.InstanceID) records = append(records, rec) } if len(instanceIDs) > 0 { _, err := ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{ InstanceIds: instanceIDs, }) if err != nil { log.Printf("Failed to terminate instances: %v", err) } else { stopped = len(instanceIDs) for _, rec := range records { updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "cancelled") } log.Printf("Terminated %d instances for %s", stopped, event.Repository.FullName) } } return respond(200, fmt.Sprintf(`{"status":"cancelled","stopped":%d}`, stopped)) } // --- Cleanup cron --- func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse, error) { log.Println("Running cleanup...") awsCfg, _ := config.LoadDefaultConfig(ctx) ddb := dynamodb.NewFromConfig(awsCfg) ec2Client := ec2.NewFromConfig(awsCfg) cutoff := time.Now().Add(-30 * time.Minute).Unix() out, err := ddb.Scan(ctx, &dynamodb.ScanInput{ TableName: aws.String(c.DDBTable), FilterExpression: aws.String("#s = :running AND started_at < :cutoff"), ExpressionAttributeNames: map[string]string{"#s": "status"}, ExpressionAttributeValues: map[string]types.AttributeValue{ ":running": &types.AttributeValueMemberS{Value: "running"}, ":cutoff": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", cutoff)}, }, }) if err != nil { return respond(500, fmt.Sprintf(`{"error":"%s"}`, err)) } // Batch terminate stale instances var staleIDs []string var staleRecs []RunRecord for _, item := range out.Items { var rec RunRecord if err := attributevalue.UnmarshalMap(item, &rec); err != nil || rec.InstanceID == "" { continue } staleIDs = append(staleIDs, rec.InstanceID) staleRecs = append(staleRecs, rec) } killed := 0 if len(staleIDs) > 0 { ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{ InstanceIds: staleIDs, }) for _, rec := range staleRecs { updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "timeout") } killed = len(staleIDs) log.Printf("Killed %d stale instances", killed) } // Also sweep any tinqs-ci tagged instances that somehow escaped DynamoDB descOut, _ := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{ Filters: []ec2types.Filter{ {Name: aws.String("tag:tinqs-ci"), Values: []string{"true"}}, {Name: aws.String("instance-state-name"), Values: []string{"running", "pending"}}, }, }) var orphanIDs []string for _, res := range descOut.Reservations { for _, inst := range res.Instances { if inst.LaunchTime != nil && time.Since(*inst.LaunchTime) > 30*time.Minute { orphanIDs = append(orphanIDs, *inst.InstanceId) } } } if len(orphanIDs) > 0 { ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{ InstanceIds: orphanIDs, }) log.Printf("Killed %d orphan instances (tag sweep)", len(orphanIDs)) killed += len(orphanIDs) } return respond(200, fmt.Sprintf(`{"status":"cleanup","killed":%d}`, killed)) } // --- DynamoDB helpers --- func trackRun(ctx context.Context, c cfg, repo, runID, instanceID, workflow, label string) error { awsCfg, _ := config.LoadDefaultConfig(ctx) ddb := dynamodb.NewFromConfig(awsCfg) now := time.Now() rec := RunRecord{ Repo: repo, RunID: runID, InstanceID: instanceID, Status: "running", Workflow: workflow, Label: label, StartedAt: now.Unix(), TTL: now.Add(7 * 24 * time.Hour).Unix(), } item, err := attributevalue.MarshalMap(rec) if err != nil { return err } _, err = ddb.PutItem(ctx, &dynamodb.PutItemInput{ TableName: aws.String(c.DDBTable), Item: item, }) return err } func updateRunStatus(ctx context.Context, ddb *dynamodb.Client, table, repo, runID, status string) { ddb.UpdateItem(ctx, &dynamodb.UpdateItemInput{ TableName: aws.String(table), Key: map[string]types.AttributeValue{ "repo": &types.AttributeValueMemberS{Value: repo}, "run_id": &types.AttributeValueMemberS{Value: runID}, }, UpdateExpression: aws.String("SET #s = :status"), ExpressionAttributeNames: map[string]string{"#s": "status"}, ExpressionAttributeValues: map[string]types.AttributeValue{ ":status": &types.AttributeValueMemberS{Value: status}, }, }) } // --- Gitea API --- func fetchWorkflows(baseURL, token, repoFullName, branch string) (map[string]string, error) { url := fmt.Sprintf("%s/api/v1/repos/%s/contents/.gitea/workflows?ref=%s", baseURL, repoFullName, branch) req, _ := http.NewRequest("GET", url, nil) req.Header.Set("Authorization", "token "+token) resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode == 404 { return nil, nil } var contents []struct { Name string `json:"name"` Path string `json:"path"` } if err := json.NewDecoder(resp.Body).Decode(&contents); err != nil { return nil, err } workflows := make(map[string]string) for _, c := range contents { if !strings.HasSuffix(c.Name, ".yml") && !strings.HasSuffix(c.Name, ".yaml") { continue } content, err := fetchRawFile(baseURL, token, repoFullName, c.Path, branch) if err != nil { log.Printf("Failed to fetch %s: %v", c.Path, err) continue } workflows[c.Name] = content } return workflows, nil } func fetchRawFile(baseURL, token, repoFullName, path, branch string) (string, error) { url := fmt.Sprintf("%s/api/v1/repos/%s/raw/%s?ref=%s", baseURL, repoFullName, path, branch) req, _ := http.NewRequest("GET", url, nil) req.Header.Set("Authorization", "token "+token) resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req) if err != nil { return "", err } defer resp.Body.Close() body := make([]byte, 1<<20) n, _ := resp.Body.Read(body) return string(body[:n]), nil } // --- Trigger evaluation --- func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool { onMap, ok := wf.On.(map[string]interface{}) if !ok { return true } pushCfg, ok := onMap["push"] if !ok { return false } pushMap, ok := pushCfg.(map[string]interface{}) if !ok { return true } if branches, ok := pushMap["branches"].([]interface{}); ok { matched := false for _, b := range branches { if fmt.Sprint(b) == branch { matched = true break } } if !matched { return false } } if paths, ok := pushMap["paths"].([]interface{}); ok && len(changedFiles) > 0 { matched := false for _, p := range paths { pattern := fmt.Sprint(p) for _, f := range changedFiles { if matchPath(pattern, f) { matched = true break } } if matched { break } } if !matched { return false } } if ignorePaths, ok := pushMap["paths-ignore"].([]interface{}); ok && len(changedFiles) > 0 { allIgnored := true for _, f := range changedFiles { ignored := false for _, p := range ignorePaths { if matchPath(fmt.Sprint(p), f) { ignored = true break } } if !ignored { allIgnored = false break } } if allIgnored { return false } } return true } func matchPath(pattern, file string) bool { if strings.HasSuffix(pattern, "/**") { prefix := strings.TrimSuffix(pattern, "/**") return strings.HasPrefix(file, prefix+"/") || file == prefix } if strings.HasPrefix(pattern, "*.") { ext := strings.TrimPrefix(pattern, "*") return strings.HasSuffix(file, ext) } return strings.HasPrefix(file, pattern) || file == pattern } func collectChangedFiles(commits []GiteaCommit) []string { seen := make(map[string]bool) var files []string for _, c := range commits { for _, f := range c.Added { if !seen[f] { files = append(files, f) seen[f] = true } } for _, f := range c.Modified { if !seen[f] { files = append(files, f) seen[f] = true } } for _, f := range c.Removed { if !seen[f] { files = append(files, f) seen[f] = true } } } return files } func respond(status int, body string) (events.APIGatewayProxyResponse, error) { return events.APIGatewayProxyResponse{ StatusCode: status, Headers: map[string]string{"Content-Type": "application/json"}, Body: body, }, nil } func main() { lambda.Start(handler) }