Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 53 additions & 7 deletions apps/api/openapi/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -3635,6 +3635,52 @@
],
"type": "object"
},
"WorkflowRunJob": {
"properties": {
"jobAgentId": {
"description": "Job agent the job was dispatched to",
"type": "string"
},
"jobId": {
"description": "Job id; poll its status via GET /v1/workspaces/{workspaceId}/jobs/{jobId}",
"type": "string"
}
},
"required": [
"jobId",
"jobAgentId"
],
"type": "object"
},
"WorkflowRunResult": {
"properties": {
"id": {
"description": "Workflow run id",
"type": "string"
},
"inputs": {
"additionalProperties": true,
"type": "object"
},
"jobs": {
"description": "Jobs created and dispatched for this run",
"items": {
"$ref": "#/components/schemas/WorkflowRunJob"
},
"type": "array"
},
"workflowId": {
"type": "string"
}
},
"required": [
"id",
"workflowId",
"inputs",
"jobs"
],
"type": "object"
},
"WorkflowSelectorArrayInput": {
"properties": {
"key": {
Expand Down Expand Up @@ -6704,15 +6750,15 @@
"content": {
"application/json": {
"schema": {
"type": "object",
"required": [
"status"
],
"properties": {
"status": {
"$ref": "#/components/schemas/JobStatus"
}
}
},
"required": [
"status"
],
"type": "object"
}
}
},
Expand Down Expand Up @@ -10364,7 +10410,7 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/WorkflowRun"
"$ref": "#/components/schemas/WorkflowRunResult"
}
}
},
Expand Down Expand Up @@ -10648,7 +10694,7 @@
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/WorkflowRun"
"$ref": "#/components/schemas/WorkflowRunResult"
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions apps/api/openapi/paths/workflows.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ local openapi = import '../lib/openapi.libsonnet';
},
},
},
responses: openapi.createdResponse(openapi.schemaRef('WorkflowRun'))
responses: openapi.createdResponse(openapi.schemaRef('WorkflowRunResult'))
+ openapi.notFoundResponse()
+ openapi.badRequestResponse(),
},
Expand Down Expand Up @@ -180,7 +180,7 @@ local openapi = import '../lib/openapi.libsonnet';
},
},
},
responses: openapi.createdResponse(openapi.schemaRef('WorkflowRun'))
responses: openapi.createdResponse(openapi.schemaRef('WorkflowRunResult'))
+ openapi.notFoundResponse()
+ openapi.badRequestResponse(),
},
Expand Down
27 changes: 27 additions & 0 deletions apps/api/openapi/schemas/workflows.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ local openapi = import '../lib/openapi.libsonnet';
},
},

WorkflowRunJob: {
type: 'object',
required: ['jobId', 'jobAgentId'],
properties: {
jobId: { type: 'string', description: 'Job id; poll its status via GET /v1/workspaces/{workspaceId}/jobs/{jobId}' },
jobAgentId: { type: 'string', description: 'Job agent the job was dispatched to' },
},
},

WorkflowRunResult: {
type: 'object',
required: ['id', 'workflowId', 'inputs', 'jobs'],
properties: {
id: { type: 'string', description: 'Workflow run id' },
workflowId: { type: 'string' },
inputs: {
type: 'object',
additionalProperties: true,
},
jobs: {
type: 'array',
description: 'Jobs created and dispatched for this run',
items: openapi.schemaRef('WorkflowRunJob'),
},
},
},

