Database Functions Reference

⚠ HERITAGE EXTRACTION — donor AMS database layer (Wave 8 quarantine)

This file extracts the donor AMS database façade from projects/unified-mcp/src/db/ (deleted R53). Every src/db/* path on this page describes pre-R53 donor code. Phase 0 Colibri targets src/db/index.ts (P0.2.2) and data/colibri.db, written from a clean low-teens-table schema, not the donor ~78-table accretion. The Phase 0 database canon is in ../../architecture/database.md.

Read this file as donor genealogy only.

Extracted from src/db/index.js (~950 LOC) and related modules. The index.js is the public façade — all actual SQL lives in src/db/repositories/. All functions are async unless noted.


Initialization

initDatabase() → Database

Calls initDatabaseConnection({}) from bootstrap.js. Returns the SQLite database instance. Called once at server startup.

getDb() → Database

Returns the singleton database instance via getDatabase() from bootstrap.js. Throws if not initialized.

dbGetDatabasePath() → string

Returns the resolved database file path for the active provider (AMS_DB_PROVIDER). Uses resolveDatabasePathForProvider().

closeDatabase() → Promise<void>

Delegates to closeDatabaseConnection(). Called on graceful shutdown.


Transaction Controller

Lazily initialized via createTransactionController() with these config values:

  • AMS_DB_RETRY_BASE_DELAY_MS, AMS_DB_RETRY_MAX_RETRIES, AMS_DB_RETRY_BUDGET_MS
  • AMS_DB_TXN_LOCK_TIMEOUT_MS, AMS_DB_TXN_LOCK_POLL_MS, AMS_DB_TXN_LOCK_STALE_MS

beginTransaction() → Promise<void>

commitTransaction() → Promise<void>

rollbackTransaction() → Promise<void>

withTransaction(fn) → Promise<T>

Preferred entry point. Wraps fn in begin/commit/rollback. Uses AsyncLocalStorage to track nesting depth — prevents double-begin race condition.

waitForDbRetry(attempt, startedAt) → Promise<boolean>

Exponential backoff helper used inside transaction retry loops. Returns false when budget exceeded.

isRetryableSqliteError(error) → boolean

isInsideTransaction() → boolean


Tasks CRUD

dbCreateTask(task) → Promise<Task>

Wraps repoCreateTask. Uses withTransaction. Parameters: { title, description, projectId, status, priority, roadmapId, nodeId, phase, progress, sessionId, eisenhowerQuadrant } Tables: tasks, gsd_projects (FK) Notes for rewrite: Returns full task object with generated UUID id.

dbGetTask(id) → Promise<Task|null>

Direct read, no transaction. SQL pattern: SELECT * FROM tasks WHERE id = ?

dbUpdateTask(id, updates) → Promise<Task>

Uses withTransaction. Any subset of task fields. Tables: tasks Trigger: update_tasks_timestamp fires automatically.

dbDeleteTask(id) → Promise<void>

Uses withTransaction.

dbListTasks(filters) → Promise<{items, total, limit, offset, hasMore}>

CRITICAL: Returns a wrapper object. Callers must use .items not treat result as array. Parameters: filters = { status, priority, roadmapId, projectId, sessionId, limit, offset, sortBy } SQL pattern: Dynamic SELECT with WHERE clauses built from truthy filter fields + LIMIT/OFFSET Tables: tasks

dbGetTaskStats(projectId?) → Promise<Stats>

SQL pattern: SELECT status, priority, COUNT(*) FROM tasks GROUP BY status, priority Tables: tasks

dbBatchUpdateTasks(updates) → Promise<Task[]>

Single withTransaction wrapping multiple dbUpdateTask calls.

dbBatchCreateTasks(tasks) → Promise<Task[]>

Single withTransaction wrapping multiple dbCreateTask calls.


Task Dependencies

dbAddTaskDependency(taskId, dependsOn) → Promise<void>

Tables: task_dependencies (PK: task_id, depends_on)

dbRemoveTaskDependency(taskId, dependsOn) → Promise<void>

dbGetTaskDependencies(taskId) → Promise<string[]>

Returns list of task IDs this task depends on.

dbGetTasksBlockedBy(taskId) → Promise<string[]>

Returns list of task IDs blocked by this task.

dbGetTaskDependencyChain(taskId, visited?) → Promise<string[]>

Recursive traversal of full dependency chain. visited = Set<string> to prevent cycles.


GSD Projects

dbCreateGsdProject(project) → Promise<Project>

Tables: gsd_projects Parameters: { id, name, roadmapId, path, pace, ownerId }

dbGetGsdProject(id) → Promise<Project|null>

dbGetGsdProjectByName(name) → Promise<Project|null>

dbListGsdProjects(options) → Promise<Project[]>

Parameters: options = { limit, offset }. Returns plain array (not wrapped).

dbCountGsdProjects() → Promise<number>

dbResolveProjectByIdOrName(nameOrId) → Promise<Project|null>

Tries lookup by ID first, then by name. Used by ACL and orchestration.


Roadmap Progress

dbUpdateRoadmapProgress(roadmapId, nodeId, status) → Promise<void>

SQL pattern: INSERT OR REPLACE INTO roadmap_progress (roadmap_id, node_id, status, updated_at) VALUES (...) Tables: roadmap_progress Status values: not_started | started | in_progress | completed | mastered

dbGetRoadmapProgress(roadmapId, limit?, offset?) → Promise<ProgressRow[]>

SQL pattern: SELECT * FROM roadmap_progress WHERE roadmap_id = ? LIMIT ? OFFSET ?


Sync Log

dbLogSync(projectId, filePath, fileHash, direction) → Promise<void>

Tables: sync_log direction: mcp_to_gsd | gsd_to_mcp

dbGetLastSync(projectId, filePath) → Promise<SyncRow|null>


MCP Context Snapshots

dbCreateContext(roadmapId, contentHash, snapshot, metadata?) → Promise<Context>

Also queues RAG and memory indexing via queueRagContextIndex and queueMemoryContextIndex. Tables: mcp_context Fields: context_id (autoincrement), roadmap_id, content_hash (SHA256 of roadmap.md), snapshot (JSON), metadata, created_at

dbGetContext(contextId) → Promise<Context|null>

dbGetLatestContext(roadmapId) → Promise<Context|null>

SQL pattern: SELECT ... ORDER BY created_at DESC LIMIT 1

dbListContexts(roadmapId, limit?) → Promise<Context[]>

dbGetContextRoadmap(contextId) → Promise<string|null>

dbUpsertSessionContext(sessionId, contextId, source, roadmapId?) → Promise<void>

Tables: mcp_session_context source: explicit | inferred | session_history | fallback | backfill Uses withTransaction.

dbGetSessionContext(sessionId) → Promise<SessionContext|null>


Audit Action Log

dbLogAction(contextId, sessionId, toolName, payload, resultHash, maxRetries?, userId?, projectId?) → Promise<Action>

Serialized via a module-level auditQueue Promise chain (no concurrent step_index races).

SQL pattern:

INSERT INTO mcp_action (context_id, session_id, tool_name, payload, result_hash,
  step_index, previous_hash, current_hash, timestamp, user_id, project_id)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  • step_index = MAX(step_index) + 1 within same session_id
  • previous_hash = current_hash of prior row
  • current_hash = calculateChainHash(previousHash, toolName, payload, timestamp) — SHA256-based chain

Also queues RAG and memory indexing after commit. UNIQUE constraint: (session_id, step_index) — retried on conflict.

dbGetActionsByContext(contextId, limit?) → Promise<Action[]>

dbGetActionsBySession(sessionId, limit?) → Promise<Action[]>

dbGetLatestAction(sessionId) → Promise<Action|null>

dbGetNextStepIndex(sessionId) → Promise<number>

dbVerifyActionChain(sessionId, batchSize?) → Promise<VerifyResult>

Delegates to repoVerifyActionChain. Walks the chain in batches and checks H(prev_hash||tool||payload||ts) === current_hash.


Thought Chain

dbLogThought(contextId, actionId, sessionId, thoughtType, content, parentId?, maxRetries?) → Promise<Thought>

Serialized via thoughtQueue. Same queue-chain pattern as dbLogAction.

SQL pattern:

INSERT INTO mcp_thought (parent_id, context_id, action_id, session_id,
  thought_type, content_hash, content, chain_hash, step_index, timestamp)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
  • content_hash = calculateThoughtHash(content, null)
  • chain_hash = calculateThoughtHash(content_hash, parentChainHash)
  • step_index = MAX(step_index) + 1 per session FTS5 trigger: mcp_thought_fts auto-updated via INSERT/UPDATE/DELETE triggers.

thoughtType values: plan | analysis | decision | reflection

dbGetThoughtsBySession(sessionId, limit?) → Promise<Thought[]>

dbGetThoughtsByAction(actionId) → Promise<Thought[]>

dbVerifyThoughtChain(sessionId) → Promise<VerifyResult>


Merkle Tree

dbBuildMerkleTree(sessionId) → Promise<MerkleResult>

Delegates to repoBuildMerkleTree(getDb(), withTransaction, sessionId). Tables: mcp_merkle (level/position/node_hash), mcp_session_root Leaf hashes = action current_hash values; parents = SHA256 of left+right.

dbGetMerkleRoot(sessionId) → Promise<{rootHash, actionCount, finalizedAt}|null>

SQL: SELECT * FROM mcp_session_root WHERE session_id = ?

dbGetInclusionProof(sessionId, actionId) → Promise<Proof>

Returns the Merkle path from the leaf (action) to the root.

dbVerifyInclusionProof(actionHash, proof, expectedRoot) → boolean

Pure function — no DB access. Also exported as verifyInclusionProof.


Memory Frames (AMS-QR)

dbUpsertMemoryFrame(frame) → Promise<Frame>

dbCreateMemoryFrame(frame) → Promise<Frame>

(Both call repoUpsertMemoryFrame. Uses withTransaction.) Tables: mcp_memory_frame frame structure: { frameId, sourceType, sourceId, sessionId, contextId, level (L0/L1/L2), zoneHeader, zoneCore, zoneLinks, zoneRecovery, frameHash, previousFrameHash, tokenEstimate }

dbGetMemoryFrame(frameId, includeZones?) → Promise<Frame|null>

dbGetMemoryFrameBySource(sourceType, sourceId, includeZones?) → Promise<Frame|null>

dbListMemoryFrames(filters) → Promise<Frame[]>

Parameters: { sessionId, contextId, sourceType, level, limit, offset }

dbUpsertMemoryEdges(edges) → Promise<void>

Tables: mcp_memory_edge edge: { fromFrameId, toFrameId, relation, weight }

dbGetMemoryGraph(filters) → Promise<{frames, edges}>

dbVerifyMemoryFrames(filters) → Promise<VerifyReport>

dbGcMemoryFrames(options) → Promise<GcResult>


RAG (Retrieval-Augmented Generation)

dbUpsertRagDocument(document) → Promise<RagDoc>

Tables: rag_documents document: { id, sourceType, sourceId, sessionId, contextId, title, body, contentHash, metadata }

dbGetRagDocumentBySource(sourceType, sourceId) → Promise<RagDoc|null>

dbPruneRagContextDocuments(roadmapId, keepContextId) → Promise<number>

dbReplaceRagChunks(documentId, sourceType, chunks) → Promise<void>

Deletes existing chunks for document, inserts new ones. Uses withTransaction. Tables: rag_chunks chunk: { id, documentId, chunkIndex, chunkText, tokenEstimate, startOffset, endOffset, keywords }

dbUpsertRagEmbedding(chunkId, model, dimensions, embedding) → Promise<void>

Tables: rag_embeddings embedding = BLOB (packed float array)

dbSearchRagChunks(query, limit?, filters?) → Promise<ChunkResult[]>

Hybrid search: FTS5 keyword match on rag_chunks_fts + optional vector cosine similarity. Parameters: filters = { sourceType, sessionId, contextId, strictQuality }

dbListRagChunks(limit?, filters?) → Promise<Chunk[]>

dbGetNeighborRagChunks(documentId, chunkIndex, windowSize?) → Promise<Chunk[]>

dbGetRagEmbeddingsByChunkIds(chunkIds, model?) → Promise<Embedding[]>

dbGetRagStats() → Promise<RagStats>

dbGcRagData(options) → Promise<GcResult>


ACL (Project Members)

dbAddProjectMember(projectId, userId, role, grantedBy?) → Promise<void>

Tables: project_members role: owner | admin | member | viewer

dbRemoveProjectMember(projectId, userId) → Promise<void>

dbGetProjectMember(projectId, userId) → Promise<Member|null>

dbGetProjectMembers(projectId) → Promise<Member[]>

dbGetUserProjects(userId) → Promise<Project[]>

dbGetUserRole(projectId, userId) → Promise<string|null>


Index Queue Exports

Re-exported from src/db/indexing/queue.js:

  • dbWaitForRagIndexQueue() — waits until RAG index queue drains
  • dbWaitForMemoryIndexQueue() — waits until memory index queue drains
  • dbGetIndexQueueStatus() — returns { ragPending, memoryPending }

Time-Series (src/db/timeseries.js)

Re-exported from timeseries.js:

Function Description
initTimeSeries() Initialize time-series tables
storeMetric(name, value, tags) Insert single metric point
storeMetricsBatch(metrics[]) Batch insert
queryMetrics(name, from, to, tags) Range query
getMetricStats(name, from, to) min/max/avg/count
downsampleMetrics(name, interval, fn) Downsample by time interval
applyRetentionPolicies() Delete old data per configured policies
setRetentionPolicy(name, daysToKeep) Set per-metric retention
getRetentionPolicies() List all policies
getTimeSeriesStats() Storage stats
resetTimeSeries() Drop all time-series data (test use)

Internal Helpers

safeParseJson(jsonStr, id, type) → any|null

Used to parse stored JSON payloads. Returns null on parse error, logs warning to stderr.


Migration / Bootstrap Notes

Schema loaded from src/db/schema.sql (single file) plus:

  • src/db/schema-batch.sql
  • src/db/message-index-schema.sql
  • src/db/thread-schema.sql
  • src/db/vector-schema.sql

All ALTER TABLE statements in schema (adding session_id, eisenhower_quadrant, applied_at to tasks) are idempotent — errors on “column already exists” are swallowed by the bootstrap loader.

Migration runner: src/db/migration-runner.js (sequenced numeric migrations in src/db/migrations/).


Back to top

Colibri — documentation-first MCP runtime. Apache 2.0 + Commons Clause.

This site uses Just the Docs, a documentation theme for Jekyll.