Cron
Recurring tasks are everywhere. Send weekly digests. Clean up expired records. Generate monthly reports. Most teams pay for an external scheduler or wire up their own cron infrastructure. If you already have Postgres and a queue, you have everything you need.
@pgshift/cron uses the pg_cron extension to schedule recurring jobs. When a job fires, PgShift inserts a payload into a @pgshift/queue table so your existing worker processes it. No extra process. No new service. The same Postgres you already run.
Requirements
Section titled “Requirements”@pgshift/cron works together with @pgshift/queue. You need both installed and configured.
npm install @pgshift/cron @pgshift/queue-
Create the cron client
import { createClient } from '@pgshift/cron'const cron = createClient({url: process.env.DATABASE_URL,queue: 'tasks', // default queue for all cron jobs}) -
Enable pg_cron
await cron.cron.setup()This runs
CREATE EXTENSION IF NOT EXISTS pg_cron. Requires superuser orrds_superuserprivileges. Safe to call on every startup. -
Create the queue
import { createClient as createQueueClient } from '@pgshift/queue'const queue = createQueueClient({ url: process.env.DATABASE_URL })await queue.queue('tasks').setup() -
Schedule your jobs and start the worker
import { schedule } from '@pgshift/cron'await cron.cron('cleanup').schedule(schedule.daily({ hour: 0 }), {payload: { type: 'cleanup-sessions' },})await queue.queue('tasks').process(async (job) => {const { type } = job.payload as { type: string }if (type === 'cleanup-sessions') await cleanupSessions()})
db.cron.setup()
Section titled “db.cron.setup()”Ensures the pg_cron extension is installed. Idempotent, safe to call on every startup.
await cron.cron.setup()db.cron(name).schedule(expr, options)
Section titled “db.cron(name).schedule(expr, options)”Creates or replaces a cron job. When the job fires, a payload is inserted into the target queue.
await cron.cron('weekly-digest').schedule(schedule.weekly({ day: 'monday', hour: 8 }), { payload: { type: 'weekly-digest' },})Use the queue option to send a specific job to a different queue:
await cron.cron('monthly-report').schedule(schedule.monthly({ day: 1, hour: 9 }), { queue: 'reports', payload: { type: 'monthly-report' },})Raw cron strings are also accepted:
await cron.cron('every-five-minutes').schedule('*/5 * * * *', { payload: { type: 'poll' },})| Option | Type | Description |
|---|---|---|
payload | object | Arbitrary JSON passed to the queue job |
queue | string | Queue to push into. Defaults to the client-level queue |
db.cron(name).unschedule()
Section titled “db.cron(name).unschedule()”Removes a scheduled job by name.
await cron.cron('weekly-digest').unschedule()db.cron.list()
Section titled “db.cron.list()”Returns all PgShift-managed cron jobs.
const jobs = await cron.cron.list()interface CronJobInfo { name: string schedule: string active: boolean jobId: number}Schedule builder
Section titled “Schedule builder”The schedule helper builds cron expressions from readable options.
import { schedule } from '@pgshift/cron'
schedule.every({ minutes: 5 }) // "*/5 * * * *"schedule.every({ hours: 2 }) // "0 */2 * * *"schedule.hourly({ minute: 30 }) // "30 * * * *"schedule.daily({ hour: 8 }) // "0 8 * * *"schedule.daily({ hour: 8, minute: 30 }) // "30 8 * * *"schedule.weekly({ day: 'monday', hour: 9 }) // "0 9 * * 1"schedule.monthly({ day: 1, hour: 0 }) // "0 0 1 * *"schedule.every(options)
Section titled “schedule.every(options)”| Option | Type | Description |
|---|---|---|
minutes | number | Run every N minutes |
hours | number | Run every N hours |
schedule.hourly(options?)
Section titled “schedule.hourly(options?)”| Option | Type | Default | Description |
|---|---|---|---|
minute | number | 0 | Minute of the hour to run |
schedule.daily(options?)
Section titled “schedule.daily(options?)”| Option | Type | Default | Description |
|---|---|---|---|
hour | number | 0 | Hour of the day in UTC |
minute | number | 0 | Minute of the hour |
schedule.weekly(options)
Section titled “schedule.weekly(options)”| Option | Type | Default | Description |
|---|---|---|---|
day | DayName | required | Day of the week ('monday', 'tuesday', etc.) |
hour | number | 0 | Hour of the day in UTC |
minute | number | 0 | Minute of the hour |
schedule.monthly(options)
Section titled “schedule.monthly(options)”| Option | Type | Default | Description |
|---|---|---|---|
day | number | required | Day of the month (1 to 31) |
hour | number | 0 | Hour of the day in UTC |
minute | number | 0 | Minute of the hour |
Complete example
Section titled “Complete example”import { createClient, schedule } from '@pgshift/cron'import { createClient as createQueueClient } from '@pgshift/queue'
const DATABASE_URL = process.env.DATABASE_URL
const cron = createClient({ url: DATABASE_URL, queue: 'tasks' })const queue = createQueueClient({ url: DATABASE_URL })
// Setupawait cron.cron.setup()await queue.queue('tasks').setup()
// Schedule recurring jobsawait cron.cron('cleanup').schedule(schedule.daily({ hour: 0 }), { payload: { type: 'cleanup-sessions' },})
await cron.cron('weekly-digest').schedule(schedule.weekly({ day: 'monday', hour: 8 }), { payload: { type: 'weekly-digest' },})
await cron.cron('monthly-report').schedule(schedule.monthly({ day: 1, hour: 9 }), { queue: 'reports', payload: { type: 'monthly-report' },})
// Worker processes jobs when they fireawait queue.queue('tasks').process(async (job) => { const { type } = job.payload as { type: string }
switch (type) { case 'cleanup-sessions': await cleanupExpiredSessions() break case 'weekly-digest': await sendWeeklyDigest() break }})
// Graceful shutdownprocess.on('SIGTERM', async () => { await cron.destroy() await queue.destroy() process.exit(0)})All schedules are UTC
Section titled “All schedules are UTC”pg_cron runs in UTC. Convert your intended local times before scheduling.