DocsPlatformJob Queue

Job Queue

Durable background processing backed by PostgreSQL. Jobs survive Core restarts and are automatically retried if a worker crashes or takes too long.


How it works

  1. You enqueue a job via the API or from a backend handler.
  2. The job is immediately persisted in PostgreSQL (via pgmq).
  3. The scheduler polls the queue every 500ms and dispatches the job to the appropriate backend worker via the onJob lifecycle hook.
  4. The worker processes the job and returns a result or throws an error.
  5. On success: the job is archived. On failure: the job is deleted.

If the worker crashes or does not acknowledge the job within the visibility timeout (120 seconds), the job becomes visible again and is re-dispatched. Handlers should be idempotent because a job may be delivered more than once.


Job lifecycle

Status Meaning
Queued Persisted in the queue, waiting for the scheduler to pick it up.
Processing Dispatched to a backend worker. Invisible in the queue for 120 seconds.
Completed Handler returned successfully. Job archived (queryable via ?archived=true).
Failed Handler threw an error. Job deleted from the queue.

Enqueue a job

POST /api/v1/apps/{appId}/jobs
{ "payload": { "type": "send_report", "userId": "..." } }

Response (201):

{ "msg_id": 42 }

The job is persisted immediately. The scheduler is woken instantly and dispatches the job without waiting for the next poll cycle.

Optional user_id

You can specify which user the job runs as:

{ "payload": { "type": "send_report" }, "user_id": "target-user-uuid" }

If user_id differs from the authenticated caller, admin permission is required. The caller object in the worker's onJob hook will contain this user's identity and a fresh JWT token.


Handle jobs in your backend

Jobs arrive in the onJob lifecycle hook:

serve({}, {
  onJob: async (payload, caller) => {
    if (payload.type === "send_report") {
      // caller.userId, caller.email, caller.authToken available
      await generateAndSendReport(payload.userId);
      return { sent: true };
    }
  },
});
  • Return a value to mark the job as completed (archived).
  • Throw an error to mark the job as failed (deleted).
  • The caller parameter contains the user who enqueued the job (userId, email, authToken), or a system identity if no user was specified.

List jobs

GET /api/v1/apps/{appId}/jobs

Returns active jobs in the queue (not yet completed or failed).

GET /api/v1/apps/{appId}/jobs?archived=true

Returns completed (archived) jobs.

Optional parameter: limit (default 100, max 1000).

Job object

{
  "msg_id": 42,
  "app_id": "crm",
  "payload": { "type": "send_report", "userId": "..." },
  "user_id": "3fa85f64-...",
  "read_ct": 1,
  "enqueued_at": "2024-12-01T10:30:00Z"
}
Field Description
msg_id Unique message ID in the queue.
app_id Which app this job belongs to.
payload The data you passed when enqueueing.
user_id The user who enqueued the job (or null for system jobs).
read_ct Number of times this job has been read (delivery attempts).
enqueued_at When the job was created.

Enqueue from a backend handler

From inside an RPC handler, enqueue a job via the Core API using the ctx object (third argument):

serve({
  closeDeal: async (params, caller, ctx) => {
    // Enqueue background work instead of blocking the RPC response
    await fetch(`${ctx.runtimeUrl}/api/v1/apps/${ctx.appId}/jobs`, {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ payload: { type: "deal_won", dealId: params.dealId } }),
    });
    return { queued: true };
  },
});

Visibility timeout

Property Value
Visibility timeout 120 seconds
Behavior After dispatch, the job is invisible in the queue for 120 seconds. If the worker does not acknowledge (complete or fail) within that window, the job becomes visible again and is re-dispatched.
Implication Handlers must be idempotent. A job may be delivered more than once.

Scheduler

The scheduler is an internal loop that polls the pgmq queue every 500ms. When a job is read:

  1. If the job has _hook: true and action_type: "agent": it is dispatched as an agent invocation (entity trigger).
  2. If the job has cron_id and the app is an agent: it is dispatched as an agent invocation with the payload's message field.
  3. Otherwise: it is dispatched to the app's worker via the onJob hook.

The scheduler can also be woken immediately (bypass the 500ms poll) when a new job is enqueued.


Crons and jobs

Scheduled jobs (crons) use the same pgmq queue. When a cron fires, pg_cron enqueues a job with a cron_id field in the payload. The scheduler picks it up and dispatches it to the worker's onJob hook, same as a manually enqueued job.

Your worker does not need to distinguish between cron jobs and manual jobs unless you want to. The payload tells it what to do.

See Scheduled Jobs (Crons) for cron-specific configuration.


Technical details

Property Value
Queue backend pgmq (PostgreSQL extension)
Queue name jobs
Queue table pgmq.q_jobs (active), pgmq.a_jobs (archived)
Scheduler poll interval 500ms
Visibility timeout 120 seconds
On success pgmq.archive() (moves to archive table)
On failure pgmq.delete() (removed permanently)
Persistence Jobs survive Core restarts (PostgreSQL-backed)

API endpoints summary

Method Path Auth Description
POST /api/v1/apps/{appId}/jobs Yes Enqueue a job
GET /api/v1/apps/{appId}/jobs Yes List active jobs
GET /api/v1/apps/{appId}/jobs?archived=true Yes List completed (archived) jobs