Job Queue
Durable background processing backed by PostgreSQL. Jobs survive Core restarts and are automatically retried if a worker crashes or takes too long.
How it works
- You enqueue a job via the API or from a backend handler.
- The job is immediately persisted in PostgreSQL (via pgmq).
- The scheduler polls the queue every 500ms and dispatches the job to the appropriate backend worker via the
onJoblifecycle hook. - The worker processes the job and returns a result or throws an error.
- On success: the job is archived. On failure: the job is deleted.
If the worker crashes or does not acknowledge the job within the visibility timeout (120 seconds), the job becomes visible again and is re-dispatched. Handlers should be idempotent because a job may be delivered more than once.
Job lifecycle
| Status | Meaning |
|---|---|
| Queued | Persisted in the queue, waiting for the scheduler to pick it up. |
| Processing | Dispatched to a backend worker. Invisible in the queue for 120 seconds. |
| Completed | Handler returned successfully. Job archived (queryable via ?archived=true). |
| Failed | Handler threw an error. Job deleted from the queue. |
Enqueue a job
POST /api/v1/apps/{appId}/jobs
{ "payload": { "type": "send_report", "userId": "..." } }
Response (201):
{ "msg_id": 42 }
The job is persisted immediately. The scheduler is woken instantly and dispatches the job without waiting for the next poll cycle.
Optional user_id
You can specify which user the job runs as:
{ "payload": { "type": "send_report" }, "user_id": "target-user-uuid" }
If user_id differs from the authenticated caller, admin permission is required. The caller object in the worker's onJob hook will contain this user's identity and a fresh JWT token.
Handle jobs in your backend
Jobs arrive in the onJob lifecycle hook:
serve({}, {
onJob: async (payload, caller) => {
if (payload.type === "send_report") {
// caller.userId, caller.email, caller.authToken available
await generateAndSendReport(payload.userId);
return { sent: true };
}
},
});
- Return a value to mark the job as completed (archived).
- Throw an error to mark the job as failed (deleted).
- The
callerparameter contains the user who enqueued the job (userId, email, authToken), or a system identity if no user was specified.
List jobs
GET /api/v1/apps/{appId}/jobs
Returns active jobs in the queue (not yet completed or failed).
GET /api/v1/apps/{appId}/jobs?archived=true
Returns completed (archived) jobs.
Optional parameter: limit (default 100, max 1000).
Job object
{
"msg_id": 42,
"app_id": "crm",
"payload": { "type": "send_report", "userId": "..." },
"user_id": "3fa85f64-...",
"read_ct": 1,
"enqueued_at": "2024-12-01T10:30:00Z"
}
| Field | Description |
|---|---|
msg_id |
Unique message ID in the queue. |
app_id |
Which app this job belongs to. |
payload |
The data you passed when enqueueing. |
user_id |
The user who enqueued the job (or null for system jobs). |
read_ct |
Number of times this job has been read (delivery attempts). |
enqueued_at |
When the job was created. |
Enqueue from a backend handler
From inside an RPC handler, enqueue a job via the Core API using the ctx object (third argument):
serve({
closeDeal: async (params, caller, ctx) => {
// Enqueue background work instead of blocking the RPC response
await fetch(`${ctx.runtimeUrl}/api/v1/apps/${ctx.appId}/jobs`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ payload: { type: "deal_won", dealId: params.dealId } }),
});
return { queued: true };
},
});
Visibility timeout
| Property | Value |
|---|---|
| Visibility timeout | 120 seconds |
| Behavior | After dispatch, the job is invisible in the queue for 120 seconds. If the worker does not acknowledge (complete or fail) within that window, the job becomes visible again and is re-dispatched. |
| Implication | Handlers must be idempotent. A job may be delivered more than once. |
Scheduler
The scheduler is an internal loop that polls the pgmq queue every 500ms. When a job is read:
- If the job has
_hook: trueandaction_type: "agent": it is dispatched as an agent invocation (entity trigger). - If the job has
cron_idand the app is an agent: it is dispatched as an agent invocation with the payload'smessagefield. - Otherwise: it is dispatched to the app's worker via the
onJobhook.
The scheduler can also be woken immediately (bypass the 500ms poll) when a new job is enqueued.
Crons and jobs
Scheduled jobs (crons) use the same pgmq queue. When a cron fires, pg_cron enqueues a job with a cron_id field in the payload. The scheduler picks it up and dispatches it to the worker's onJob hook, same as a manually enqueued job.
Your worker does not need to distinguish between cron jobs and manual jobs unless you want to. The payload tells it what to do.
See Scheduled Jobs (Crons) for cron-specific configuration.
Technical details
| Property | Value |
|---|---|
| Queue backend | pgmq (PostgreSQL extension) |
| Queue name | jobs |
| Queue table | pgmq.q_jobs (active), pgmq.a_jobs (archived) |
| Scheduler poll interval | 500ms |
| Visibility timeout | 120 seconds |
| On success | pgmq.archive() (moves to archive table) |
| On failure | pgmq.delete() (removed permanently) |
| Persistence | Jobs survive Core restarts (PostgreSQL-backed) |
API endpoints summary
| Method | Path | Auth | Description |
|---|---|---|---|
| POST | /api/v1/apps/{appId}/jobs |
Yes | Enqueue a job |
| GET | /api/v1/apps/{appId}/jobs |
Yes | List active jobs |
| GET | /api/v1/apps/{appId}/jobs?archived=true |
Yes | List completed (archived) jobs |