Files
ozan 4076cf67b7 fix(ci): make Spot runner names unique per dispatch
runnerName used runID[:12], which for a given commit always collapsed to
e.g. `spot-deploy-7fa70fc-depl` — so a single push triggering both
deploy-arikigame and release (both runs-on: deploy), or any same-commit
rerun, registered colliding runner names and confused task routing.

Use the full runID (sha+workflow+ms), sanitised for the runner-name charset.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-07 20:08:27 +01:00

756 lines
22 KiB
Go

// tinqs/ci orchestrator — dispatcher Lambda
//
// Receives Gitea system webhooks, determines which workflows to run,
// and routes each job to the right execution environment based on runs-on label:
//
// go, node, docker, godot → EC2 Spot instance (pre-baked AMI, self-terminates)
// deploy → Lambda direct execution (ci-exec)
// host → skip (handled by registered always-on runner)
//
// Also handles cancel events (terminate instances) and cleanup cron.
package main
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"os"
"strings"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/aws/aws-sdk-go-v2/service/ec2"
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
"gopkg.in/yaml.v3"
)
// --- Gitea webhook types ---
type GiteaPushEvent struct {
Ref string `json:"ref"`
Before string `json:"before"`
After string `json:"after"`
Repository GiteaRepo `json:"repository"`
Pusher GiteaUser `json:"pusher"`
Commits []GiteaCommit `json:"commits"`
}
type GiteaRepo struct {
ID int `json:"id"`
Name string `json:"name"`
FullName string `json:"full_name"`
CloneURL string `json:"clone_url"`
HTMLURL string `json:"html_url"`
}
type GiteaUser struct {
Login string `json:"login"`
Email string `json:"email"`
}
type GiteaCommit struct {
ID string `json:"id"`
Message string `json:"message"`
Added []string `json:"added"`
Removed []string `json:"removed"`
Modified []string `json:"modified"`
}
// --- Workflow YAML (minimal parse) ---
type Workflow struct {
Name string `yaml:"name"`
On interface{} `yaml:"on"`
Jobs map[string]WorkflowJob `yaml:"jobs"`
}
type WorkflowJob struct {
RunsOn string `yaml:"runs-on"`
}
// --- DynamoDB run record ---
type RunRecord struct {
Repo string `dynamodbav:"repo"`
RunID string `dynamodbav:"run_id"`
InstanceID string `dynamodbav:"instance_id"`
Status string `dynamodbav:"status"`
Workflow string `dynamodbav:"workflow"`
Label string `dynamodbav:"label"`
StartedAt int64 `dynamodbav:"started_at"`
TTL int64 `dynamodbav:"ttl"`
}
// --- Spot instance config per label ---
type spotConfig struct {
InstanceType string
MaxPrice string // spot bid (empty = on-demand price cap)
}
var labelToSpot = map[string]spotConfig{
"go": {InstanceType: "t3.small", MaxPrice: "0.02"},
"node": {InstanceType: "t3.medium", MaxPrice: "0.04"},
"docker": {InstanceType: "t3.medium", MaxPrice: "0.04"},
"deploy": {InstanceType: "t3.micro", MaxPrice: "0.01"},
"godot": {InstanceType: "t3.medium", MaxPrice: "0.04"},
}
// --- Config from env ---
type cfg struct {
GiteaURL string
GiteaToken string // API token (for fetching workflows, setting commit status)
RunnerToken string // Runner registration token (for act_runner register)
AMI string // pre-baked AMI with Go, Node, Docker, AWS CLI, act_runner
Subnet string
SecurityGroup string
DDBTable string
InstanceProfile string
}
func loadCfg() cfg {
return cfg{
GiteaURL: os.Getenv("GITEA_URL"),
GiteaToken: os.Getenv("GITEA_TOKEN"),
RunnerToken: os.Getenv("RUNNER_TOKEN"),
AMI: os.Getenv("RUNNER_AMI"),
Subnet: os.Getenv("SUBNET"),
SecurityGroup: os.Getenv("SECURITY_GROUP"),
DDBTable: os.Getenv("DDB_TABLE"),
InstanceProfile: os.Getenv("INSTANCE_PROFILE"),
}
}
// --- User-data script template ---
// The instance boots, registers as ephemeral runner, picks one job, then terminates itself.
func userDataScript(c cfg, label, runnerName string) string {
script := fmt.Sprintf(`#!/bin/bash
set -euo pipefail
exec > /var/log/tinqs-ci.log 2>&1
echo "=== Tinqs CI Runner: %s ==="
# Get instance metadata (IMDSv2)
TOKEN=$(curl -s -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 60")
INSTANCE_ID=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/instance-id)
REGION=$(curl -s -H "X-aws-ec2-metadata-token: $TOKEN" http://169.254.169.254/latest/meta-data/placement/region)
# Self-termination: shutdown triggers terminate (InstanceInitiatedShutdownBehavior=terminate)
# No IAM needed — kernel handles it
trap 'echo "=== Job done, shutting down ===" && shutdown -h now' EXIT
export PATH=$PATH:/usr/local/go/bin:/usr/local/bin
export HOME=/root
# Git auth for private repo clones (checkout action uses git binary which reads this)
# Note: act_runner's internal action resolution uses go-git, which does NOT read
# gitconfig — so tinqs/ci must stay public for act_runner to clone actions.
GITEA_TOKEN="%s"
git config --global url."https://token:${GITEA_TOKEN}@tinqs.com/".insteadOf "https://tinqs.com/"
# Create proper working directory for act_runner
mkdir -p /opt/runner && cd /opt/runner
# Register as ephemeral runner (picks one job, then exits)
act_runner register --no-interactive \
--instance %s \
--token %s \
--name %s \
--labels "%s:host" \
--ephemeral
# Configure runner
cat > .runner.yaml << 'RUNCFG'
log:
level: info
runner:
capacity: 1
timeout: 30m
envs:
HOME: /root
AWS_DEFAULT_REGION: eu-west-1
PATH: /usr/local/go/bin:/usr/local/bin:/usr/bin:/bin:/usr/sbin:/sbin
host:
workdir_parent: /opt/runner/work
RUNCFG
mkdir -p /opt/runner/work
# Run — blocks until the job completes, then exits (ephemeral = one job only)
act_runner daemon --config .runner.yaml
# Safety: if daemon exits without triggering trap (shouldn't happen), force shutdown
shutdown -h now
echo "=== Runner exited, cleanup will terminate instance ==="
`, runnerName, c.GiteaToken, c.GiteaURL, c.RunnerToken, runnerName, label)
return base64.StdEncoding.EncodeToString([]byte(script))
}
func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
c := loadCfg()
// Handle cleanup cron
if request.Body == "" || strings.Contains(request.Body, `"action":"cleanup"`) {
return handleCleanup(ctx, c)
}
// Handle cancel webhook
eventType := request.Headers["x-gitea-event"]
if eventType == "" {
eventType = request.Headers["X-Gitea-Event"]
}
if eventType == "workflow_job" {
return handleCancel(ctx, c, request.Body)
}
// Handle push event
var push GiteaPushEvent
if err := json.Unmarshal([]byte(request.Body), &push); err != nil {
log.Printf("Failed to parse webhook: %v", err)
return respond(400, `{"error":"invalid payload"}`)
}
if !strings.HasPrefix(push.Ref, "refs/heads/") {
return respond(200, `{"status":"skipped","reason":"not a branch push"}`)
}
branch := strings.TrimPrefix(push.Ref, "refs/heads/")
log.Printf("Push to %s branch=%s by=%s commits=%d",
push.Repository.FullName, branch, push.Pusher.Login, len(push.Commits))
changedFiles := collectChangedFiles(push.Commits)
workflows, err := fetchWorkflows(c.GiteaURL, c.GiteaToken, push.Repository.FullName, branch)
if err != nil || len(workflows) == 0 {
return respond(200, `{"status":"no_workflows"}`)
}
dispatched := 0
for name, content := range workflows {
var wf Workflow
if err := yaml.Unmarshal([]byte(content), &wf); err != nil {
log.Printf("Failed to parse %s: %v", name, err)
continue
}
if !shouldTrigger(wf, branch, changedFiles) {
log.Printf("Skipping %s (no matching trigger)", name)
continue
}
label := "host"
for _, job := range wf.Jobs {
if job.RunsOn != "" {
label = job.RunsOn
break
}
}
runID := fmt.Sprintf("%s-%s-%d", push.After[:7], name, time.Now().UnixMilli())
switch label {
case "host":
log.Printf("Skipping %s (runs-on: host — registered runner)", name)
continue
default:
// All non-host jobs (go/node/docker/deploy/godot) go through Spot.
// The old `deploy` → Lambda-executor path was retired when
// tinqs-ci-exec was deleted (26 May 2026); deploy now launches
// a t3.micro Spot runner labelled `deploy` like any other job.
instanceID, err := startSpotRunner(ctx, c, label, runID)
if err != nil {
log.Printf("Failed to start spot for %s: %v", name, err)
continue
}
if err := trackRun(ctx, c, push.Repository.FullName, runID, instanceID, name, label); err != nil {
log.Printf("Warning: failed to track run: %v", err)
}
}
dispatched++
}
return respond(200, fmt.Sprintf(
`{"status":"dispatched","workflows":%d,"dispatched":%d,"repo":"%s","branch":"%s"}`,
len(workflows), dispatched, push.Repository.FullName, branch))
}
// --- EC2 Spot runner ---
func startSpotRunner(ctx context.Context, c cfg, label, runID string) (string, error) {
awsCfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return "", fmt.Errorf("aws config: %w", err)
}
spot, ok := labelToSpot[label]
if !ok {
return "", fmt.Errorf("unknown label: %s", label)
}
// runID = "<sha7>-<workflow>-<ms>" already encodes everything unique.
// The old runID[:12] truncation collapsed every deploy of a commit to the
// same name (e.g. deploy-arikigame + release on one push), confusing task
// routing and same-commit reruns. Use the full runID, sanitised for the
// runner-name charset (no dots/slashes from workflow filenames).
safeID := strings.NewReplacer(".", "-", "/", "-").Replace(runID)
runnerName := fmt.Sprintf("spot-%s-%s", label, safeID)
userData := userDataScript(c, label, runnerName)
client := ec2.NewFromConfig(awsCfg)
// Docker builds need more disk (monorepo + layers)
diskSize := int32(20)
if label == "docker" {
diskSize = 40
}
out, err := client.RunInstances(ctx, &ec2.RunInstancesInput{
ImageId: aws.String(c.AMI),
InstanceType: ec2types.InstanceType(spot.InstanceType),
MinCount: aws.Int32(1),
MaxCount: aws.Int32(1),
UserData: aws.String(userData),
// Root volume — bigger for Docker builds
BlockDeviceMappings: []ec2types.BlockDeviceMapping{{
DeviceName: aws.String("/dev/xvda"),
Ebs: &ec2types.EbsBlockDevice{
VolumeSize: aws.Int32(diskSize),
VolumeType: ec2types.VolumeTypeGp3,
DeleteOnTermination: aws.Bool(true),
},
}},
// Spot request
InstanceMarketOptions: &ec2types.InstanceMarketOptionsRequest{
MarketType: ec2types.MarketTypeSpot,
SpotOptions: &ec2types.SpotMarketOptions{
MaxPrice: aws.String(spot.MaxPrice),
SpotInstanceType: ec2types.SpotInstanceTypeOneTime,
InstanceInterruptionBehavior: ec2types.InstanceInterruptionBehaviorTerminate,
},
},
// Network
NetworkInterfaces: []ec2types.InstanceNetworkInterfaceSpecification{{
DeviceIndex: aws.Int32(0),
SubnetId: aws.String(c.Subnet),
Groups: []string{c.SecurityGroup},
AssociatePublicIpAddress: aws.Bool(true),
}},
// IAM role (for AWS CLI, ECR, S3, ECS access)
IamInstanceProfile: &ec2types.IamInstanceProfileSpecification{
Arn: aws.String(c.InstanceProfile),
},
// Tags for identification and cleanup
TagSpecifications: []ec2types.TagSpecification{{
ResourceType: ec2types.ResourceTypeInstance,
Tags: []ec2types.Tag{
{Key: aws.String("Name"), Value: aws.String(runnerName)},
{Key: aws.String("tinqs-ci"), Value: aws.String("true")},
{Key: aws.String("tinqs-ci-run"), Value: aws.String(runID)},
{Key: aws.String("tinqs-ci-label"), Value: aws.String(label)},
},
}},
// Auto-terminate safety net (instance shuts down → terminates)
InstanceInitiatedShutdownBehavior: ec2types.ShutdownBehaviorTerminate,
})
if err != nil {
return "", fmt.Errorf("run instances: %w", err)
}
if len(out.Instances) == 0 {
return "", fmt.Errorf("no instances launched")
}
instanceID := *out.Instances[0].InstanceId
log.Printf("Started spot instance: %s (type=%s, label=%s, runner=%s)",
instanceID, spot.InstanceType, label, runnerName)
return instanceID, nil
}
// --- Cancel handler ---
func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayProxyResponse, error) {
var event struct {
Action string `json:"action"`
Repository GiteaRepo `json:"repository"`
}
if err := json.Unmarshal([]byte(body), &event); err != nil {
return respond(400, `{"error":"invalid cancel event"}`)
}
if event.Action != "cancelled" {
return respond(200, `{"status":"ignored","reason":"not a cancel event"}`)
}
log.Printf("Cancel request for %s", event.Repository.FullName)
awsCfg, _ := config.LoadDefaultConfig(ctx)
ddb := dynamodb.NewFromConfig(awsCfg)
out, err := ddb.Query(ctx, &dynamodb.QueryInput{
TableName: aws.String(c.DDBTable),
KeyConditionExpression: aws.String("repo = :repo"),
FilterExpression: aws.String("#s = :running"),
ExpressionAttributeNames: map[string]string{"#s": "status"},
ExpressionAttributeValues: map[string]types.AttributeValue{
":repo": &types.AttributeValueMemberS{Value: event.Repository.FullName},
":running": &types.AttributeValueMemberS{Value: "running"},
},
})
if err != nil {
return respond(500, `{"error":"db query failed"}`)
}
ec2Client := ec2.NewFromConfig(awsCfg)
stopped := 0
// Collect instance IDs to terminate
var instanceIDs []string
var records []RunRecord
for _, item := range out.Items {
var rec RunRecord
if err := attributevalue.UnmarshalMap(item, &rec); err != nil || rec.InstanceID == "" {
continue
}
instanceIDs = append(instanceIDs, rec.InstanceID)
records = append(records, rec)
}
if len(instanceIDs) > 0 {
_, err := ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{
InstanceIds: instanceIDs,
})
if err != nil {
log.Printf("Failed to terminate instances: %v", err)
} else {
stopped = len(instanceIDs)
for _, rec := range records {
updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "cancelled")
}
log.Printf("Terminated %d instances for %s", stopped, event.Repository.FullName)
}
}
return respond(200, fmt.Sprintf(`{"status":"cancelled","stopped":%d}`, stopped))
}
// --- Cleanup cron ---
func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse, error) {
log.Println("Running cleanup...")
awsCfg, _ := config.LoadDefaultConfig(ctx)
ddb := dynamodb.NewFromConfig(awsCfg)
ec2Client := ec2.NewFromConfig(awsCfg)
cutoff := time.Now().Add(-30 * time.Minute).Unix()
out, err := ddb.Scan(ctx, &dynamodb.ScanInput{
TableName: aws.String(c.DDBTable),
FilterExpression: aws.String("#s = :running AND started_at < :cutoff"),
ExpressionAttributeNames: map[string]string{"#s": "status"},
ExpressionAttributeValues: map[string]types.AttributeValue{
":running": &types.AttributeValueMemberS{Value: "running"},
":cutoff": &types.AttributeValueMemberN{Value: fmt.Sprintf("%d", cutoff)},
},
})
if err != nil {
return respond(500, fmt.Sprintf(`{"error":"%s"}`, err))
}
// Batch terminate stale instances
var staleIDs []string
var staleRecs []RunRecord
for _, item := range out.Items {
var rec RunRecord
if err := attributevalue.UnmarshalMap(item, &rec); err != nil || rec.InstanceID == "" {
continue
}
staleIDs = append(staleIDs, rec.InstanceID)
staleRecs = append(staleRecs, rec)
}
killed := 0
if len(staleIDs) > 0 {
ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{
InstanceIds: staleIDs,
})
for _, rec := range staleRecs {
updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "timeout")
}
killed = len(staleIDs)
log.Printf("Killed %d stale instances", killed)
}
// Also sweep any tinqs-ci tagged instances that somehow escaped DynamoDB
descOut, _ := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
Filters: []ec2types.Filter{
{Name: aws.String("tag:tinqs-ci"), Values: []string{"true"}},
{Name: aws.String("instance-state-name"), Values: []string{"running", "pending"}},
},
})
var orphanIDs []string
for _, res := range descOut.Reservations {
for _, inst := range res.Instances {
if inst.LaunchTime != nil && time.Since(*inst.LaunchTime) > 30*time.Minute {
orphanIDs = append(orphanIDs, *inst.InstanceId)
}
}
}
if len(orphanIDs) > 0 {
ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{
InstanceIds: orphanIDs,
})
log.Printf("Killed %d orphan instances (tag sweep)", len(orphanIDs))
killed += len(orphanIDs)
}
return respond(200, fmt.Sprintf(`{"status":"cleanup","killed":%d}`, killed))
}
// --- DynamoDB helpers ---
func trackRun(ctx context.Context, c cfg, repo, runID, instanceID, workflow, label string) error {
awsCfg, _ := config.LoadDefaultConfig(ctx)
ddb := dynamodb.NewFromConfig(awsCfg)
now := time.Now()
rec := RunRecord{
Repo: repo,
RunID: runID,
InstanceID: instanceID,
Status: "running",
Workflow: workflow,
Label: label,
StartedAt: now.Unix(),
TTL: now.Add(7 * 24 * time.Hour).Unix(),
}
item, err := attributevalue.MarshalMap(rec)
if err != nil {
return err
}
_, err = ddb.PutItem(ctx, &dynamodb.PutItemInput{
TableName: aws.String(c.DDBTable),
Item: item,
})
return err
}
func updateRunStatus(ctx context.Context, ddb *dynamodb.Client, table, repo, runID, status string) {
ddb.UpdateItem(ctx, &dynamodb.UpdateItemInput{
TableName: aws.String(table),
Key: map[string]types.AttributeValue{
"repo": &types.AttributeValueMemberS{Value: repo},
"run_id": &types.AttributeValueMemberS{Value: runID},
},
UpdateExpression: aws.String("SET #s = :status"),
ExpressionAttributeNames: map[string]string{"#s": "status"},
ExpressionAttributeValues: map[string]types.AttributeValue{
":status": &types.AttributeValueMemberS{Value: status},
},
})
}
// --- Gitea API ---
func fetchWorkflows(baseURL, token, repoFullName, branch string) (map[string]string, error) {
url := fmt.Sprintf("%s/api/v1/repos/%s/contents/.gitea/workflows?ref=%s",
baseURL, repoFullName, branch)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Authorization", "token "+token)
resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode == 404 {
return nil, nil
}
var contents []struct {
Name string `json:"name"`
Path string `json:"path"`
}
if err := json.NewDecoder(resp.Body).Decode(&contents); err != nil {
return nil, err
}
workflows := make(map[string]string)
for _, c := range contents {
if !strings.HasSuffix(c.Name, ".yml") && !strings.HasSuffix(c.Name, ".yaml") {
continue
}
content, err := fetchRawFile(baseURL, token, repoFullName, c.Path, branch)
if err != nil {
log.Printf("Failed to fetch %s: %v", c.Path, err)
continue
}
workflows[c.Name] = content
}
return workflows, nil
}
func fetchRawFile(baseURL, token, repoFullName, path, branch string) (string, error) {
url := fmt.Sprintf("%s/api/v1/repos/%s/raw/%s?ref=%s", baseURL, repoFullName, path, branch)
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Authorization", "token "+token)
resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
body := make([]byte, 1<<20)
n, _ := resp.Body.Read(body)
return string(body[:n]), nil
}
// --- Trigger evaluation ---
func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool {
onMap, ok := wf.On.(map[string]interface{})
if !ok {
return true
}
pushCfg, ok := onMap["push"]
if !ok {
return false
}
pushMap, ok := pushCfg.(map[string]interface{})
if !ok {
return true
}
if branches, ok := pushMap["branches"].([]interface{}); ok {
matched := false
for _, b := range branches {
if fmt.Sprint(b) == branch {
matched = true
break
}
}
if !matched {
return false
}
}
if paths, ok := pushMap["paths"].([]interface{}); ok && len(changedFiles) > 0 {
matched := false
for _, p := range paths {
pattern := fmt.Sprint(p)
for _, f := range changedFiles {
if matchPath(pattern, f) {
matched = true
break
}
}
if matched {
break
}
}
if !matched {
return false
}
}
if ignorePaths, ok := pushMap["paths-ignore"].([]interface{}); ok && len(changedFiles) > 0 {
allIgnored := true
for _, f := range changedFiles {
ignored := false
for _, p := range ignorePaths {
if matchPath(fmt.Sprint(p), f) {
ignored = true
break
}
}
if !ignored {
allIgnored = false
break
}
}
if allIgnored {
return false
}
}
return true
}
func matchPath(pattern, file string) bool {
if strings.HasSuffix(pattern, "/**") {
prefix := strings.TrimSuffix(pattern, "/**")
return strings.HasPrefix(file, prefix+"/") || file == prefix
}
if strings.HasPrefix(pattern, "*.") {
ext := strings.TrimPrefix(pattern, "*")
return strings.HasSuffix(file, ext)
}
return strings.HasPrefix(file, pattern) || file == pattern
}
func collectChangedFiles(commits []GiteaCommit) []string {
seen := make(map[string]bool)
var files []string
for _, c := range commits {
for _, f := range c.Added {
if !seen[f] {
files = append(files, f)
seen[f] = true
}
}
for _, f := range c.Modified {
if !seen[f] {
files = append(files, f)
seen[f] = true
}
}
for _, f := range c.Removed {
if !seen[f] {
files = append(files, f)
seen[f] = true
}
}
}
return files
}
func respond(status int, body string) (events.APIGatewayProxyResponse, error) {
return events.APIGatewayProxyResponse{
StatusCode: status,
Headers: map[string]string{"Content-Type": "application/json"},
Body: body,
}, nil
}
func main() {
lambda.Start(handler)
}