| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Core.SqlTemplates
Description
SQL query templates with parameter markers
Synopsis
- insertJobSQL :: Text -> Text -> Text
- insertJobReplaceSQL :: Text -> Text -> Text
- insertJobsBatchSQL :: Text -> Text -> Text
- insertJobsBatchSQL_ :: Text -> Text -> Text
- claimJobsSQL :: Text -> Text -> Int -> Int -> Text
- claimJobsBatchedSQL :: Text -> Text -> Int -> Int -> Int -> Text
- smartAckJobSQL :: Text -> Text -> Text
- setVisibilityTimeoutSQL :: Text -> Text -> Text
- setVisibilityTimeoutBatchSQL :: Text -> Text -> Text -> Text
- updateJobForRetrySQL :: Text -> Text -> Text
- moveToDLQSQL :: Text -> Text -> Text
- retryFromDLQSQL :: Text -> Text -> Text
- dlqJobExistsSQL :: Text -> Text -> Text
- deleteDLQJobSQL :: Text -> Text -> Text
- getJobByIdSQL :: Text -> Text -> Text
- cancelJobSQL :: Text -> Text -> Text
- promoteJobSQL :: Text -> Text -> Text
- getQueueStatsSQL :: Text -> Text -> Text
- ackJobsBulkSQL :: Text -> Text -> Text
- moveToDLQBatchSQL :: Text -> Text -> Text
- deleteDLQJobsBatchSQL :: Text -> Text -> Text
- pauseChildrenSQL :: Text -> Text -> Text
- resumeChildrenSQL :: Text -> Text -> Text
- cancelJobCascadeSQL :: Text -> Text -> Text
- cancelJobTreeSQL :: Text -> Text -> Text
- tryWakeAncestorSQL :: Text -> Text -> Text
- suspendJobSQL :: Text -> Text -> Text
- resumeJobSQL :: Text -> Text -> Text
- parentExistsSQL :: Text -> Text -> Text
- getParentIdSQL :: Text -> Text -> Text
- countChildrenBatchSQL :: Text -> Text -> Text
- countDLQChildrenBatchSQL :: Text -> Text -> Text
- cascadeChildrenToDLQSQL :: Text -> Text -> Text
- descendantRollupIdsSQL :: Text -> Text -> Text
- insertResultSQL :: Text -> Text -> Text
- getResultsByParentSQL :: Text -> Text -> Text
- getDLQChildErrorsByParentSQL :: Text -> Text -> Text
- persistParentStateSQL :: Text -> Text -> Text
- getParentStateSnapshotSQL :: Text -> Text -> Text
- readChildResultsSQL :: Text -> Text -> Text
- lockGroupsSQL :: Text -> Text -> Text
- refreshGroupsSQL :: Text -> Text -> Text
- checkReaperSeqSQL :: Text -> Text -> Text
- updateReaperSeqSQL :: Text -> Text -> Text
- data JobFilter
- listJobsFilteredSQL :: Text -> Text -> Text -> Text
- countJobsFilteredSQL :: Text -> Text -> Text -> Text
- listDLQFilteredSQL :: Text -> Text -> Text -> Text
- countDLQFilteredSQL :: Text -> Text -> Text -> Text
Job Queue Operations
insertJobSQL :: Text -> Text -> Text Source #
RETURNING clause with job columns | SQL template for inserting a job
Parameters (in order): payload, group_key, attempts, last_error, priority, dedup_key, dedup_strategy, max_attempts, parent_id, parent_state, suspended, not_visible_until
Returns: The inserted job row
insertJobReplaceSQL :: Text -> Text -> Text Source #
SQL template for replace deduplication strategy
Replaces existing job unless actively in-flight or has children (in either the main queue or the DLQ). A parent with no children yet can be replaced.
The groups table is maintained by triggers on the main job table.
ON CONFLICT DO UPDATE fires the UPDATE trigger, whose transition tables
contain the old and new rows -- handling cross-group moves automatically.
Parameters: same 12 as insertJobSQL
insertJobsBatchSQL :: Text -> Text -> Text Source #
SQL template for batch inserting jobs using array parameters
Uses unnest to expand parallel arrays into rows. Supports dedup keys:
jobs with IgnoreDuplicate are silently skipped on conflict, jobs with
ReplaceDuplicate update the existing row (unless actively in-flight).
Parameters (10 arrays): 1. payloads — jsonb[] 2. group_keys — text[] (with NULLs) 3. priorities — int[] 4. dedup_keys — text[] (with NULLs) 5. dedup_strategies — text[] (with NULLs) 6. max_attempts — int[] (with NULLs) 7. parent_ids — bigint[] (with NULLs) 8. parent_states — jsonb[] (with NULLs) 9. suspended — boolean[] 10. not_visible_untils — timestamptz[] (with NULLs)
claimJobsSQL :: Text -> Text -> Int -> Int -> Text Source #
Single-CTE claim for grouped jobs.
Uses the {queue}_groups table for fast candidate selection with
FOR UPDATE SKIP LOCKED for concurrency control.
The groups table row lock provides two guarantees:
SKIP LOCKED— groups currently being claimed by an in-flight transaction are skipped (no contention).EPQ re-evaluation— in READ COMMITTED,FOR UPDATEre-evaluates the WHERE clause against committed state. If a concurrent claim committed and the trigger updatedin_flight_until, we see the fresh value and correctly skip the group.
No ? parameters — all values are interpolated.
claimJobsBatchedSQL :: Text -> Text -> Int -> Int -> Int -> Text Source #
Batched single-CTE claim.
Uses the {queue}_groups table with FOR UPDATE SKIP LOCKED for
concurrency control (see claimJobsSQL for details on the EPQ
re-evaluation mechanism).
Excludes tree jobs (parent_id IS NULL AND parent_state IS NULL).
No ? parameters — all values are interpolated.
smartAckJobSQL :: Text -> Text -> Text Source #
Smart ack CTE for job dependencies.
- ack: DELETE the job only if it has no children. Returns deleted row.
- suspend: If ack returned nothing AND children exist, suspend the job (it becomes a finalizer waiting for children to complete).
- wake_parent: If ack deleted a child whose parent is suspended with no remaining siblings in the queue, resume the parent for its completion round.
Returns rows_affected (1 on success, 0 if stolen/gone).
Parameters: job_id, attempts, job_id, job_id, attempts, job_id
setVisibilityTimeoutSQL :: Text -> Text -> Text Source #
SQL template for setting visibility timeout
Parameters: timeout, job_id, attempts
Uses optimistic locking (attempts check) to prevent race conditions when another worker has reclaimed the job after visibility timeout expired.
setVisibilityTimeoutBatchSQL :: Text -> Text -> Text -> Text Source #
Atomically updates the visibility timeout for a batch of jobs and returns the detailed status of each job in a single query.
This is used for heartbeating. The query attempts to update all jobs, and then reports on which ones succeeded, which were missing (acked), and which had a different attempts count (stolen).
Parameters: timeout, then a placeholder for a VALUES list of (job_id, attempts) pairs
updateJobForRetrySQL :: Text -> Text -> Text Source #
SQL template for updating job for retry
Parameters: backoff, error, job_id, attempts
Uses optimistic locking (attempts check) to prevent race conditions when a job's visibility timeout expires and another worker claims it before the retry update completes.
moveToDLQSQL :: Text -> Text -> Text Source #
SQL template for moving job to DLQ atomically
This preserves ALL job fields (complete snapshot) plus DLQ metadata. The operation is atomic: the job is deleted from the main queue and inserted into the DLQ in a single statement. The final error message is passed as a parameter to capture the error that caused the DLQ move.
Parameters: job_id, attempts, last_error
Dead Letter Queue Operations
retryFromDLQSQL :: Text -> Text -> Text Source #
SQL template for retrying a job from DLQ (tree-aware)
Tree-aware retry behavior — retrying any member of a DLQ'd tree recovers the entire tree in a single operation:
- If the target is a child whose parent is in the DLQ (not in main queue),
the parent is auto-retried as
suspended = TRUE, and ALL DLQ'd siblings are auto-retried too. The parent waits for children to complete. - If the target is a rollup finalizer with DLQ'd children, ALL children
are auto-retried and the finalizer comes back as
suspended = TRUE(waits for children to complete). If no DLQ'd children exist, it comes back assuspended = FALSE(runs immediately with snapshot data). - Refuses to retry if the tree root's parent_id references a job that no longer exists in the main queue — prevents creating orphaned children.
Retried rollup finalizers get suspended = TRUE when they have children
being retried alongside them. dedup_key and dedup_strategy are
intentionally dropped on retry (columns omitted → NULL defaults).
Parameters: id (the DLQ primary key)
dlqJobExistsSQL :: Text -> Text -> Text Source #
Check whether a DLQ job exists by ID.
Parameters: dlq_id
deleteDLQJobSQL :: Text -> Text -> Text Source #
SQL template for deleting a DLQ job
Parameters: id (the DLQ primary key) Returns: parent_id of the deleted job (NULL if no parent)
Admin Operations
getJobByIdSQL :: Text -> Text -> Text Source #
SQL template for getting a job by ID
Parameters: job_id
Returns: Single job row if found
cancelJobSQL :: Text -> Text -> Text Source #
SQL template for canceling (deleting) a job by ID.
Refuses to delete a job that has children — use cancelJobCascadeSQL instead.
If the deleted job was a child and no siblings remain in the queue, resumes the parent for its completion round.
Returns rows_affected.
Parameters: job_id (for DELETE), job_id (for children guard)
promoteJobSQL :: Text -> Text -> Text Source #
Promote a delayed or retrying job to be immediately visible.
Refuses in-flight jobs (attempts > 0 with no last_error). Returns 0 if job doesn't exist, is already visible, or is in-flight.
getQueueStatsSQL :: Text -> Text -> Text Source #
SQL template for getting queue statistics
Returns: total_jobs, visible_jobs, oldest_job_age_seconds
Batch Operations
ackJobsBulkSQL :: Text -> Text -> Text Source #
Bulk ack for standalone jobs (no parent, no tree logic).
Deletes jobs matching both ID and attempts (optimistic locking).
Skips the full smart-ack CTE since batched-mode jobs are guaranteed
standalone (parent_id IS NULL AND parent_state IS NULL).
Parameters: Array of job IDs, array of expected attempts Returns: number of rows deleted
moveToDLQBatchSQL :: Text -> Text -> Text Source #
SQL template for moving multiple jobs to DLQ in a single operation
Uses unnest to process multiple (id, attempts, error_msg) tuples. Returns the number of jobs moved.
Parameters: Array of job IDs, array of attempts, array of error messages
deleteDLQJobsBatchSQL :: Text -> Text -> Text Source #
SQL template for deleting multiple DLQ jobs by ID
Parameters: Array of DLQ job IDs Returns: parent_id of each deleted job (NULL if no parent)
Job Dependency Operations
pauseChildrenSQL :: Text -> Text -> Text Source #
Pause all visible children of a parent job (set suspended = TRUE).
Only affects children that are currently claimable (not in-flight, not already suspended). In-flight children are left alone so their visibility timeout can expire normally if the worker crashes — pausing them would break crash recovery.
Parameters: parent_id
resumeChildrenSQL :: Text -> Text -> Text Source #
Resume all suspended children of a parent job.
Parameters: parent_id
cancelJobCascadeSQL :: Text -> Text -> Text Source #
Cancel a job and all its descendants recursively.
Parameters: job_id
cancelJobTreeSQL :: Text -> Text -> Text Source #
Cancel an entire job tree by walking up from any node to the root, then cascade-deleting everything from the root down.
Parameters: job_id
tryWakeAncestorSQL :: Text -> Text -> Text Source #
Try to wake a suspended ancestor when all its children are gone.
Resumes the parent for a completion round (sets suspended = FALSE). Only wakes if the parent is suspended and has no remaining children in the main queue.
Parameters: ancestor_id (repeated 2 times)
suspendJobSQL :: Text -> Text -> Text Source #
Suspend a job (make it unclaimable).
Only suspends non-in-flight jobs (not currently being processed by workers).
Parameters: job_id Returns: number of rows updated (0 if job doesn't exist, is in-flight, or already suspended)
resumeJobSQL :: Text -> Text -> Text Source #
Resume a suspended job (make it claimable again).
Refuses to resume a rollup finalizer that still has children in the main
queue, preventing premature handler execution. Children in the DLQ are
considered terminal — the finalizer's handler receives DLQ errors via
readChildResultsSQL and can decide how to handle them.
Parameters: job_id Returns: number of rows updated (0 if job doesn't exist, isn't suspended, or is a finalizer with remaining children in the main queue)
parentExistsSQL :: Text -> Text -> Text Source #
Check whether a parent job exists.
Parameters: parent_id Returns: single row with a boolean
getParentIdSQL :: Text -> Text -> Text Source #
Fetch just the parent_id for a given job.
Parameters: job_id Returns: single row with parent_id (NULL if no parent or job doesn't exist)
Parent-Child Operations
countChildrenBatchSQL :: Text -> Text -> Text Source #
Batch child count: returns (parent_id, total_count, suspended_count) for a set of job IDs
Parameters: array of job IDs
countDLQChildrenBatchSQL :: Text -> Text -> Text Source #
Batch DLQ child count: returns (parent_id, count) for a set of job IDs
Parameters: array of job IDs
cascadeChildrenToDLQSQL :: Text -> Text -> Text Source #
Cascade all descendants of a rollup parent to the DLQ.
Recursively finds all descendants and moves them from the main queue to the DLQ in a single operation. Used when a rollup parent is moved to DLQ to prevent orphaned children from hitting FK violations on the results table.
Parameters: parent_job_id, error_message
descendantRollupIdsSQL :: Text -> Text -> Text Source #
Find descendant rollup finalizer IDs for snapshot preservation.
Used before cascade-DLQ to identify intermediate rollup nodes that
need their results persisted into parent_state before deletion.
Parameters: parent_job_id
Results Table Operations
insertResultSQL :: Text -> Text -> Text Source #
Insert a child's result into the results table.
Parameters: parent_id (bigint), child_id (bigint), result (jsonb)
getResultsByParentSQL :: Text -> Text -> Text Source #
Get all child results for a parent from the results table.
Parameters: parent_id (bigint) Returns: rows of (child_id bigint, result jsonb)
getDLQChildErrorsByParentSQL :: Text -> Text -> Text Source #
Get DLQ child errors for a parent.
Returns rows of (job_id bigint, last_error text) for each DLQ'd child.
Parameters: parent_id (bigint)
persistParentStateSQL :: Text -> Text -> Text Source #
Snapshot results into parent_state before DLQ move.
Parameters: parent_state (jsonb), job_id (bigint)
getParentStateSnapshotSQL :: Text -> Text -> Text Source #
Read the raw parent_state snapshot from the DB.
Parameters: job_id (bigint) Returns: single row with parent_state (jsonb, may be NULL)
readChildResultsSQL :: Text -> Text -> Text Source #
Read all child result data for a rollup finalizer in a single query.
Combines results table, DLQ child errors, and parent_state snapshot
into a tagged UNION ALL. Tags: = result, r = DLQ error,
e = parent_state snapshot.s
Parameters: parent_id (bigint) × 3 Returns: tagged rows (source, child_id, result_jsonb, error_text, dlq_pk)
Groups Table Operations
lockGroupsSQL :: Text -> Text -> Text Source #
The advisory lock key expression for group serialization.
Used by both claim CTEs (pg_try_advisory_xact_lock) and insert
(pg_advisory_xact_lock) to ensure producers and consumers share
the same lock namespace.
refreshGroupsSQL :: Text -> Text -> Text Source #
Full recompute of the groups table from the main queue.
Caller must hold row locks on the groups table (via lockGroupsSQL)
to prevent trigger interleaving.
Filtered Query Operations
Filter predicates for job listing queries.
Constructors
| FilterGroupKey Text | |
| FilterParentId Int64 | |
| FilterSuspended Bool | |
| FilterInFlight |
listJobsFilteredSQL :: Text -> Text -> Text -> Text Source #
Generic SQL for listing jobs with dynamic WHERE clause.
Parameters (appended after filter params): limit, offset
countJobsFilteredSQL :: Text -> Text -> Text -> Text Source #
Generic SQL for counting jobs with dynamic WHERE clause.