CogniMesh Routines — Algorithm Extraction

Algorithmic content extracted from projects/ckamal/src/domains/routines/ (P0.4) for Colibri absorption. Implementation target: src/domains/routines/.

RoutineService API

class RoutineService {

  // Create and register a new routine
  async createRoutine(name, schedule, handler, options)
  // name:     unique routine identifier string
  // schedule: { kind: TriggerKind, cron?: string, interval_ms?: number }
  // handler:  async function() — the work to execute
  // options:  { priority, concurrencyPolicy, catchUpPolicy, maxRetries, retryDelayMs }
  // → Returns: { routineId, status: "ACTIVE" }

  async pauseRoutine(id)
  // → Sets status to PAUSED; scheduler stops triggering it
  // → Returns: { routineId, status: "PAUSED" }

  async resumeRoutine(id)
  // → Sets status to ACTIVE; scheduler resumes triggering
  // → Returns: { routineId, status: "ACTIVE" }

  async runNow(id)
  // → Immediately trigger one execution, regardless of schedule
  // → Returns: { runId, status: "RUNNING" }

  async listRuns(id, limit)
  // → Returns last `limit` run records for the routine
  // → Each run: { runId, status, startedAt, completedAt, error? }
}

RoutineScheduler API

class RoutineScheduler {

  start()
  // → Begins the scheduler tick loop
  // → Loads all ACTIVE routines from DB
  // → Schedules next fires based on cron/interval

  stop()
  // → Graceful shutdown: wait for in-flight runs to complete
  // → Clears all scheduled timers

  register(routine)
  // → Add a routine to the active schedule
  // → Computes next fire time and sets timer

  unregister(id)
  // → Remove routine from schedule; cancel pending timer
}

Enums

const RoutineStatus = {
  ACTIVE:   "ACTIVE",   // running on schedule
  PAUSED:   "PAUSED",   // temporarily suspended
  ARCHIVED: "ARCHIVED"  // permanently retired
};

const RoutinePriority = {
  LOW:    "LOW",
  MEDIUM: "MEDIUM",
  HIGH:   "HIGH"
};

const ConcurrencyPolicy = {
  ALLOW:   "ALLOW",   // allow overlapping runs (new run starts while previous still running)
  FORBID:  "FORBID",  // skip new run if previous is still running
  REPLACE: "REPLACE"  // cancel previous run and start new one
};

const CatchUpPolicy = {
  SKIP: "SKIP",  // if server was down, skip missed runs
  LAST: "LAST",  // run once for the most recent missed trigger
  ALL:  "ALL"    // run all missed triggers in sequence
};

const TriggerKind = {
  CRON:     "CRON",     // standard cron expression (5-field or 6-field with seconds)
  INTERVAL: "INTERVAL", // fixed millisecond interval
  MANUAL:   "MANUAL"    // only triggered by runNow()
};

const RunStatus = {
  PENDING:   "PENDING",
  RUNNING:   "RUNNING",
  SUCCESS:   "SUCCESS",
  FAILED:    "FAILED",
  SKIPPED:   "SKIPPED"  // skipped due to ConcurrencyPolicy.FORBID or CatchUpPolicy.SKIP
};

Cron Expression Parsing

Dependency: node-cron library.

// Standard 5-field cron: minute hour day month weekday
// Example: "0 3 * * 1"  → every Monday at 03:00
// Example: "*/15 * * * *"  → every 15 minutes
// Example: "0 0 1 * *"  → first of every month at midnight

// 6-field cron with seconds (node-cron extension):
// second minute hour day month weekday
// Example: "0 */5 * * * *"  → every 5 minutes, on the minute

function parseNextFire(cronExpression, fromDate):
  // Uses node-cron internals to compute next trigger time
  return node_cron.schedule(cronExpression).nextDate(fromDate)

Execution Model: Isolated Execution + Error Boundaries

async function executeRoutine(routine):
  run = createRunRecord(routine.id, RunStatus.PENDING)

  // Check concurrency policy
  activeRun = getActiveRun(routine.id)
  if activeRun:
    if routine.concurrencyPolicy == ConcurrencyPolicy.FORBID:
      updateRun(run.id, RunStatus.SKIPPED)
      return
    else if routine.concurrencyPolicy == ConcurrencyPolicy.REPLACE:
      cancelRun(activeRun.id)
      // falls through to execute new run

  updateRun(run.id, RunStatus.RUNNING)

  try:
    // ISOLATED EXECUTION — error boundary wraps the handler
    await Promise.race([
      routine.handler(),
      timeout(routine.options.timeoutMs || 300000)
    ])
    updateRun(run.id, RunStatus.SUCCESS)
    emit('routine.completed', { routineId: routine.id, runId: run.id })

  catch error:
    updateRun(run.id, RunStatus.FAILED, { error: error.message })
    emit('routine.failed', { routineId: routine.id, runId: run.id, error })
    
    // Retry on failure (if configured)
    await scheduleRetry(routine, run, error)

Error boundary: Each routine handler runs inside try/catch. A handler failure does NOT propagate to the scheduler loop — the scheduler continues ticking and other routines are unaffected.

Retry on Failure: N Attempts with Backoff

async function scheduleRetry(routine, failedRun, error):
  attempt = failedRun.attempt || 1
  maxRetries = routine.options.maxRetries || 3

  if attempt >= maxRetries:
    // Exhausted — log final failure, notify
    emit('routine.exhausted', { routineId: routine.id, attempts: attempt })
    return

  // Exponential backoff with jitter
  baseDelay = routine.options.retryDelayMs || 1000
  delay = baseDelay * Math.pow(2, attempt - 1)
  jitter = delay * 0.2 * Math.random()
  totalDelay = delay + jitter

  setTimeout(async () => {
    newRun = createRunRecord(routine.id, RunStatus.PENDING, { attempt: attempt + 1 })
    await executeRoutine(routine, newRun)
  }, totalDelay)

Retry delay sequence (default 1000ms base, 3 attempts):

  • Attempt 1 failure → retry after ~1000ms
  • Attempt 2 failure → retry after ~2000ms
  • Attempt 3 failure → final failure, no more retries

Colibri Absorption Target

  • Implementation: src/domains/routines/
  • Files to create: routine-service.js, routine-scheduler.js
  • Database tables:
    • routines (id, name, trigger_kind, schedule, status, priority, concurrency_policy, catch_up_policy, options)
    • routine_runs (id, routine_id, status, started_at, completed_at, error, attempt)
  • Integration points:
    • Maintenance tasks and periodic audits
    • Epoch-boundary operations (Hot→Warm→Cold retention transitions)
    • Task queue management (P0.14)
  • See implementation guide: [[guides/implementation/ckamal-extraction-guide CogniMesh Extraction Guide]] P0.4

See Also

  • [[extractions/ckamal-approvals-extraction CogniMesh Approvals Extraction]] — companion P0.3 module
  • [[extractions/beta-task-pipeline-extraction β Pipeline Extraction]] — task pipeline that uses routines for periodic work
  • [[guides/implementation/ckamal-extraction-guide CogniMesh Extraction Guide]] — full absorption plan

Back to top

Colibri — documentation-first MCP runtime. Apache 2.0 + Commons Clause.

This site uses Just the Docs, a documentation theme for Jekyll.