WorkflowSlugConflictResponse: {
type: 'object',
required: ['message', 'code', 'details'],
Expand Down
28 changes: 28 additions & 0 deletions apps/api/src/routes/v1/workspaces/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,32 @@ import { db } from "@ctrlplane/db/client";
import * as schema from "@ctrlplane/db/schema";
import { getClientFor } from "@ctrlplane/workspace-engine-sdk";

import { validResourceSelector } from "../valid-selector.js";

const RESOURCE_SELECTOR_INPUT_KEY = "resourceSelector";

type WorkflowCelBody = {
jobAgents?: { name?: string; selector?: string | null }[];
inputs?: { key?: string; default?: unknown; selector?: { default?: unknown } | null }[];
};

const assertValidWorkflowCel = (body: WorkflowCelBody) => {
const check = (expr: unknown, label: string) => {
if (typeof expr !== "string" || expr.trim() === "") return;
if (!validResourceSelector(expr))
throw new ApiError(`Invalid CEL expression for ${label}`, 400, "INVALID_CEL");
};

for (const agent of body.jobAgents ?? [])
check(agent.selector, `job agent '${agent.name ?? "?"}'`);

for (const input of body.inputs ?? []) {
if (input.key === RESOURCE_SELECTOR_INPUT_KEY)
check(input.default, `input '${input.key}'`);
check(input.selector?.default, `input '${input.key ?? "?"}' selector`);
}
};

const slugifyWorkflowName = (name: string) =>
name
.toLowerCase()
Expand Down Expand Up @@ -105,6 +131,7 @@ const createWorkflow: AsyncTypedHandler<
"post"
> = async (req, res) => {
const { workspaceId } = req.params;
assertValidWorkflowCel(req.body);
const slug = resolveSlug(req.body.slug, req.body.name);

const created = await db
Expand Down Expand Up @@ -167,6 +194,7 @@ const updateWorkflow: AsyncTypedHandler<
"put"
> = async (req, res) => {
const { workflowId, workspaceId } = req.params;
assertValidWorkflowCel(req.body);

if (req.body.slug != null) {
const result = slugSchema.safeParse(req.body.slug);
Expand Down
20 changes: 18 additions & 2 deletions apps/api/src/types/openapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2499,6 +2499,22 @@ export interface components {
};
workflowId: string;
};
WorkflowRunJob: {
/** @description Job agent the job was dispatched to */
jobAgentId: string;
/** @description Job id; poll its status via GET /v1/workspaces/{workspaceId}/jobs/{jobId} */
jobId: string;
};
WorkflowRunResult: {
/** @description Workflow run id */
id: string;
inputs: {
[key: string]: unknown;
};
/** @description Jobs created and dispatched for this run */
jobs: components["schemas"]["WorkflowRunJob"][];
workflowId: string;
};
WorkflowSelectorArrayInput: {
key: string;
selector: {
Expand Down Expand Up @@ -6803,7 +6819,7 @@ export interface operations {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["WorkflowRun"];
"application/json": components["schemas"]["WorkflowRunResult"];
};
};
/** @description Invalid request */
Expand Down Expand Up @@ -6997,7 +7013,7 @@ export interface operations {
[name: string]: unknown;
};
content: {
"application/json": components["schemas"]["WorkflowRun"];
"application/json": components["schemas"]["WorkflowRunResult"];
};
};
/** @description Invalid request */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package workflows

import (
"context"
"fmt"

"workspace-engine/pkg/db"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/selector"
)

type plannedDispatch struct {
runner db.JobAgent
mergedConfig oapi.JobAgentConfig
dispatchCtx *oapi.DispatchContext
}

func buildDispatchContext(workflow *oapi.Workflow, inputs map[string]any) *oapi.DispatchContext {
return &oapi.DispatchContext{
Workflow: workflow,
Inputs: &inputs,
}
}

func planDispatches(
ctx context.Context,
base *oapi.DispatchContext,
resources []*oapi.Resource,
jobAgents []oapi.WorkflowJobAgent,
runners map[string]db.JobAgent,
) ([]plannedDispatch, error) {
planned := make([]plannedDispatch, 0, len(resources)*len(jobAgents))
for _, resource := range resources {
resourceCtx := *base
resourceCtx.Resource = resource

matchResource := resource
if matchResource == nil {
matchResource = &oapi.Resource{}
}

for _, jobAgent := range jobAgents {
runner := runners[jobAgent.Ref]
mergedConfig := mergeWorkflowJobAgentConfig(runner.Config, jobAgent.Config)
dispatchCtx := buildJobDispatchContext(&resourceCtx, runner, mergedConfig)

matched, err := selector.MatchJobAgentsWithResource(
ctx,
jobAgent.Selector,
[]oapi.JobAgent{dispatchCtx.JobAgent},
matchResource,
)
if err != nil {
return nil, fmt.Errorf("match selector: %w", err)
}
if len(matched) == 0 {
continue
}

planned = append(planned, plannedDispatch{
runner: runner,
mergedConfig: mergedConfig,
dispatchCtx: dispatchCtx,
})
}
}
return planned, nil
}

func mergeWorkflowJobAgentConfig(
runnerConfig, perJobConfig oapi.JobAgentConfig,
) oapi.JobAgentConfig {
return oapi.DeepMergeConfigs(runnerConfig, perJobConfig)
}

func buildJobDispatchContext(
base *oapi.DispatchContext,
runner db.JobAgent,
mergedConfig oapi.JobAgentConfig,
) *oapi.DispatchContext {
out := *base
out.JobAgent = oapi.JobAgent{
Id: runner.ID.String(),
WorkspaceId: runner.WorkspaceID.String(),
Name: runner.Name,
Type: runner.Type,
Config: runner.Config,
}
out.JobAgentConfig = mergedConfig
return &out
}
53 changes: 53 additions & 0 deletions apps/workspace-engine/svc/http/server/openapi/workflows/getters.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ import (
"github.com/google/uuid"
"workspace-engine/pkg/db"
"workspace-engine/pkg/oapi"
"workspace-engine/pkg/store/resources"
)

type Getter interface {
GetWorkflowByID(ctx context.Context, workflowID string) (*oapi.Workflow, error)
GetResourcesMatching(ctx context.Context, workspaceID, sel string) ([]*oapi.Resource, error)
GetJobAgentsByRef(
ctx context.Context,
workspaceID string,
jobAgents []oapi.WorkflowJobAgent,
) (map[string]db.JobAgent, error)
}

type PostgresGetter struct{}
Expand Down Expand Up @@ -65,3 +72,49 @@ func (g *PostgresGetter) GetWorkflowByID(
Jobs: jobs,
}, nil
}

func (g *PostgresGetter) GetResourcesMatching(
ctx context.Context,
workspaceID, sel string,
) ([]*oapi.Resource, error) {
if sel == "" {
return []*oapi.Resource{nil}, nil
}
Comment on lines +80 to +82
store := &resources.PostgresGetResources{}
return store.GetResources(ctx, workspaceID, resources.GetResourcesOptions{CEL: sel})
}

func (g *PostgresGetter) GetJobAgentsByRef(
ctx context.Context,
workspaceID string,
jobAgents []oapi.WorkflowJobAgent,
) (map[string]db.JobAgent, error) {
workspaceUUID, err := uuid.Parse(workspaceID)
if err != nil {
return nil, fmt.Errorf("parse workspace id: %w", err)
}

queries := db.GetQueries(ctx)
runners := make(map[string]db.JobAgent, len(jobAgents))
for _, jobAgent := range jobAgents {
if _, ok := runners[jobAgent.Ref]; ok {
continue
}
runnerID, err := uuid.Parse(jobAgent.Ref)
if err != nil {
return nil, fmt.Errorf("parse job agent id: %w", err)
}
runner, err := queries.GetJobAgentByID(ctx, runnerID)
if err != nil {
return nil, fmt.Errorf("get job agent: %w", err)
}
if runner.WorkspaceID != workspaceUUID {
return nil, fmt.Errorf(
"job agent %s does not belong to workspace %s",
runnerID, workspaceUUID,
)
}
runners[jobAgent.Ref] = runner
}
return runners, nil
}
Loading
Loading