GSD Workflow Engine — Function Reference
⚠ HERITAGE EXTRACTION — donor AMS GSD engine (Wave 8 quarantine)
This file extracts the donor AMS GSD parallel-execution engine from
projects/unified-mcp/src/gsd/(deleted R53). The DAG scheduler, agent-pool system, parallel-executor, bulkhead queue, andmaxParallelruntime are all donor accretion. Phase 0 Colibri ships nosrc/gsd/directory, no agent pool, and no parallel executor. The Phase 0 β Task Pipeline is an 8-state FSM (INIT → GATHER → ANALYZE → PLAN → APPLY → VERIFY → DONE+CANCELLED) defined in../../concepts/β-task-pipeline.md. Sub-agent dispatch is via the host client’s Task tool, not via Colibri MCP — agent spawning is deferred to Phase 1.5 per ADR-005.Read this file as donor genealogy only.
Core Algorithm
The GSD engine is a DAG-based workflow state machine with:
- In-memory registries (
workflowRegistry,phaseRegistry) backed by JSON files on disk. - Dependency-satisfied scheduling (phases run when all deps are
COMPLETED). - Parallel execution up to
maxParallel(default 4) concurrent phases. - Timeout protection per workflow (default 5 min).
- Agent pool system for parallel workload distribution.
createWorkflow(name, phases, config) # build + persist
↓
executeWorkflow(workflowId, inputs) # set RUNNING
↓
processWorkflowPhases(workflowId) # event-loop scheduler
→ getNextReadyPhases() # PENDING + deps COMPLETED
→ executePhase(phaseId) # RUNNING → COMPLETED|FAILED
→ aggregatePhaseOutputs() # merge phase.outputs in order
→ COMPLETED | FAILED
State Enumerations
WorkflowState
PENDING | RUNNING | PAUSED | COMPLETED | FAILED | ROLLED_BACK
PhaseState
PENDING | RUNNING | WAITING | COMPLETED | FAILED | SKIPPED
AgentState
IDLE | BUSY | ERROR | TERMINATED
Exported Functions
initializeEngine(): Promise<boolean>
File: src/gsd/engine.js
Purpose: One-time initialization. Creates WORKFLOW_STORAGE_DIR if absent, loads persisted JSON workflow files.
Idempotent: returns true immediately if already initialized.
Notes for rewrite: Storage path = AMS_ROOT/data/state/workflows/. Files named {workflowId}.json.
createWorkflow(name, phases, config): object
File: src/gsd/engine.js
Purpose: Instantiate a workflow with phases and persist to disk.
Algorithm:
- Generate
workflowId = "workflow-{UUID}". - Map each
phaseDefto a phase record:id = "phase-{UUID}",order = index,state = PENDING.- Register in
phaseRegistry.
- Resolve
projectIdby checking:config.projectId→config.project_id→config.inputs.projectId→ … (8 fallback paths viaresolveWorkflowProjectId). normalizePhaseDependencies(workflow)— resolves phase name strings to phase IDs independencies[].- Persist via
persistWorkflow()(write to.tmp-{pid}thenrenameSync). Config defaults:
| Key | Default |
|---|---|
maxParallel |
4 |
timeoutMs |
300000 (5 min) |
retryCount |
3 |
checkpointInterval |
60 |
Returns: Full workflow object with metrics: { totalPhases, completedPhases:0, failedPhases:0, agentSpawnCount:0, checkpointCount:0 }
executeWorkflow(workflowId, inputs): Promise<object>
File: src/gsd/engine.js
Purpose: Run all phases of a workflow.
Algorithm:
- Merge
inputsintoworkflow.inputs. - If re-running (FAILED/COMPLETED/ROLLED_BACK): reset all phases to PENDING, clear metrics.
updateWorkflowState(RUNNING).processWorkflowPhases(workflowId)— parallel scheduler.- On completion: check for any FAILED phases → set FAILED; else
aggregatePhaseOutputs()→ set COMPLETED. - On exception:
updateWorkflowState(FAILED, error.message). Returns: Final workflow object.
processWorkflowPhases(workflowId): Promise<void>
File: src/gsd/engine.js
Purpose: Event-loop phase scheduler (internal).
Algorithm:
- Returns a Promise that resolves when all phases reach terminal state.
schedulePhases():getNextReadyPhases()→ PENDING phases with all deps COMPLETED, not already running.- Take up to
maxParallel - runningPhases.sizephases. executePhase(phase.id)for each → on complete or error, callcheckCompletion().
checkCompletion():- All phases terminal → resolve.
- Any FAILED and
!continueOnError→ resolve (stop early). - Otherwise →
schedulePhases()again.
- Global timeout:
setTimeout(() => reject(...), workflow.config.timeoutMs).
executePhase(phaseId): Promise<void>
File: src/gsd/engine.js
Purpose: Execute a single phase.
Algorithm:
updatePhaseState(phaseId, RUNNING).simulatePhaseExecution(phase)← PLACEHOLDER — replace with real agent dispatch.- Current: random 100–500 ms sleep, 5% failure rate.
updatePhaseState(phaseId, COMPLETED, { executionTime }).- On error:
updatePhaseState(phaseId, FAILED, null, error.message). Notes for rewrite: This is the integration point. ReplacesimulatePhaseExecutionwith real agent spawn logic.
areDependenciesSatisfied(phaseId): boolean
File: src/gsd/engine.js
Purpose: Check if all dependency phases are COMPLETED.
Returns true if phase.dependencies.length === 0.
Requires each dep phase to be in COMPLETED state.
getNextReadyPhases(workflowId): Phase[]
File: src/gsd/engine.js
Purpose: Return phases that are PENDING and dependency-satisfied.
Filters workflow.phases where state === PENDING && areDependenciesSatisfied(id).
updateWorkflowState(workflowId, newState, error?): object
File: src/gsd/engine.js
Purpose: Transition workflow to new state, set timestamps, persist.
Sets startedAt on first RUNNING. Sets completedAt on terminal states. Persists workflow JSON.
updatePhaseState(phaseId, newState, outputs?, error?): object
File: src/gsd/engine.js
Purpose: Transition phase to new state. Merges outputs, sets timestamps.
On COMPLETED/FAILED: increments workflow.metrics.completedPhases or failedPhases. Persists parent workflow.
assignAgentsToPhase(phaseId, agentIds): object
File: src/gsd/engine.js
Purpose: Register agent IDs to a phase (deduped). Persists.
aggregatePhaseOutputs(phases): object
File: src/gsd/engine.js
Purpose: Merge all phase outputs in phase-order into a single object.
Sorts by phase.order ascending, then Object.assign() accumulate.
parallelMap(items, agentType, config): Promise<object>
File: src/gsd/engine.js
Purpose: Batch-parallel execution of items via agents.
Algorithm:
- Process in batches of
maxParallel(default 4). Promise.all(batch.map(item → executeWithAgent(item, agentType, config))).- Aggregate results, null-fill failures.
Returns:
{ results[], errors[], total, succeeded, failed }Notes for rewrite:executeWithAgentis a stub — replace with real agent dispatch.
persistWorkflow(workflow): void
File: src/gsd/engine.js
Purpose: Atomic JSON write to {workflowId}.json.
Writes to .tmp-{pid} first, then renameSync for atomicity. Cleans up tmp on failure.
GSD Concurrency & Parallel Execution Modules
ParallelExecutor (src/gsd/parallel-executor.js)
Multi-worker parallel task execution.
WorkerPool (src/gsd/parallel-executor.js)
Fixed-size worker pool for bounded concurrency.
Semaphore (src/gsd/concurrency.js)
Classic counting semaphore for concurrency limiting.
ConcurrencyController (src/gsd/concurrency.js)
Higher-level wrapper: acquire/release + queue.
Barrier / Latch / ReadWriteLock (src/gsd/concurrency.js)
Synchronization primitives for phase coordination.
TaskQueue / DelayedTaskQueue / BulkheadQueue (src/gsd/task-queue.js)
Priority-based task queuing with bulkhead isolation.
ResultAggregator / StreamAggregator (src/gsd/aggregator.js)
Accumulate results from parallel workers, including streaming variants.
DistributedLock / AutoRenewingLock / LockRegistry (src/gsd/lock.js)
Distributed locking with auto-renewal for long operations.
Agent Pool System
Key Files
src/gsd/agent.js— Agent base classsrc/gsd/agent-pool.js— Pool management (acquire/release)src/gsd/agent-pool-manager.js— Lifecycle and health checkssrc/gsd/agent-pool-index.js— Pool registrysrc/gsd/agent-types.js— Type definitionssrc/gsd/auto-scaler.js— Auto-scaling by loadsrc/gsd/planner.js— Task planningsrc/gsd/verifier.js— Result verificationsrc/gsd/checkpoint.js— Checkpoint creation/restore
Configuration
| Config key | Default | Purpose |
|---|---|---|
WORKFLOW_STORAGE_DIR |
AMS_ROOT/data/state/workflows |
JSON persistence dir |
DEFAULT_MAX_PARALLEL |
4 | Max concurrent phases |
DEFAULT_TIMEOUT_MS |
300000 (5 min) | Workflow timeout |
config.retryCount |
3 | Phase retry limit |
config.checkpointInterval |
60 | Seconds between checkpoints |
config.continueOnError |
undefined (falsy) | Stop on first phase failure |
Workflow JSON Schema (persisted)
{
"id": "workflow-{UUID}",
"name": "string",
"projectId": "string|null",
"state": "pending|running|...",
"phases": [
{
"id": "phase-{UUID}",
"workflowId": "workflow-{UUID}",
"name": "string",
"order": 0,
"state": "pending",
"config": {},
"agents": [],
"dependencies": [],
"inputs": {},
"outputs": {},
"startedAt": null,
"completedAt": null,
"error": null
}
],
"config": { "maxParallel": 4, "timeoutMs": 300000, "retryCount": 3 },
"inputs": {},
"outputs": {},
"currentPhaseIndex": -1,
"createdAt": "ISO",
"updatedAt": "ISO",
"startedAt": null,
"completedAt": null,
"error": null,
"metrics": {
"totalPhases": 0,
"completedPhases": 0,
"failedPhases": 0,
"agentSpawnCount": 0,
"checkpointCount": 0
}
}