feat: CI orchestrator — Lambda dispatch + Fargate routing + cancel

dispatch: receives Gitea webhook, routes by runs-on label to Fargate
  tasks (go/node/docker/godot) or Lambda executor (deploy).
  Path filter evaluation, DynamoDB run tracking, cancel via StopTask.
exec: lightweight Lambda for deploy-only jobs (S3 sync, ECS update).
SAM template: API Gateway + 2 Lambdas + DynamoDB + cleanup cron.
This commit is contained in:
2026-05-22 18:47:47 +01:00
parent 1564c61acc
commit e96c7c5bf1
8 changed files with 1254 additions and 0 deletions
+14
View File
@@ -0,0 +1,14 @@
module tinqs.com/tinqs/ci/orchestrator/dispatch
go 1.23
require (
github.com/aws/aws-lambda-go v1.47.0
github.com/aws/aws-sdk-go-v2 v1.32.5
github.com/aws/aws-sdk-go-v2/config v1.28.0
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0
github.com/aws/aws-sdk-go-v2/service/ecs v1.52.0
github.com/aws/aws-sdk-go-v2/service/lambda v1.69.0
gopkg.in/yaml.v3 v3.0.1
)
+702
View File
@@ -0,0 +1,702 @@
// tinqs/ci orchestrator — dispatcher Lambda
//
// Receives Gitea system webhooks, determines which workflows to run,
// and routes each job to the right execution environment based on runs-on label:
//
// go, node, docker, godot → Fargate task with matching runner image
// deploy → Lambda direct execution (ci-exec)
// host → skip (handled by registered always-on runner)
//
// Also handles cancel events (stop Fargate tasks) and cleanup cron.
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/aws-sdk-go-v2/service/ecs"
awslambda "github.com/aws/aws-sdk-go-v2/service/lambda"
"gopkg.in/yaml.v3"
)
// --- Gitea webhook types ---
type GiteaPushEvent struct {
Ref string `json:"ref"`
Before string `json:"before"`
After string `json:"after"`
Repository GiteaRepo `json:"repository"`
Pusher GiteaUser `json:"pusher"`
Commits []GiteaCommit `json:"commits"`
}
type GiteaRepo struct {
ID int `json:"id"`
Name string `json:"name"`
FullName string `json:"full_name"`
CloneURL string `json:"clone_url"`
HTMLURL string `json:"html_url"`
}
type GiteaUser struct {
Login string `json:"login"`
Email string `json:"email"`
}
type GiteaCommit struct {
ID string `json:"id"`
Message string `json:"message"`
Added []string `json:"added"`
Removed []string `json:"removed"`
Modified []string `json:"modified"`
}
// --- Workflow YAML (minimal parse) ---
type Workflow struct {
Name string `yaml:"name"`
On interface{} `yaml:"on"`
Jobs map[string]WorkflowJob `yaml:"jobs"`
}
type WorkflowJob struct {
RunsOn string `yaml:"runs-on"`
}
// --- DynamoDB run record ---
type RunRecord struct {
Repo string `dynamodbav:"repo"`
RunID string `dynamodbav:"run_id"`
TaskArn string `dynamodbav:"task_arn"`
Status string `dynamodbav:"status"`
Workflow string `dynamodbav:"workflow"`
Label string `dynamodbav:"label"`
StartedAt int64 `dynamodbav:"started_at"`
TTL int64 `dynamodbav:"ttl"`
}
// --- Image routing ---
var labelToImage = map[string]string{
"go": "tinqs-runner-go",
"node": "tinqs-runner-node",
"docker": "tinqs-runner-docker",
"deploy": "tinqs-runner-deploy",
"godot": "tinqs-runner-godot",
}
var labelToResources = map[string][2]string{
"go": {"1024", "2048"}, // 1 vCPU, 2 GB
"node": {"1024", "2048"},
"docker": {"2048", "4096"}, // 2 vCPU, 4 GB (Docker builds are heavy)
"deploy": {"512", "1024"}, // 0.5 vCPU, 1 GB (lightweight)
"godot": {"2048", "4096"},
}
// --- Config from env ---
type cfg struct {
GiteaURL string
GiteaToken string
ExecFnName string
ECSCluster string
Subnets []string
SecurityGroup string
ECRBase string
DDBTable string
TaskRoleArn string
ExecRoleArn string
}
func loadCfg() cfg {
return cfg{
GiteaURL: os.Getenv("GITEA_URL"),
GiteaToken: os.Getenv("GITEA_TOKEN"),
ExecFnName: os.Getenv("EXECUTOR_FUNCTION_NAME"),
ECSCluster: os.Getenv("ECS_CLUSTER"),
Subnets: strings.Split(os.Getenv("SUBNETS"), ","),
SecurityGroup: os.Getenv("SECURITY_GROUP"),
ECRBase: os.Getenv("ECR_BASE"), // e.g. 149751500842.dkr.ecr.eu-west-1.amazonaws.com
DDBTable: os.Getenv("DDB_TABLE"),
TaskRoleArn: os.Getenv("TASK_ROLE_ARN"),
ExecRoleArn: os.Getenv("EXEC_ROLE_ARN"),
}
}
func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
c := loadCfg()
// Handle cleanup cron (EventBridge invokes with action=cleanup)
if request.Body == "" || strings.Contains(request.Body, `"action":"cleanup"`) {
return handleCleanup(ctx, c)
}
// Handle cancel webhook
eventType := request.Headers["x-gitea-event"]
if eventType == "" {
eventType = request.Headers["X-Gitea-Event"]
}
if eventType == "workflow_job" {
return handleCancel(ctx, c, request.Body)
}
// Handle push event
var push GiteaPushEvent
if err := json.Unmarshal([]byte(request.Body), &push); err != nil {
log.Printf("Failed to parse webhook: %v", err)
return respond(400, `{"error":"invalid payload"}`)
}
// Only process branch pushes
if !strings.HasPrefix(push.Ref, "refs/heads/") {
return respond(200, `{"status":"skipped","reason":"not a branch push"}`)
}
branch := strings.TrimPrefix(push.Ref, "refs/heads/")
log.Printf("Push to %s branch=%s by=%s commits=%d",
push.Repository.FullName, branch, push.Pusher.Login, len(push.Commits))
// Collect all changed files from commits
changedFiles := collectChangedFiles(push.Commits)
// Fetch and filter workflows
workflows, err := fetchWorkflows(c.GiteaURL, c.GiteaToken, push.Repository.FullName, branch)
if err != nil || len(workflows) == 0 {
return respond(200, `{"status":"no_workflows"}`)
}
// Route each workflow
dispatched := 0
for name, content := range workflows {
var wf Workflow
if err := yaml.Unmarshal([]byte(content), &wf); err != nil {
log.Printf("Failed to parse %s: %v", name, err)
continue
}
// Check if workflow triggers on this branch + paths
if !shouldTrigger(wf, branch, changedFiles) {
log.Printf("Skipping %s (no matching trigger)", name)
continue
}
// Get the first job's runs-on label
label := "host"
for _, job := range wf.Jobs {
if job.RunsOn != "" {
label = job.RunsOn
break
}
}
runID := fmt.Sprintf("%s-%s-%d", push.After[:7], name, time.Now().UnixMilli())
switch label {
case "deploy":
// Lightweight: invoke Lambda executor directly
if err := invokeLambdaExec(ctx, c, push, name, content); err != nil {
log.Printf("Failed to invoke executor for %s: %v", name, err)
continue
}
case "host":
// Legacy: skip, the always-on runner handles it
log.Printf("Skipping %s (runs-on: host — handled by registered runner)", name)
continue
default:
// Fargate: start ephemeral runner with matching image
taskArn, err := startFargateRunner(ctx, c, push, label, name, runID)
if err != nil {
log.Printf("Failed to start Fargate for %s: %v", name, err)
continue
}
// Track in DynamoDB
if err := trackRun(ctx, c, push.Repository.FullName, runID, taskArn, name, label); err != nil {
log.Printf("Warning: failed to track run: %v", err)
}
}
dispatched++
}
return respond(200, fmt.Sprintf(
`{"status":"dispatched","workflows":%d,"dispatched":%d,"repo":"%s","branch":"%s"}`,
len(workflows), dispatched, push.Repository.FullName, branch))
}
// --- Fargate runner ---
func startFargateRunner(ctx context.Context, c cfg, push GiteaPushEvent, label, workflow, runID string) (string, error) {
awsCfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return "", fmt.Errorf("aws config: %w", err)
}
imageName := labelToImage[label]
if imageName == "" {
return "", fmt.Errorf("unknown label: %s", label)
}
image := fmt.Sprintf("%s/%s:latest", c.ECRBase, imageName)
resources := labelToResources[label]
cpu, memory := resources[0], resources[1]
client := ecs.NewFromConfig(awsCfg)
// Register a one-off task definition for this run
taskDefOut, err := client.RegisterTaskDefinition(ctx, &ecs.RegisterTaskDefinitionInput{
Family: aws.String(fmt.Sprintf("tinqs-ci-%s", label)),
RequiresCompatibilities: []ecs.Compatibility{ecs.CompatibilityFargate},
NetworkMode: ecs.NetworkModeAwsvpc,
Cpu: aws.String(cpu),
Memory: aws.String(memory),
TaskRoleArn: aws.String(c.TaskRoleArn),
ExecutionRoleArn: aws.String(c.ExecRoleArn),
ContainerDefinitions: []ecs.ContainerDefinition{{
Name: aws.String("runner"),
Image: aws.String(image),
Essential: aws.Bool(true),
Environment: []ecs.KeyValuePair{
{Name: aws.String("GITEA_INSTANCE_URL"), Value: aws.String(c.GiteaURL)},
{Name: aws.String("GITEA_RUNNER_REGISTRATION_TOKEN"), Value: aws.String(c.GiteaToken)},
{Name: aws.String("GITEA_RUNNER_NAME"), Value: aws.String(fmt.Sprintf("ephemeral-%s-%s", label, runID[:8]))},
{Name: aws.String("GITEA_RUNNER_LABELS"), Value: aws.String(fmt.Sprintf("%s:host", label))},
{Name: aws.String("GITEA_RUNNER_EPHEMERAL"), Value: aws.String("true")},
},
LogConfiguration: &ecs.LogConfiguration{
LogDriver: ecs.LogDriverAwslogs,
Options: map[string]string{
"awslogs-group": "/ecs/tinqs-ci",
"awslogs-region": "eu-west-1",
"awslogs-stream-prefix": label,
},
},
}},
})
if err != nil {
return "", fmt.Errorf("register task def: %w", err)
}
taskDef := fmt.Sprintf("%s:%d", *taskDefOut.TaskDefinition.Family,
taskDefOut.TaskDefinition.Revision)
// Run the task
runOut, err := client.RunTask(ctx, &ecs.RunTaskInput{
Cluster: aws.String(c.ECSCluster),
TaskDefinition: aws.String(taskDef),
LaunchType: ecs.LaunchTypeFargate,
Count: aws.Int32(1),
NetworkConfiguration: &ecs.NetworkConfiguration{
AwsvpcConfiguration: &ecs.AwsVpcConfiguration{
Subnets: c.Subnets,
SecurityGroups: []string{c.SecurityGroup},
AssignPublicIp: ecs.AssignPublicIpEnabled,
},
},
})
if err != nil {
return "", fmt.Errorf("run task: %w", err)
}
if len(runOut.Tasks) == 0 {
return "", fmt.Errorf("no tasks started")
}
taskArn := *runOut.Tasks[0].TaskArn
log.Printf("Started Fargate task: %s (image=%s, label=%s)", taskArn, imageName, label)
return taskArn, nil
}
// --- Lambda executor (for deploy-only jobs) ---
func invokeLambdaExec(ctx context.Context, c cfg, push GiteaPushEvent, workflow, content string) error {
if c.ExecFnName == "" {
log.Printf("[DRY RUN] Would invoke executor for %s", workflow)
return nil
}
awsCfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return err
}
payload, _ := json.Marshal(map[string]interface{}{
"repo": push.Repository,
"ref": push.Ref,
"commit_sha": push.After,
"pusher": push.Pusher.Login,
"workflow_name": workflow,
"workflow_yaml": content,
"gitea_url": c.GiteaURL,
"gitea_token": c.GiteaToken,
})
client := awslambda.NewFromConfig(awsCfg)
_, err = client.Invoke(ctx, &awslambda.InvokeInput{
FunctionName: aws.String(c.ExecFnName),
InvocationType: "Event",
Payload: payload,
})
return err
}
// --- Cancel handler ---
func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayProxyResponse, error) {
// Parse workflow_job event to find the run to cancel
var event struct {
Action string `json:"action"`
Repository GiteaRepo `json:"repository"`
}
if err := json.Unmarshal([]byte(body), &event); err != nil {
return respond(400, `{"error":"invalid cancel event"}`)
}
if event.Action != "cancelled" {
return respond(200, `{"status":"ignored","reason":"not a cancel event"}`)
}
log.Printf("Cancel request for %s", event.Repository.FullName)
// Find active runs for this repo in DynamoDB
awsCfg, _ := config.LoadDefaultConfig(ctx)
ddb := dynamodb.NewFromConfig(awsCfg)
out, err := ddb.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(c.DDBTable),
KeyConditionExpression: aws.String("repo = :repo"),
FilterExpression: aws.String("#s = :running"),
ExpressionAttributeNames: map[string]string{"#s": "status"},
ExpressionAttributeValues: map[string]types.AttributeValue{
":repo": &types.AttributeValueMemberS{Value: event.Repository.FullName},
":running": &types.AttributeValueMemberS{Value: "running"},
},
})
if err != nil {
log.Printf("DynamoDB query failed: %v", err)
return respond(500, `{"error":"db query failed"}`)
}
// Stop all running Fargate tasks for this repo
ecsClient := ecs.NewFromConfig(awsCfg)
stopped := 0
for _, item := range out.Items {
var rec RunRecord
if err := attributevalue.UnmarshalMap(item, &rec); err != nil {
continue
}
if rec.TaskArn == "" {
continue
}
_, err := ecsClient.StopTask(ctx, &ecs.StopTaskInput{
Cluster: aws.String(c.ECSCluster),
Task: aws.String(rec.TaskArn),
Reason: aws.String("Cancelled via Gitea webhook"),
})
if err != nil {
log.Printf("Failed to stop task %s: %v", rec.TaskArn, err)
continue
}
// Update status in DynamoDB
updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "cancelled")
stopped++
log.Printf("Stopped task %s for %s", rec.TaskArn, rec.Repo)
}
return respond(200, fmt.Sprintf(`{"status":"cancelled","stopped":%d}`, stopped))
}
// --- Cleanup cron ---
func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse, error) {
log.Println("Running cleanup...")
awsCfg, _ := config.LoadDefaultConfig(ctx)
ddb := dynamodb.NewFromConfig(awsCfg)
ecsClient := ecs.NewFromConfig(awsCfg)
// Scan for runs older than 30 minutes that are still "running"
cutoff := time.Now().Add(-30 * time.Minute).Unix()
out, err := ddb.Scan(ctx, &dynamodb.ScanInput{
TableName: aws.String(c.DDBTable),
FilterExpression: aws.String("#s = :running AND started_at < :cutoff"),
ExpressionAttributeNames: map[string]string{"#s": "status"},
ExpressionAttributeValues: map[string]types.AttributeValue{
":running": &types.AttributeValueMemberS{Value: "running"},
":cutoff": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", cutoff)},
},
})
if err != nil {
return respond(500, fmt.Sprintf(`{"error":"%s"}`, err))
}
killed := 0
for _, item := range out.Items {
var rec RunRecord
if err := attributevalue.UnmarshalMap(item, &rec); err != nil {
continue
}
if rec.TaskArn != "" {
ecsClient.StopTask(ctx, &ecs.StopTaskInput{
Cluster: aws.String(c.ECSCluster),
Task: aws.String(rec.TaskArn),
Reason: aws.String("Timed out (cleanup cron)"),
})
}
updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "timeout")
killed++
log.Printf("Killed stale run: %s/%s (started %d)", rec.Repo, rec.RunID, rec.StartedAt)
}
return respond(200, fmt.Sprintf(`{"status":"cleanup","killed":%d}`, killed))
}
// --- DynamoDB helpers ---
func trackRun(ctx context.Context, c cfg, repo, runID, taskArn, workflow, label string) error {
awsCfg, _ := config.LoadDefaultConfig(ctx)
ddb := dynamodb.NewFromConfig(awsCfg)
now := time.Now()
rec := RunRecord{
Repo: repo,
RunID: runID,
TaskArn: taskArn,
Status: "running",
Workflow: workflow,
Label: label,
StartedAt: now.Unix(),
TTL: now.Add(7 * 24 * time.Hour).Unix(),
}
item, err := attributevalue.MarshalMap(rec)
if err != nil {
return err
}
_, err = ddb.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(c.DDBTable),
Item: item,
})
return err
}
func updateRunStatus(ctx context.Context, ddb *dynamodb.Client, table, repo, runID, status string) {
ddb.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: aws.String(table),
Key: map[string]types.AttributeValue{
"repo": &types.AttributeValueMemberS{Value: repo},
"run_id": &types.AttributeValueMemberS{Value: runID},
},
UpdateExpression: aws.String("SET #s = :status"),
ExpressionAttributeNames: map[string]string{"#s": "status"},
ExpressionAttributeValues: map[string]types.AttributeValue{
":status": &types.AttributeValueMemberS{Value: status},
},
})
}
// --- Gitea API ---
func fetchWorkflows(baseURL, token, repoFullName, branch string) (map[string]string, error) {
url := fmt.Sprintf("%s/api/v1/repos/%s/contents/.gitea/workflows?ref=%s",
baseURL, repoFullName, branch)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Authorization", "token "+token)
resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, nil
}
var contents []struct {
Name string `json:"name"`
Path string `json:"path"`
}
if err := json.NewDecoder(resp.Body).Decode(&contents); err != nil {
return nil, err
}
workflows := make(map[string]string)
for _, c := range contents {
if !strings.HasSuffix(c.Name, ".yml") && !strings.HasSuffix(c.Name, ".yaml") {
continue
}
content, err := fetchRawFile(baseURL, token, repoFullName, c.Path, branch)
if err != nil {
log.Printf("Failed to fetch %s: %v", c.Path, err)
continue
}
workflows[c.Name] = content
}
return workflows, nil
}
func fetchRawFile(baseURL, token, repoFullName, path, branch string) (string, error) {
url := fmt.Sprintf("%s/api/v1/repos/%s/raw/%s?ref=%s", baseURL, repoFullName, path, branch)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Authorization", "token "+token)
resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body := make([]byte, 1<<20)
n, _ := resp.Body.Read(body)
return string(body[:n]), nil
}
// --- Trigger evaluation ---
func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool {
// Parse the on: section
onMap, ok := wf.On.(map[string]interface{})
if !ok {
return true // simple trigger like on: push — always matches
}
pushCfg, ok := onMap["push"]
if !ok {
return false // no push trigger
}
pushMap, ok := pushCfg.(map[string]interface{})
if !ok {
return true // on: push without config — matches all
}
// Check branches
if branches, ok := pushMap["branches"].([]interface{}); ok {
matched := false
for _, b := range branches {
if fmt.Sprint(b) == branch {
matched = true
break
}
}
if !matched {
return false
}
}
// Check paths (if specified, at least one changed file must match)
if paths, ok := pushMap["paths"].([]interface{}); ok && len(changedFiles) > 0 {
matched := false
for _, p := range paths {
pattern := fmt.Sprint(p)
for _, f := range changedFiles {
if matchPath(pattern, f) {
matched = true
break
}
}
if matched {
break
}
}
if !matched {
return false
}
}
// Check paths-ignore
if ignorePaths, ok := pushMap["paths-ignore"].([]interface{}); ok && len(changedFiles) > 0 {
allIgnored := true
for _, f := range changedFiles {
ignored := false
for _, p := range ignorePaths {
if matchPath(fmt.Sprint(p), f) {
ignored = true
break
}
}
if !ignored {
allIgnored = false
break
}
}
if allIgnored {
return false
}
}
return true
}
func matchPath(pattern, file string) bool {
// Simple glob matching: foo/** matches foo/bar/baz, *.go matches main.go
if strings.HasSuffix(pattern, "/**") {
prefix := strings.TrimSuffix(pattern, "/**")
return strings.HasPrefix(file, prefix+"/") || file == prefix
}
if strings.HasPrefix(pattern, "*.") {
ext := strings.TrimPrefix(pattern, "*")
return strings.HasSuffix(file, ext)
}
return strings.HasPrefix(file, pattern) || file == pattern
}
func collectChangedFiles(commits []GiteaCommit) []string {
seen := make(map[string]bool)
var files []string
for _, c := range commits {
for _, f := range c.Added {
if !seen[f] {
files = append(files, f)
seen[f] = true
}
}
for _, f := range c.Modified {
if !seen[f] {
files = append(files, f)
seen[f] = true
}
}
for _, f := range c.Removed {
if !seen[f] {
files = append(files, f)
seen[f] = true
}
}
}
return files
}
func respond(status int, body string) (events.APIGatewayProxyResponse, error) {
return events.APIGatewayProxyResponse{
StatusCode: status,
Headers: map[string]string{"Content-Type": "application/json"},
Body: body,
}, nil
}
func main() {
lambda.Start(handler)
}