diff --git a/apps/api/openapi/openapi.json b/apps/api/openapi/openapi.json index f30995efb..9e14aaad4 100644 --- a/apps/api/openapi/openapi.json +++ b/apps/api/openapi/openapi.json @@ -3635,6 +3635,52 @@ ], "type": "object" }, + "WorkflowRunJob": { + "properties": { + "jobAgentId": { + "description": "Job agent the job was dispatched to", + "type": "string" + }, + "jobId": { + "description": "Job id; poll its status via GET /v1/workspaces/{workspaceId}/jobs/{jobId}", + "type": "string" + } + }, + "required": [ + "jobId", + "jobAgentId" + ], + "type": "object" + }, + "WorkflowRunResult": { + "properties": { + "id": { + "description": "Workflow run id", + "type": "string" + }, + "inputs": { + "additionalProperties": true, + "type": "object" + }, + "jobs": { + "description": "Jobs created and dispatched for this run", + "items": { + "$ref": "#/components/schemas/WorkflowRunJob" + }, + "type": "array" + }, + "workflowId": { + "type": "string" + } + }, + "required": [ + "id", + "workflowId", + "inputs", + "jobs" + ], + "type": "object" + }, "WorkflowSelectorArrayInput": { "properties": { "key": { @@ -6704,15 +6750,15 @@ "content": { "application/json": { "schema": { - "type": "object", - "required": [ - "status" - ], "properties": { "status": { "$ref": "#/components/schemas/JobStatus" } - } + }, + "required": [ + "status" + ], + "type": "object" } } }, @@ -10364,7 +10410,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/WorkflowRun" + "$ref": "#/components/schemas/WorkflowRunResult" } } }, @@ -10648,7 +10694,7 @@ "content": { "application/json": { "schema": { - "$ref": "#/components/schemas/WorkflowRun" + "$ref": "#/components/schemas/WorkflowRunResult" } } }, diff --git a/apps/api/openapi/paths/workflows.jsonnet b/apps/api/openapi/paths/workflows.jsonnet index d4916741d..053c0b517 100644 --- a/apps/api/openapi/paths/workflows.jsonnet +++ b/apps/api/openapi/paths/workflows.jsonnet @@ -88,7 +88,7 @@ local openapi = import '../lib/openapi.libsonnet'; }, }, }, - responses: openapi.createdResponse(openapi.schemaRef('WorkflowRun')) + responses: openapi.createdResponse(openapi.schemaRef('WorkflowRunResult')) + openapi.notFoundResponse() + openapi.badRequestResponse(), }, @@ -180,7 +180,7 @@ local openapi = import '../lib/openapi.libsonnet'; }, }, }, - responses: openapi.createdResponse(openapi.schemaRef('WorkflowRun')) + responses: openapi.createdResponse(openapi.schemaRef('WorkflowRunResult')) + openapi.notFoundResponse() + openapi.badRequestResponse(), }, diff --git a/apps/api/openapi/schemas/workflows.jsonnet b/apps/api/openapi/schemas/workflows.jsonnet index d3e4118f5..d5d12c8db 100644 --- a/apps/api/openapi/schemas/workflows.jsonnet +++ b/apps/api/openapi/schemas/workflows.jsonnet @@ -175,6 +175,33 @@ local openapi = import '../lib/openapi.libsonnet'; }, }, + WorkflowRunJob: { + type: 'object', + required: ['jobId', 'jobAgentId'], + properties: { + jobId: { type: 'string', description: 'Job id; poll its status via GET /v1/workspaces/{workspaceId}/jobs/{jobId}' }, + jobAgentId: { type: 'string', description: 'Job agent the job was dispatched to' }, + }, + }, + + WorkflowRunResult: { + type: 'object', + required: ['id', 'workflowId', 'inputs', 'jobs'], + properties: { + id: { type: 'string', description: 'Workflow run id' }, + workflowId: { type: 'string' }, + inputs: { + type: 'object', + additionalProperties: true, + }, + jobs: { + type: 'array', + description: 'Jobs created and dispatched for this run', + items: openapi.schemaRef('WorkflowRunJob'), + }, + }, + }, + WorkflowSlugConflictResponse: { type: 'object', required: ['message', 'code', 'details'], diff --git a/apps/api/src/routes/v1/workspaces/workflows.ts b/apps/api/src/routes/v1/workspaces/workflows.ts index b47bf55ab..2d19cbf14 100644 --- a/apps/api/src/routes/v1/workspaces/workflows.ts +++ b/apps/api/src/routes/v1/workspaces/workflows.ts @@ -8,6 +8,32 @@ import { db } from "@ctrlplane/db/client"; import * as schema from "@ctrlplane/db/schema"; import { getClientFor } from "@ctrlplane/workspace-engine-sdk"; +import { validResourceSelector } from "../valid-selector.js"; + +const RESOURCE_SELECTOR_INPUT_KEY = "resourceSelector"; + +type WorkflowCelBody = { + jobAgents?: { name?: string; selector?: string | null }[]; + inputs?: { key?: string; default?: unknown; selector?: { default?: unknown } | null }[]; +}; + +const assertValidWorkflowCel = (body: WorkflowCelBody) => { + const check = (expr: unknown, label: string) => { + if (typeof expr !== "string" || expr.trim() === "") return; + if (!validResourceSelector(expr)) + throw new ApiError(`Invalid CEL expression for ${label}`, 400, "INVALID_CEL"); + }; + + for (const agent of body.jobAgents ?? []) + check(agent.selector, `job agent '${agent.name ?? "?"}'`); + + for (const input of body.inputs ?? []) { + if (input.key === RESOURCE_SELECTOR_INPUT_KEY) + check(input.default, `input '${input.key}'`); + check(input.selector?.default, `input '${input.key ?? "?"}' selector`); + } +}; + const slugifyWorkflowName = (name: string) => name .toLowerCase() @@ -105,6 +131,7 @@ const createWorkflow: AsyncTypedHandler< "post" > = async (req, res) => { const { workspaceId } = req.params; + assertValidWorkflowCel(req.body); const slug = resolveSlug(req.body.slug, req.body.name); const created = await db @@ -167,6 +194,7 @@ const updateWorkflow: AsyncTypedHandler< "put" > = async (req, res) => { const { workflowId, workspaceId } = req.params; + assertValidWorkflowCel(req.body); if (req.body.slug != null) { const result = slugSchema.safeParse(req.body.slug); diff --git a/apps/api/src/types/openapi.ts b/apps/api/src/types/openapi.ts index ac944cde5..af25de2d3 100644 --- a/apps/api/src/types/openapi.ts +++ b/apps/api/src/types/openapi.ts @@ -2499,6 +2499,22 @@ export interface components { }; workflowId: string; }; + WorkflowRunJob: { + /** @description Job agent the job was dispatched to */ + jobAgentId: string; + /** @description Job id; poll its status via GET /v1/workspaces/{workspaceId}/jobs/{jobId} */ + jobId: string; + }; + WorkflowRunResult: { + /** @description Workflow run id */ + id: string; + inputs: { + [key: string]: unknown; + }; + /** @description Jobs created and dispatched for this run */ + jobs: components["schemas"]["WorkflowRunJob"][]; + workflowId: string; + }; WorkflowSelectorArrayInput: { key: string; selector: { @@ -6803,7 +6819,7 @@ export interface operations { [name: string]: unknown; }; content: { - "application/json": components["schemas"]["WorkflowRun"]; + "application/json": components["schemas"]["WorkflowRunResult"]; }; }; /** @description Invalid request */ @@ -6997,7 +7013,7 @@ export interface operations { [name: string]: unknown; }; content: { - "application/json": components["schemas"]["WorkflowRun"]; + "application/json": components["schemas"]["WorkflowRunResult"]; }; }; /** @description Invalid request */ diff --git a/apps/workspace-engine/svc/http/server/openapi/workflows/dispatch.go b/apps/workspace-engine/svc/http/server/openapi/workflows/dispatch.go new file mode 100644 index 000000000..7ab3e7506 --- /dev/null +++ b/apps/workspace-engine/svc/http/server/openapi/workflows/dispatch.go @@ -0,0 +1,91 @@ +package workflows + +import ( + "context" + "fmt" + + "workspace-engine/pkg/db" + "workspace-engine/pkg/oapi" + "workspace-engine/pkg/selector" +) + +type plannedDispatch struct { + runner db.JobAgent + mergedConfig oapi.JobAgentConfig + dispatchCtx *oapi.DispatchContext +} + +func buildDispatchContext(workflow *oapi.Workflow, inputs map[string]any) *oapi.DispatchContext { + return &oapi.DispatchContext{ + Workflow: workflow, + Inputs: &inputs, + } +} + +func planDispatches( + ctx context.Context, + base *oapi.DispatchContext, + resources []*oapi.Resource, + jobAgents []oapi.WorkflowJobAgent, + runners map[string]db.JobAgent, +) ([]plannedDispatch, error) { + planned := make([]plannedDispatch, 0, len(resources)*len(jobAgents)) + for _, resource := range resources { + resourceCtx := *base + resourceCtx.Resource = resource + + matchResource := resource + if matchResource == nil { + matchResource = &oapi.Resource{} + } + + for _, jobAgent := range jobAgents { + runner := runners[jobAgent.Ref] + mergedConfig := mergeWorkflowJobAgentConfig(runner.Config, jobAgent.Config) + dispatchCtx := buildJobDispatchContext(&resourceCtx, runner, mergedConfig) + + matched, err := selector.MatchJobAgentsWithResource( + ctx, + jobAgent.Selector, + []oapi.JobAgent{dispatchCtx.JobAgent}, + matchResource, + ) + if err != nil { + return nil, fmt.Errorf("match selector: %w", err) + } + if len(matched) == 0 { + continue + } + + planned = append(planned, plannedDispatch{ + runner: runner, + mergedConfig: mergedConfig, + dispatchCtx: dispatchCtx, + }) + } + } + return planned, nil +} + +func mergeWorkflowJobAgentConfig( + runnerConfig, perJobConfig oapi.JobAgentConfig, +) oapi.JobAgentConfig { + return oapi.DeepMergeConfigs(runnerConfig, perJobConfig) +} + +func buildJobDispatchContext( + base *oapi.DispatchContext, + runner db.JobAgent, + mergedConfig oapi.JobAgentConfig, +) *oapi.DispatchContext { + out := *base + out.JobAgent = oapi.JobAgent{ + Id: runner.ID.String(), + WorkspaceId: runner.WorkspaceID.String(), + Name: runner.Name, + Type: runner.Type, + Config: runner.Config, + } + out.JobAgentConfig = mergedConfig + return &out +} diff --git a/apps/workspace-engine/svc/http/server/openapi/workflows/getters.go b/apps/workspace-engine/svc/http/server/openapi/workflows/getters.go index 010eef427..7e9929c42 100644 --- a/apps/workspace-engine/svc/http/server/openapi/workflows/getters.go +++ b/apps/workspace-engine/svc/http/server/openapi/workflows/getters.go @@ -8,10 +8,17 @@ import ( "github.com/google/uuid" "workspace-engine/pkg/db" "workspace-engine/pkg/oapi" + "workspace-engine/pkg/store/resources" ) type Getter interface { GetWorkflowByID(ctx context.Context, workflowID string) (*oapi.Workflow, error) + GetResourcesMatching(ctx context.Context, workspaceID, sel string) ([]*oapi.Resource, error) + GetJobAgentsByRef( + ctx context.Context, + workspaceID string, + jobAgents []oapi.WorkflowJobAgent, + ) (map[string]db.JobAgent, error) } type PostgresGetter struct{} @@ -65,3 +72,49 @@ func (g *PostgresGetter) GetWorkflowByID( Jobs: jobs, }, nil } + +func (g *PostgresGetter) GetResourcesMatching( + ctx context.Context, + workspaceID, sel string, +) ([]*oapi.Resource, error) { + if sel == "" { + return []*oapi.Resource{nil}, nil + } + store := &resources.PostgresGetResources{} + return store.GetResources(ctx, workspaceID, resources.GetResourcesOptions{CEL: sel}) +} + +func (g *PostgresGetter) GetJobAgentsByRef( + ctx context.Context, + workspaceID string, + jobAgents []oapi.WorkflowJobAgent, +) (map[string]db.JobAgent, error) { + workspaceUUID, err := uuid.Parse(workspaceID) + if err != nil { + return nil, fmt.Errorf("parse workspace id: %w", err) + } + + queries := db.GetQueries(ctx) + runners := make(map[string]db.JobAgent, len(jobAgents)) + for _, jobAgent := range jobAgents { + if _, ok := runners[jobAgent.Ref]; ok { + continue + } + runnerID, err := uuid.Parse(jobAgent.Ref) + if err != nil { + return nil, fmt.Errorf("parse job agent id: %w", err) + } + runner, err := queries.GetJobAgentByID(ctx, runnerID) + if err != nil { + return nil, fmt.Errorf("get job agent: %w", err) + } + if runner.WorkspaceID != workspaceUUID { + return nil, fmt.Errorf( + "job agent %s does not belong to workspace %s", + runnerID, workspaceUUID, + ) + } + runners[jobAgent.Ref] = runner + } + return runners, nil +} diff --git a/apps/workspace-engine/svc/http/server/openapi/workflows/setters.go b/apps/workspace-engine/svc/http/server/openapi/workflows/setters.go index adfb9330c..9f0b6c186 100644 --- a/apps/workspace-engine/svc/http/server/openapi/workflows/setters.go +++ b/apps/workspace-engine/svc/http/server/openapi/workflows/setters.go @@ -11,15 +11,15 @@ import ( "workspace-engine/pkg/db" "workspace-engine/pkg/oapi" "workspace-engine/pkg/reconcile" - "workspace-engine/pkg/selector" ) type Setter interface { - CreateWorkflowRun( + PersistWorkflowRun( ctx context.Context, workspaceID string, - workflow *oapi.Workflow, + workflowID string, inputs map[string]any, + dispatches []plannedDispatch, ) (*oapi.WorkflowRunResult, error) } @@ -33,25 +33,14 @@ func NewPostgresSetter(queue reconcile.Queue) *PostgresSetter { return &PostgresSetter{queue: queue} } -func (s *PostgresSetter) buildDispatchContext( - workflow *oapi.Workflow, - inputs map[string]any, -) *oapi.DispatchContext { - return &oapi.DispatchContext{ - Workflow: workflow, - Inputs: &inputs, - } -} - -func (s *PostgresSetter) CreateWorkflowRun( +func (s *PostgresSetter) PersistWorkflowRun( ctx context.Context, workspaceID string, - workflow *oapi.Workflow, + workflowID string, inputs map[string]any, + dispatches []plannedDispatch, ) (*oapi.WorkflowRunResult, error) { - dispatchContext := s.buildDispatchContext(workflow, inputs) - - workflowIDUUID, err := uuid.Parse(workflow.Id) + workflowIDUUID, err := uuid.Parse(workflowID) if err != nil { return nil, fmt.Errorf("parse workflow id: %w", err) } @@ -72,23 +61,9 @@ func (s *PostgresSetter) CreateWorkflowRun( return nil, fmt.Errorf("insert workflow run: %w", err) } - jobs := make([]oapi.WorkflowRunJob, 0, len(workflow.Jobs)) - for _, jobAgent := range workflow.Jobs { - isMatchingSelector, err := selector.Match(ctx, jobAgent.Selector, *dispatchContext) - if err != nil { - return nil, fmt.Errorf("match selector: %w", err) - } - if !isMatchingSelector { - continue - } - job, err := s.dispatchJobForAgent( - ctx, - queries, - jobAgent, - workflowRun.ID, - dispatchContext, - workspaceID, - ) + jobs := make([]oapi.WorkflowRunJob, 0, len(dispatches)) + for _, d := range dispatches { + job, err := insertJob(ctx, queries, d, workflowRun.ID) if err != nil { return nil, err } @@ -99,81 +74,36 @@ func (s *PostgresSetter) CreateWorkflowRun( return nil, fmt.Errorf("commit transaction: %w", err) } + for _, job := range jobs { + if err := s.queue.Enqueue(ctx, reconcile.EnqueueParams{ + WorkspaceID: workspaceID, + Kind: "job-dispatch", + ScopeType: "job", + ScopeID: job.JobId, + }); err != nil { + return nil, fmt.Errorf("enqueue job dispatch: %w", err) + } + } + return &oapi.WorkflowRunResult{ Id: workflowRun.ID.String(), - WorkflowId: workflow.Id, + WorkflowId: workflowID, Inputs: inputs, Jobs: jobs, }, nil } -// mergeWorkflowJobAgentConfig builds the JobAgentConfig that ends up on a -// workflow-triggered job. The runner row holds shared credentials (e.g. -// serverUrl, apiKey); the per-job WorkflowJobAgent.Config holds the -// per-invocation payload (e.g. template, name). Per-job values win on -// conflict, mirroring the deployment flow's runner < deployment < version -// precedence in jobeligibility. -func mergeWorkflowJobAgentConfig( - runnerConfig, perJobConfig oapi.JobAgentConfig, -) oapi.JobAgentConfig { - return oapi.DeepMergeConfigs(runnerConfig, perJobConfig) -} - -// buildJobDispatchContext returns a per-job copy of the base dispatch context -// with the resolved JobAgent and merged JobAgentConfig populated. Dispatchers -// (argo-workflow, argo-cd, github, …) read these fields off DispatchContext, -// not off the Job row, so they must be set here before the context is -// persisted alongside the job. -func buildJobDispatchContext( - base *oapi.DispatchContext, - runner db.JobAgent, - mergedConfig oapi.JobAgentConfig, -) *oapi.DispatchContext { - out := *base - out.JobAgent = oapi.JobAgent{ - Id: runner.ID.String(), - WorkspaceId: runner.WorkspaceID.String(), - Name: runner.Name, - Type: runner.Type, - Config: runner.Config, - } - out.JobAgentConfig = mergedConfig - return &out -} - -func (s *PostgresSetter) dispatchJobForAgent( +func insertJob( ctx context.Context, queries *db.Queries, - jobAgent oapi.WorkflowJobAgent, + d plannedDispatch, workflowRunID uuid.UUID, - dispatchContext *oapi.DispatchContext, - workspaceID string, ) (oapi.WorkflowRunJob, error) { - jobAgentIDUUID, err := uuid.Parse(jobAgent.Ref) - if err != nil { - return oapi.WorkflowRunJob{}, fmt.Errorf("parse job agent id: %w", err) - } - workspaceIDUUID, err := uuid.Parse(workspaceID) - if err != nil { - return oapi.WorkflowRunJob{}, fmt.Errorf("parse workspace id: %w", err) - } - runner, err := queries.GetJobAgentByID(ctx, jobAgentIDUUID) - if err != nil { - return oapi.WorkflowRunJob{}, fmt.Errorf("get job agent: %w", err) - } - if runner.WorkspaceID != workspaceIDUUID { - return oapi.WorkflowRunJob{}, fmt.Errorf( - "job agent %s does not belong to workspace %s", - jobAgentIDUUID, workspaceIDUUID, - ) - } - mergedConfig := mergeWorkflowJobAgentConfig(runner.Config, jobAgent.Config) - jobAgentConfig, err := json.Marshal(mergedConfig) + jobAgentConfig, err := json.Marshal(d.mergedConfig) if err != nil { return oapi.WorkflowRunJob{}, fmt.Errorf("marshal job agent config: %w", err) } - jobDispatchContext := buildJobDispatchContext(dispatchContext, runner, mergedConfig) - dispatchContextJSON, err := json.Marshal(jobDispatchContext) + dispatchContextJSON, err := json.Marshal(d.dispatchCtx) if err != nil { return oapi.WorkflowRunJob{}, fmt.Errorf("marshal dispatch context: %w", err) } @@ -184,7 +114,7 @@ func (s *PostgresSetter) dispatchJobForAgent( if err := queries.InsertJob(ctx, db.InsertJobParams{ ID: jobID, Status: db.JobStatusPending, - JobAgentID: pgtype.UUID{Bytes: jobAgentIDUUID, Valid: true}, + JobAgentID: pgtype.UUID{Bytes: d.runner.ID, Valid: true}, JobAgentConfig: jobAgentConfig, DispatchContext: dispatchContextJSON, CreatedAt: now, @@ -200,17 +130,8 @@ func (s *PostgresSetter) dispatchJobForAgent( return oapi.WorkflowRunJob{}, fmt.Errorf("insert workflow job: %w", err) } - if err := s.queue.Enqueue(ctx, reconcile.EnqueueParams{ - WorkspaceID: workspaceID, - Kind: "job-dispatch", - ScopeType: "job", - ScopeID: jobID.String(), - }); err != nil { - return oapi.WorkflowRunJob{}, fmt.Errorf("enqueue job dispatch: %w", err) - } - return oapi.WorkflowRunJob{ JobId: jobID.String(), - JobAgentId: jobAgentIDUUID.String(), + JobAgentId: d.runner.ID.String(), }, nil } diff --git a/apps/workspace-engine/svc/http/server/openapi/workflows/workflows.go b/apps/workspace-engine/svc/http/server/openapi/workflows/workflows.go index 2b0215db5..74ed572b8 100644 --- a/apps/workspace-engine/svc/http/server/openapi/workflows/workflows.go +++ b/apps/workspace-engine/svc/http/server/openapi/workflows/workflows.go @@ -11,6 +11,12 @@ import ( "workspace-engine/pkg/reconcile/postgres" ) +// resourceSelectorInputKey is the reserved input key whose value is a CEL +// expression selecting the resources a run fans out over. One set of jobs is +// dispatched per matched resource. Absent or empty means a single, non-fanned- +// out run. +const resourceSelectorInputKey = "resourceSelector" + type Workflows struct { getter Getter setter Setter @@ -76,7 +82,9 @@ func (w *Workflows) CreateWorkflowRun( workspaceId string, workflowId string, ) { - workflow, err := w.getter.GetWorkflowByID(c.Request.Context(), workflowId) + ctx := c.Request.Context() + + workflow, err := w.getter.GetWorkflowByID(ctx, workflowId) if err != nil { c.JSON(http.StatusNotFound, gin.H{"error": err.Error()}) return @@ -94,16 +102,46 @@ func (w *Workflows) CreateWorkflowRun( return } - result, err := w.setter.CreateWorkflowRun( - c.Request.Context(), - workspaceId, - workflow, - inputs, + resourceSelector := "" + if raw, ok := inputs[resourceSelectorInputKey]; ok { + sel, isString := raw.(string) + if !isString { + c.JSON(http.StatusBadRequest, gin.H{ + "error": fmt.Sprintf("%s must be a string", resourceSelectorInputKey), + }) + return + } + resourceSelector = sel + } + resources, err := w.getter.GetResourcesMatching(ctx, workspaceId, resourceSelector) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + runners, err := w.getter.GetJobAgentsByRef(ctx, workspaceId, workflow.Jobs) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + + dispatches, err := planDispatches( + ctx, + buildDispatchContext(workflow, inputs), + resources, + workflow.Jobs, + runners, ) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } + result, err := w.setter.PersistWorkflowRun(ctx, workspaceId, workflow.Id, inputs, dispatches) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, result) } diff --git a/apps/workspace-engine/svc/http/server/openapi/workflows/workflows_test.go b/apps/workspace-engine/svc/http/server/openapi/workflows/workflows_test.go index 7f5df5488..c0e81beba 100644 --- a/apps/workspace-engine/svc/http/server/openapi/workflows/workflows_test.go +++ b/apps/workspace-engine/svc/http/server/openapi/workflows/workflows_test.go @@ -1,6 +1,7 @@ package workflows import ( + "context" "testing" "github.com/google/uuid" @@ -10,6 +11,109 @@ import ( "workspace-engine/pkg/oapi" ) +func TestGetResourcesMatching_EmptySelectorReturnsSingleNil(t *testing.T) { + getter := &PostgresGetter{} + resources, err := getter.GetResourcesMatching(context.Background(), uuid.New().String(), "") + + require.NoError(t, err) + require.Len(t, resources, 1) + assert.Nil(t, resources[0]) +} + +const argoRoutingSelector = "resource.config.argo.server.contains(jobAgent.config.serverUrl)" + +func resourceOnServer(name, server string) *oapi.Resource { + return &oapi.Resource{ + Id: uuid.New().String(), + Name: name, + Config: map[string]any{ + "argo": map[string]any{"server": server}, + }, + } +} + +func argoAgent(serverURL string) (oapi.WorkflowJobAgent, db.JobAgent) { + ref := uuid.New() + agent := oapi.WorkflowJobAgent{ + Ref: ref.String(), + Name: "delete-on-" + serverURL, + Selector: argoRoutingSelector, + Config: map[string]any{}, + } + runner := db.JobAgent{ + ID: ref, + Config: oapi.JobAgentConfig{"serverUrl": serverURL}, + } + return agent, runner +} + +func TestPlanDispatches_RoutesEachResourceToItsServer(t *testing.T) { + prodAgent, prodRunner := argoAgent("argocd.prod.example.com") + stagingAgent, stagingRunner := argoAgent("argocd.staging.example.com") + + resources := []*oapi.Resource{ + resourceOnServer("r1", "https://argocd.prod.example.com"), + resourceOnServer("r2", "https://argocd.prod.example.com"), + resourceOnServer("r3", "https://argocd.staging.example.com"), + } + runners := map[string]db.JobAgent{ + prodAgent.Ref: prodRunner, + stagingAgent.Ref: stagingRunner, + } + base := &oapi.DispatchContext{Workflow: &oapi.Workflow{Id: uuid.New().String()}} + + dispatches, err := planDispatches( + context.Background(), base, resources, + []oapi.WorkflowJobAgent{prodAgent, stagingAgent}, runners, + ) + + require.NoError(t, err) + require.Len(t, dispatches, 3) // each resource matches exactly one server + + for _, d := range dispatches { + server := d.dispatchCtx.Resource.Config["argo"].(map[string]any)["server"].(string) + serverURL := d.runner.Config["serverUrl"].(string) + assert.Contains(t, server, serverURL, "resource routed to the wrong server") + } +} + +func TestPlanDispatches_NoMatchingServerYieldsNoDispatches(t *testing.T) { + prodAgent, prodRunner := argoAgent("argocd.prod.example.com") + + resources := []*oapi.Resource{resourceOnServer("r1", "https://argocd.other.example.com")} + runners := map[string]db.JobAgent{prodAgent.Ref: prodRunner} + base := &oapi.DispatchContext{Workflow: &oapi.Workflow{Id: uuid.New().String()}} + + dispatches, err := planDispatches( + context.Background(), base, resources, + []oapi.WorkflowJobAgent{prodAgent}, runners, + ) + + require.NoError(t, err) + assert.Empty(t, dispatches) +} + +func TestPlanDispatches_NilResourceRunsGateOnce(t *testing.T) { + ref := uuid.New() + agent := oapi.WorkflowJobAgent{ + Ref: ref.String(), + Name: "always", + Selector: "true", + Config: map[string]any{}, + } + runners := map[string]db.JobAgent{ref.String(): {ID: ref}} + base := &oapi.DispatchContext{Workflow: &oapi.Workflow{Id: uuid.New().String()}} + + dispatches, err := planDispatches( + context.Background(), base, + []*oapi.Resource{nil}, []oapi.WorkflowJobAgent{agent}, runners, + ) + + require.NoError(t, err) + require.Len(t, dispatches, 1) + assert.Nil(t, dispatches[0].dispatchCtx.Resource) +} + func stringInput(key string, def *string) oapi.WorkflowInput { var input oapi.WorkflowInput _ = input.FromWorkflowStringInput(oapi.WorkflowStringInput{ diff --git a/e2e/api/schema.ts b/e2e/api/schema.ts index ac944cde5..af25de2d3 100644 --- a/e2e/api/schema.ts +++ b/e2e/api/schema.ts @@ -2499,6 +2499,22 @@ export interface components { }; workflowId: string; }; + WorkflowRunJob: { + /** @description Job agent the job was dispatched to */ + jobAgentId: string; + /** @description Job id; poll its status via GET /v1/workspaces/{workspaceId}/jobs/{jobId} */ + jobId: string; + }; + WorkflowRunResult: { + /** @description Workflow run id */ + id: string; + inputs: { + [key: string]: unknown; + }; + /** @description Jobs created and dispatched for this run */ + jobs: components["schemas"]["WorkflowRunJob"][]; + workflowId: string; + }; WorkflowSelectorArrayInput: { key: string; selector: { @@ -6803,7 +6819,7 @@ export interface operations { [name: string]: unknown; }; content: { - "application/json": components["schemas"]["WorkflowRun"]; + "application/json": components["schemas"]["WorkflowRunResult"]; }; }; /** @description Invalid request */ @@ -6997,7 +7013,7 @@ export interface operations { [name: string]: unknown; }; content: { - "application/json": components["schemas"]["WorkflowRun"]; + "application/json": components["schemas"]["WorkflowRunResult"]; }; }; /** @description Invalid request */ diff --git a/e2e/tests/api/workflows.spec.ts b/e2e/tests/api/workflows.spec.ts index 0fc3814fc..55c58627e 100644 --- a/e2e/tests/api/workflows.spec.ts +++ b/e2e/tests/api/workflows.spec.ts @@ -1,5 +1,6 @@ import { faker } from "@faker-js/faker"; import { expect } from "@playwright/test"; +import { v4 as uuidv4 } from "uuid"; import type { components } from "../../api/schema"; import { test } from "../fixtures"; @@ -402,4 +403,257 @@ test.describe("Workflow API", () => { ); expect(getRes.response.status).toBe(404); }); + + test("should reject creating a workflow with an invalid CEL job agent selector", async ({ + api, + workspace, + }) => { + const createRes = await api.POST( + "/v1/workspaces/{workspaceId}/workflows", + { + params: { path: { workspaceId: workspace.id } }, + body: { + name: `Bad CEL ${faker.string.alphanumeric(8)}`, + inputs: [], + jobAgents: [ + { name: "agent", ref: uuidv4(), config: {}, selector: "((( not valid" }, + ], + }, + }, + ); + expect(createRes.response.status).toBe(400); + }); + + test("should reject creating a workflow with an invalid CEL resourceSelector input", async ({ + api, + workspace, + }) => { + const createRes = await api.POST( + "/v1/workspaces/{workspaceId}/workflows", + { + params: { path: { workspaceId: workspace.id } }, + body: { + name: `Bad CEL ${faker.string.alphanumeric(8)}`, + inputs: [ + { key: "resourceSelector", type: "string", default: "((( not valid" }, + ], + jobAgents: [], + }, + }, + ); + expect(createRes.response.status).toBe(400); + }); + + test("should reject updating a workflow with an invalid CEL job agent selector", async ({ + api, + workspace, + }) => { + const createRes = await api.POST( + "/v1/workspaces/{workspaceId}/workflows", + { + params: { path: { workspaceId: workspace.id } }, + body: { + name: `Update CEL ${faker.string.alphanumeric(8)}`, + inputs: [], + jobAgents: [{ name: "agent", ref: uuidv4(), config: {}, selector: "true" }], + }, + }, + ); + expect(createRes.response.status).toBe(201); + const workflowId = createRes.data!.id; + + try { + const updateRes = await api.PUT( + "/v1/workspaces/{workspaceId}/workflows/{workflowId}", + { + params: { path: { workspaceId: workspace.id, workflowId } }, + body: { + name: createRes.data!.name, + inputs: [], + jobAgents: [ + { name: "agent", ref: uuidv4(), config: {}, selector: "((( not valid" }, + ], + }, + }, + ); + expect(updateRes.response.status).toBe(400); + } finally { + await api.DELETE( + "/v1/workspaces/{workspaceId}/workflows/{workflowId}", + { params: { path: { workspaceId: workspace.id, workflowId } } }, + ); + } + }); + + test("should accept a workflow with a valid CEL job agent selector", async ({ + api, + workspace, + }) => { + const createRes = await api.POST( + "/v1/workspaces/{workspaceId}/workflows", + { + params: { path: { workspaceId: workspace.id } }, + body: { + name: `Good CEL ${faker.string.alphanumeric(8)}`, + inputs: [], + jobAgents: [ + { + name: "agent", + ref: uuidv4(), + config: {}, + selector: + "resource.config.argo.server.contains(jobAgent.config.serverUrl)", + }, + ], + }, + }, + ); + expect(createRes.response.status).toBe(201); + await api.DELETE( + "/v1/workspaces/{workspaceId}/workflows/{workflowId}", + { + params: { + path: { workspaceId: workspace.id, workflowId: createRes.data!.id }, + }, + }, + ); + }); + + test("should fan a run out to one job per matched resource, routed by server", async ({ + api, + workspace, + }) => { + const suffix = faker.string.alphanumeric(8); + const prodServer = `argocd-prod-${suffix}.example.com`; + const stagingServer = `argocd-staging-${suffix}.example.com`; + const kind = `ArgoApp${suffix}`; + const routingSelector = + "resource.config.argo.server.contains(jobAgent.config.serverUrl)"; + + // Two registered job agents, each holding one server's URL in its config — + // the selector routes each resource to the agent whose serverUrl it matches. + const prodAgentId = uuidv4(); + const stagingAgentId = uuidv4(); + for (const [jobAgentId, serverUrl] of [ + [prodAgentId, prodServer], + [stagingAgentId, stagingServer], + ] as const) { + const agentRes = await api.PUT( + "/v1/workspaces/{workspaceId}/job-agents/{jobAgentId}", + { + params: { path: { workspaceId: workspace.id, jobAgentId } }, + body: { + name: `argo-${jobAgentId.slice(0, 8)}`, + type: "argo-cd", + config: { serverUrl }, + }, + }, + ); + expect(agentRes.response.status).toBe(202); + } + + const providerName = `wf-fanout-${suffix}`; + const providerRes = await api.PUT( + "/v1/workspaces/{workspaceId}/resource-providers", + { + params: { path: { workspaceId: workspace.id } }, + body: { id: uuidv4(), name: providerName }, + }, + ); + expect(providerRes.response.status).toBe(202); + const providerId = providerRes.data!.id; + + let workflowId: string | undefined; + try { + const resource = (label: string, server: string) => ({ + createdAt: new Date().toISOString(), + identifier: `${kind}-${label}-${suffix}`, + name: `${kind}-${label}`, + kind, + version: "1.0.0", + config: { argo: { server: `https://${server}` } }, + metadata: {}, + }); + const setRes = await api.PUT( + "/v1/workspaces/{workspaceId}/resource-providers/{providerId}/set", + { + params: { path: { workspaceId: workspace.id, providerId } }, + body: { + resources: [ + resource("a", prodServer), + resource("b", prodServer), + resource("c", stagingServer), + ], + }, + }, + ); + expect(setRes.response.status).toBe(202); + + // Barrier: ensure the three resources are queryable before the run. + const listRes = await api.GET( + "/v1/workspaces/{workspaceId}/resource-providers/name/{name}/resources", + { params: { path: { workspaceId: workspace.id, name: providerName } } }, + ); + expect(listRes.data!.items).toHaveLength(3); + + const createRes = await api.POST( + "/v1/workspaces/{workspaceId}/workflows", + { + params: { path: { workspaceId: workspace.id } }, + body: { + name: `Delete Argo ${suffix}`, + inputs: [ + { + key: "resourceSelector", + type: "string", + default: `resource.kind == '${kind}'`, + }, + ], + jobAgents: [ + { name: "prod", ref: prodAgentId, config: {}, selector: routingSelector }, + { name: "staging", ref: stagingAgentId, config: {}, selector: routingSelector }, + ], + }, + }, + ); + expect(createRes.response.status).toBe(201); + workflowId = createRes.data!.id; + + const runRes = await api.POST( + "/v1/workspaces/{workspaceId}/workflows/{workflowId}/runs", + { + params: { path: { workspaceId: workspace.id, workflowId } }, + body: { inputs: {} }, + }, + ); + expect(runRes.response.status).toBe(201); + + const jobs = runRes.data!.jobs; + expect(jobs).toHaveLength(3); // one per matched resource + + const jobsPerAgent = jobs.reduce>((acc, job) => { + acc[job.jobAgentId] = (acc[job.jobAgentId] ?? 0) + 1; + return acc; + }, {}); + expect(jobsPerAgent[prodAgentId]).toBe(2); // two prod resources + expect(jobsPerAgent[stagingAgentId]).toBe(1); // one staging resource + } finally { + if (workflowId != null) + await api.DELETE( + "/v1/workspaces/{workspaceId}/workflows/{workflowId}", + { params: { path: { workspaceId: workspace.id, workflowId } } }, + ); + await api.PUT( + "/v1/workspaces/{workspaceId}/resource-providers/{providerId}/set", + { + params: { path: { workspaceId: workspace.id, providerId } }, + body: { resources: [] }, + }, + ); + await api.DELETE( + "/v1/workspaces/{workspaceId}/resource-providers/name/{name}", + { params: { path: { workspaceId: workspace.id, name: providerName } } }, + ); + } + }); }); diff --git a/packages/workspace-engine-sdk/src/schema.ts b/packages/workspace-engine-sdk/src/schema.ts index 10ce93fd5..6ad613231 100644 --- a/packages/workspace-engine-sdk/src/schema.ts +++ b/packages/workspace-engine-sdk/src/schema.ts @@ -1413,6 +1413,22 @@ export interface components { }; workflowId: string; }; + WorkflowRunJob: { + /** @description Job agent the job was dispatched to */ + jobAgentId: string; + /** @description Job id; poll its status via GET /v1/workspaces/{workspaceId}/jobs/{jobId} */ + jobId: string; + }; + WorkflowRunResult: { + /** @description Workflow run id */ + id: string; + inputs: { + [key: string]: unknown; + }; + /** @description Jobs created and dispatched for this run */ + jobs: components["schemas"]["WorkflowRunJob"][]; + workflowId: string; + }; WorkflowStringInput: { default?: string; key: string; @@ -1868,7 +1884,7 @@ export interface operations { [name: string]: unknown; }; content: { - "application/json": components["schemas"]["WorkflowRun"]; + "application/json": components["schemas"]["WorkflowRunResult"]; }; }; /** @description Invalid request */