feat: switch orchestrator from Fargate to EC2 Spot
Dispatcher now launches Spot instances instead of Fargate tasks: - t3.small for go/node builds ($0.005/hr) - t3.medium for docker/godot builds ($0.01/hr) - t3.micro for deploy jobs ($0.004/hr) Instances self-terminate via user-data trap on exit. Cancel: ec2:TerminateInstances instead of ecs:StopTask. Cleanup cron also sweeps orphan instances by tinqs-ci tag. Pre-baked AMI with act_runner + tools = instant boot, no install.
This commit is contained in:
@@ -1,6 +1,6 @@
|
|||||||
AWSTemplateFormatVersion: '2010-09-09'
|
AWSTemplateFormatVersion: '2010-09-09'
|
||||||
Transform: AWS::Serverless-2016-10-31
|
Transform: AWS::Serverless-2016-10-31
|
||||||
Description: Tinqs CI Orchestrator — webhook dispatch, Fargate routing, cancel support
|
Description: Tinqs CI Orchestrator — Lambda dispatch + EC2 Spot runners
|
||||||
|
|
||||||
Parameters:
|
Parameters:
|
||||||
GiteaURL:
|
GiteaURL:
|
||||||
@@ -9,24 +9,18 @@ Parameters:
|
|||||||
GiteaToken:
|
GiteaToken:
|
||||||
Type: String
|
Type: String
|
||||||
NoEcho: true
|
NoEcho: true
|
||||||
ECSCluster:
|
RunnerAMI:
|
||||||
Type: String
|
Type: AWS::EC2::Image::Id
|
||||||
Default: tinqs-git
|
Description: Pre-baked AMI with Go, Node, Docker, AWS CLI, act_runner
|
||||||
Subnets:
|
Subnet:
|
||||||
Type: CommaDelimitedList
|
Type: AWS::EC2::Subnet::Id
|
||||||
Description: VPC subnet IDs for Fargate tasks
|
Description: Public subnet for spot instances
|
||||||
SecurityGroup:
|
SecurityGroup:
|
||||||
|
Type: AWS::EC2::SecurityGroup::Id
|
||||||
|
Description: Security group for spot instances
|
||||||
|
InstanceProfileArn:
|
||||||
Type: String
|
Type: String
|
||||||
Description: Security group for Fargate tasks
|
Description: IAM instance profile ARN for runners
|
||||||
ECRBase:
|
|
||||||
Type: String
|
|
||||||
Default: 149751500842.dkr.ecr.eu-west-1.amazonaws.com
|
|
||||||
TaskRoleArn:
|
|
||||||
Type: String
|
|
||||||
Default: arn:aws:iam::149751500842:role/tinqs-git-task
|
|
||||||
ExecRoleArn:
|
|
||||||
Type: String
|
|
||||||
Default: arn:aws:iam::149751500842:role/ecsTaskExecutionRole
|
|
||||||
|
|
||||||
Globals:
|
Globals:
|
||||||
Function:
|
Function:
|
||||||
@@ -71,7 +65,7 @@ Resources:
|
|||||||
FunctionName: tinqs-ci-dispatch
|
FunctionName: tinqs-ci-dispatch
|
||||||
Handler: bootstrap
|
Handler: bootstrap
|
||||||
CodeUri: ../dispatch/
|
CodeUri: ../dispatch/
|
||||||
Description: Receives webhook, routes to Fargate or Lambda by runs-on label
|
Description: Receives webhook, starts Spot instances or invokes Lambda executor
|
||||||
Timeout: 60
|
Timeout: 60
|
||||||
MemorySize: 256
|
MemorySize: 256
|
||||||
Environment:
|
Environment:
|
||||||
@@ -79,13 +73,11 @@ Resources:
|
|||||||
GITEA_URL: !Ref GiteaURL
|
GITEA_URL: !Ref GiteaURL
|
||||||
GITEA_TOKEN: !Ref GiteaToken
|
GITEA_TOKEN: !Ref GiteaToken
|
||||||
EXECUTOR_FUNCTION_NAME: !Ref ExecFunction
|
EXECUTOR_FUNCTION_NAME: !Ref ExecFunction
|
||||||
ECS_CLUSTER: !Ref ECSCluster
|
RUNNER_AMI: !Ref RunnerAMI
|
||||||
SUBNETS: !Join [",", !Ref Subnets]
|
SUBNET: !Ref Subnet
|
||||||
SECURITY_GROUP: !Ref SecurityGroup
|
SECURITY_GROUP: !Ref SecurityGroup
|
||||||
ECR_BASE: !Ref ECRBase
|
|
||||||
DDB_TABLE: !Ref RunsTable
|
DDB_TABLE: !Ref RunsTable
|
||||||
TASK_ROLE_ARN: !Ref TaskRoleArn
|
INSTANCE_PROFILE: !Ref InstanceProfileArn
|
||||||
EXEC_ROLE_ARN: !Ref ExecRoleArn
|
|
||||||
Policies:
|
Policies:
|
||||||
- LambdaInvokePolicy:
|
- LambdaInvokePolicy:
|
||||||
FunctionName: !Ref ExecFunction
|
FunctionName: !Ref ExecFunction
|
||||||
@@ -95,16 +87,14 @@ Resources:
|
|||||||
Statement:
|
Statement:
|
||||||
- Effect: Allow
|
- Effect: Allow
|
||||||
Action:
|
Action:
|
||||||
- ecs:RunTask
|
- ec2:RunInstances
|
||||||
- ecs:StopTask
|
- ec2:TerminateInstances
|
||||||
- ecs:RegisterTaskDefinition
|
- ec2:DescribeInstances
|
||||||
- ecs:DescribeTasks
|
- ec2:CreateTags
|
||||||
Resource: '*'
|
Resource: '*'
|
||||||
- Effect: Allow
|
- Effect: Allow
|
||||||
Action: iam:PassRole
|
Action: iam:PassRole
|
||||||
Resource:
|
Resource: !Ref InstanceProfileArn
|
||||||
- !Ref TaskRoleArn
|
|
||||||
- !Ref ExecRoleArn
|
|
||||||
Events:
|
Events:
|
||||||
Webhook:
|
Webhook:
|
||||||
Type: Api
|
Type: Api
|
||||||
@@ -172,12 +162,12 @@ Resources:
|
|||||||
CILogGroup:
|
CILogGroup:
|
||||||
Type: AWS::Logs::LogGroup
|
Type: AWS::Logs::LogGroup
|
||||||
Properties:
|
Properties:
|
||||||
LogGroupName: /ecs/tinqs-ci
|
LogGroupName: /tinqs/ci
|
||||||
RetentionInDays: 14
|
RetentionInDays: 14
|
||||||
|
|
||||||
Outputs:
|
Outputs:
|
||||||
WebhookURL:
|
WebhookURL:
|
||||||
Description: Configure this as Gitea system webhook
|
Description: Configure as Gitea system webhook
|
||||||
Value: !Sub 'https://${WebhookApi}.execute-api.${AWS::Region}.amazonaws.com/prod/webhook'
|
Value: !Sub 'https://${WebhookApi}.execute-api.${AWS::Region}.amazonaws.com/prod/webhook'
|
||||||
DispatchArn:
|
DispatchArn:
|
||||||
Value: !GetAtt DispatchFunction.Arn
|
Value: !GetAtt DispatchFunction.Arn
|
||||||
|
|||||||
@@ -8,7 +8,7 @@ require (
|
|||||||
github.com/aws/aws-sdk-go-v2/config v1.28.0
|
github.com/aws/aws-sdk-go-v2/config v1.28.0
|
||||||
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12
|
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.15.12
|
||||||
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0
|
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.38.0
|
||||||
github.com/aws/aws-sdk-go-v2/service/ecs v1.52.0
|
github.com/aws/aws-sdk-go-v2/service/ec2 v1.198.0
|
||||||
github.com/aws/aws-sdk-go-v2/service/lambda v1.69.0
|
github.com/aws/aws-sdk-go-v2/service/lambda v1.69.0
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
)
|
)
|
||||||
|
|||||||
+182
-144
@@ -3,16 +3,17 @@
|
|||||||
// Receives Gitea system webhooks, determines which workflows to run,
|
// Receives Gitea system webhooks, determines which workflows to run,
|
||||||
// and routes each job to the right execution environment based on runs-on label:
|
// and routes each job to the right execution environment based on runs-on label:
|
||||||
//
|
//
|
||||||
// go, node, docker, godot → Fargate task with matching runner image
|
// go, node, docker, godot → EC2 Spot instance (pre-baked AMI, self-terminates)
|
||||||
// deploy → Lambda direct execution (ci-exec)
|
// deploy → Lambda direct execution (ci-exec)
|
||||||
// host → skip (handled by registered always-on runner)
|
// host → skip (handled by registered always-on runner)
|
||||||
//
|
//
|
||||||
// Also handles cancel events (stop Fargate tasks) and cleanup cron.
|
// Also handles cancel events (terminate instances) and cleanup cron.
|
||||||
|
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
@@ -28,7 +29,8 @@ import (
|
|||||||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
|
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/ecs"
|
"github.com/aws/aws-sdk-go-v2/service/ec2"
|
||||||
|
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
|
||||||
awslambda "github.com/aws/aws-sdk-go-v2/service/lambda"
|
awslambda "github.com/aws/aws-sdk-go-v2/service/lambda"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
@@ -82,7 +84,7 @@ type WorkflowJob struct {
|
|||||||
type RunRecord struct {
|
type RunRecord struct {
|
||||||
Repo string `dynamodbav:"repo"`
|
Repo string `dynamodbav:"repo"`
|
||||||
RunID string `dynamodbav:"run_id"`
|
RunID string `dynamodbav:"run_id"`
|
||||||
TaskArn string `dynamodbav:"task_arn"`
|
InstanceID string `dynamodbav:"instance_id"`
|
||||||
Status string `dynamodbav:"status"`
|
Status string `dynamodbav:"status"`
|
||||||
Workflow string `dynamodbav:"workflow"`
|
Workflow string `dynamodbav:"workflow"`
|
||||||
Label string `dynamodbav:"label"`
|
Label string `dynamodbav:"label"`
|
||||||
@@ -90,22 +92,19 @@ type RunRecord struct {
|
|||||||
TTL int64 `dynamodbav:"ttl"`
|
TTL int64 `dynamodbav:"ttl"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Image routing ---
|
// --- Spot instance config per label ---
|
||||||
|
|
||||||
var labelToImage = map[string]string{
|
type spotConfig struct {
|
||||||
"go": "tinqs-runner-go",
|
InstanceType string
|
||||||
"node": "tinqs-runner-node",
|
MaxPrice string // spot bid (empty = on-demand price cap)
|
||||||
"docker": "tinqs-runner-docker",
|
|
||||||
"deploy": "tinqs-runner-deploy",
|
|
||||||
"godot": "tinqs-runner-godot",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var labelToResources = map[string][2]string{
|
var labelToSpot = map[string]spotConfig{
|
||||||
"go": {"1024", "2048"}, // 1 vCPU, 2 GB
|
"go": {InstanceType: "t3.small", MaxPrice: "0.008"},
|
||||||
"node": {"1024", "2048"},
|
"node": {InstanceType: "t3.small", MaxPrice: "0.008"},
|
||||||
"docker": {"2048", "4096"}, // 2 vCPU, 4 GB (Docker builds are heavy)
|
"docker": {InstanceType: "t3.medium", MaxPrice: "0.016"},
|
||||||
"deploy": {"512", "1024"}, // 0.5 vCPU, 1 GB (lightweight)
|
"deploy": {InstanceType: "t3.micro", MaxPrice: "0.004"},
|
||||||
"godot": {"2048", "4096"},
|
"godot": {InstanceType: "t3.medium", MaxPrice: "0.016"},
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Config from env ---
|
// --- Config from env ---
|
||||||
@@ -114,13 +113,12 @@ type cfg struct {
|
|||||||
GiteaURL string
|
GiteaURL string
|
||||||
GiteaToken string
|
GiteaToken string
|
||||||
ExecFnName string
|
ExecFnName string
|
||||||
ECSCluster string
|
AMI string // pre-baked AMI with Go, Node, Docker, AWS CLI, act_runner
|
||||||
Subnets []string
|
Subnet string
|
||||||
SecurityGroup string
|
SecurityGroup string
|
||||||
ECRBase string
|
|
||||||
DDBTable string
|
DDBTable string
|
||||||
TaskRoleArn string
|
InstanceProfile string
|
||||||
ExecRoleArn string
|
Region string
|
||||||
}
|
}
|
||||||
|
|
||||||
func loadCfg() cfg {
|
func loadCfg() cfg {
|
||||||
@@ -128,20 +126,57 @@ func loadCfg() cfg {
|
|||||||
GiteaURL: os.Getenv("GITEA_URL"),
|
GiteaURL: os.Getenv("GITEA_URL"),
|
||||||
GiteaToken: os.Getenv("GITEA_TOKEN"),
|
GiteaToken: os.Getenv("GITEA_TOKEN"),
|
||||||
ExecFnName: os.Getenv("EXECUTOR_FUNCTION_NAME"),
|
ExecFnName: os.Getenv("EXECUTOR_FUNCTION_NAME"),
|
||||||
ECSCluster: os.Getenv("ECS_CLUSTER"),
|
AMI: os.Getenv("RUNNER_AMI"),
|
||||||
Subnets: strings.Split(os.Getenv("SUBNETS"), ","),
|
Subnet: os.Getenv("SUBNET"),
|
||||||
SecurityGroup: os.Getenv("SECURITY_GROUP"),
|
SecurityGroup: os.Getenv("SECURITY_GROUP"),
|
||||||
ECRBase: os.Getenv("ECR_BASE"), // e.g. 149751500842.dkr.ecr.eu-west-1.amazonaws.com
|
|
||||||
DDBTable: os.Getenv("DDB_TABLE"),
|
DDBTable: os.Getenv("DDB_TABLE"),
|
||||||
TaskRoleArn: os.Getenv("TASK_ROLE_ARN"),
|
InstanceProfile: os.Getenv("INSTANCE_PROFILE"), // IAM instance profile ARN
|
||||||
ExecRoleArn: os.Getenv("EXEC_ROLE_ARN"),
|
Region: os.Getenv("AWS_REGION"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- User-data script template ---
|
||||||
|
// The instance boots, registers as ephemeral runner, picks one job, then terminates itself.
|
||||||
|
|
||||||
|
func userDataScript(c cfg, label, runnerName string) string {
|
||||||
|
script := fmt.Sprintf(`#!/bin/bash
|
||||||
|
set -euo pipefail
|
||||||
|
exec > /var/log/tinqs-ci.log 2>&1
|
||||||
|
|
||||||
|
echo "=== Tinqs CI Runner: %s ==="
|
||||||
|
INSTANCE_ID=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)
|
||||||
|
REGION=%s
|
||||||
|
|
||||||
|
# Self-termination trap — kill instance on exit (success or failure)
|
||||||
|
cleanup() {
|
||||||
|
echo "=== Job done, terminating $INSTANCE_ID ==="
|
||||||
|
aws ec2 terminate-instances --instance-ids "$INSTANCE_ID" --region "$REGION" || true
|
||||||
|
}
|
||||||
|
trap cleanup EXIT
|
||||||
|
|
||||||
|
# act_runner is pre-installed in the AMI at /usr/local/bin/act_runner
|
||||||
|
cd /tmp
|
||||||
|
|
||||||
|
# Register as ephemeral runner (picks one job, then exits)
|
||||||
|
act_runner register --no-interactive \
|
||||||
|
--instance %s \
|
||||||
|
--token %s \
|
||||||
|
--name %s \
|
||||||
|
--labels "%s:host"
|
||||||
|
|
||||||
|
# Run — blocks until the job completes, then exits (ephemeral mode)
|
||||||
|
act_runner daemon
|
||||||
|
|
||||||
|
echo "=== Runner exited, cleanup will terminate instance ==="
|
||||||
|
`, runnerName, c.Region, c.GiteaURL, c.GiteaToken, runnerName, label)
|
||||||
|
|
||||||
|
return base64.StdEncoding.EncodeToString([]byte(script))
|
||||||
|
}
|
||||||
|
|
||||||
func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
|
func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events.APIGatewayProxyResponse, error) {
|
||||||
c := loadCfg()
|
c := loadCfg()
|
||||||
|
|
||||||
// Handle cleanup cron (EventBridge invokes with action=cleanup)
|
// Handle cleanup cron
|
||||||
if request.Body == "" || strings.Contains(request.Body, `"action":"cleanup"`) {
|
if request.Body == "" || strings.Contains(request.Body, `"action":"cleanup"`) {
|
||||||
return handleCleanup(ctx, c)
|
return handleCleanup(ctx, c)
|
||||||
}
|
}
|
||||||
@@ -162,7 +197,6 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
|
|||||||
return respond(400, `{"error":"invalid payload"}`)
|
return respond(400, `{"error":"invalid payload"}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only process branch pushes
|
|
||||||
if !strings.HasPrefix(push.Ref, "refs/heads/") {
|
if !strings.HasPrefix(push.Ref, "refs/heads/") {
|
||||||
return respond(200, `{"status":"skipped","reason":"not a branch push"}`)
|
return respond(200, `{"status":"skipped","reason":"not a branch push"}`)
|
||||||
}
|
}
|
||||||
@@ -171,16 +205,13 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
|
|||||||
log.Printf("Push to %s branch=%s by=%s commits=%d",
|
log.Printf("Push to %s branch=%s by=%s commits=%d",
|
||||||
push.Repository.FullName, branch, push.Pusher.Login, len(push.Commits))
|
push.Repository.FullName, branch, push.Pusher.Login, len(push.Commits))
|
||||||
|
|
||||||
// Collect all changed files from commits
|
|
||||||
changedFiles := collectChangedFiles(push.Commits)
|
changedFiles := collectChangedFiles(push.Commits)
|
||||||
|
|
||||||
// Fetch and filter workflows
|
|
||||||
workflows, err := fetchWorkflows(c.GiteaURL, c.GiteaToken, push.Repository.FullName, branch)
|
workflows, err := fetchWorkflows(c.GiteaURL, c.GiteaToken, push.Repository.FullName, branch)
|
||||||
if err != nil || len(workflows) == 0 {
|
if err != nil || len(workflows) == 0 {
|
||||||
return respond(200, `{"status":"no_workflows"}`)
|
return respond(200, `{"status":"no_workflows"}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Route each workflow
|
|
||||||
dispatched := 0
|
dispatched := 0
|
||||||
for name, content := range workflows {
|
for name, content := range workflows {
|
||||||
var wf Workflow
|
var wf Workflow
|
||||||
@@ -189,13 +220,11 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if workflow triggers on this branch + paths
|
|
||||||
if !shouldTrigger(wf, branch, changedFiles) {
|
if !shouldTrigger(wf, branch, changedFiles) {
|
||||||
log.Printf("Skipping %s (no matching trigger)", name)
|
log.Printf("Skipping %s (no matching trigger)", name)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get the first job's runs-on label
|
|
||||||
label := "host"
|
label := "host"
|
||||||
for _, job := range wf.Jobs {
|
for _, job := range wf.Jobs {
|
||||||
if job.RunsOn != "" {
|
if job.RunsOn != "" {
|
||||||
@@ -208,24 +237,20 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
|
|||||||
|
|
||||||
switch label {
|
switch label {
|
||||||
case "deploy":
|
case "deploy":
|
||||||
// Lightweight: invoke Lambda executor directly
|
|
||||||
if err := invokeLambdaExec(ctx, c, push, name, content); err != nil {
|
if err := invokeLambdaExec(ctx, c, push, name, content); err != nil {
|
||||||
log.Printf("Failed to invoke executor for %s: %v", name, err)
|
log.Printf("Failed to invoke executor for %s: %v", name, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
case "host":
|
case "host":
|
||||||
// Legacy: skip, the always-on runner handles it
|
log.Printf("Skipping %s (runs-on: host — registered runner)", name)
|
||||||
log.Printf("Skipping %s (runs-on: host — handled by registered runner)", name)
|
|
||||||
continue
|
continue
|
||||||
default:
|
default:
|
||||||
// Fargate: start ephemeral runner with matching image
|
instanceID, err := startSpotRunner(ctx, c, label, runID)
|
||||||
taskArn, err := startFargateRunner(ctx, c, push, label, name, runID)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Failed to start Fargate for %s: %v", name, err)
|
log.Printf("Failed to start spot for %s: %v", name, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Track in DynamoDB
|
if err := trackRun(ctx, c, push.Repository.FullName, runID, instanceID, name, label); err != nil {
|
||||||
if err := trackRun(ctx, c, push.Repository.FullName, runID, taskArn, name, label); err != nil {
|
|
||||||
log.Printf("Warning: failed to track run: %v", err)
|
log.Printf("Warning: failed to track run: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -238,87 +263,80 @@ func handler(ctx context.Context, request events.APIGatewayProxyRequest) (events
|
|||||||
len(workflows), dispatched, push.Repository.FullName, branch))
|
len(workflows), dispatched, push.Repository.FullName, branch))
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Fargate runner ---
|
// --- EC2 Spot runner ---
|
||||||
|
|
||||||
func startFargateRunner(ctx context.Context, c cfg, push GiteaPushEvent, label, workflow, runID string) (string, error) {
|
func startSpotRunner(ctx context.Context, c cfg, label, runID string) (string, error) {
|
||||||
awsCfg, err := config.LoadDefaultConfig(ctx)
|
awsCfg, err := config.LoadDefaultConfig(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("aws config: %w", err)
|
return "", fmt.Errorf("aws config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
imageName := labelToImage[label]
|
spot, ok := labelToSpot[label]
|
||||||
if imageName == "" {
|
if !ok {
|
||||||
return "", fmt.Errorf("unknown label: %s", label)
|
return "", fmt.Errorf("unknown label: %s", label)
|
||||||
}
|
}
|
||||||
|
|
||||||
image := fmt.Sprintf("%s/%s:latest", c.ECRBase, imageName)
|
runnerName := fmt.Sprintf("spot-%s-%s", label, runID[:12])
|
||||||
resources := labelToResources[label]
|
userData := userDataScript(c, label, runnerName)
|
||||||
cpu, memory := resources[0], resources[1]
|
|
||||||
|
|
||||||
client := ecs.NewFromConfig(awsCfg)
|
client := ec2.NewFromConfig(awsCfg)
|
||||||
|
|
||||||
// Register a one-off task definition for this run
|
out, err := client.RunInstances(ctx, &ec2.RunInstancesInput{
|
||||||
taskDefOut, err := client.RegisterTaskDefinition(ctx, &ecs.RegisterTaskDefinitionInput{
|
ImageId: aws.String(c.AMI),
|
||||||
Family: aws.String(fmt.Sprintf("tinqs-ci-%s", label)),
|
InstanceType: ec2types.InstanceType(spot.InstanceType),
|
||||||
RequiresCompatibilities: []ecs.Compatibility{ecs.CompatibilityFargate},
|
MinCount: aws.Int32(1),
|
||||||
NetworkMode: ecs.NetworkModeAwsvpc,
|
MaxCount: aws.Int32(1),
|
||||||
Cpu: aws.String(cpu),
|
UserData: aws.String(userData),
|
||||||
Memory: aws.String(memory),
|
|
||||||
TaskRoleArn: aws.String(c.TaskRoleArn),
|
// Spot request
|
||||||
ExecutionRoleArn: aws.String(c.ExecRoleArn),
|
InstanceMarketOptions: &ec2types.InstanceMarketOptionsRequest{
|
||||||
ContainerDefinitions: []ecs.ContainerDefinition{{
|
MarketType: ec2types.MarketTypeSpot,
|
||||||
Name: aws.String("runner"),
|
SpotOptions: &ec2types.SpotMarketOptions{
|
||||||
Image: aws.String(image),
|
MaxPrice: aws.String(spot.MaxPrice),
|
||||||
Essential: aws.Bool(true),
|
SpotInstanceType: ec2types.SpotInstanceTypeOneTime,
|
||||||
Environment: []ecs.KeyValuePair{
|
InstanceInterruptionBehavior: ec2types.InstanceInterruptionBehaviorTerminate,
|
||||||
{Name: aws.String("GITEA_INSTANCE_URL"), Value: aws.String(c.GiteaURL)},
|
|
||||||
{Name: aws.String("GITEA_RUNNER_REGISTRATION_TOKEN"), Value: aws.String(c.GiteaToken)},
|
|
||||||
{Name: aws.String("GITEA_RUNNER_NAME"), Value: aws.String(fmt.Sprintf("ephemeral-%s-%s", label, runID[:8]))},
|
|
||||||
{Name: aws.String("GITEA_RUNNER_LABELS"), Value: aws.String(fmt.Sprintf("%s:host", label))},
|
|
||||||
{Name: aws.String("GITEA_RUNNER_EPHEMERAL"), Value: aws.String("true")},
|
|
||||||
},
|
},
|
||||||
LogConfiguration: &ecs.LogConfiguration{
|
|
||||||
LogDriver: ecs.LogDriverAwslogs,
|
|
||||||
Options: map[string]string{
|
|
||||||
"awslogs-group": "/ecs/tinqs-ci",
|
|
||||||
"awslogs-region": "eu-west-1",
|
|
||||||
"awslogs-stream-prefix": label,
|
|
||||||
},
|
},
|
||||||
|
|
||||||
|
// Network
|
||||||
|
NetworkInterfaces: []ec2types.InstanceNetworkInterfaceSpecification{{
|
||||||
|
DeviceIndex: aws.Int32(0),
|
||||||
|
SubnetId: aws.String(c.Subnet),
|
||||||
|
Groups: []string{c.SecurityGroup},
|
||||||
|
AssociatePublicIpAddress: aws.Bool(true),
|
||||||
|
}},
|
||||||
|
|
||||||
|
// IAM role (for AWS CLI, ECR, S3, ECS access)
|
||||||
|
IamInstanceProfile: &ec2types.IamInstanceProfileSpecification{
|
||||||
|
Arn: aws.String(c.InstanceProfile),
|
||||||
|
},
|
||||||
|
|
||||||
|
// Tags for identification and cleanup
|
||||||
|
TagSpecifications: []ec2types.TagSpecification{{
|
||||||
|
ResourceType: ec2types.ResourceTypeInstance,
|
||||||
|
Tags: []ec2types.Tag{
|
||||||
|
{Key: aws.String("Name"), Value: aws.String(runnerName)},
|
||||||
|
{Key: aws.String("tinqs-ci"), Value: aws.String("true")},
|
||||||
|
{Key: aws.String("tinqs-ci-run"), Value: aws.String(runID)},
|
||||||
|
{Key: aws.String("tinqs-ci-label"), Value: aws.String(label)},
|
||||||
},
|
},
|
||||||
}},
|
}},
|
||||||
|
|
||||||
|
// Auto-terminate safety net (instance shuts down → terminates)
|
||||||
|
InstanceInitiatedShutdownBehavior: ec2types.ShutdownBehaviorTerminate,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("register task def: %w", err)
|
return "", fmt.Errorf("run instances: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
taskDef := fmt.Sprintf("%s:%d", *taskDefOut.TaskDefinition.Family,
|
if len(out.Instances) == 0 {
|
||||||
taskDefOut.TaskDefinition.Revision)
|
return "", fmt.Errorf("no instances launched")
|
||||||
|
|
||||||
// Run the task
|
|
||||||
runOut, err := client.RunTask(ctx, &ecs.RunTaskInput{
|
|
||||||
Cluster: aws.String(c.ECSCluster),
|
|
||||||
TaskDefinition: aws.String(taskDef),
|
|
||||||
LaunchType: ecs.LaunchTypeFargate,
|
|
||||||
Count: aws.Int32(1),
|
|
||||||
NetworkConfiguration: &ecs.NetworkConfiguration{
|
|
||||||
AwsvpcConfiguration: &ecs.AwsVpcConfiguration{
|
|
||||||
Subnets: c.Subnets,
|
|
||||||
SecurityGroups: []string{c.SecurityGroup},
|
|
||||||
AssignPublicIp: ecs.AssignPublicIpEnabled,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return "", fmt.Errorf("run task: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(runOut.Tasks) == 0 {
|
instanceID := *out.Instances[0].InstanceId
|
||||||
return "", fmt.Errorf("no tasks started")
|
log.Printf("Started spot instance: %s (type=%s, label=%s, runner=%s)",
|
||||||
}
|
instanceID, spot.InstanceType, label, runnerName)
|
||||||
|
return instanceID, nil
|
||||||
taskArn := *runOut.Tasks[0].TaskArn
|
|
||||||
log.Printf("Started Fargate task: %s (image=%s, label=%s)", taskArn, imageName, label)
|
|
||||||
return taskArn, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Lambda executor (for deploy-only jobs) ---
|
// --- Lambda executor (for deploy-only jobs) ---
|
||||||
@@ -357,7 +375,6 @@ func invokeLambdaExec(ctx context.Context, c cfg, push GiteaPushEvent, workflow,
|
|||||||
// --- Cancel handler ---
|
// --- Cancel handler ---
|
||||||
|
|
||||||
func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayProxyResponse, error) {
|
func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayProxyResponse, error) {
|
||||||
// Parse workflow_job event to find the run to cancel
|
|
||||||
var event struct {
|
var event struct {
|
||||||
Action string `json:"action"`
|
Action string `json:"action"`
|
||||||
Repository GiteaRepo `json:"repository"`
|
Repository GiteaRepo `json:"repository"`
|
||||||
@@ -372,7 +389,6 @@ func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayPro
|
|||||||
|
|
||||||
log.Printf("Cancel request for %s", event.Repository.FullName)
|
log.Printf("Cancel request for %s", event.Repository.FullName)
|
||||||
|
|
||||||
// Find active runs for this repo in DynamoDB
|
|
||||||
awsCfg, _ := config.LoadDefaultConfig(ctx)
|
awsCfg, _ := config.LoadDefaultConfig(ctx)
|
||||||
ddb := dynamodb.NewFromConfig(awsCfg)
|
ddb := dynamodb.NewFromConfig(awsCfg)
|
||||||
|
|
||||||
@@ -387,36 +403,37 @@ func handleCancel(ctx context.Context, c cfg, body string) (events.APIGatewayPro
|
|||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("DynamoDB query failed: %v", err)
|
|
||||||
return respond(500, `{"error":"db query failed"}`)
|
return respond(500, `{"error":"db query failed"}`)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop all running Fargate tasks for this repo
|
ec2Client := ec2.NewFromConfig(awsCfg)
|
||||||
ecsClient := ecs.NewFromConfig(awsCfg)
|
|
||||||
stopped := 0
|
stopped := 0
|
||||||
|
|
||||||
|
// Collect instance IDs to terminate
|
||||||
|
var instanceIDs []string
|
||||||
|
var records []RunRecord
|
||||||
for _, item := range out.Items {
|
for _, item := range out.Items {
|
||||||
var rec RunRecord
|
var rec RunRecord
|
||||||
if err := attributevalue.UnmarshalMap(item, &rec); err != nil {
|
if err := attributevalue.UnmarshalMap(item, &rec); err != nil || rec.InstanceID == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if rec.TaskArn == "" {
|
instanceIDs = append(instanceIDs, rec.InstanceID)
|
||||||
continue
|
records = append(records, rec)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := ecsClient.StopTask(ctx, &ecs.StopTaskInput{
|
if len(instanceIDs) > 0 {
|
||||||
Cluster: aws.String(c.ECSCluster),
|
_, err := ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{
|
||||||
Task: aws.String(rec.TaskArn),
|
InstanceIds: instanceIDs,
|
||||||
Reason: aws.String("Cancelled via Gitea webhook"),
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Failed to stop task %s: %v", rec.TaskArn, err)
|
log.Printf("Failed to terminate instances: %v", err)
|
||||||
continue
|
} else {
|
||||||
}
|
stopped = len(instanceIDs)
|
||||||
|
for _, rec := range records {
|
||||||
// Update status in DynamoDB
|
|
||||||
updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "cancelled")
|
updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "cancelled")
|
||||||
stopped++
|
}
|
||||||
log.Printf("Stopped task %s for %s", rec.TaskArn, rec.Repo)
|
log.Printf("Terminated %d instances for %s", stopped, event.Repository.FullName)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return respond(200, fmt.Sprintf(`{"status":"cancelled","stopped":%d}`, stopped))
|
return respond(200, fmt.Sprintf(`{"status":"cancelled","stopped":%d}`, stopped))
|
||||||
@@ -429,9 +446,8 @@ func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse,
|
|||||||
|
|
||||||
awsCfg, _ := config.LoadDefaultConfig(ctx)
|
awsCfg, _ := config.LoadDefaultConfig(ctx)
|
||||||
ddb := dynamodb.NewFromConfig(awsCfg)
|
ddb := dynamodb.NewFromConfig(awsCfg)
|
||||||
ecsClient := ecs.NewFromConfig(awsCfg)
|
ec2Client := ec2.NewFromConfig(awsCfg)
|
||||||
|
|
||||||
// Scan for runs older than 30 minutes that are still "running"
|
|
||||||
cutoff := time.Now().Add(-30 * time.Minute).Unix()
|
cutoff := time.Now().Add(-30 * time.Minute).Unix()
|
||||||
|
|
||||||
out, err := ddb.Scan(ctx, &dynamodb.ScanInput{
|
out, err := ddb.Scan(ctx, &dynamodb.ScanInput{
|
||||||
@@ -447,24 +463,51 @@ func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse,
|
|||||||
return respond(500, fmt.Sprintf(`{"error":"%s"}`, err))
|
return respond(500, fmt.Sprintf(`{"error":"%s"}`, err))
|
||||||
}
|
}
|
||||||
|
|
||||||
killed := 0
|
// Batch terminate stale instances
|
||||||
|
var staleIDs []string
|
||||||
|
var staleRecs []RunRecord
|
||||||
for _, item := range out.Items {
|
for _, item := range out.Items {
|
||||||
var rec RunRecord
|
var rec RunRecord
|
||||||
if err := attributevalue.UnmarshalMap(item, &rec); err != nil {
|
if err := attributevalue.UnmarshalMap(item, &rec); err != nil || rec.InstanceID == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
staleIDs = append(staleIDs, rec.InstanceID)
|
||||||
if rec.TaskArn != "" {
|
staleRecs = append(staleRecs, rec)
|
||||||
ecsClient.StopTask(ctx, &ecs.StopTaskInput{
|
|
||||||
Cluster: aws.String(c.ECSCluster),
|
|
||||||
Task: aws.String(rec.TaskArn),
|
|
||||||
Reason: aws.String("Timed out (cleanup cron)"),
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
killed := 0
|
||||||
|
if len(staleIDs) > 0 {
|
||||||
|
ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{
|
||||||
|
InstanceIds: staleIDs,
|
||||||
|
})
|
||||||
|
for _, rec := range staleRecs {
|
||||||
updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "timeout")
|
updateRunStatus(ctx, ddb, c.DDBTable, rec.Repo, rec.RunID, "timeout")
|
||||||
killed++
|
}
|
||||||
log.Printf("Killed stale run: %s/%s (started %d)", rec.Repo, rec.RunID, rec.StartedAt)
|
killed = len(staleIDs)
|
||||||
|
log.Printf("Killed %d stale instances", killed)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Also sweep any tinqs-ci tagged instances that somehow escaped DynamoDB
|
||||||
|
descOut, _ := ec2Client.DescribeInstances(ctx, &ec2.DescribeInstancesInput{
|
||||||
|
Filters: []ec2types.Filter{
|
||||||
|
{Name: aws.String("tag:tinqs-ci"), Values: []string{"true"}},
|
||||||
|
{Name: aws.String("instance-state-name"), Values: []string{"running", "pending"}},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
var orphanIDs []string
|
||||||
|
for _, res := range descOut.Reservations {
|
||||||
|
for _, inst := range res.Instances {
|
||||||
|
if inst.LaunchTime != nil && time.Since(*inst.LaunchTime) > 30*time.Minute {
|
||||||
|
orphanIDs = append(orphanIDs, *inst.InstanceId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(orphanIDs) > 0 {
|
||||||
|
ec2Client.TerminateInstances(ctx, &ec2.TerminateInstancesInput{
|
||||||
|
InstanceIds: orphanIDs,
|
||||||
|
})
|
||||||
|
log.Printf("Killed %d orphan instances (tag sweep)", len(orphanIDs))
|
||||||
|
killed += len(orphanIDs)
|
||||||
}
|
}
|
||||||
|
|
||||||
return respond(200, fmt.Sprintf(`{"status":"cleanup","killed":%d}`, killed))
|
return respond(200, fmt.Sprintf(`{"status":"cleanup","killed":%d}`, killed))
|
||||||
@@ -472,7 +515,7 @@ func handleCleanup(ctx context.Context, c cfg) (events.APIGatewayProxyResponse,
|
|||||||
|
|
||||||
// --- DynamoDB helpers ---
|
// --- DynamoDB helpers ---
|
||||||
|
|
||||||
func trackRun(ctx context.Context, c cfg, repo, runID, taskArn, workflow, label string) error {
|
func trackRun(ctx context.Context, c cfg, repo, runID, instanceID, workflow, label string) error {
|
||||||
awsCfg, _ := config.LoadDefaultConfig(ctx)
|
awsCfg, _ := config.LoadDefaultConfig(ctx)
|
||||||
ddb := dynamodb.NewFromConfig(awsCfg)
|
ddb := dynamodb.NewFromConfig(awsCfg)
|
||||||
|
|
||||||
@@ -480,7 +523,7 @@ func trackRun(ctx context.Context, c cfg, repo, runID, taskArn, workflow, label
|
|||||||
rec := RunRecord{
|
rec := RunRecord{
|
||||||
Repo: repo,
|
Repo: repo,
|
||||||
RunID: runID,
|
RunID: runID,
|
||||||
TaskArn: taskArn,
|
InstanceID: instanceID,
|
||||||
Status: "running",
|
Status: "running",
|
||||||
Workflow: workflow,
|
Workflow: workflow,
|
||||||
Label: label,
|
Label: label,
|
||||||
@@ -576,23 +619,21 @@ func fetchRawFile(baseURL, token, repoFullName, path, branch string) (string, er
|
|||||||
// --- Trigger evaluation ---
|
// --- Trigger evaluation ---
|
||||||
|
|
||||||
func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool {
|
func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool {
|
||||||
// Parse the on: section
|
|
||||||
onMap, ok := wf.On.(map[string]interface{})
|
onMap, ok := wf.On.(map[string]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
return true // simple trigger like on: push — always matches
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
pushCfg, ok := onMap["push"]
|
pushCfg, ok := onMap["push"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return false // no push trigger
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
pushMap, ok := pushCfg.(map[string]interface{})
|
pushMap, ok := pushCfg.(map[string]interface{})
|
||||||
if !ok {
|
if !ok {
|
||||||
return true // on: push without config — matches all
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check branches
|
|
||||||
if branches, ok := pushMap["branches"].([]interface{}); ok {
|
if branches, ok := pushMap["branches"].([]interface{}); ok {
|
||||||
matched := false
|
matched := false
|
||||||
for _, b := range branches {
|
for _, b := range branches {
|
||||||
@@ -606,7 +647,6 @@ func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check paths (if specified, at least one changed file must match)
|
|
||||||
if paths, ok := pushMap["paths"].([]interface{}); ok && len(changedFiles) > 0 {
|
if paths, ok := pushMap["paths"].([]interface{}); ok && len(changedFiles) > 0 {
|
||||||
matched := false
|
matched := false
|
||||||
for _, p := range paths {
|
for _, p := range paths {
|
||||||
@@ -626,7 +666,6 @@ func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check paths-ignore
|
|
||||||
if ignorePaths, ok := pushMap["paths-ignore"].([]interface{}); ok && len(changedFiles) > 0 {
|
if ignorePaths, ok := pushMap["paths-ignore"].([]interface{}); ok && len(changedFiles) > 0 {
|
||||||
allIgnored := true
|
allIgnored := true
|
||||||
for _, f := range changedFiles {
|
for _, f := range changedFiles {
|
||||||
@@ -651,7 +690,6 @@ func shouldTrigger(wf Workflow, branch string, changedFiles []string) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func matchPath(pattern, file string) bool {
|
func matchPath(pattern, file string) bool {
|
||||||
// Simple glob matching: foo/** matches foo/bar/baz, *.go matches main.go
|
|
||||||
if strings.HasSuffix(pattern, "/**") {
|
if strings.HasSuffix(pattern, "/**") {
|
||||||
prefix := strings.TrimSuffix(pattern, "/**")
|
prefix := strings.TrimSuffix(pattern, "/**")
|
||||||
return strings.HasPrefix(file, prefix+"/") || file == prefix
|
return strings.HasPrefix(file, prefix+"/") || file == prefix
|
||||||
|
|||||||
Reference in New Issue
Block a user