CogniMesh Routines — Algorithm Extraction
Algorithmic content extracted from
projects/ckamal/src/domains/routines/(P0.4) for Colibri absorption. Implementation target:src/domains/routines/.
RoutineService API
class RoutineService {
// Create and register a new routine
async createRoutine(name, schedule, handler, options)
// name: unique routine identifier string
// schedule: { kind: TriggerKind, cron?: string, interval_ms?: number }
// handler: async function() — the work to execute
// options: { priority, concurrencyPolicy, catchUpPolicy, maxRetries, retryDelayMs }
// → Returns: { routineId, status: "ACTIVE" }
async pauseRoutine(id)
// → Sets status to PAUSED; scheduler stops triggering it
// → Returns: { routineId, status: "PAUSED" }
async resumeRoutine(id)
// → Sets status to ACTIVE; scheduler resumes triggering
// → Returns: { routineId, status: "ACTIVE" }
async runNow(id)
// → Immediately trigger one execution, regardless of schedule
// → Returns: { runId, status: "RUNNING" }
async listRuns(id, limit)
// → Returns last `limit` run records for the routine
// → Each run: { runId, status, startedAt, completedAt, error? }
}
RoutineScheduler API
class RoutineScheduler {
start()
// → Begins the scheduler tick loop
// → Loads all ACTIVE routines from DB
// → Schedules next fires based on cron/interval
stop()
// → Graceful shutdown: wait for in-flight runs to complete
// → Clears all scheduled timers
register(routine)
// → Add a routine to the active schedule
// → Computes next fire time and sets timer
unregister(id)
// → Remove routine from schedule; cancel pending timer
}
Enums
const RoutineStatus = {
ACTIVE: "ACTIVE", // running on schedule
PAUSED: "PAUSED", // temporarily suspended
ARCHIVED: "ARCHIVED" // permanently retired
};
const RoutinePriority = {
LOW: "LOW",
MEDIUM: "MEDIUM",
HIGH: "HIGH"
};
const ConcurrencyPolicy = {
ALLOW: "ALLOW", // allow overlapping runs (new run starts while previous still running)
FORBID: "FORBID", // skip new run if previous is still running
REPLACE: "REPLACE" // cancel previous run and start new one
};
const CatchUpPolicy = {
SKIP: "SKIP", // if server was down, skip missed runs
LAST: "LAST", // run once for the most recent missed trigger
ALL: "ALL" // run all missed triggers in sequence
};
const TriggerKind = {
CRON: "CRON", // standard cron expression (5-field or 6-field with seconds)
INTERVAL: "INTERVAL", // fixed millisecond interval
MANUAL: "MANUAL" // only triggered by runNow()
};
const RunStatus = {
PENDING: "PENDING",
RUNNING: "RUNNING",
SUCCESS: "SUCCESS",
FAILED: "FAILED",
SKIPPED: "SKIPPED" // skipped due to ConcurrencyPolicy.FORBID or CatchUpPolicy.SKIP
};
Cron Expression Parsing
Dependency: node-cron library.
// Standard 5-field cron: minute hour day month weekday
// Example: "0 3 * * 1" → every Monday at 03:00
// Example: "*/15 * * * *" → every 15 minutes
// Example: "0 0 1 * *" → first of every month at midnight
// 6-field cron with seconds (node-cron extension):
// second minute hour day month weekday
// Example: "0 */5 * * * *" → every 5 minutes, on the minute
function parseNextFire(cronExpression, fromDate):
// Uses node-cron internals to compute next trigger time
return node_cron.schedule(cronExpression).nextDate(fromDate)
Execution Model: Isolated Execution + Error Boundaries
async function executeRoutine(routine):
run = createRunRecord(routine.id, RunStatus.PENDING)
// Check concurrency policy
activeRun = getActiveRun(routine.id)
if activeRun:
if routine.concurrencyPolicy == ConcurrencyPolicy.FORBID:
updateRun(run.id, RunStatus.SKIPPED)
return
else if routine.concurrencyPolicy == ConcurrencyPolicy.REPLACE:
cancelRun(activeRun.id)
// falls through to execute new run
updateRun(run.id, RunStatus.RUNNING)
try:
// ISOLATED EXECUTION — error boundary wraps the handler
await Promise.race([
routine.handler(),
timeout(routine.options.timeoutMs || 300000)
])
updateRun(run.id, RunStatus.SUCCESS)
emit('routine.completed', { routineId: routine.id, runId: run.id })
catch error:
updateRun(run.id, RunStatus.FAILED, { error: error.message })
emit('routine.failed', { routineId: routine.id, runId: run.id, error })
// Retry on failure (if configured)
await scheduleRetry(routine, run, error)
Error boundary: Each routine handler runs inside try/catch. A handler failure does NOT propagate to the scheduler loop — the scheduler continues ticking and other routines are unaffected.
Retry on Failure: N Attempts with Backoff
async function scheduleRetry(routine, failedRun, error):
attempt = failedRun.attempt || 1
maxRetries = routine.options.maxRetries || 3
if attempt >= maxRetries:
// Exhausted — log final failure, notify
emit('routine.exhausted', { routineId: routine.id, attempts: attempt })
return
// Exponential backoff with jitter
baseDelay = routine.options.retryDelayMs || 1000
delay = baseDelay * Math.pow(2, attempt - 1)
jitter = delay * 0.2 * Math.random()
totalDelay = delay + jitter
setTimeout(async () => {
newRun = createRunRecord(routine.id, RunStatus.PENDING, { attempt: attempt + 1 })
await executeRoutine(routine, newRun)
}, totalDelay)
Retry delay sequence (default 1000ms base, 3 attempts):
- Attempt 1 failure → retry after ~1000ms
- Attempt 2 failure → retry after ~2000ms
- Attempt 3 failure → final failure, no more retries
Colibri Absorption Target
- Implementation:
src/domains/routines/ - Files to create:
routine-service.js,routine-scheduler.js - Database tables:
routines(id, name, trigger_kind, schedule, status, priority, concurrency_policy, catch_up_policy, options)routine_runs(id, routine_id, status, started_at, completed_at, error, attempt)
- Integration points:
- Maintenance tasks and periodic audits
- Epoch-boundary operations (Hot→Warm→Cold retention transitions)
- Task queue management (P0.14)
-
See implementation guide: [[guides/implementation/ckamal-extraction-guide CogniMesh Extraction Guide]] P0.4
See Also
-
[[extractions/ckamal-approvals-extraction CogniMesh Approvals Extraction]] — companion P0.3 module -
[[extractions/beta-task-pipeline-extraction β Pipeline Extraction]] — task pipeline that uses routines for periodic work -
[[guides/implementation/ckamal-extraction-guide CogniMesh Extraction Guide]] — full absorption plan