arbiter-core-0.1.0.0: Core types and logic for PostgreSQL-backed job queue
Safe HaskellNone
LanguageGHC2024

Arbiter.Core.SqlTemplates

Description

SQL query templates with parameter markers

Synopsis

Job Queue Operations

insertJobSQL :: Text -> Text -> Text Source #

RETURNING clause with job columns | SQL template for inserting a job

Parameters (in order): payload, group_key, attempts, last_error, priority, dedup_key, dedup_strategy, max_attempts, parent_id, parent_state, suspended, not_visible_until

Returns: The inserted job row

insertJobReplaceSQL :: Text -> Text -> Text Source #

SQL template for replace deduplication strategy

Replaces existing job unless actively in-flight or has children (in either the main queue or the DLQ). A parent with no children yet can be replaced.

The groups table is maintained by triggers on the main job table. ON CONFLICT DO UPDATE fires the UPDATE trigger, whose transition tables contain the old and new rows -- handling cross-group moves automatically.

Parameters: same 12 as insertJobSQL

insertJobsBatchSQL :: Text -> Text -> Text Source #

SQL template for batch inserting jobs using array parameters

Uses unnest to expand parallel arrays into rows. Supports dedup keys: jobs with IgnoreDuplicate are silently skipped on conflict, jobs with ReplaceDuplicate update the existing row (unless actively in-flight).

Parameters (10 arrays): 1. payloads — jsonb[] 2. group_keys — text[] (with NULLs) 3. priorities — int[] 4. dedup_keys — text[] (with NULLs) 5. dedup_strategies — text[] (with NULLs) 6. max_attempts — int[] (with NULLs) 7. parent_ids — bigint[] (with NULLs) 8. parent_states — jsonb[] (with NULLs) 9. suspended — boolean[] 10. not_visible_untils — timestamptz[] (with NULLs)

claimJobsSQL :: Text -> Text -> Int -> Int -> Text Source #

Single-CTE claim for grouped jobs.

Uses the {queue}_groups table for fast candidate selection with FOR UPDATE SKIP LOCKED for concurrency control.

The groups table row lock provides two guarantees:

  1. SKIP LOCKED — groups currently being claimed by an in-flight transaction are skipped (no contention).
  2. EPQ re-evaluation — in READ COMMITTED, FOR UPDATE re-evaluates the WHERE clause against committed state. If a concurrent claim committed and the trigger updated in_flight_until, we see the fresh value and correctly skip the group.

No ? parameters — all values are interpolated.

claimJobsBatchedSQL :: Text -> Text -> Int -> Int -> Int -> Text Source #

Batched single-CTE claim.

Uses the {queue}_groups table with FOR UPDATE SKIP LOCKED for concurrency control (see claimJobsSQL for details on the EPQ re-evaluation mechanism). Excludes tree jobs (parent_id IS NULL AND parent_state IS NULL). No ? parameters — all values are interpolated.

smartAckJobSQL :: Text -> Text -> Text Source #

Smart ack CTE for job dependencies.

  1. ack: DELETE the job only if it has no children. Returns deleted row.
  2. suspend: If ack returned nothing AND children exist, suspend the job (it becomes a finalizer waiting for children to complete).
  3. wake_parent: If ack deleted a child whose parent is suspended with no remaining siblings in the queue, resume the parent for its completion round.

Returns rows_affected (1 on success, 0 if stolen/gone). Parameters: job_id, attempts, job_id, job_id, attempts, job_id

setVisibilityTimeoutSQL :: Text -> Text -> Text Source #

SQL template for setting visibility timeout

Parameters: timeout, job_id, attempts

Uses optimistic locking (attempts check) to prevent race conditions when another worker has reclaimed the job after visibility timeout expired.

setVisibilityTimeoutBatchSQL :: Text -> Text -> Text -> Text Source #

Atomically updates the visibility timeout for a batch of jobs and returns the detailed status of each job in a single query.

This is used for heartbeating. The query attempts to update all jobs, and then reports on which ones succeeded, which were missing (acked), and which had a different attempts count (stolen).

Parameters: timeout, then a placeholder for a VALUES list of (job_id, attempts) pairs

updateJobForRetrySQL :: Text -> Text -> Text Source #

SQL template for updating job for retry

Parameters: backoff, error, job_id, attempts

Uses optimistic locking (attempts check) to prevent race conditions when a job's visibility timeout expires and another worker claims it before the retry update completes.

moveToDLQSQL :: Text -> Text -> Text Source #

SQL template for moving job to DLQ atomically

This preserves ALL job fields (complete snapshot) plus DLQ metadata. The operation is atomic: the job is deleted from the main queue and inserted into the DLQ in a single statement. The final error message is passed as a parameter to capture the error that caused the DLQ move.

