GSD Workflow Engine — Function Reference

⚠ HERITAGE EXTRACTION — donor AMS GSD engine (Wave 8 quarantine)

This file extracts the donor AMS GSD parallel-execution engine from projects/unified-mcp/src/gsd/ (deleted R53). The DAG scheduler, agent-pool system, parallel-executor, bulkhead queue, and maxParallel runtime are all donor accretion. Phase 0 Colibri ships no src/gsd/ directory, no agent pool, and no parallel executor. The Phase 0 β Task Pipeline is an 8-state FSM (INIT → GATHER → ANALYZE → PLAN → APPLY → VERIFY → DONE + CANCELLED) defined in ../../concepts/β-task-pipeline.md. Sub-agent dispatch is via the host client’s Task tool, not via Colibri MCP — agent spawning is deferred to Phase 1.5 per ADR-005.

Read this file as donor genealogy only.

Core Algorithm

The GSD engine is a DAG-based workflow state machine with:

  • In-memory registries (workflowRegistry, phaseRegistry) backed by JSON files on disk.
  • Dependency-satisfied scheduling (phases run when all deps are COMPLETED).
  • Parallel execution up to maxParallel (default 4) concurrent phases.
  • Timeout protection per workflow (default 5 min).
  • Agent pool system for parallel workload distribution.
createWorkflow(name, phases, config)   # build + persist
  ↓
executeWorkflow(workflowId, inputs)    # set RUNNING
  ↓
processWorkflowPhases(workflowId)      # event-loop scheduler
  → getNextReadyPhases()               # PENDING + deps COMPLETED
  → executePhase(phaseId)             # RUNNING → COMPLETED|FAILED
  → aggregatePhaseOutputs()           # merge phase.outputs in order
  → COMPLETED | FAILED

State Enumerations

WorkflowState

PENDING | RUNNING | PAUSED | COMPLETED | FAILED | ROLLED_BACK

PhaseState

PENDING | RUNNING | WAITING | COMPLETED | FAILED | SKIPPED

AgentState

IDLE | BUSY | ERROR | TERMINATED


Exported Functions

initializeEngine(): Promise<boolean>

File: src/gsd/engine.js Purpose: One-time initialization. Creates WORKFLOW_STORAGE_DIR if absent, loads persisted JSON workflow files. Idempotent: returns true immediately if already initialized. Notes for rewrite: Storage path = AMS_ROOT/data/state/workflows/. Files named {workflowId}.json.


createWorkflow(name, phases, config): object

File: src/gsd/engine.js Purpose: Instantiate a workflow with phases and persist to disk. Algorithm:

  1. Generate workflowId = "workflow-{UUID}".
  2. Map each phaseDef to a phase record:
    • id = "phase-{UUID}", order = index, state = PENDING.
    • Register in phaseRegistry.
  3. Resolve projectId by checking: config.projectIdconfig.project_idconfig.inputs.projectId → … (8 fallback paths via resolveWorkflowProjectId).
  4. normalizePhaseDependencies(workflow) — resolves phase name strings to phase IDs in dependencies[].
  5. Persist via persistWorkflow() (write to .tmp-{pid} then renameSync). Config defaults:
Key Default
maxParallel 4
timeoutMs 300000 (5 min)
retryCount 3
checkpointInterval 60

Returns: Full workflow object with metrics: { totalPhases, completedPhases:0, failedPhases:0, agentSpawnCount:0, checkpointCount:0 }


executeWorkflow(workflowId, inputs): Promise<object>

File: src/gsd/engine.js Purpose: Run all phases of a workflow. Algorithm:

  1. Merge inputs into workflow.inputs.
  2. If re-running (FAILED/COMPLETED/ROLLED_BACK): reset all phases to PENDING, clear metrics.
  3. updateWorkflowState(RUNNING).
  4. processWorkflowPhases(workflowId) — parallel scheduler.
  5. On completion: check for any FAILED phases → set FAILED; else aggregatePhaseOutputs() → set COMPLETED.
  6. On exception: updateWorkflowState(FAILED, error.message). Returns: Final workflow object.

processWorkflowPhases(workflowId): Promise<void>

File: src/gsd/engine.js Purpose: Event-loop phase scheduler (internal). Algorithm:

  • Returns a Promise that resolves when all phases reach terminal state.
  • schedulePhases():
    • getNextReadyPhases() → PENDING phases with all deps COMPLETED, not already running.
    • Take up to maxParallel - runningPhases.size phases.
    • executePhase(phase.id) for each → on complete or error, call checkCompletion().
  • checkCompletion():
    • All phases terminal → resolve.
    • Any FAILED and !continueOnError → resolve (stop early).
    • Otherwise → schedulePhases() again.
  • Global timeout: setTimeout(() => reject(...), workflow.config.timeoutMs).

executePhase(phaseId): Promise<void>

File: src/gsd/engine.js Purpose: Execute a single phase. Algorithm:

  1. updatePhaseState(phaseId, RUNNING).
  2. simulatePhaseExecution(phase)PLACEHOLDER — replace with real agent dispatch.
    • Current: random 100–500 ms sleep, 5% failure rate.
  3. updatePhaseState(phaseId, COMPLETED, { executionTime }).
  4. On error: updatePhaseState(phaseId, FAILED, null, error.message). Notes for rewrite: This is the integration point. Replace simulatePhaseExecution with real agent spawn logic.

