Claude API: Batch — Function Reference
Source files:
batch-api/jobs.js(963 LOC) — job lifecycle managerbatch-api/client.js(892 LOC) — Anthropic Batch API client wrapperbatch-api/results.js(729 LOC) — result processing and retrievalbatch-api/webhooks.js(668 LOC) — batch webhook integrationbatch/aggregator.js,batch/executor.js,batch/manager.js,batch/queue.js,batch/storage.js
Exported Class — BatchJobManager (batch-api/jobs.js)
extends EventEmitter
constructor(options)
Parameters:
maxConcurrentJobs(number): Parallel job limit (default: 5)maxQueueSize(number): Queue capacity (default: 1000)defaultPriority(Priority): Default job priority (default: NORMAL)deduplicationWindowMs(number): Window for request fingerprint deduplication (default: 5 min)defaultTimeoutMs(number): Job timeout (default: 24 hours)retryAttempts(number): Retry count per request (default: 3)retryDelayMs(number): Delay between retries (default: 5000)
createJob(config): Promise<JobSummary>
Purpose: Create and queue a batch job with deduplication, priority, and webhook callback. Parameters:
requests(object[]): Non-empty array of request objects-
priority(Priority): CRITICALHIGH NORMAL LOW BACKGROUND metadata(object): Arbitrary metadatatags(string[]): Tag labels for filteringdeduplicate(boolean): Enable request fingerprinting (default: true)timeoutMs(number): Job-level timeout-
callbackUrl(stringnull): Webhook URL for terminal-status notifications -
webhookSecret(stringnull): HMAC signing secret for callback
Returns: { id, status, priority, totalRequests, uniqueRequests, deduplicatedRequests, createdAt, expiresAt }
Notes for rewrite: Request fingerprints use SHA-256 hash of stringified request. Deduplicated requests carry _deduplicated: true and _originalJobId ref. Auto-starts queue processor.
getJob(jobId): object|null
Purpose: Retrieve sanitized job object (strips internal results Map).
updateJobStatus(jobId, status, updates): Promise<JobObject>
Purpose: Transition job to new status. Handles timestamp recording, statistics, worker cleanup, and webhook trigger. Parameters:
status(JobStatus): See enum belowupdates(object): Additional fields to merge
Notes for rewrite: Triggers callbackUrl webhook automatically on terminal status (completed/failed/cancelled/expired).
updateProgress(jobId, progress): void
Purpose: Update { completed, failed, cancelled, pending } counts. Auto-calculates percentComplete.
Emits job:progress event.
addResult(jobId, requestId, result): void
Purpose: Store a single request result; auto-updates progress counters.
addError(jobId, requestId, error): void
Purpose: Record a request error and push to job errors list.
cancelJob(jobId): Promise<CancelResult>
Purpose: Cancel job at any non-terminal state. Removes from queue if pending; emits cancel signal to worker if processing.
pauseJob(jobId): Promise<PauseResult>
Purpose: Pause a PROCESSING job (transitions to PAUSED).
resumeJob(jobId): Promise<ResumeResult>
Purpose: Resume a PAUSED job (transitions back to PROCESSING).
listJobs(filters): ListResult
Purpose: List jobs with filtering, sorting, and pagination. Parameters:
-
status(JobStatusnull): Filter by status -
priority(Prioritynull): Filter by max priority level tags(string[]): Filter by tags (any match)limit(number): Max results (default: 50)offset(number): Pagination offsetsortBy(string): Field to sort by (default: “createdAt”)-
sortOrder(string): “asc”“desc” (default: “desc”)
Returns: { jobs, total, limit, offset }
deleteJob(jobId): Promise<DeleteResult>
Purpose: Cancel if active, clean up fingerprints and trackers, remove from storage.
getJobResults(jobId): object
Purpose: Return all collected request results for a job.
getJobStats(): Stats
Purpose: Return manager-level statistics.
Returns: { totalCreated, totalCompleted, totalFailed, totalCancelled, totalDeduplicated, averageProcessingTime }
Events emitted
| Event | Payload |
|——-|———|
| job:created | { id, priority, totalRequests, uniqueRequests, deduplicatedRequests } |
| job:status_changed | { id, oldStatus, newStatus, progress } |
| job:progress | { id, progress, percentComplete } |
| job:cancelled | { id } |
| job:paused | { id } |
| job:resumed | { id } |
| job:deleted | { id } |
| job:cancel_worker | { jobId, workerId } |
Constants: JobStatus
| Value | Terminal? | |——-|———–| | PENDING | No | | QUEUED | No | | PROCESSING | No | | COMPLETED | Yes | | FAILED | Yes | | CANCELLED | Yes | | EXPIRED | Yes | | PAUSED | No |
Constants: Priority (lower number = higher priority)
| Value | Level | |——-|——-| | CRITICAL | 1 | | HIGH | 2 | | NORMAL | 3 | | LOW | 4 | | BACKGROUND | 5 |
Exported Class — PriorityQueue (batch-api/jobs.js, internal)
Min-heap based priority queue. Methods: enqueue(item, priority), dequeue(), peek(), remove(item), size(), isEmpty().
batch-api/client.js — Key Exports
createBatch(requests, options): Promise<BatchResponse>
Purpose: Submit batch of requests to Anthropic Message Batches API. Parameters:
requests(BatchRequest[]): Each withcustom_idand message paramsmodel(string): Claude model IDmaxTokens(number)-
system(stringnull) metadata(object)
Returns: Anthropic batch response with id, processing_status, request_counts.
Anthropic SDK call: client.beta.messages.batches.create()
getBatchStatus(batchId): Promise<BatchStatus>
Purpose: Poll batch processing status.
Anthropic SDK call: client.beta.messages.batches.retrieve(batchId)
cancelBatch(batchId): Promise<CancelResult>
Anthropic SDK call: client.beta.messages.batches.cancel(batchId)
listBatches(options): Promise<BatchList>
Anthropic SDK call: client.beta.messages.batches.list()
batch-api/results.js — Key Exports
getBatchResults(batchId): Promise<ResultsObject>
Purpose: Retrieve and parse batch results once processing completes.
Anthropic SDK call: client.beta.messages.batches.results(batchId) — returns JSONL stream.
parseResults(stream): Promise<ParsedResult[]>
Purpose: Parse JSONL result stream into structured array.
waitForBatch(batchId, options): Promise<BatchStatus>
Purpose: Poll batch status until completed or failed. Parameters:
pollInterval(number): ms between polls (default: 5000)maxWait(number): Max total wait time ms
batch-api/webhooks.js — Key Exports
registerBatchWebhook(batchId, config): object
Purpose: Register a webhook to fire when a batch completes. Parameters:
url(string): Callback URLsecret(string): HMAC signing secretevents(string[]): e.g.["batch.completed", "batch.failed"]
triggerBatchWebhook(batchId, event, payload): Promise<void>
Purpose: Send signed HTTP POST to registered webhook URL.
Key Data Structures
| Structure | Fields | Purpose |
|---|---|---|
| JobObject | id, status, priority, requests, progress, results, errors, metadata, tags, timeoutMs, callbackUrl, createdAt, startedAt, completedAt, expiresAt, retryCount | Full job entity |
| BatchRequest | custom_id, params: {model, max_tokens, messages, system?} | Anthropic batch request item |
| ProgressObject | total, pending, processing, completed, failed, cancelled, percentComplete | Job progress tracker |
| RequestFingerprint | jobId, timestamp | Dedup record |
External Dependencies
events(Node.js built-in): EventEmittercrypto(Node.js built-in):randomUUID,createHash- Anthropic SDK:
client.beta.messages.batches.*
Notes for Rewrite
BatchJobManageris in-memory only — no persistence. On server restart, all jobs are lost.- Deduplication window is per-manager-instance and clears on restart.
- The
PriorityQueueis a min-heap — lower priority number = dequeued first. - Webhook trigger on terminal status fires async with
.catch(() => {})— silent failure. waitForBatchpolls Anthropic API directly — must handle rate limits in production.- Batch API requires
anthropic-beta: message-batches-2024-09-24header.