Parameters: job_id, attempts, last_error

Dead Letter Queue Operations

retryFromDLQSQL :: Text -> Text -> Text Source #

SQL template for retrying a job from DLQ (tree-aware)

Tree-aware retry behavior — retrying any member of a DLQ'd tree recovers the entire tree in a single operation:

  1. If the target is a child whose parent is in the DLQ (not in main queue), the parent is auto-retried as suspended = TRUE, and ALL DLQ'd siblings are auto-retried too. The parent waits for children to complete.
  2. If the target is a rollup finalizer with DLQ'd children, ALL children are auto-retried and the finalizer comes back as suspended = TRUE (waits for children to complete). If no DLQ'd children exist, it comes back as suspended = FALSE (runs immediately with snapshot data).
  3. Refuses to retry if the tree root's parent_id references a job that no longer exists in the main queue — prevents creating orphaned children.

Retried rollup finalizers get suspended = TRUE when they have children being retried alongside them. dedup_key and dedup_strategy are intentionally dropped on retry (columns omitted → NULL defaults).

Parameters: id (the DLQ primary key)

dlqJobExistsSQL :: Text -> Text -> Text Source #

Check whether a DLQ job exists by ID.

Parameters: dlq_id

deleteDLQJobSQL :: Text -> Text -> Text Source #

SQL template for deleting a DLQ job

Parameters: id (the DLQ primary key) Returns: parent_id of the deleted job (NULL if no parent)

Admin Operations

getJobByIdSQL :: Text -> Text -> Text Source #

SQL template for getting a job by ID

Parameters: job_id

Returns: Single job row if found

cancelJobSQL :: Text -> Text -> Text Source #

SQL template for canceling (deleting) a job by ID.

Refuses to delete a job that has children — use cancelJobCascadeSQL instead.

If the deleted job was a child and no siblings remain in the queue, resumes the parent for its completion round.

Returns rows_affected.

Parameters: job_id (for DELETE), job_id (for children guard)

promoteJobSQL :: Text -> Text -> Text Source #

Promote a delayed or retrying job to be immediately visible.

Refuses in-flight jobs (attempts > 0 with no last_error). Returns 0 if job doesn't exist, is already visible, or is in-flight.

getQueueStatsSQL :: Text -> Text -> Text Source #

SQL template for getting queue statistics

Returns: total_jobs, visible_jobs, oldest_job_age_seconds

Batch Operations

ackJobsBulkSQL :: Text -> Text -> Text Source #

Bulk ack for standalone jobs (no parent, no tree logic).

Deletes jobs matching both ID and attempts (optimistic locking). Skips the full smart-ack CTE since batched-mode jobs are guaranteed standalone (parent_id IS NULL AND parent_state IS NULL).

Parameters: Array of job IDs, array of expected attempts Returns: number of rows deleted

moveToDLQBatchSQL :: Text -> Text -> Text Source #

SQL template for moving multiple jobs to DLQ in a single operation

Uses unnest to process multiple (id, attempts, error_msg) tuples. Returns the number of jobs moved.

Parameters: Array of job IDs, array of attempts, array of error messages

deleteDLQJobsBatchSQL :: Text -> Text -> Text Source #

SQL template for deleting multiple DLQ jobs by ID

Parameters: Array of DLQ job IDs Returns: parent_id of each deleted job (NULL if no parent)

Job Dependency Operations

pauseChildrenSQL :: Text -> Text -> Text Source #

Pause all visible children of a parent job (set suspended = TRUE).

Only affects children that are currently claimable (not in-flight, not already suspended). In-flight children are left alone so their visibility timeout can expire normally if the worker crashes — pausing them would break crash recovery.

Parameters: parent_id

resumeChildrenSQL :: Text -> Text -> Text Source #

Resume all suspended children of a parent job.

Parameters: parent_id

cancelJobCascadeSQL :: Text -> Text -> Text Source #

Cancel a job and all its descendants recursively.

Parameters: job_id

cancelJobTreeSQL :: Text -> Text -> Text Source #

Cancel an entire job tree by walking up from any node to the root, then cascade-deleting everything from the root down.

Parameters: job_id

tryWakeAncestorSQL :: Text -> Text -> Text Source #

Try to wake a suspended ancestor when all its children are gone.

Resumes the parent for a completion round (sets suspended = FALSE). Only wakes if the parent is suspended and has no remaining children in the main queue.

Parameters: ancestor_id (repeated 2 times)

suspendJobSQL :: Text -> Text -> Text Source #

Suspend a job (make it unclaimable).