areDependenciesSatisfied(phaseId): boolean

File: src/gsd/engine.js Purpose: Check if all dependency phases are COMPLETED. Returns true if phase.dependencies.length === 0. Requires each dep phase to be in COMPLETED state.


getNextReadyPhases(workflowId): Phase[]

File: src/gsd/engine.js Purpose: Return phases that are PENDING and dependency-satisfied. Filters workflow.phases where state === PENDING && areDependenciesSatisfied(id).


updateWorkflowState(workflowId, newState, error?): object

File: src/gsd/engine.js Purpose: Transition workflow to new state, set timestamps, persist. Sets startedAt on first RUNNING. Sets completedAt on terminal states. Persists workflow JSON.


updatePhaseState(phaseId, newState, outputs?, error?): object

File: src/gsd/engine.js Purpose: Transition phase to new state. Merges outputs, sets timestamps. On COMPLETED/FAILED: increments workflow.metrics.completedPhases or failedPhases. Persists parent workflow.


assignAgentsToPhase(phaseId, agentIds): object

File: src/gsd/engine.js Purpose: Register agent IDs to a phase (deduped). Persists.


aggregatePhaseOutputs(phases): object

File: src/gsd/engine.js Purpose: Merge all phase outputs in phase-order into a single object. Sorts by phase.order ascending, then Object.assign() accumulate.


parallelMap(items, agentType, config): Promise<object>

File: src/gsd/engine.js Purpose: Batch-parallel execution of items via agents. Algorithm:

  • Process in batches of maxParallel (default 4).
  • Promise.all(batch.map(item → executeWithAgent(item, agentType, config))).
  • Aggregate results, null-fill failures. Returns: { results[], errors[], total, succeeded, failed } Notes for rewrite: executeWithAgent is a stub — replace with real agent dispatch.

persistWorkflow(workflow): void

File: src/gsd/engine.js Purpose: Atomic JSON write to {workflowId}.json. Writes to .tmp-{pid} first, then renameSync for atomicity. Cleans up tmp on failure.


GSD Concurrency & Parallel Execution Modules

ParallelExecutor (src/gsd/parallel-executor.js)

Multi-worker parallel task execution.

WorkerPool (src/gsd/parallel-executor.js)

Fixed-size worker pool for bounded concurrency.

Semaphore (src/gsd/concurrency.js)

Classic counting semaphore for concurrency limiting.

ConcurrencyController (src/gsd/concurrency.js)

Higher-level wrapper: acquire/release + queue.

Barrier / Latch / ReadWriteLock (src/gsd/concurrency.js)

Synchronization primitives for phase coordination.

TaskQueue / DelayedTaskQueue / BulkheadQueue (src/gsd/task-queue.js)

Priority-based task queuing with bulkhead isolation.

ResultAggregator / StreamAggregator (src/gsd/aggregator.js)

Accumulate results from parallel workers, including streaming variants.

DistributedLock / AutoRenewingLock / LockRegistry (src/gsd/lock.js)

Distributed locking with auto-renewal for long operations.


Agent Pool System

Key Files

  • src/gsd/agent.js — Agent base class
  • src/gsd/agent-pool.js — Pool management (acquire/release)
  • src/gsd/agent-pool-manager.js — Lifecycle and health checks
  • src/gsd/agent-pool-index.js — Pool registry
  • src/gsd/agent-types.js — Type definitions
  • src/gsd/auto-scaler.js — Auto-scaling by load
  • src/gsd/planner.js — Task planning
  • src/gsd/verifier.js — Result verification
  • src/gsd/checkpoint.js — Checkpoint creation/restore

Configuration

Config key Default Purpose
WORKFLOW_STORAGE_DIR AMS_ROOT/data/state/workflows JSON persistence dir
DEFAULT_MAX_PARALLEL 4 Max concurrent phases
DEFAULT_TIMEOUT_MS 300000 (5 min) Workflow timeout
config.retryCount 3 Phase retry limit
config.checkpointInterval 60 Seconds between checkpoints
config.continueOnError undefined (falsy) Stop on first phase failure

Workflow JSON Schema (persisted)

{
  "id": "workflow-{UUID}",
  "name": "string",
  "projectId": "string|null",
  "state": "pending|running|...",
  "phases": [
    {
      "id": "phase-{UUID}",
      "workflowId": "workflow-{UUID}",
      "name": "string",
      "order": 0,
      "state": "pending",
      "config": {},
      "agents": [],
      "dependencies": [],
      "inputs": {},
      "outputs": {},
      "startedAt": null,
      "completedAt": null,
      "error": null
    }
  ],
  "config": { "maxParallel": 4, "timeoutMs": 300000, "retryCount": 3 },
  "inputs": {},
  "outputs": {},
  "currentPhaseIndex": -1,
  "createdAt": "ISO",
  "updatedAt": "ISO",
  "startedAt": null,
  "completedAt": null,
  "error": null,
  "metrics": {
    "totalPhases": 0,
    "completedPhases": 0,
    "failedPhases": 0,
    "agentSpawnCount": 0,
    "checkpointCount": 0
  }
}

Back to top

Colibri — documentation-first MCP runtime. Apache 2.0 + Commons Clause.

This site uses Just the Docs, a documentation theme for Jekyll.