{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module Arbiter.Core.SqlTemplates
(
insertJobSQL
, insertJobReplaceSQL
, insertJobsBatchSQL
, insertJobsBatchSQL_
, claimJobsSQL
, claimJobsBatchedSQL
, smartAckJobSQL
, setVisibilityTimeoutSQL
, setVisibilityTimeoutBatchSQL
, updateJobForRetrySQL
, moveToDLQSQL
, retryFromDLQSQL
, dlqJobExistsSQL
, deleteDLQJobSQL
, getJobByIdSQL
, cancelJobSQL
, promoteJobSQL
, getQueueStatsSQL
, ackJobsBulkSQL
, moveToDLQBatchSQL
, deleteDLQJobsBatchSQL
, pauseChildrenSQL
, resumeChildrenSQL
, cancelJobCascadeSQL
, cancelJobTreeSQL
, tryWakeAncestorSQL
, suspendJobSQL
, resumeJobSQL
, parentExistsSQL
, getParentIdSQL
, countChildrenBatchSQL
, countDLQChildrenBatchSQL
, cascadeChildrenToDLQSQL
, descendantRollupIdsSQL
, insertResultSQL
, getResultsByParentSQL
, getDLQChildErrorsByParentSQL
, persistParentStateSQL
, getParentStateSnapshotSQL
, readChildResultsSQL
, lockGroupsSQL
, refreshGroupsSQL
, checkReaperSeqSQL
, updateReaperSeqSQL
, JobFilter (..)
, listJobsFilteredSQL
, countJobsFilteredSQL
, listDLQFilteredSQL
, countDLQFilteredSQL
) where
import Data.Int (Int64)
import Data.Text (Text)
import Data.Text qualified as T
import NeatInterpolation (text)
import Arbiter.Core.Codec (codecColumns, dlqRowCodec, jobRowCodec)
import Arbiter.Core.Job.Schema
( jobQueueDLQTable
, jobQueueGroupsTable
, jobQueueReaperSeq
, jobQueueResultsTable
, jobQueueTable
)
data JobFilter
= FilterGroupKey Text
| FilterParentId Int64
| FilterSuspended Bool
| FilterInFlight
deriving stock (JobFilter -> JobFilter -> Bool
(JobFilter -> JobFilter -> Bool)
-> (JobFilter -> JobFilter -> Bool) -> Eq JobFilter
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: JobFilter -> JobFilter -> Bool
== :: JobFilter -> JobFilter -> Bool
$c/= :: JobFilter -> JobFilter -> Bool
/= :: JobFilter -> JobFilter -> Bool
Eq, Int -> JobFilter -> ShowS
[JobFilter] -> ShowS
JobFilter -> String
(Int -> JobFilter -> ShowS)
-> (JobFilter -> String)
-> ([JobFilter] -> ShowS)
-> Show JobFilter
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> JobFilter -> ShowS
showsPrec :: Int -> JobFilter -> ShowS
$cshow :: JobFilter -> String
show :: JobFilter -> String
$cshowList :: [JobFilter] -> ShowS
showList :: [JobFilter] -> ShowS
Show)
listJobsFilteredSQL :: Text -> Text -> Text -> Text
listJobsFilteredSQL :: Text -> Text -> Text -> Text
listJobsFilteredSQL Text
schema Text
tableName Text
whereClause =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
columns :: Text
columns = Maybe Text -> Text
jobColumns Maybe Text
forall a. Maybe a
Nothing
in [text|
SELECT ${columns}
FROM ${tbl}
${whereClause}
ORDER BY id DESC LIMIT ? OFFSET ?
|]
countJobsFilteredSQL :: Text -> Text -> Text -> Text
countJobsFilteredSQL :: Text -> Text -> Text -> Text
countJobsFilteredSQL Text
schema Text
tableName Text
whereClause =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|SELECT COUNT(*) FROM ${tbl} ${whereClause}|]
listDLQFilteredSQL :: Text -> Text -> Text -> Text
listDLQFilteredSQL :: Text -> Text -> Text -> Text
listDLQFilteredSQL Text
schema Text
tableName Text
whereClause =
let dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
columns :: Text
columns = Text -> [Text] -> Text
T.intercalate Text
", " [Text]
allDLQColumns
in [text|
SELECT ${columns}
FROM ${dlqTbl}
${whereClause}
ORDER BY failed_at DESC
LIMIT ? OFFSET ?
|]
countDLQFilteredSQL :: Text -> Text -> Text -> Text
countDLQFilteredSQL :: Text -> Text -> Text -> Text
countDLQFilteredSQL Text
schema Text
tableName Text
whereClause =
let dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
in [text|SELECT COUNT(*) FROM ${dlqTbl} ${whereClause}|]
allJobColumns :: [Text]
allJobColumns :: [Text]
allJobColumns = RowCodec (Job Value Int64 Text UTCTime) -> [Text]
forall a. RowCodec a -> [Text]
codecColumns (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
"")
allDLQColumns :: [Text]
allDLQColumns :: [Text]
allDLQColumns = RowCodec (Int64, UTCTime, Job Value Int64 Text UTCTime) -> [Text]
forall a. RowCodec a -> [Text]
codecColumns (Text -> RowCodec (Int64, UTCTime, Job Value Int64 Text UTCTime)
dlqRowCodec Text
"")
jobColsExceptError :: Text
jobColsExceptError :: Text
jobColsExceptError = Text -> [Text] -> Text
T.intercalate Text
", " ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ (Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Text
"last_error") (Int -> [Text] -> [Text]
forall a. Int -> [a] -> [a]
drop Int
1 [Text]
allJobColumns)
jobColumns :: Maybe Text -> Text
jobColumns :: Maybe Text -> Text
jobColumns Maybe Text
mAlias = Text -> [Text] -> Text
T.intercalate Text
", " ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
withAlias [Text]
allJobColumns
where
withAlias :: Text -> Text
withAlias Text
name = Text -> (Text -> Text) -> Maybe Text -> Text
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Text
name (\Text
alias -> Text
alias Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
name) Maybe Text
mAlias
insertJobSQL :: Text -> Text -> Text
insertJobSQL :: Text -> Text -> Text
insertJobSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
columns :: Text
columns = Maybe Text -> Text
jobColumns Maybe Text
forall a. Maybe a
Nothing
in [text|
INSERT INTO ${tbl} (payload, group_key, attempts, last_error, priority, dedup_key, dedup_strategy, max_attempts, parent_id, parent_state, suspended, not_visible_until)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (dedup_key) WHERE dedup_key IS NOT NULL DO NOTHING
RETURNING ${columns}
|]
insertJobReplaceSQL :: Text -> Text -> Text
insertJobReplaceSQL :: Text -> Text -> Text
insertJobReplaceSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
columns :: Text
columns = Maybe Text -> Text
jobColumns Maybe Text
forall a. Maybe a
Nothing
in [text|
INSERT INTO ${tbl} (payload, group_key, attempts, last_error, priority, dedup_key, dedup_strategy, max_attempts, parent_id, parent_state, suspended, not_visible_until)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (dedup_key) WHERE dedup_key IS NOT NULL DO UPDATE SET
payload = EXCLUDED.payload,
group_key = EXCLUDED.group_key,
attempts = 0,
last_error = NULL,
priority = EXCLUDED.priority,
dedup_strategy = EXCLUDED.dedup_strategy,
max_attempts = EXCLUDED.max_attempts,
updated_at = NOW(),
parent_id = EXCLUDED.parent_id,
parent_state = EXCLUDED.parent_state,
suspended = EXCLUDED.suspended,
not_visible_until = EXCLUDED.not_visible_until,
last_attempted_at = NULL
WHERE (${tbl}.attempts = 0
OR ${tbl}.not_visible_until IS NULL
OR ${tbl}.not_visible_until <= NOW()
OR ${tbl}.last_error IS NOT NULL)
AND NOT EXISTS (SELECT 1 FROM ${tbl} c WHERE c.parent_id = ${tbl}.id)
AND NOT EXISTS (SELECT 1 FROM ${dlqTbl} d WHERE d.parent_id = ${tbl}.id)
RETURNING ${columns}
|]
insertJobsBatchSQL :: Text -> Text -> Text
insertJobsBatchSQL :: Text -> Text -> Text
insertJobsBatchSQL Text
schema Text
tableName =
let columns :: Text
columns = Maybe Text -> Text
jobColumns Maybe Text
forall a. Maybe a
Nothing
returning :: Text
returning = Text
"RETURNING " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
columns
in Text -> Text -> Text -> Text
insertJobsBatchBase Text
schema Text
tableName Text
returning
insertJobsBatchSQL_ :: Text -> Text -> Text
insertJobsBatchSQL_ :: Text -> Text -> Text
insertJobsBatchSQL_ Text
schema Text
tableName =
Text -> Text -> Text -> Text
insertJobsBatchBase Text
schema Text
tableName Text
""
insertJobsBatchBase :: Text -> Text -> Text -> Text
insertJobsBatchBase :: Text -> Text -> Text -> Text
insertJobsBatchBase Text
schema Text
tableName Text
returning =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
in [text|
INSERT INTO ${tbl} (payload, group_key, attempts, last_error, priority, dedup_key, dedup_strategy, max_attempts, parent_id, parent_state, suspended, not_visible_until)
SELECT
payload, group_key, 0, NULL, priority, dedup_key, dedup_strategy, max_attempts, parent_id,
parent_state, suspended, not_visible_until
FROM (
SELECT
unnest(?::jsonb[]) AS payload,
unnest(?::text[]) AS group_key,
unnest(?::int[]) AS priority,
unnest(?::text[]) AS dedup_key,
unnest(?::text[]) AS dedup_strategy,
unnest(?::int[]) AS max_attempts,
unnest(?::bigint[]) AS parent_id,
unnest(?::jsonb[]) AS parent_state,
unnest(?::boolean[]) AS suspended,
unnest(?::timestamptz[]) AS not_visible_until
) src
WHERE (src.parent_id IS NULL
OR EXISTS (SELECT 1 FROM ${tbl} p WHERE p.id = src.parent_id))
ON CONFLICT (dedup_key) WHERE dedup_key IS NOT NULL DO UPDATE SET
payload = EXCLUDED.payload,
group_key = EXCLUDED.group_key,
attempts = 0,
last_error = NULL,
priority = EXCLUDED.priority,
dedup_strategy = EXCLUDED.dedup_strategy,
max_attempts = EXCLUDED.max_attempts,
updated_at = NOW(),
parent_id = EXCLUDED.parent_id,
parent_state = EXCLUDED.parent_state,
suspended = EXCLUDED.suspended,
not_visible_until = EXCLUDED.not_visible_until,
last_attempted_at = NULL
WHERE EXCLUDED.dedup_strategy = 'replace'
AND (${tbl}.attempts = 0
OR ${tbl}.not_visible_until IS NULL
OR ${tbl}.not_visible_until <= NOW()
OR ${tbl}.last_error IS NOT NULL)
AND NOT EXISTS (SELECT 1 FROM ${tbl} c WHERE c.parent_id = ${tbl}.id)
AND NOT EXISTS (SELECT 1 FROM ${dlqTbl} d WHERE d.parent_id = ${tbl}.id)
${returning}
|]
claimJobsSQL :: Text -> Text -> Int -> Int -> Text
claimJobsSQL :: Text -> Text -> Int -> Int -> Text
claimJobsSQL Text
schema Text
tableName Int
maxJobs Int
timeoutSeconds =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schema Text
tableName
columns :: Text
columns = Maybe Text -> Text
jobColumns Maybe Text
forall a. Maybe a
Nothing
overfetch :: Text
overfetch = String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int
maxJobs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
10))
limit :: Text
limit = String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
maxJobs)
timeout :: Text
timeout = String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
timeoutSeconds)
in [text|
WITH
eligible_groups AS (
SELECT group_key FROM ${groupsTbl}
WHERE job_count > 0
AND (in_flight_until IS NULL OR in_flight_until <= NOW())
ORDER BY min_priority ASC, min_id ASC
LIMIT ${overfetch}
FOR UPDATE SKIP LOCKED
),
grouped_candidates AS (
SELECT j.id
FROM eligible_groups el
CROSS JOIN LATERAL (
SELECT id
FROM ${tbl}
WHERE group_key = el.group_key
AND NOT suspended
AND (not_visible_until IS NULL OR not_visible_until <= NOW())
ORDER BY attempts DESC, priority ASC, id ASC
LIMIT 1
) j
),
ungrouped_candidates AS (
SELECT id FROM ${tbl}
WHERE group_key IS NULL
AND NOT suspended
AND (not_visible_until IS NULL OR not_visible_until <= NOW())
ORDER BY priority ASC, id ASC
LIMIT ${overfetch}
),
locked AS (
SELECT j.id
FROM (
SELECT id FROM grouped_candidates
UNION ALL
SELECT id FROM ungrouped_candidates
) c
INNER JOIN ${tbl} j ON j.id = c.id
WHERE NOT j.suspended
AND (j.not_visible_until IS NULL OR j.not_visible_until <= NOW())
ORDER BY j.priority ASC, j.id ASC
FOR UPDATE OF j SKIP LOCKED
LIMIT ${limit}
),
claimed AS (
UPDATE ${tbl} j
SET not_visible_until = NOW() + (${timeout} * interval '1 second'),
attempts = j.attempts + 1,
last_attempted_at = NOW(),
updated_at = NOW()
FROM locked l
WHERE j.id = l.id
RETURNING j.*
)
SELECT ${columns} FROM claimed ORDER BY priority ASC, id ASC
|]
claimJobsBatchedSQL :: Text -> Text -> Int -> Int -> Int -> Text
claimJobsBatchedSQL :: Text -> Text -> Int -> Int -> Int -> Text
claimJobsBatchedSQL Text
schema Text
tableName Int
batchSize Int
maxBatches Int
timeoutSeconds =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schema Text
tableName
columns :: Text
columns = Maybe Text -> Text
jobColumns Maybe Text
forall a. Maybe a
Nothing
bs :: Text
bs = String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
batchSize)
mb :: Text
mb = String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
maxBatches)
timeout :: Text
timeout = String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
timeoutSeconds)
ungroupedLimit :: Text
ungroupedLimit = String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int
maxBatches Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
batchSize))
overfetch :: Text
overfetch = String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show (Int
maxBatches Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
10))
in [text|
WITH
eligible_groups AS (
SELECT group_key FROM ${groupsTbl}
WHERE job_count > 0
AND (in_flight_until IS NULL OR in_flight_until <= NOW())
ORDER BY min_priority ASC, min_id ASC
LIMIT ${overfetch}
FOR UPDATE SKIP LOCKED
),
eligible_heads AS (
SELECT el.group_key, h.min_priority, h.min_id
FROM eligible_groups el
CROSS JOIN LATERAL (
SELECT t.priority AS min_priority, t.id AS min_id
FROM ${tbl} t
WHERE t.group_key = el.group_key
AND NOT t.suspended
AND t.parent_id IS NULL
AND t.parent_state IS NULL
AND (t.not_visible_until IS NULL OR t.not_visible_until <= NOW())
ORDER BY t.priority ASC, t.id ASC
LIMIT 1
) h
),
ungrouped_numbered AS (
SELECT id, priority,
((ROW_NUMBER() OVER (ORDER BY priority ASC, id ASC) - 1)
/ ${bs}) + 1 AS batch_num
FROM ${tbl}
WHERE group_key IS NULL
AND NOT suspended
AND parent_id IS NULL
AND parent_state IS NULL
AND (not_visible_until IS NULL OR not_visible_until <= NOW())
ORDER BY priority ASC, id ASC
LIMIT ${ungroupedLimit}
),
ungrouped_batch_info AS (
SELECT batch_num, MIN(priority) AS min_priority, MIN(id) AS min_id
FROM ungrouped_numbered
GROUP BY batch_num
),
allocated_slots AS (
SELECT s.group_key, s.ungrouped_batch
FROM (
SELECT group_key, NULL::bigint AS ungrouped_batch, min_priority, min_id
FROM eligible_heads
UNION ALL
SELECT NULL::text, batch_num, min_priority, min_id
FROM ungrouped_batch_info
ORDER BY min_priority ASC, min_id ASC
) s
LIMIT ${mb}
),
final_locked_groups AS (
SELECT group_key FROM allocated_slots WHERE group_key IS NOT NULL
),
grouped_candidates AS (
SELECT j.id
FROM final_locked_groups flg
CROSS JOIN LATERAL (
SELECT id
FROM ${tbl}
WHERE group_key = flg.group_key
AND NOT suspended
AND parent_id IS NULL
AND parent_state IS NULL
AND (not_visible_until IS NULL OR not_visible_until <= NOW())
ORDER BY attempts DESC, priority ASC, id ASC
LIMIT ${bs}
) j
),
ungrouped_candidates AS (
SELECT id
FROM ungrouped_numbered
WHERE batch_num IN (
SELECT ungrouped_batch
FROM allocated_slots
WHERE ungrouped_batch IS NOT NULL
)
),
locked AS (
SELECT j.id
FROM (
SELECT id FROM grouped_candidates
UNION ALL
SELECT id FROM ungrouped_candidates
) i
INNER JOIN ${tbl} j ON j.id = i.id
WHERE NOT j.suspended
AND j.parent_id IS NULL
AND j.parent_state IS NULL
AND (j.not_visible_until IS NULL OR j.not_visible_until <= NOW())
ORDER BY j.priority ASC, j.id ASC
FOR UPDATE OF j SKIP LOCKED
LIMIT ${ungroupedLimit}
),
claimed AS (
UPDATE ${tbl} j
SET not_visible_until = NOW() + (${timeout} * interval '1 second'),
attempts = j.attempts + 1,
last_attempted_at = NOW(),
updated_at = NOW()
FROM locked l
WHERE j.id = l.id
RETURNING j.*
)
SELECT ${columns} FROM claimed ORDER BY priority ASC, id ASC
|]
smartAckJobSQL :: Text -> Text -> Text
smartAckJobSQL :: Text -> Text -> Text
smartAckJobSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
WITH ack AS (
DELETE FROM ${tbl}
WHERE id = ? AND attempts = ?
AND NOT EXISTS (SELECT 1 FROM ${tbl} WHERE parent_id = ?)
RETURNING id, parent_id
),
suspend AS (
UPDATE ${tbl}
SET suspended = TRUE, not_visible_until = NULL, updated_at = NOW()
WHERE id = ? AND attempts = ?
AND NOT EXISTS (SELECT 1 FROM ack)
AND EXISTS (SELECT 1 FROM ${tbl} WHERE parent_id = ?)
RETURNING id
),
wake_parent AS (
UPDATE ${tbl}
SET suspended = FALSE, updated_at = NOW()
WHERE id = (SELECT parent_id FROM ack WHERE parent_id IS NOT NULL)
AND suspended = TRUE
AND NOT EXISTS (
SELECT 1 FROM ${tbl} c
WHERE c.parent_id = (SELECT parent_id FROM ack WHERE parent_id IS NOT NULL)
AND c.id NOT IN (SELECT id FROM ack)
)
RETURNING id
)
SELECT
(SELECT count(*) FROM ack) + (SELECT count(*) FROM suspend) AS result
|]
setVisibilityTimeoutSQL :: Text -> Text -> Text
setVisibilityTimeoutSQL :: Text -> Text -> Text
setVisibilityTimeoutSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
UPDATE ${tbl}
SET not_visible_until = NOW() + (? * interval '1 second'),
updated_at = NOW()
WHERE id = ? AND attempts = ?
|]
setVisibilityTimeoutBatchSQL :: Text -> Text -> Text -> Text
setVisibilityTimeoutBatchSQL :: Text -> Text -> Text -> Text
setVisibilityTimeoutBatchSQL Text
schema Text
tableName Text
valuesPlaceholder =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
WITH input_jobs AS (
SELECT v.id::bigint AS id, v.expected_attempts::int AS expected_attempts
FROM (VALUES ${valuesPlaceholder}) AS v(id, expected_attempts)
),
updated AS (
UPDATE ${tbl} j
SET not_visible_until = NOW() + (? * interval '1 second'),
updated_at = NOW()
FROM input_jobs ij
WHERE j.id = ij.id AND j.attempts = ij.expected_attempts
RETURNING j.id
)
SELECT
ij.id,
(u.id IS NOT NULL) as was_heartbeated,
j.attempts as current_db_attempts
FROM input_jobs ij
LEFT JOIN updated u ON ij.id = u.id
LEFT JOIN ${tbl} j ON j.id = ij.id
|]
updateJobForRetrySQL :: Text -> Text -> Text
updateJobForRetrySQL :: Text -> Text -> Text
updateJobForRetrySQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
UPDATE ${tbl}
SET not_visible_until = NOW() + (? * interval '1 second'),
last_error = ?,
updated_at = NOW()
WHERE id = ? AND attempts = ?
|]
promoteJobSQL :: Text -> Text -> Text
promoteJobSQL :: Text -> Text -> Text
promoteJobSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
UPDATE ${tbl}
SET not_visible_until = NULL,
updated_at = NOW()
WHERE id = ?
AND not_visible_until IS NOT NULL
AND not_visible_until > NOW()
AND (attempts = 0 OR last_error IS NOT NULL)
|]
moveToDLQSQL :: Text -> Text -> Text
moveToDLQSQL :: Text -> Text -> Text
moveToDLQSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
cols :: Text
cols = Text
jobColsExceptError
in [text|
WITH deleted_job AS (
DELETE FROM ${tbl}
WHERE id = ? AND attempts = ?
RETURNING *
),
inserted_dlq AS (
INSERT INTO ${dlqTbl} (
job_id, ${cols}, last_error
)
SELECT
id, ${cols}, ?
FROM deleted_job
)
SELECT count(*) FROM deleted_job
|]
retryFromDLQSQL :: Text -> Text -> Text
retryFromDLQSQL :: Text -> Text -> Text
retryFromDLQSQL Text
schema Text
tableName =
let dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
columns :: Text
columns = Maybe Text -> Text
jobColumns Maybe Text
forall a. Maybe a
Nothing
in [text|
WITH RECURSIVE
target AS (
SELECT * FROM ${dlqTbl} WHERE id = ?
),
-- Walk up through DLQ ancestors to find the root of the tree.
-- Stops when parent_id IS NULL, parent is in main queue, or
-- parent is not found in DLQ (orphaned).
ancestors AS (
SELECT d.job_id, d.parent_id, 0 AS depth
FROM ${dlqTbl} d
WHERE d.job_id = (SELECT parent_id FROM target)
AND (SELECT parent_id FROM target) IS NOT NULL
AND NOT EXISTS (SELECT 1 FROM ${tbl} WHERE id = (SELECT parent_id FROM target))
UNION ALL
SELECT d.job_id, d.parent_id, a.depth + 1
FROM ${dlqTbl} d
JOIN ancestors a ON d.job_id = a.parent_id
WHERE a.parent_id IS NOT NULL
AND NOT EXISTS (SELECT 1 FROM ${tbl} WHERE id = a.parent_id)
),
-- Root is the topmost DLQ ancestor, or the target itself
root_job_id AS (
SELECT COALESCE(
(SELECT job_id FROM ancestors ORDER BY depth DESC LIMIT 1),
(SELECT job_id FROM target)
) AS job_id
),
-- Guard: root's parent must be NULL or exist in main queue
can_retry AS (
SELECT EXISTS (
SELECT 1
FROM root_job_id r
JOIN ${dlqTbl} d ON d.job_id = r.job_id
WHERE d.parent_id IS NULL
OR EXISTS (SELECT 1 FROM ${tbl} WHERE id = d.parent_id)
) AS val
),
-- Walk down from root to collect all DLQ tree members
tree AS (
SELECT d.id AS dlq_id, d.job_id, d.payload, d.group_key, d.priority,
d.max_attempts, d.parent_id, d.parent_state
FROM ${dlqTbl} d
WHERE d.job_id = (SELECT job_id FROM root_job_id)
UNION ALL
SELECT d.id AS dlq_id, d.job_id, d.payload, d.group_key, d.priority,
d.max_attempts, d.parent_id, d.parent_state
FROM ${dlqTbl} d
JOIN tree t ON d.parent_id = t.job_id
),
-- Delete all tree members from DLQ (guarded by can_retry)
deleted AS (
DELETE FROM ${dlqTbl}
WHERE id IN (SELECT dlq_id FROM tree)
AND (SELECT val FROM can_retry)
RETURNING job_id, payload, group_key, priority, max_attempts, parent_id, parent_state
),
-- Re-insert into main queue with computed suspended state:
-- rollup finalizers are suspended if they have children (in this
-- retry batch OR already in the main queue).
inserted AS (
INSERT INTO ${tbl} (id, payload, group_key, attempts, priority, max_attempts,
parent_id, parent_state, suspended)
SELECT d.job_id, d.payload, d.group_key, 0, d.priority, d.max_attempts,
d.parent_id, d.parent_state,
CASE WHEN d.parent_state IS NOT NULL
THEN EXISTS (SELECT 1 FROM deleted c WHERE c.parent_id = d.job_id)
OR EXISTS (SELECT 1 FROM ${tbl} WHERE parent_id = d.job_id)
ELSE FALSE
END
FROM deleted d
RETURNING *
)
SELECT ${columns} FROM inserted WHERE id = (SELECT job_id FROM target)
|]
dlqJobExistsSQL :: Text -> Text -> Text
dlqJobExistsSQL :: Text -> Text -> Text
dlqJobExistsSQL Text
schema Text
tableName =
let dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
in [text|SELECT EXISTS (SELECT 1 FROM ${dlqTbl} WHERE id = ?) AS result|]
deleteDLQJobSQL :: Text -> Text -> Text
deleteDLQJobSQL :: Text -> Text -> Text
deleteDLQJobSQL Text
schema Text
tableName =
let dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
in [text|DELETE FROM ${dlqTbl} WHERE id = ? RETURNING parent_id|]
getJobByIdSQL :: Text -> Text -> Text
getJobByIdSQL :: Text -> Text -> Text
getJobByIdSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
columns :: Text
columns = Maybe Text -> Text
jobColumns Maybe Text
forall a. Maybe a
Nothing
in [text|
SELECT ${columns}
FROM ${tbl}
WHERE id = ?
|]
cancelJobSQL :: Text -> Text -> Text
cancelJobSQL :: Text -> Text -> Text
cancelJobSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
WITH cancel AS (
DELETE FROM ${tbl}
WHERE id = ?
AND NOT EXISTS (SELECT 1 FROM ${tbl} c WHERE c.parent_id = ?)
RETURNING id, parent_id
),
wake_parent AS (
UPDATE ${tbl}
SET suspended = FALSE, updated_at = NOW()
WHERE id = (SELECT parent_id FROM cancel WHERE parent_id IS NOT NULL)
AND suspended = TRUE
AND NOT EXISTS (
SELECT 1 FROM ${tbl} c
WHERE c.parent_id = (SELECT parent_id FROM cancel WHERE parent_id IS NOT NULL)
AND c.id NOT IN (SELECT id FROM cancel)
)
RETURNING id
)
SELECT (SELECT count(*) FROM cancel) AS result
|]
getQueueStatsSQL :: Text -> Text -> Text
getQueueStatsSQL :: Text -> Text -> Text
getQueueStatsSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
SELECT
COUNT(*) as total_jobs,
COUNT(*) FILTER (WHERE NOT suspended AND (not_visible_until IS NULL OR not_visible_until <= NOW())) as visible_jobs,
EXTRACT(EPOCH FROM (NOW() - MIN(inserted_at)))::float8 as oldest_job_age_seconds
FROM ${tbl}
|]
countChildrenBatchSQL :: Text -> Text -> Text
countChildrenBatchSQL :: Text -> Text -> Text
countChildrenBatchSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
SELECT parent_id, COUNT(*),
COUNT(*) FILTER (WHERE suspended)
FROM ${tbl}
WHERE parent_id = ANY(?)
GROUP BY parent_id
|]
countDLQChildrenBatchSQL :: Text -> Text -> Text
countDLQChildrenBatchSQL :: Text -> Text -> Text
countDLQChildrenBatchSQL Text
schema Text
tableName =
let dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
in [text|
SELECT parent_id, COUNT(*)
FROM ${dlqTbl}
WHERE parent_id = ANY(?)
GROUP BY parent_id
|]
ackJobsBulkSQL :: Text -> Text -> Text
ackJobsBulkSQL :: Text -> Text -> Text
ackJobsBulkSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
WITH input AS (
SELECT unnest(?::bigint[]) AS id,
unnest(?::int[]) AS expected_attempts
),
deleted AS (
DELETE FROM ${tbl} j USING input i
WHERE j.id = i.id AND j.attempts = i.expected_attempts
RETURNING j.id
)
SELECT count(*) FROM deleted
|]
moveToDLQBatchSQL :: Text -> Text -> Text
moveToDLQBatchSQL :: Text -> Text -> Text
moveToDLQBatchSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
cols :: Text
cols = Text
jobColsExceptError
in [text|
WITH input_jobs AS (
SELECT unnest(?::bigint[]) AS id,
unnest(?::int[]) AS expected_attempts,
unnest(?::text[]) AS error_msg
),
deleted_jobs AS (
DELETE FROM ${tbl} j
USING input_jobs ij
WHERE j.id = ij.id AND j.attempts = ij.expected_attempts
RETURNING j.*, ij.error_msg AS new_error
),
inserted_dlq AS (
INSERT INTO ${dlqTbl} (job_id, failed_at, ${cols}, last_error)
SELECT id, NOW(), ${cols}, new_error
FROM deleted_jobs
)
SELECT count(*) FROM deleted_jobs
|]
deleteDLQJobsBatchSQL :: Text -> Text -> Text
deleteDLQJobsBatchSQL :: Text -> Text -> Text
deleteDLQJobsBatchSQL Text
schema Text
tableName =
let dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
in [text|DELETE FROM ${dlqTbl} WHERE id = ANY(?) RETURNING parent_id|]
pauseChildrenSQL :: Text -> Text -> Text
pauseChildrenSQL :: Text -> Text -> Text
pauseChildrenSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
UPDATE ${tbl}
SET suspended = TRUE, updated_at = NOW()
WHERE parent_id = ?
AND NOT suspended
AND (not_visible_until IS NULL OR not_visible_until <= NOW())
|]
resumeChildrenSQL :: Text -> Text -> Text
resumeChildrenSQL :: Text -> Text -> Text
resumeChildrenSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
UPDATE ${tbl}
SET suspended = FALSE, updated_at = NOW()
WHERE parent_id = ? AND suspended = TRUE
|]
cancelJobCascadeSQL :: Text -> Text -> Text
cancelJobCascadeSQL :: Text -> Text -> Text
cancelJobCascadeSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
WITH RECURSIVE descendants AS (
SELECT id FROM ${tbl} WHERE id = ?
UNION ALL
SELECT j.id FROM ${tbl} j JOIN descendants d ON j.parent_id = d.id
),
deleted AS (
DELETE FROM ${tbl} WHERE id IN (SELECT id FROM descendants)
RETURNING id
)
SELECT count(*) FROM deleted
|]
cancelJobTreeSQL :: Text -> Text -> Text
cancelJobTreeSQL :: Text -> Text -> Text
cancelJobTreeSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
WITH RECURSIVE
ancestors AS (
SELECT id, parent_id FROM ${tbl} WHERE id = ?
UNION ALL
SELECT j.id, j.parent_id FROM ${tbl} j JOIN ancestors a ON j.id = a.parent_id
),
root AS (
SELECT id FROM ancestors WHERE parent_id IS NULL
),
descendants AS (
SELECT id FROM ${tbl} WHERE id = (SELECT id FROM root)
UNION ALL
SELECT j.id FROM ${tbl} j JOIN descendants d ON j.parent_id = d.id
),
deleted AS (
DELETE FROM ${tbl} WHERE id IN (SELECT id FROM descendants)
RETURNING id
)
SELECT count(*) FROM deleted
|]
tryWakeAncestorSQL :: Text -> Text -> Text
tryWakeAncestorSQL :: Text -> Text -> Text
tryWakeAncestorSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
UPDATE ${tbl}
SET suspended = FALSE, updated_at = NOW()
WHERE id = ?
AND suspended = TRUE
AND NOT EXISTS (SELECT 1 FROM ${tbl} c WHERE c.parent_id = ?)
|]
cascadeChildrenToDLQSQL :: Text -> Text -> Text
cascadeChildrenToDLQSQL :: Text -> Text -> Text
cascadeChildrenToDLQSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
cols :: Text
cols = Text
jobColsExceptError
in [text|
WITH RECURSIVE descendants AS (
SELECT id FROM ${tbl} WHERE parent_id = ?
UNION ALL
SELECT j.id FROM ${tbl} j JOIN descendants d ON j.parent_id = d.id
),
deleted AS (
DELETE FROM ${tbl}
WHERE id IN (SELECT id FROM descendants)
RETURNING id, ${cols}
),
inserted_dlq AS (
INSERT INTO ${dlqTbl} (job_id, ${cols}, last_error)
SELECT id, ${cols}, ?
FROM deleted
)
SELECT count(*) FROM deleted
|]
descendantRollupIdsSQL :: Text -> Text -> Text
descendantRollupIdsSQL :: Text -> Text -> Text
descendantRollupIdsSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
WITH RECURSIVE descendants AS (
SELECT id, parent_state FROM ${tbl} WHERE parent_id = ?
UNION ALL
SELECT j.id, j.parent_state FROM ${tbl} j JOIN descendants d ON j.parent_id = d.id
)
SELECT id AS result FROM descendants WHERE parent_state IS NOT NULL
|]
suspendJobSQL :: Text -> Text -> Text
suspendJobSQL :: Text -> Text -> Text
suspendJobSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
UPDATE ${tbl}
SET suspended = TRUE, updated_at = NOW()
WHERE id = ?
AND NOT suspended
AND NOT (attempts > 0 AND not_visible_until IS NOT NULL AND not_visible_until > NOW())
|]
resumeJobSQL :: Text -> Text -> Text
resumeJobSQL :: Text -> Text -> Text
resumeJobSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
UPDATE ${tbl}
SET suspended = FALSE, updated_at = NOW()
WHERE id = ? AND suspended = TRUE
AND NOT (
parent_state IS NOT NULL
AND EXISTS (SELECT 1 FROM ${tbl} c WHERE c.parent_id = ${tbl}.id)
)
|]
parentExistsSQL :: Text -> Text -> Text
parentExistsSQL :: Text -> Text -> Text
parentExistsSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|SELECT EXISTS (SELECT 1 FROM ${tbl} WHERE id = ?) AS result|]
getParentIdSQL :: Text -> Text -> Text
getParentIdSQL :: Text -> Text -> Text
getParentIdSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|SELECT parent_id FROM ${tbl} WHERE id = ?|]
insertResultSQL :: Text -> Text -> Text
insertResultSQL :: Text -> Text -> Text
insertResultSQL Text
schema Text
tableName =
let resultsTbl :: Text
resultsTbl = Text -> Text -> Text
jobQueueResultsTable Text
schema Text
tableName
in [text|
INSERT INTO ${resultsTbl} (parent_id, child_id, result)
VALUES (?, ?, ?)
ON CONFLICT (parent_id, child_id) DO UPDATE SET result = EXCLUDED.result
|]
getResultsByParentSQL :: Text -> Text -> Text
getResultsByParentSQL :: Text -> Text -> Text
getResultsByParentSQL Text
schema Text
tableName =
let resultsTbl :: Text
resultsTbl = Text -> Text -> Text
jobQueueResultsTable Text
schema Text
tableName
in [text|
SELECT child_id, result FROM ${resultsTbl} WHERE parent_id = ?
|]
getDLQChildErrorsByParentSQL :: Text -> Text -> Text
getDLQChildErrorsByParentSQL :: Text -> Text -> Text
getDLQChildErrorsByParentSQL Text
schema Text
tableName =
let dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
in [text|
SELECT job_id, last_error FROM ${dlqTbl} WHERE parent_id = ?
|]
persistParentStateSQL :: Text -> Text -> Text
persistParentStateSQL :: Text -> Text -> Text
persistParentStateSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
UPDATE ${tbl} SET parent_state = ?, updated_at = NOW() WHERE id = ?
|]
getParentStateSnapshotSQL :: Text -> Text -> Text
getParentStateSnapshotSQL :: Text -> Text -> Text
getParentStateSnapshotSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|SELECT parent_state FROM ${tbl} WHERE id = ?|]
readChildResultsSQL :: Text -> Text -> Text
readChildResultsSQL :: Text -> Text -> Text
readChildResultsSQL Text
schema Text
tableName =
let resultsTbl :: Text
resultsTbl = Text -> Text -> Text
jobQueueResultsTable Text
schema Text
tableName
dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schema Text
tableName
tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
in [text|
SELECT 'r'::text AS source, child_id, result, NULL::text AS error, NULL::bigint AS dlq_pk FROM ${resultsTbl} WHERE parent_id = ?
UNION ALL
SELECT 'e' AS source, job_id AS child_id, NULL::jsonb AS result, last_error AS error, id AS dlq_pk FROM ${dlqTbl} WHERE parent_id = ?
UNION ALL
SELECT 's' AS source, NULL::bigint AS child_id, parent_state AS result, NULL::text AS error, NULL::bigint AS dlq_pk FROM ${tbl} WHERE id = ? AND parent_state IS NOT NULL
|]
lockGroupsSQL :: Text -> Text -> Text
lockGroupsSQL :: Text -> Text -> Text
lockGroupsSQL Text
schema Text
tableName =
let groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schema Text
tableName
in [text|SELECT 1::bigint AS result FROM ${groupsTbl} FOR UPDATE SKIP LOCKED|]
checkReaperSeqSQL :: Text -> Text -> Text
checkReaperSeqSQL :: Text -> Text -> Text
checkReaperSeqSQL Text
schema Text
tableName =
let seqName :: Text
seqName = Text -> Text -> Text
jobQueueReaperSeq Text
schema Text
tableName
in [text|SELECT last_value AS result FROM ${seqName}|]
updateReaperSeqSQL :: Text -> Text -> Text
updateReaperSeqSQL :: Text -> Text -> Text
updateReaperSeqSQL Text
schema Text
tableName =
let seqName :: Text
seqName = Text -> Text -> Text
jobQueueReaperSeq Text
schema Text
tableName
in [text|SELECT setval('${seqName}', extract(epoch FROM now())::bigint) AS result|]
refreshGroupsSQL :: Text -> Text -> Text
refreshGroupsSQL :: Text -> Text -> Text
refreshGroupsSQL Text
schema Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schema Text
tableName
groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schema Text
tableName
in [text|
WITH current AS (
SELECT group_key,
MIN(priority) AS min_priority,
MIN(id) AS min_id,
COUNT(*) AS job_count,
MAX(not_visible_until) FILTER (WHERE not_visible_until > NOW() AND NOT suspended) AS in_flight_until
FROM ${tbl}
WHERE group_key IS NOT NULL
GROUP BY group_key
),
deleted AS (
DELETE FROM ${groupsTbl} g
WHERE NOT EXISTS (SELECT 1 FROM current c WHERE c.group_key = g.group_key)
),
updated AS (
UPDATE ${groupsTbl} g
SET min_priority = c.min_priority,
min_id = c.min_id,
job_count = c.job_count,
in_flight_until = c.in_flight_until
FROM current c
WHERE g.group_key = c.group_key
AND (g.min_priority <> c.min_priority OR g.min_id <> c.min_id
OR g.job_count <> c.job_count
OR g.in_flight_until IS DISTINCT FROM c.in_flight_until)
)
INSERT INTO ${groupsTbl} (group_key, min_priority, min_id, job_count, in_flight_until)
SELECT c.group_key, c.min_priority, c.min_id, c.job_count, c.in_flight_until
FROM current c
WHERE NOT EXISTS (SELECT 1 FROM ${groupsTbl} g WHERE g.group_key = c.group_key)
ON CONFLICT (group_key) DO NOTHING
|]