Only suspends non-in-flight jobs (not currently being processed by workers).

Parameters: job_id Returns: number of rows updated (0 if job doesn't exist, is in-flight, or already suspended)

resumeJobSQL :: Text -> Text -> Text Source #

Resume a suspended job (make it claimable again).

Refuses to resume a rollup finalizer that still has children in the main queue, preventing premature handler execution. Children in the DLQ are considered terminal — the finalizer's handler receives DLQ errors via readChildResultsSQL and can decide how to handle them.

Parameters: job_id Returns: number of rows updated (0 if job doesn't exist, isn't suspended, or is a finalizer with remaining children in the main queue)

parentExistsSQL :: Text -> Text -> Text Source #

Check whether a parent job exists.

Parameters: parent_id Returns: single row with a boolean

getParentIdSQL :: Text -> Text -> Text Source #

Fetch just the parent_id for a given job.

Parameters: job_id Returns: single row with parent_id (NULL if no parent or job doesn't exist)

Parent-Child Operations

countChildrenBatchSQL :: Text -> Text -> Text Source #

Batch child count: returns (parent_id, total_count, suspended_count) for a set of job IDs

Parameters: array of job IDs

countDLQChildrenBatchSQL :: Text -> Text -> Text Source #

Batch DLQ child count: returns (parent_id, count) for a set of job IDs

Parameters: array of job IDs

cascadeChildrenToDLQSQL :: Text -> Text -> Text Source #

Cascade all descendants of a rollup parent to the DLQ.

Recursively finds all descendants and moves them from the main queue to the DLQ in a single operation. Used when a rollup parent is moved to DLQ to prevent orphaned children from hitting FK violations on the results table.

Parameters: parent_job_id, error_message

descendantRollupIdsSQL :: Text -> Text -> Text Source #

Find descendant rollup finalizer IDs for snapshot preservation.

Used before cascade-DLQ to identify intermediate rollup nodes that need their results persisted into parent_state before deletion.

Parameters: parent_job_id

Results Table Operations

insertResultSQL :: Text -> Text -> Text Source #

Insert a child's result into the results table.

Parameters: parent_id (bigint), child_id (bigint), result (jsonb)

getResultsByParentSQL :: Text -> Text -> Text Source #

Get all child results for a parent from the results table.

Parameters: parent_id (bigint) Returns: rows of (child_id bigint, result jsonb)

getDLQChildErrorsByParentSQL :: Text -> Text -> Text Source #

Get DLQ child errors for a parent.

Returns rows of (job_id bigint, last_error text) for each DLQ'd child.

Parameters: parent_id (bigint)

persistParentStateSQL :: Text -> Text -> Text Source #

Snapshot results into parent_state before DLQ move.

Parameters: parent_state (jsonb), job_id (bigint)

getParentStateSnapshotSQL :: Text -> Text -> Text Source #

Read the raw parent_state snapshot from the DB.

Parameters: job_id (bigint) Returns: single row with parent_state (jsonb, may be NULL)

readChildResultsSQL :: Text -> Text -> Text Source #

Read all child result data for a rollup finalizer in a single query.

Combines results table, DLQ child errors, and parent_state snapshot into a tagged UNION ALL. Tags: r = result, e = DLQ error, s = parent_state snapshot.

Parameters: parent_id (bigint) × 3 Returns: tagged rows (source, child_id, result_jsonb, error_text, dlq_pk)

Groups Table Operations

lockGroupsSQL :: Text -> Text -> Text Source #

The advisory lock key expression for group serialization.

Used by both claim CTEs (pg_try_advisory_xact_lock) and insert (pg_advisory_xact_lock) to ensure producers and consumers share the same lock namespace.

refreshGroupsSQL :: Text -> Text -> Text Source #

Full recompute of the groups table from the main queue. Caller must hold row locks on the groups table (via lockGroupsSQL) to prevent trigger interleaving.

Filtered Query Operations

data JobFilter Source #

Filter predicates for job listing queries.

Instances

Instances details
Show JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

Eq JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

listJobsFilteredSQL :: Text -> Text -> Text -> Text Source #

Generic SQL for listing jobs with dynamic WHERE clause.

Parameters (appended after filter params): limit, offset

countJobsFilteredSQL :: Text -> Text -> Text -> Text Source #

Generic SQL for counting jobs with dynamic WHERE clause.

listDLQFilteredSQL :: Text -> Text -> Text -> Text Source #

Generic SQL for listing DLQ jobs with dynamic WHERE clause.

Parameters (appended after filter params): limit, offset

countDLQFilteredSQL :: Text -> Text -> Text -> Text Source #

Generic SQL for counting DLQ jobs with dynamic WHERE clause.