Files
ci/orchestrator/exec/main.go
T
ozan e96c7c5bf1 feat: CI orchestrator — Lambda dispatch + Fargate routing + cancel
dispatch: receives Gitea webhook, routes by runs-on label to Fargate
  tasks (go/node/docker/godot) or Lambda executor (deploy).
  Path filter evaluation, DynamoDB run tracking, cancel via StopTask.
exec: lightweight Lambda for deploy-only jobs (S3 sync, ECS update).
SAM template: API Gateway + 2 Lambdas + DynamoDB + cleanup cron.
2026-05-22 18:47:47 +01:00

234 lines
5.9 KiB
Go

// tinqs/ci orchestrator — executor Lambda
//
// Handles deploy-only jobs directly in Lambda (no Fargate needed).
// Parses workflow YAML, runs shell steps, reports commit status to Gitea.
// 15 min timeout, 2 GB memory, 5 GB /tmp.
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"time"
"github.com/aws/aws-lambda-go/lambda"
"gopkg.in/yaml.v3"
)
type ExecPayload struct {
Repo GiteaRepo `json:"repo"`
Ref string `json:"ref"`
CommitSHA string `json:"commit_sha"`
Pusher string `json:"pusher"`
Workflow string `json:"workflow_name"`
WorkflowYAML string `json:"workflow_yaml"`
GiteaURL string `json:"gitea_url"`
GiteaToken string `json:"gitea_token"`
}
type GiteaRepo struct {
FullName string `json:"full_name"`
CloneURL string `json:"clone_url"`
}
type Workflow struct {
Name string `yaml:"name"`
Jobs map[string]WorkflowJob `yaml:"jobs"`
}
type WorkflowJob struct {
Steps []WorkflowStep `yaml:"steps"`
Env map[string]string `yaml:"env"`
}
type WorkflowStep struct {
Name string `yaml:"name"`
Run string `yaml:"run"`
Uses string `yaml:"uses"`
Env map[string]string `yaml:"env"`
}
type StepResult struct {
Name string `json:"name"`
Status string `json:"status"`
Duration string `json:"duration"`
}
type JobResult struct {
Workflow string `json:"workflow"`
Status string `json:"status"`
Steps []StepResult `json:"steps"`
Duration string `json:"duration"`
}
func handler(ctx context.Context, payload ExecPayload) (JobResult, error) {
sha := payload.CommitSHA
if len(sha) > 7 {
sha = sha[:7]
}
log.Printf("Executor: repo=%s workflow=%s sha=%s", payload.Repo.FullName, payload.Workflow, sha)
start := time.Now()
var wf Workflow
if err := yaml.Unmarshal([]byte(payload.WorkflowYAML), &wf); err != nil {
return JobResult{Workflow: payload.Workflow, Status: "failure"}, err
}
setCommitStatus(payload, "pending", "CI running...")
overallStatus := "success"
var steps []StepResult
for _, job := range wf.Jobs {
// Build environment
env := os.Environ()
env = append(env,
"CI=true",
fmt.Sprintf("GITHUB_REPOSITORY=%s", payload.Repo.FullName),
fmt.Sprintf("GITHUB_REF=%s", payload.Ref),
fmt.Sprintf("GITHUB_SHA=%s", payload.CommitSHA),
)
for k, v := range job.Env {
env = append(env, fmt.Sprintf("%s=%s", k, v))
}
workdir := "/tmp/workspace"
os.MkdirAll(workdir, 0755)
os.RemoveAll(workdir)
os.MkdirAll(workdir, 0755)
for i, step := range job.Steps {
stepStart := time.Now()
name := step.Name
if name == "" {
name = fmt.Sprintf("Step %d", i+1)
}
// Handle checkout action
if step.Uses != "" && contains(step.Uses, "checkout") {
cloneURL := payload.Repo.CloneURL
if payload.GiteaToken != "" {
cloneURL = fmt.Sprintf("https://token:%s@%s",
payload.GiteaToken, cloneURL[8:]) // strip https://
}
branch := payload.Ref
if len(branch) > 11 && branch[:11] == "refs/heads/" {
branch = branch[11:]
}
cmd := exec.CommandContext(ctx, "git", "clone", "--depth=1", "--branch", branch, cloneURL, workdir)
if out, err := cmd.CombinedOutput(); err != nil {
log.Printf("Checkout failed: %s", string(out))
steps = append(steps, StepResult{Name: name, Status: "failure", Duration: since(stepStart)})
overallStatus = "failure"
break
}
steps = append(steps, StepResult{Name: name, Status: "success", Duration: since(stepStart)})
continue
}
// Skip unknown actions
if step.Uses != "" {
steps = append(steps, StepResult{Name: name, Status: "skipped", Duration: since(stepStart)})
continue
}
if step.Run == "" {
continue
}
// Run shell step
stepEnv := make([]string, len(env))
copy(stepEnv, env)
for k, v := range step.Env {
stepEnv = append(stepEnv, fmt.Sprintf("%s=%s", k, v))
}
cmd := exec.CommandContext(ctx, "bash", "-c", step.Run)
cmd.Dir = workdir
cmd.Env = stepEnv
var output bytes.Buffer
cmd.Stdout = io.MultiWriter(&output, os.Stdout)
cmd.Stderr = io.MultiWriter(&output, os.Stderr)
if err := cmd.Run(); err != nil {
log.Printf("Step %d FAILED: %v", i+1, err)
steps = append(steps, StepResult{Name: name, Status: "failure", Duration: since(stepStart)})
overallStatus = "failure"
break
}
steps = append(steps, StepResult{Name: name, Status: "success", Duration: since(stepStart)})
}
if overallStatus == "failure" {
break
}
}
desc := fmt.Sprintf("CI %s in %s", overallStatus, time.Since(start).Round(time.Second))
setCommitStatus(payload, overallStatus, desc)
return JobResult{
Workflow: payload.Workflow,
Status: overallStatus,
Steps: steps,
Duration: time.Since(start).Round(time.Second).String(),
}, nil
}
func setCommitStatus(payload ExecPayload, state, description string) {
if payload.GiteaURL == "" || payload.GiteaToken == "" {
return
}
url := fmt.Sprintf("%s/api/v1/repos/%s/statuses/%s",
payload.GiteaURL, payload.Repo.FullName, payload.CommitSHA)
body, _ := json.Marshal(map[string]string{
"state": state,
"description": description,
"context": fmt.Sprintf("ci/%s", payload.Workflow),
})
req, _ := http.NewRequest("POST", url, bytes.NewReader(body))
req.Header.Set("Authorization", "token "+payload.GiteaToken)
req.Header.Set("Content-Type", "application/json")
resp, err := (&http.Client{Timeout: 10 * time.Second}).Do(req)
if err != nil {
log.Printf("Failed to set commit status: %v", err)
return
}
resp.Body.Close()
}
func contains(s, sub string) bool {
return len(s) >= len(sub) && (s == sub || len(s) > 0 && containsStr(s, sub))
}
func containsStr(s, sub string) bool {
for i := 0; i <= len(s)-len(sub); i++ {
if s[i:i+len(sub)] == sub {
return true
}
}
return false
}
func since(t time.Time) string {
return time.Since(t).Round(time.Millisecond).String()
}
func main() {
lambda.Start(handler)
}