State
Every application has business rules. Most of them live in the API layer.
The problem is that the API is not the only path into your database. Scripts bypass it. Migrations bypass it. Admins with psql bypass it. Internal workers with their own database connections bypass it. Every one of those paths is an opportunity for data to end up in an invalid state.
@pgshift/state moves the rules down into the database itself, via triggers. It does not matter who writes to the table — the database enforces the rules on every write, from every source, unconditionally.
Install
Section titled “Install”npm install @pgshift/stateFour independent capabilities
Section titled “Four independent capabilities”Each method installs an independent trigger. Use only what you need, in any order, on any table.
| Method | What it does | Postgres primitive |
|---|---|---|
.define() | Enforces valid state transitions | BEFORE UPDATE trigger |
.normalize() | Normalizes field values on every write | BEFORE INSERT OR UPDATE trigger |
.audit() | Writes an immutable log of every change | AFTER INSERT OR UPDATE trigger |
.consensus() | Blocks a transition until N approvals are recorded | BEFORE UPDATE trigger |
import { createClient } from '@pgshift/state'
const db = createClient({ url: process.env.DATABASE_URL }).define() — State machine
Section titled “.define() — State machine”Installs a trigger that enforces valid state transitions. Invalid transitions are rejected at the database level with a clear error message, regardless of where the write originates.
await db.state('loans').define({ field: 'status', states: ['pending', 'approved', 'rejected', 'paid'], transitions: { pending: ['approved', 'rejected'], approved: ['paid'], rejected: [], paid: [], }, initial: 'pending',})| Option | Type | Description |
|---|---|---|
field | string | The column that holds the state value |
states | string[] | All valid state values |
transitions | Record<string, string[]> | Allowed transitions per state. Empty array means terminal state. |
initial | string | Default value set on INSERT if the field is null |
What gets blocked:
-- loan is 'approved' — this is rejectedUPDATE loans SET status = 'pending' WHERE id = '123';-- ERROR: [PgShift] Invalid state transition on table "loans": "approved" -> "pending" is not allowed.
-- loan is 'paid' — terminal state, nothing is allowedUPDATE loans SET status = 'approved' WHERE id = '123';-- ERROR: [PgShift] Invalid state transition on table "loans": "paid" -> "approved" is not allowed..normalize() — Data normalization
Section titled “.normalize() — Data normalization”Installs a trigger that normalizes field values on every INSERT or UPDATE. PgShift ships with built-in normalizers for common cases, and accepts any SQL expression for custom rules.
import { createClient, normalizers } from '@pgshift/state'
await db.state('users').normalize({ email: normalizers.email, // LOWER(TRIM(value)) name: normalizers.name, // TRIM + collapse multiple spaces phone: normalizers.phone, // remove non-digit characters})Built-in normalizers:
| Name | SQL expression | Example |
|---|---|---|
normalizers.email | LOWER(TRIM(value)) | ' USER@EXAMPLE.COM ' → 'user@example.com' |
normalizers.name | TRIM + collapse spaces | ' John Doe ' → 'John Doe' |
normalizers.phone | remove non-digits | '(11) 99999-8888' → '11999998888' |
normalizers.trim | TRIM(value) | ' hello ' → 'hello' |
normalizers.lowercase | LOWER(value) | 'Hello' → 'hello' |
normalizers.uppercase | UPPER(value) | 'hello' → 'HELLO' |
Custom SQL expressions using {value} as placeholder:
await db.state('products').normalize({ slug: "LOWER(REGEXP_REPLACE(TRIM({value}), '[^a-z0-9]+', '-', 'g'))", price: 'ROUND({value}::NUMERIC, 2)',})Why this matters:
// API always trims and lowercases emailawait pool.query(`INSERT INTO users (email) VALUES ($1)`, [' USER@EXAMPLE.COM '])// stored as: user@example.com ✓
// Direct SQL from a migration or admin — same resultawait pool.query(`INSERT INTO users (email) VALUES (' ADMIN@EXAMPLE.COM ')`)// stored as: admin@example.com ✓.audit() — Immutable audit log
Section titled “.audit() — Immutable audit log”Installs a trigger that writes an immutable entry to _pgshift_state_audit for every change. Track all columns or specify which fields matter.
// Track specific fieldsawait db.state('loans').audit({ track: ['status', 'amount'],})
// Track all columnsawait db.state('users').audit()| Option | Type | Default | Description |
|---|---|---|---|
track | string[] | all columns | Fields to include in the audit log |
Reading the history:
const history = await db.state('loans').history('loan-123')
history.forEach((entry) => { console.log(`${entry.field}: ${entry.fromValue} → ${entry.toValue}`) console.log(`changed at: ${entry.changedAt.toISOString()}`)})interface StateHistoryEntry { id: string entityId: string field: string fromValue: string | null // null on INSERT toValue: string changedBy: string | null changedAt: Date}The audit table is append-only. No rows are ever deleted.
.consensus() — Consensus gate
Section titled “.consensus() — Consensus gate”Installs a trigger that blocks a specific transition until the required number of approvals have been recorded. Optionally applies only when a SQL condition is true.
await db.state('loans').consensus({ transition: 'approved', require: 2, roles: ['finance', 'manager'], when: 'NEW.amount > 10000000', // only for loans over 10M})| Option | Type | Description |
|---|---|---|
transition | string | The target state that requires consensus |
require | number | Number of approvals required |
roles | string[] | Optional. Which roles are allowed to approve. |
when | string | Optional SQL condition. If false, consensus is skipped. |
Recording approvals:
await db.state('loans').approve('loan-123', { by: 'alice', role: 'finance',})
await db.state('loans').approve('loan-123', { by: 'bob', role: 'manager',})
// Now the transition is allowedawait pool.query(`UPDATE loans SET status = 'approved' WHERE id = 'loan-123'`)Inspecting pending approvals:
const approvals = await db.state('loans').pendingApprovals('loan-123')
approvals.forEach((a) => { console.log(`${a.approvedBy} (${a.role}) approved at ${a.approvedAt.toISOString()}`)})The when condition is evaluated inside the trigger with access to NEW.* — the full row after the update. This means you can gate consensus on any column value:
// Only large expenses require dual approvalwhen: 'NEW.amount > 10000000'
// Only production deployments require approvalwhen: "NEW.environment = 'production'"
// Always require approval// omit `when` entirelyCombining capabilities
Section titled “Combining capabilities”Each method is independent and composable. Any combination works.
await db.state('loans') .define({ field: 'status', states: ['pending', 'approved', 'rejected'], transitions: { pending: ['approved', 'rejected'], approved: [], rejected: [] }, }) .audit({ track: ['status'] })await db.state('users').normalize({ email: normalizers.email, name: normalizers.name,})await db.state('loans') .normalize({ amount: 'ABS({value})' }) .define({ field: 'status', states: ['pending', 'approved', 'rejected', 'paid'], transitions: { pending: ['approved', 'rejected'], approved: ['paid'], rejected: [], paid: [], }, initial: 'pending', }) .audit({ track: ['status', 'amount'] }) .consensus({ transition: 'approved', require: 2, roles: ['finance', 'manager'], when: 'NEW.amount > 10000000', })Complete example
Section titled “Complete example”import { createClient, normalizers } from '@pgshift/state'import { Pool } from 'pg'
const DATABASE_URL = process.env.DATABASE_URLconst pool = new Pool({ connectionString: DATABASE_URL })const db = createClient({ url: DATABASE_URL })
// Create the loans tableawait pool.query(` CREATE TABLE IF NOT EXISTS loans ( id TEXT PRIMARY KEY DEFAULT gen_random_uuid()::TEXT, amount NUMERIC NOT NULL, status TEXT NOT NULL DEFAULT 'pending' )`)
// Install all capabilitiesawait db.state('loans') .define({ field: 'status', states: ['pending', 'approved', 'rejected', 'paid'], transitions: { pending: ['approved', 'rejected'], approved: ['paid'], rejected: [], paid: [], }, initial: 'pending', }) .audit({ track: ['status', 'amount'] }) .consensus({ transition: 'approved', require: 2, roles: ['finance', 'manager'], when: 'NEW.amount > 10000000', })
// Create a large loanconst { rows } = await pool.query( `INSERT INTO loans (amount) VALUES (15000000) RETURNING id`,)const loanId = rows[0].id
// Approve itawait db.state('loans').approve(loanId, { by: 'alice', role: 'finance' })await db.state('loans').approve(loanId, { by: 'bob', role: 'manager' })
await pool.query(`UPDATE loans SET status = 'approved' WHERE id = $1`, [loanId])
// Inspect historyconst history = await db.state('loans').history(loanId)console.log('Audit trail:')history.forEach((h) => console.log(` ${h.field}: ${h.fromValue} → ${h.toValue}`))
await db.destroy()await pool.end()Internal tables
Section titled “Internal tables”| Table | Purpose |
|---|---|
_pgshift_state_audit | Shared append-only audit log for all audited tables |
_pgshift_consensus_{table}_{transition} | Approval records per table and transition |