{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module Arbiter.Core.Job.Schema
(
createSchemaSQL
, defaultSchemaName
, createJobQueueTableSQL
, createJobQueueDLQTableSQL
, createJobQueueGroupKeyIndexSQL
, createJobQueueUngroupedRankingIndexSQL
, createDLQGroupKeyIndexSQL
, createDLQFailedAtIndexSQL
, createDLQParentIdIndexSQL
, createDedupKeyIndexSQL
, createParentIdIndexSQL
, createNotifyFunctionSQL
, createNotifyTriggerSQL
, dropNotifyTriggerSQL
, dropNotifyFunctionSQL
, createEventStreamingFunctionSQL
, createEventStreamingTriggersSQL
, notificationChannelForTable
, eventStreamingChannel
, notifyFunctionName
, notifyTriggerName
, eventStreamingFunctionName
, eventStreamingTriggerName
, eventStreamingDLQTriggerName
, jobQueueTable
, jobQueueDLQTable
, jobQueueResultsTable
, jobQueueGroupsTable
, jobQueueReaperSeq
, createReaperSeqSQL
, createResultsTableSQL
, createGroupsTableSQL
, createGroupsIndexSQL
, createGroupsTriggerFunctionsSQL
, createGroupsTriggersSQL
, quoteIdentifier
) where
import Data.Text (Text)
import Data.Text qualified as T
import NeatInterpolation (text)
quoteIdentifier :: Text -> Text
quoteIdentifier :: Text -> Text
quoteIdentifier Text
ident =
Text
"\"" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> HasCallStack => Text -> Text -> Text -> Text
Text -> Text -> Text -> Text
T.replace Text
"\"" Text
"\"\"" Text
ident Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"\""
defaultSchemaName :: Text
defaultSchemaName :: Text
defaultSchemaName = Text
"arbiter"
notificationChannelForTable :: Text -> Text
notificationChannelForTable :: Text -> Text
notificationChannelForTable Text
tableName = Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_created"
eventStreamingChannel :: Text
eventStreamingChannel :: Text
eventStreamingChannel = Text
"arbiter_job_events"
notifyFunctionName :: Text -> Text
notifyFunctionName :: Text -> Text
notifyFunctionName Text
tableName = Text
"notify_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_created"
notifyTriggerName :: Text -> Text
notifyTriggerName :: Text -> Text
notifyTriggerName Text
tableName = Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_notify_trigger"
eventStreamingFunctionName :: Text
eventStreamingFunctionName :: Text
eventStreamingFunctionName = Text
"notify_job_event"
eventStreamingTriggerName :: Text -> Text
eventStreamingTriggerName :: Text -> Text
eventStreamingTriggerName Text
tableName = Text
"notify_job_event_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName
eventStreamingDLQTriggerName :: Text -> Text
eventStreamingDLQTriggerName :: Text -> Text
eventStreamingDLQTriggerName Text
tableName = Text
"notify_job_event_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq"
jobQueueTable :: Text -> Text -> Text
jobQueueTable :: Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
tableName
jobQueueDLQTable :: Text -> Text -> Text
jobQueueDLQTable :: Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq")
jobQueueResultsTable :: Text -> Text -> Text
jobQueueResultsTable :: Text -> Text -> Text
jobQueueResultsTable Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_results")
jobQueueGroupsTable :: Text -> Text -> Text
jobQueueGroupsTable :: Text -> Text -> Text
jobQueueGroupsTable Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_groups")
jobQueueReaperSeq :: Text -> Text -> Text
jobQueueReaperSeq :: Text -> Text -> Text
jobQueueReaperSeq Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_reaper_seq")
createReaperSeqSQL :: Text -> Text -> Text
createReaperSeqSQL :: Text -> Text -> Text
createReaperSeqSQL Text
schemaName Text
tableName =
let seqName :: Text
seqName = Text -> Text -> Text
jobQueueReaperSeq Text
schemaName Text
tableName
in Text
"CREATE SEQUENCE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
seqName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" START WITH 0 MINVALUE 0;"
createSchemaSQL :: Text -> Text
createSchemaSQL :: Text -> Text
createSchemaSQL Text
schemaName =
Text
"CREATE SCHEMA IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
jobColumns :: [Text]
jobColumns :: [Text]
jobColumns =
[ Text
" id BIGSERIAL PRIMARY KEY,"
, Text
" payload JSONB NOT NULL,"
, Text
" group_key TEXT,"
, Text
" inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),"
, Text
" updated_at TIMESTAMPTZ,"
, Text
" last_attempted_at TIMESTAMPTZ,"
, Text
" not_visible_until TIMESTAMPTZ,"
, Text
" attempts INT NOT NULL DEFAULT 0,"
, Text
" last_error TEXT,"
, Text
" priority INT NOT NULL DEFAULT 0,"
, Text
" dedup_key TEXT,"
, Text
" dedup_strategy TEXT,"
, Text
" max_attempts INT,"
, Text
" parent_id BIGINT,"
, Text
" parent_state JSONB,"
, Text
" suspended BOOLEAN NOT NULL DEFAULT FALSE"
]
jobColumnsForDLQ :: Text
jobColumnsForDLQ :: Text
jobColumnsForDLQ =
[Text] -> Text
T.unlines
[ Text
" id BIGSERIAL PRIMARY KEY,"
, Text
" failed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),"
, Text
" job_id BIGINT NOT NULL,"
]
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Text] -> Text
T.unlines (Int -> [Text] -> [Text]
forall a. Int -> [a] -> [a]
drop Int
1 [Text]
jobColumns)
createJobQueueTableSQL :: Text -> Text -> Text
createJobQueueTableSQL :: Text -> Text -> Text
createJobQueueTableSQL Text
schemaName Text
tableName =
[Text] -> Text
T.unlines
[ Text
"CREATE TABLE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ("
, [Text] -> Text
T.unlines [Text]
jobColumns
, Text
") WITH (fillfactor = 70);"
]
createJobQueueDLQTableSQL :: Text -> Text -> Text
createJobQueueDLQTableSQL :: Text -> Text -> Text
createJobQueueDLQTableSQL Text
schemaName Text
tableName =
[Text] -> Text
T.unlines
[ Text
"CREATE TABLE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ("
, Text
jobColumnsForDLQ
, Text
");"
]
createJobQueueGroupKeyIndexSQL :: Text -> Text -> Text
createJobQueueGroupKeyIndexSQL :: Text -> Text -> Text
createJobQueueGroupKeyIndexSQL Text
schemaName Text
tableName =
[Text] -> Text
T.unlines
[ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_group_key")
, Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (group_key, priority ASC, id ASC)"
, Text
"WHERE group_key IS NOT NULL;"
]
createJobQueueUngroupedRankingIndexSQL :: Text -> Text -> Text
createJobQueueUngroupedRankingIndexSQL :: Text -> Text -> Text
createJobQueueUngroupedRankingIndexSQL Text
schemaName Text
tableName =
[Text] -> Text
T.unlines
[ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_ungrouped_ranking")
, Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (priority ASC, id ASC)"
, Text
"WHERE group_key IS NULL;"
]
createDLQGroupKeyIndexSQL :: Text -> Text -> Text
createDLQGroupKeyIndexSQL :: Text -> Text -> Text
createDLQGroupKeyIndexSQL Text
schemaName Text
tableName =
[Text] -> Text
T.unlines
[ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq_group_key")
, Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (group_key);"
]
createDLQFailedAtIndexSQL :: Text -> Text -> Text
createDLQFailedAtIndexSQL :: Text -> Text -> Text
createDLQFailedAtIndexSQL Text
schemaName Text
tableName =
[Text] -> Text
T.unlines
[ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq_failed_at")
, Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (failed_at DESC);"
]
createDLQParentIdIndexSQL :: Text -> Text -> Text
createDLQParentIdIndexSQL :: Text -> Text -> Text
createDLQParentIdIndexSQL Text
schemaName Text
tableName =
[Text] -> Text
T.unlines
[ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq_parent_id")
, Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (parent_id)"
, Text
"WHERE parent_id IS NOT NULL;"
]
createDedupKeyIndexSQL :: Text -> Text -> Text
createDedupKeyIndexSQL :: Text -> Text -> Text
createDedupKeyIndexSQL Text
schemaName Text
tableName =
[Text] -> Text
T.unlines
[ Text
"CREATE UNIQUE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dedup_key")
, Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (dedup_key)"
, Text
"WHERE dedup_key IS NOT NULL;"
]
createParentIdIndexSQL :: Text -> Text -> Text
createParentIdIndexSQL :: Text -> Text -> Text
createParentIdIndexSQL Text
schemaName Text
tableName =
[Text] -> Text
T.unlines
[ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_parent_id")
, Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (parent_id)"
, Text
"WHERE parent_id IS NOT NULL;"
]
createResultsTableSQL :: Text -> Text -> Text
createResultsTableSQL :: Text -> Text -> Text
createResultsTableSQL Text
schemaName Text
tableName =
let resultsTbl :: Text
resultsTbl = Text -> Text -> Text
jobQueueResultsTable Text
schemaName Text
tableName
mainTbl :: Text
mainTbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
in [Text] -> Text
T.unlines
[ Text
"CREATE TABLE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
resultsTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ("
, Text
" parent_id BIGINT NOT NULL REFERENCES " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
mainTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"(id) ON DELETE CASCADE,"
, Text
" child_id BIGINT NOT NULL,"
, Text
" result JSONB NOT NULL,"
, Text
" PRIMARY KEY (parent_id, child_id)"
, Text
");"
]
createGroupsTableSQL :: Text -> Text -> Text
createGroupsTableSQL :: Text -> Text -> Text
createGroupsTableSQL Text
schemaName Text
tableName =
let groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schemaName Text
tableName
in [Text] -> Text
T.unlines
[ Text
"CREATE TABLE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
groupsTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ("
, Text
" group_key TEXT PRIMARY KEY,"
, Text
" min_priority INT NOT NULL DEFAULT 0,"
, Text
" min_id BIGINT NOT NULL DEFAULT 0,"
, Text
" job_count INT NOT NULL DEFAULT 0,"
, Text
" in_flight_until TIMESTAMPTZ DEFAULT NULL"
, Text
");"
]
createGroupsIndexSQL :: Text -> Text -> Text
createGroupsIndexSQL :: Text -> Text -> Text
createGroupsIndexSQL Text
schemaName Text
tableName =
let groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schemaName Text
tableName
in [Text] -> Text
T.unlines
[ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_groups_ranking")
, Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
groupsTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (min_priority ASC, min_id ASC);"
]
createGroupsTriggerFunctionsSQL :: Text -> Text -> Text
createGroupsTriggerFunctionsSQL :: Text -> Text -> Text
createGroupsTriggerFunctionsSQL Text
schemaName Text
tableName =
let groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schemaName Text
tableName
tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
baseName :: Text
baseName = Text
"maintain_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_groups"
funcInsert :: Text
funcInsert = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_insert")
funcDelete :: Text
funcDelete = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_delete")
funcUpdate :: Text
funcUpdate = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_update")
dd :: Text
dd = Text
"$$"
in [Text] -> Text
T.unlines
[ Text -> Text -> Text -> Text
groupsInsertFunction Text
funcInsert Text
groupsTbl Text
dd
, Text -> Text -> Text -> Text -> Text
groupsDeleteFunction Text
funcDelete Text
groupsTbl Text
tbl Text
dd
, Text -> Text -> Text -> Text -> Text
groupsUpdateFunction Text
funcUpdate Text
groupsTbl Text
tbl Text
dd
]
groupsInsertFunction :: Text -> Text -> Text -> Text
groupsInsertFunction :: Text -> Text -> Text -> Text
groupsInsertFunction Text
funcName Text
groupsTbl Text
dd =
[text|
CREATE OR REPLACE FUNCTION ${funcName}()
RETURNS TRIGGER AS ${dd}
BEGIN
IF NOT EXISTS (SELECT 1 FROM new_table WHERE group_key IS NOT NULL LIMIT 1) THEN
RETURN NULL;
END IF;
INSERT INTO ${groupsTbl} (group_key, min_priority, min_id, job_count)
SELECT group_key, MIN(priority), MIN(id), COUNT(*)
FROM new_table
WHERE group_key IS NOT NULL
GROUP BY group_key
ON CONFLICT (group_key) DO UPDATE SET
min_priority = LEAST(${groupsTbl}.min_priority, EXCLUDED.min_priority),
min_id = LEAST(${groupsTbl}.min_id, EXCLUDED.min_id),
job_count = ${groupsTbl}.job_count + EXCLUDED.job_count,
in_flight_until = CASE WHEN ${groupsTbl}.in_flight_until <= NOW()
THEN NULL ELSE ${groupsTbl}.in_flight_until END;
RETURN NULL;
END;
${dd} LANGUAGE plpgsql;
|]
groupsDeleteFunction :: Text -> Text -> Text -> Text -> Text
groupsDeleteFunction :: Text -> Text -> Text -> Text -> Text
groupsDeleteFunction Text
funcName Text
groupsTbl Text
tbl Text
dd =
[text|
CREATE OR REPLACE FUNCTION ${funcName}()
RETURNS TRIGGER AS ${dd}
BEGIN
IF NOT EXISTS (SELECT 1 FROM old_table WHERE group_key IS NOT NULL LIMIT 1) THEN
RETURN NULL;
END IF;
UPDATE ${groupsTbl} g
SET job_count = g.job_count - sub.removed_count,
min_priority = COALESCE(sub.new_min_priority, g.min_priority),
min_id = COALESCE(sub.new_min_id, g.min_id),
in_flight_until = CASE
WHEN sub.had_inflight THEN NULL
ELSE g.in_flight_until
END
FROM (
SELECT d.group_key, d.removed_count, d.had_inflight,
MIN(t.priority) AS new_min_priority, MIN(t.id) AS new_min_id
FROM (
SELECT group_key, COUNT(*) AS removed_count,
bool_or(not_visible_until > NOW() AND NOT suspended) AS had_inflight
FROM old_table
WHERE group_key IS NOT NULL
GROUP BY group_key
) d
LEFT JOIN ${tbl} t ON t.group_key = d.group_key
GROUP BY d.group_key, d.removed_count, d.had_inflight
) sub
WHERE g.group_key = sub.group_key;
DELETE FROM ${groupsTbl}
WHERE job_count <= 0
AND group_key IN (SELECT group_key FROM old_table WHERE group_key IS NOT NULL);
RETURN NULL;
END;
${dd} LANGUAGE plpgsql;
|]
groupsUpdateFunction :: Text -> Text -> Text -> Text -> Text
groupsUpdateFunction :: Text -> Text -> Text -> Text -> Text
groupsUpdateFunction Text
funcName Text
groupsTbl Text
tbl Text
dd =
[text|
CREATE OR REPLACE FUNCTION ${funcName}()
RETURNS TRIGGER AS ${dd}
BEGIN
IF NOT EXISTS (
SELECT 1 FROM new_table WHERE group_key IS NOT NULL LIMIT 1
) AND NOT EXISTS (
SELECT 1 FROM old_table WHERE group_key IS NOT NULL LIMIT 1
) THEN
RETURN NULL;
END IF;
-- Step 1: Fast path — extend in_flight_until when not_visible_until increases (claim, retry)
UPDATE ${groupsTbl} g
SET in_flight_until = GREATEST(g.in_flight_until, sub.new_ift)
FROM (
SELECT n.group_key, MAX(n.not_visible_until) AS new_ift
FROM new_table n
JOIN old_table o ON o.id = n.id
WHERE n.group_key IS NOT NULL
AND n.not_visible_until > NOW()
AND NOT n.suspended
AND (o.not_visible_until IS NULL OR o.not_visible_until <= NOW()
OR n.not_visible_until > o.not_visible_until)
GROUP BY n.group_key
) sub
WHERE g.group_key = sub.group_key;
-- Step 2: Full rescan — recompute in_flight_until when not_visible_until decreases or suspended changes
UPDATE ${groupsTbl} g
SET in_flight_until = sub.new_ift
FROM (
SELECT t.group_key,
MAX(t.not_visible_until) FILTER (
WHERE t.not_visible_until > NOW() AND NOT t.suspended
) AS new_ift
FROM ${tbl} t
WHERE t.group_key IN (
SELECT n.group_key FROM new_table n
JOIN old_table o ON o.id = n.id
WHERE n.group_key IS NOT NULL
AND (o.not_visible_until IS DISTINCT FROM n.not_visible_until
OR o.suspended IS DISTINCT FROM n.suspended)
AND (
n.not_visible_until > NOW() AND NOT n.suspended
AND (o.not_visible_until IS NULL OR o.not_visible_until <= NOW()
OR n.not_visible_until > o.not_visible_until)
) IS NOT TRUE
)
GROUP BY t.group_key
) sub
WHERE g.group_key = sub.group_key
AND g.in_flight_until IS DISTINCT FROM sub.new_ift;
-- Step 3: Handle group_key changes (dedup replace) — remove from old group
UPDATE ${groupsTbl} g
SET job_count = g.job_count - sub.cnt,
min_priority = COALESCE(sub.new_min_priority, g.min_priority),
min_id = COALESCE(sub.new_min_id, g.min_id),
in_flight_until = CASE
WHEN sub.had_inflight THEN NULL
ELSE g.in_flight_until
END
FROM (
SELECT d.group_key, d.cnt, d.had_inflight,
MIN(t.priority) AS new_min_priority, MIN(t.id) AS new_min_id
FROM (
SELECT o.group_key, COUNT(*) AS cnt,
bool_or(o.not_visible_until > NOW() AND NOT o.suspended) AS had_inflight
FROM old_table o
JOIN new_table n ON o.id = n.id
WHERE o.group_key IS NOT NULL
AND o.group_key IS DISTINCT FROM n.group_key
GROUP BY o.group_key
) d
LEFT JOIN ${tbl} t ON t.group_key = d.group_key
GROUP BY d.group_key, d.cnt, d.had_inflight
) sub
WHERE g.group_key = sub.group_key;
DELETE FROM ${groupsTbl}
WHERE job_count <= 0
AND group_key IN (
SELECT o.group_key FROM old_table o
JOIN new_table n ON o.id = n.id
WHERE o.group_key IS NOT NULL
AND o.group_key IS DISTINCT FROM n.group_key
);
-- Step 4: Handle group_key changes — add to new group
INSERT INTO ${groupsTbl} (group_key, min_priority, min_id, job_count)
SELECT n.group_key, MIN(n.priority), MIN(n.id), COUNT(*)
FROM new_table n
JOIN old_table o ON o.id = n.id
WHERE n.group_key IS NOT NULL
AND o.group_key IS DISTINCT FROM n.group_key
GROUP BY n.group_key
ON CONFLICT (group_key) DO UPDATE SET
min_priority = LEAST(${groupsTbl}.min_priority, EXCLUDED.min_priority),
min_id = LEAST(${groupsTbl}.min_id, EXCLUDED.min_id),
job_count = ${groupsTbl}.job_count + EXCLUDED.job_count;
RETURN NULL;
END;
${dd} LANGUAGE plpgsql;
|]
createGroupsTriggersSQL :: Text -> Text -> Text
Text
schemaName Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
baseName :: Text
baseName = Text
"maintain_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_groups"
funcInsert :: Text
funcInsert = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_insert")
funcDelete :: Text
funcDelete = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_delete")
funcUpdate :: Text
funcUpdate = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_update")
trigInsert :: Text
trigInsert = Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_insert")
trigDelete :: Text
trigDelete = Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_delete")
trigUpdate :: Text
trigUpdate = Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_update")
in [Text] -> Text
T.unlines
[ Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigInsert Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigInsert
, Text
"AFTER INSERT ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
, Text
"REFERENCING NEW TABLE AS new_table"
, Text
"FOR EACH STATEMENT EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcInsert Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
, Text
""
, Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigDelete Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigDelete
, Text
"AFTER DELETE ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
, Text
"REFERENCING OLD TABLE AS old_table"
, Text
"FOR EACH STATEMENT EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcDelete Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
, Text
""
, Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigUpdate Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigUpdate
, Text
"AFTER UPDATE ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
, Text
"REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table"
, Text
"FOR EACH STATEMENT EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcUpdate Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
]
createNotifyFunctionSQL :: Text -> Text -> Text
createNotifyFunctionSQL :: Text -> Text -> Text
createNotifyFunctionSQL Text
schemaName Text
tableName =
let functionName :: Text
functionName = Text -> Text
notifyFunctionName Text
tableName
channelName :: Text
channelName = Text -> Text
notificationChannelForTable Text
tableName
quotedChannel :: Text
quotedChannel = HasCallStack => Text -> Text -> Text -> Text
Text -> Text -> Text -> Text
T.replace Text
"'" Text
"''" Text
channelName
in [Text] -> Text
T.unlines
[ Text
"CREATE OR REPLACE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
functionName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"()"
, Text
"RETURNS TRIGGER AS $$"
, Text
"BEGIN"
, Text
" PERFORM pg_notify('" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
quotedChannel Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"', '');"
, Text
" RETURN NEW;"
, Text
"END;"
, Text
"$$ LANGUAGE plpgsql;"
]
createNotifyTriggerSQL :: Text -> Text -> Text
createNotifyTriggerSQL :: Text -> Text -> Text
createNotifyTriggerSQL Text
schemaName Text
tableName =
let functionName :: Text
functionName = Text -> Text
notifyFunctionName Text
tableName
trigName :: Text
trigName = Text -> Text
quoteIdentifier (Text -> Text
notifyTriggerName Text
tableName)
tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
in [Text] -> Text
T.unlines
[ Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigName
, Text
"AFTER INSERT ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
, Text
"FOR EACH ROW"
, Text
"EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
functionName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
]
dropNotifyTriggerSQL :: Text -> Text -> Text
dropNotifyTriggerSQL :: Text -> Text -> Text
dropNotifyTriggerSQL Text
schemaName Text
tableName =
Text
"DROP TRIGGER IF EXISTS "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text -> Text
notifyTriggerName Text
tableName)
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
dropNotifyFunctionSQL :: Text -> Text -> Text
dropNotifyFunctionSQL :: Text -> Text -> Text
dropNotifyFunctionSQL Text
schemaName Text
tableName =
let functionName :: Text
functionName = Text -> Text
notifyFunctionName Text
tableName
in Text
"DROP FUNCTION IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
functionName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
createEventStreamingFunctionSQL :: Text -> Text
createEventStreamingFunctionSQL :: Text -> Text
createEventStreamingFunctionSQL Text
schemaName =
let funcName :: Text
funcName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
eventStreamingFunctionName
in [Text] -> Text
T.unlines
[ Text
"CREATE OR REPLACE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"() RETURNS trigger AS $$"
, Text
"DECLARE"
, Text
" event_type text;"
, Text
" job_id bigint;"
, Text
"BEGIN"
, Text
" CASE TG_OP"
, Text
" WHEN 'INSERT' THEN"
, Text
" event_type := CASE"
, Text
" WHEN TG_TABLE_NAME LIKE '%_dlq' THEN 'job_dlq'"
, Text
" ELSE 'job_inserted'"
, Text
" END;"
, Text
" job_id := NEW.id;"
, Text
" WHEN 'UPDATE' THEN"
, Text
" event_type := 'job_updated';"
, Text
" job_id := NEW.id;"
, Text
" WHEN 'DELETE' THEN"
, Text
" event_type := 'job_deleted';"
, Text
" job_id := OLD.id;"
, Text
" END CASE;"
, Text
""
, Text
" PERFORM pg_notify('" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventStreamingChannel Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"',"
, Text
" json_build_object("
, Text
" 'event', event_type,"
, Text
" 'table', regexp_replace(TG_TABLE_NAME, '_dlq$', ''),"
, Text
" 'job_id', job_id"
, Text
" )::text);"
, Text
" RETURN NULL;"
, Text
"END;"
, Text
"$$ LANGUAGE plpgsql;"
]
createEventStreamingTriggersSQL :: Text -> Text -> Text
Text
schemaName Text
tableName =
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName
funcName :: Text
funcName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
eventStreamingFunctionName
trigName :: Text
trigName = Text -> Text
eventStreamingTriggerName Text
tableName
dlqTrigName :: Text
dlqTrigName = Text -> Text
eventStreamingDLQTriggerName Text
tableName
in [Text] -> Text
T.unlines
[
Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
"notify_job_insert" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
"notify_job_update" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
"notify_job_delete" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
"notify_dlq_insert" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dlqTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
""
,
Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
trigName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
trigName
, Text
"AFTER INSERT OR UPDATE OR DELETE ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
, Text
"FOR EACH ROW EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
, Text
""
, Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
dlqTrigName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dlqTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
, Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
dlqTrigName
, Text
"AFTER INSERT ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dlqTbl
, Text
"FOR EACH ROW EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
]