Workflow
Most teams manage multi-step processes with sequential await chains. That works until one step fails mid-way, the process restarts, the server crashes, or two workers try to process the same job simultaneously. Then you need retries, idempotency, state persistence, and compensation logic — all bolted on after the fact.
@pgshift/workflow gives you durable workflow orchestration backed by PostgreSQL. Define your steps and their dependencies as a DAG. The engine handles execution order, parallelism, retries, and saga-pattern compensation when things go wrong.
Install
Section titled “Install”npm install @pgshift/workflowimport { createClient } from '@pgshift/workflow'
const db = createClient({ url: process.env.DATABASE_URL })Core concepts
Section titled “Core concepts”DAG — a directed acyclic graph where each step declares its dependencies. Steps with no dependencies run immediately and in parallel. Steps with dependencies wait until all their dependencies are completed.
Compensation — each step can declare a compensate handler. If a step fails permanently after exhausting retries, PgShift runs the compensation handlers of all completed steps in reverse order, leaving the system in a consistent state.
At-least-once execution — steps are dispatched via SKIP LOCKED. A step may be retried if a worker crashes mid-execution. Make your handlers idempotent.
db.workflow(name).define(config)
Section titled “db.workflow(name).define(config)”Registers the workflow definition. Idempotent, safe to call on every startup.
await db.workflow('order-fulfillment').define({ steps: { validate_stock: { handler: 'validateStock', retries: 3 }, validate_fraud: { handler: 'validateFraud', retries: 3 }, charge_card: { handler: 'chargeCard', retries: 1, compensate: 'refundCard' }, emit_invoice: { handler: 'emitInvoice', retries: 3, compensate: 'voidInvoice' }, send_email: { handler: 'sendEmail', retries: 5 }, update_analytics: { handler: 'updateAnalytics', retries: 5 }, }, dag: { validate_stock: [], // starts immediately validate_fraud: [], // starts immediately (parallel) charge_card: ['validate_stock', 'validate_fraud'], // waits for both emit_invoice: ['charge_card'], send_email: ['emit_invoice'], // parallel with update_analytics update_analytics: ['emit_invoice'], // parallel with send_email },})Step config:
| Option | Type | Default | Description |
|---|---|---|---|
handler | string | required | Name of the handler function registered via .handlers() |
retries | number | 3 | Max retry attempts before the step fails permanently |
compensate | string | none | Name of the compensation handler to run on workflow failure |
DAG config: each key is a step name, each value is a list of step names it depends on. Steps with an empty list start immediately.
db.workflow(name).handlers(handlers)
Section titled “db.workflow(name).handlers(handlers)”Registers the handler functions for a workflow. Must be called before .work().
await db.workflow('order-fulfillment').handlers({ validateStock: async (ctx) => { const { items } = ctx.input as { items: string[] } // returns value available to subsequent steps via ctx.previousSteps return { reservationId: 'res-123' } },
chargeCard: async (ctx) => { const { reservationId } = ctx.previousSteps['validate_stock'] as { reservationId: string } return { chargeId: 'ch-456', amount: ctx.input.amount } },
refundCard: async (ctx) => { // compensation — undo chargeCard const { chargeId } = ctx.previousSteps['charge_card'] as { chargeId: string } await refund(chargeId) },
// ... other handlers})The ctx object:
| Field | Type | Description |
|---|---|---|
ctx.runId | string | Current run ID |
ctx.step | string | Current step name |
ctx.input | object | Payload passed to run() |
ctx.attempt | number | Current attempt number (1-based) |
ctx.previousSteps | object | Output of completed steps, keyed by step name |
db.workflow(name).work()
Section titled “db.workflow(name).work()”Starts the polling worker. The worker polls the database for active runs and advances them.
await db.workflow('order-fulfillment').work()Call this once on startup, after define() and handlers().
db.workflow(name).run(input?)
Section titled “db.workflow(name).run(input?)”Creates a new workflow run and returns the run ID.
const runId = await db.workflow('order-fulfillment').run({ orderId: 'order-123', amount: 299.99, items: ['ITEM_A', 'ITEM_B'],})db.workflow(name).status(runId)
Section titled “db.workflow(name).status(runId)”Returns the current status of a run and all its steps.
const status = await db.workflow('order-fulfillment').status(runId)interface WorkflowRunStatus { runId: string workflow: string status: 'running' | 'completed' | 'failed' | 'compensating' | 'compensated' input: Record<string, unknown> startedAt: Date finishedAt?: Date steps: Record<string, { status: string attempts: number output?: Record<string, unknown> error?: string startedAt?: Date completedAt?: Date }>}Execution model
Section titled “Execution model”Run starts │ ├─ validate_stock (pending → running → completed) ─┐ │ ├─ charge_card (pending → running → completed) └─ validate_fraud (pending → running → completed) ─┘ │ emit_invoice │ ┌───────────────────┴────────────────────┐ send_email update_analytics │ │ completed completed │ run: completedCompensation (saga pattern)
Section titled “Compensation (saga pattern)”When a step exhausts its retries, the run enters compensating status. PgShift runs the compensation handlers of completed steps in reverse execution order.
charge_card completed ✓ (has compensate: refundCard)emit_invoice failed ✗ (exhausted 3 retries)
Compensation order: 1. voidInvoice ← emit_invoice's compensate (failed, so no-op) 2. refundCard ← charge_card's compensateOnly steps that are completed and have a compensate handler defined are compensated. Steps without compensate are skipped in the compensation chain.
Retry behavior
Section titled “Retry behavior”Failed steps are retried with exponential backoff:
| Attempt | Backoff |
|---|---|
| 1 | 1 second |
| 2 | 2 seconds |
| 3 | 4 seconds |
| N | min(2^N seconds, 30 seconds) |
After all retry attempts are exhausted, the step moves to failed and compensation begins.
Complete example
Section titled “Complete example”import { createClient } from '@pgshift/workflow'
const db = createClient({ url: process.env.DATABASE_URL })
await db.workflow('order-fulfillment').define({ steps: { validate_stock: { handler: 'validateStock', retries: 3 }, validate_fraud: { handler: 'validateFraud', retries: 3 }, charge_card: { handler: 'chargeCard', retries: 1, compensate: 'refundCard' }, emit_invoice: { handler: 'emitInvoice', retries: 3, compensate: 'voidInvoice' }, send_email: { handler: 'sendEmail', retries: 5 }, update_analytics: { handler: 'updateAnalytics', retries: 5 }, }, dag: { validate_stock: [], validate_fraud: [], charge_card: ['validate_stock', 'validate_fraud'], emit_invoice: ['charge_card'], send_email: ['emit_invoice'], update_analytics: ['emit_invoice'], },})
await db.workflow('order-fulfillment').handlers({ validateStock: async (ctx) => { /* check inventory */ return { available: true } }, validateFraud: async (ctx) => { /* run fraud check */ return { approved: true } }, chargeCard: async (ctx) => { /* charge payment */ return { chargeId: 'ch_123' } }, refundCard: async (ctx) => { /* refund payment */ }, emitInvoice: async (ctx) => { /* create invoice */ return { invoiceId: 'inv_456' } }, voidInvoice: async (ctx) => { /* void invoice */ }, sendEmail: async (ctx) => { /* send confirmation */ }, updateAnalytics: async (ctx) => { /* record event */ },})
await db.workflow('order-fulfillment').work()
const runId = await db.workflow('order-fulfillment').run({ orderId: 'order-123', amount: 299.99,})
process.on('SIGTERM', () => db.destroy())Internal tables
Section titled “Internal tables”| Table | Purpose |
|---|---|
_pgshift_workflow_definitions | Stores workflow step config and DAG |
_pgshift_workflow_runs | One row per workflow execution |
_pgshift_workflow_steps | One row per step per run — the unit of work dispatched via SKIP LOCKED |