{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}

-- | SQL generation functions for job queue schemas. No database execution happens here.
module Arbiter.Core.Job.Schema
  ( -- * Schema Creation
    createSchemaSQL
  , defaultSchemaName

    -- * Table Creation SQL
  , createJobQueueTableSQL
  , createJobQueueDLQTableSQL

    -- * Index Creation SQL
  , createJobQueueGroupKeyIndexSQL
  , createJobQueueUngroupedRankingIndexSQL
  , createDLQGroupKeyIndexSQL
  , createDLQFailedAtIndexSQL
  , createDLQParentIdIndexSQL
  , createDedupKeyIndexSQL
  , createParentIdIndexSQL

    -- * NOTIFY Trigger SQL
  , createNotifyFunctionSQL
  , createNotifyTriggerSQL
  , dropNotifyTriggerSQL
  , dropNotifyFunctionSQL

    -- * Event Streaming Trigger SQL
  , createEventStreamingFunctionSQL
  , createEventStreamingTriggersSQL

    -- * Notification Channel Helpers
  , notificationChannelForTable
  , eventStreamingChannel

    -- * Trigger / Function Name Helpers
  , notifyFunctionName
  , notifyTriggerName
  , eventStreamingFunctionName
  , eventStreamingTriggerName
  , eventStreamingDLQTriggerName

    -- * Table Name Helpers
  , jobQueueTable
  , jobQueueDLQTable
  , jobQueueResultsTable
  , jobQueueGroupsTable
  , jobQueueReaperSeq

    -- * Reaper Coordination
  , createReaperSeqSQL

    -- * Results Table
  , createResultsTableSQL

    -- * Groups Table
  , createGroupsTableSQL
  , createGroupsIndexSQL

    -- * Groups Trigger SQL
  , createGroupsTriggerFunctionsSQL
  , createGroupsTriggersSQL

    -- * Identifier Quoting
  , quoteIdentifier
  ) where

import Data.Text (Text)
import Data.Text qualified as T
import NeatInterpolation (text)

-- | Quote a PostgreSQL identifier (schema name, table name, column name).
--
-- This escapes double quotes by doubling them and wraps the identifier in quotes.
-- This prevents SQL injection when using dynamic identifiers.
quoteIdentifier :: Text -> Text
quoteIdentifier :: Text -> Text
quoteIdentifier Text
ident =
  Text
"\"" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> HasCallStack => Text -> Text -> Text -> Text
Text -> Text -> Text -> Text
T.replace Text
"\"" Text
"\"\"" Text
ident Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"\""

-- | Default PostgreSQL schema name for Arbiter tables
--
-- Using a dedicated schema prevents namespace pollution in the user's public schema.
defaultSchemaName :: Text
defaultSchemaName :: Text
defaultSchemaName = Text
"arbiter"

-- | Generate notification channel name for a table
--
-- Each table gets its own NOTIFY channel for job insertions.
-- Example: notificationChannelForTable "email_jobs" -> "email_jobs_created"
notificationChannelForTable :: Text -> Text
notificationChannelForTable :: Text -> Text
notificationChannelForTable Text
tableName = Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_created"

-- | Channel name used by the event streaming (SSE) system.
eventStreamingChannel :: Text
eventStreamingChannel :: Text
eventStreamingChannel = Text
"arbiter_job_events"

-- | Per-table NOTIFY trigger function name.
notifyFunctionName :: Text -> Text
notifyFunctionName :: Text -> Text
notifyFunctionName Text
tableName = Text
"notify_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_created"

-- | Per-table NOTIFY trigger name.
notifyTriggerName :: Text -> Text
notifyTriggerName :: Text -> Text
notifyTriggerName Text
tableName = Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_notify_trigger"

-- | Shared event streaming trigger function name (one per schema).
eventStreamingFunctionName :: Text
eventStreamingFunctionName :: Text
eventStreamingFunctionName = Text
"notify_job_event"

-- | Per-table event streaming trigger name.
eventStreamingTriggerName :: Text -> Text
eventStreamingTriggerName :: Text -> Text
eventStreamingTriggerName Text
tableName = Text
"notify_job_event_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName

-- | Per-table DLQ event streaming trigger name.
eventStreamingDLQTriggerName :: Text -> Text
eventStreamingDLQTriggerName :: Text -> Text
eventStreamingDLQTriggerName Text
tableName = Text
"notify_job_event_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq"

-- | Qualified table name: @jobQueueTable "arbiter" "email_jobs"@ -> @"arbiter"."email_jobs"@
jobQueueTable :: Text -> Text -> Text
jobQueueTable :: Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
tableName

-- | Qualified DLQ table name: @jobQueueDLQTable "arbiter" "email_jobs"@ -> @"arbiter"."email_jobs_dlq"@
jobQueueDLQTable :: Text -> Text -> Text
jobQueueDLQTable :: Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq")

-- | Qualified results table name: @jobQueueResultsTable "arbiter" "email_jobs"@ -> @"arbiter"."email_jobs_results"@
jobQueueResultsTable :: Text -> Text -> Text
jobQueueResultsTable :: Text -> Text -> Text
jobQueueResultsTable Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_results")

-- | Qualified groups table name: @jobQueueGroupsTable "arbiter" "email_jobs"@ -> @"arbiter"."email_jobs_groups"@
jobQueueGroupsTable :: Text -> Text -> Text
jobQueueGroupsTable :: Text -> Text -> Text
jobQueueGroupsTable Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_groups")

-- | Qualified reaper sequence name.
jobQueueReaperSeq :: Text -> Text -> Text
jobQueueReaperSeq :: Text -> Text -> Text
jobQueueReaperSeq Text
schemaName Text
tableName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_reaper_seq")

-- | SQL to create the reaper coordination sequence.
-- Stores the epoch (seconds) of the last reaper run.
createReaperSeqSQL :: Text -> Text -> Text
createReaperSeqSQL :: Text -> Text -> Text
createReaperSeqSQL Text
schemaName Text
tableName =
  let seqName :: Text
seqName = Text -> Text -> Text
jobQueueReaperSeq Text
schemaName Text
tableName
   in Text
"CREATE SEQUENCE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
seqName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" START WITH 0 MINVALUE 0;"

-- | SQL to create the schema for Arbiter tables
createSchemaSQL :: Text -> Text
createSchemaSQL :: Text -> Text
createSchemaSQL Text
schemaName =
  Text
"CREATE SCHEMA IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"

-- | Common job column definitions (matches the Job type structure)
--
-- These columns are shared between job_queue and dead_letter_queue tables.
jobColumns :: [Text]
jobColumns :: [Text]
jobColumns =
  [ Text
"  id BIGSERIAL PRIMARY KEY,"
  , Text
"  payload JSONB NOT NULL,"
  , Text
"  group_key TEXT,"
  , Text
"  inserted_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),"
  , Text
"  updated_at TIMESTAMPTZ,"
  , Text
"  last_attempted_at TIMESTAMPTZ,"
  , Text
"  not_visible_until TIMESTAMPTZ,"
  , Text
"  attempts INT NOT NULL DEFAULT 0,"
  , Text
"  last_error TEXT,"
  , Text
"  priority INT NOT NULL DEFAULT 0,"
  , Text
"  dedup_key TEXT,"
  , Text
"  dedup_strategy TEXT,"
  , Text
"  max_attempts INT,"
  , Text
"  parent_id BIGINT,"
  , Text
"  parent_state JSONB,"
  , Text
"  suspended BOOLEAN NOT NULL DEFAULT FALSE"
  ]

-- | Job column definitions for DLQ table (with job_id instead of id)
jobColumnsForDLQ :: Text
jobColumnsForDLQ :: Text
jobColumnsForDLQ =
  [Text] -> Text
T.unlines
    [ Text
"  id BIGSERIAL PRIMARY KEY,"
    , Text
"  failed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),"
    , Text
"  job_id BIGINT NOT NULL,"
    ]
    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> [Text] -> Text
T.unlines (Int -> [Text] -> [Text]
forall a. Int -> [a] -> [a]
drop Int
1 [Text]
jobColumns)

-- | SQL to create the main job queue table within a schema
--
-- This table stores pending and in-progress jobs.
-- Completed jobs are deleted, failed jobs are moved to the DLQ.
createJobQueueTableSQL :: Text -> Text -> Text
createJobQueueTableSQL :: Text -> Text -> Text
createJobQueueTableSQL Text
schemaName Text
tableName =
  [Text] -> Text
T.unlines
    [ Text
"CREATE TABLE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ("
    , [Text] -> Text
T.unlines [Text]
jobColumns
    , Text
") WITH (fillfactor = 70);"
    ]

-- | SQL to create the dead letter queue table within a schema
--
-- Jobs that fail repeatedly (exceed max attempts) are moved here for inspection.
-- This table contains ALL the Job fields (complete snapshot) plus DLQ-specific metadata.
createJobQueueDLQTableSQL :: Text -> Text -> Text
createJobQueueDLQTableSQL :: Text -> Text -> Text
createJobQueueDLQTableSQL Text
schemaName Text
tableName =
  [Text] -> Text
T.unlines
    [ Text
"CREATE TABLE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ("
    , Text
jobColumnsForDLQ
    , Text
");"
    ]

-- | SQL to create a partial index on group_key for efficient per-group lookups.
--
-- Used by claim queries' LATERAL subqueries and by the DELETE/UPDATE triggers
-- when recomputing @in_flight_until@. Composite @(group_key, priority, id)@
-- so the DELETE trigger can recompute min values via index-only lookup.
createJobQueueGroupKeyIndexSQL :: Text -> Text -> Text
createJobQueueGroupKeyIndexSQL :: Text -> Text -> Text
createJobQueueGroupKeyIndexSQL Text
schemaName Text
tableName =
  [Text] -> Text
T.unlines
    [ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_group_key")
    , Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (group_key, priority ASC, id ASC)"
    , Text
"WHERE group_key IS NOT NULL;"
    ]

createJobQueueUngroupedRankingIndexSQL :: Text -> Text -> Text
createJobQueueUngroupedRankingIndexSQL :: Text -> Text -> Text
createJobQueueUngroupedRankingIndexSQL Text
schemaName Text
tableName =
  [Text] -> Text
T.unlines
    [ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_ungrouped_ranking")
    , Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (priority ASC, id ASC)"
    , Text
"WHERE group_key IS NULL;"
    ]

-- | SQL to create index on DLQ group_key for querying failed jobs by group
createDLQGroupKeyIndexSQL :: Text -> Text -> Text
createDLQGroupKeyIndexSQL :: Text -> Text -> Text
createDLQGroupKeyIndexSQL Text
schemaName Text
tableName =
  [Text] -> Text
T.unlines
    [ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq_group_key")
    , Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (group_key);"
    ]

-- | SQL to create index on DLQ failed_at for time-based queries
createDLQFailedAtIndexSQL :: Text -> Text -> Text
createDLQFailedAtIndexSQL :: Text -> Text -> Text
createDLQFailedAtIndexSQL Text
schemaName Text
tableName =
  [Text] -> Text
T.unlines
    [ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq_failed_at")
    , Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (failed_at DESC);"
    ]

-- | SQL to create index on DLQ parent_id for efficient child lookups (DLQ child counts)
createDLQParentIdIndexSQL :: Text -> Text -> Text
createDLQParentIdIndexSQL :: Text -> Text -> Text
createDLQParentIdIndexSQL Text
schemaName Text
tableName =
  [Text] -> Text
T.unlines
    [ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dlq_parent_id")
    , Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (parent_id)"
    , Text
"WHERE parent_id IS NOT NULL;"
    ]

-- | SQL to create unique index on dedup_key for job deduplication
createDedupKeyIndexSQL :: Text -> Text -> Text
createDedupKeyIndexSQL :: Text -> Text -> Text
createDedupKeyIndexSQL Text
schemaName Text
tableName =
  [Text] -> Text
T.unlines
    [ Text
"CREATE UNIQUE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_dedup_key")
    , Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (dedup_key)"
    , Text
"WHERE dedup_key IS NOT NULL;"
    ]

-- | SQL to create a partial index on parent_id for efficient child lookups.
createParentIdIndexSQL :: Text -> Text -> Text
createParentIdIndexSQL :: Text -> Text -> Text
createParentIdIndexSQL Text
schemaName Text
tableName =
  [Text] -> Text
T.unlines
    [ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_parent_id")
    , Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (parent_id)"
    , Text
"WHERE parent_id IS NOT NULL;"
    ]

-- | SQL to create the results table for storing child job results.
--
-- Child results are stored as individual rows (one per child), keyed by
-- @(parent_id, child_id)@. The @ON DELETE CASCADE@ foreign key ensures
-- cleanup when the parent is acked (deleted).
createResultsTableSQL :: Text -> Text -> Text
createResultsTableSQL :: Text -> Text -> Text
createResultsTableSQL Text
schemaName Text
tableName =
  let resultsTbl :: Text
resultsTbl = Text -> Text -> Text
jobQueueResultsTable Text
schemaName Text
tableName
      mainTbl :: Text
mainTbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
   in [Text] -> Text
T.unlines
        [ Text
"CREATE TABLE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
resultsTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ("
        , Text
"  parent_id BIGINT NOT NULL REFERENCES " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
mainTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"(id) ON DELETE CASCADE,"
        , Text
"  child_id BIGINT NOT NULL,"
        , Text
"  result JSONB NOT NULL,"
        , Text
"  PRIMARY KEY (parent_id, child_id)"
        , Text
");"
        ]

-- | SQL to create the groups table for fast grouped claims.
--
-- Stores one row per distinct @group_key@ with pre-computed @min_priority@,
-- @min_id@, @job_count@, and @in_flight_until@. Maintained by statement-level
-- AFTER triggers on the main job table (see 'createGroupsTriggerFunctionsSQL').
createGroupsTableSQL :: Text -> Text -> Text
createGroupsTableSQL :: Text -> Text -> Text
createGroupsTableSQL Text
schemaName Text
tableName =
  let groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schemaName Text
tableName
   in [Text] -> Text
T.unlines
        [ Text
"CREATE TABLE IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
groupsTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ("
        , Text
"  group_key TEXT PRIMARY KEY,"
        , Text
"  min_priority INT NOT NULL DEFAULT 0,"
        , Text
"  min_id BIGINT NOT NULL DEFAULT 0,"
        , Text
"  job_count INT NOT NULL DEFAULT 0,"
        , Text
"  in_flight_until TIMESTAMPTZ DEFAULT NULL"
        , Text
");"
        ]

-- | SQL to create a ranking index on the groups table.
--
-- Non-partial so that @UPDATE SET job_count@, @in_flight_until@ etc. never
-- change indexed columns — enabling HOT updates on every ack and claim.
createGroupsIndexSQL :: Text -> Text -> Text
createGroupsIndexSQL :: Text -> Text -> Text
createGroupsIndexSQL Text
schemaName Text
tableName =
  let groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schemaName Text
tableName
   in [Text] -> Text
T.unlines
        [ Text
"CREATE INDEX IF NOT EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
"idx_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_groups_ranking")
        , Text
"ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
groupsTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" (min_priority ASC, min_id ASC);"
        ]

-- ---------------------------------------------------------------------------
-- Groups Maintenance Triggers
-- ---------------------------------------------------------------------------

-- | Three trigger functions maintaining the groups table via statement-level
-- AFTER triggers. Uses incremental operations where possible.
createGroupsTriggerFunctionsSQL :: Text -> Text -> Text
createGroupsTriggerFunctionsSQL :: Text -> Text -> Text
createGroupsTriggerFunctionsSQL Text
schemaName Text
tableName =
  let groupsTbl :: Text
groupsTbl = Text -> Text -> Text
jobQueueGroupsTable Text
schemaName Text
tableName
      tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
      baseName :: Text
baseName = Text
"maintain_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_groups"
      funcInsert :: Text
funcInsert = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_insert")
      funcDelete :: Text
funcDelete = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_delete")
      funcUpdate :: Text
funcUpdate = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_update")
      dd :: Text
dd = Text
"$$"
   in [Text] -> Text
T.unlines
        [ Text -> Text -> Text -> Text
groupsInsertFunction Text
funcInsert Text
groupsTbl Text
dd
        , Text -> Text -> Text -> Text -> Text
groupsDeleteFunction Text
funcDelete Text
groupsTbl Text
tbl Text
dd
        , Text -> Text -> Text -> Text -> Text
groupsUpdateFunction Text
funcUpdate Text
groupsTbl Text
tbl Text
dd
        ]

groupsInsertFunction :: Text -> Text -> Text -> Text
groupsInsertFunction :: Text -> Text -> Text -> Text
groupsInsertFunction Text
funcName Text
groupsTbl Text
dd =
  [text|
    CREATE OR REPLACE FUNCTION ${funcName}()
    RETURNS TRIGGER AS ${dd}
    BEGIN
      IF NOT EXISTS (SELECT 1 FROM new_table WHERE group_key IS NOT NULL LIMIT 1) THEN
        RETURN NULL;
      END IF;

      INSERT INTO ${groupsTbl} (group_key, min_priority, min_id, job_count)
      SELECT group_key, MIN(priority), MIN(id), COUNT(*)
      FROM new_table
      WHERE group_key IS NOT NULL
      GROUP BY group_key
      ON CONFLICT (group_key) DO UPDATE SET
        min_priority = LEAST(${groupsTbl}.min_priority, EXCLUDED.min_priority),
        min_id = LEAST(${groupsTbl}.min_id, EXCLUDED.min_id),
        job_count = ${groupsTbl}.job_count + EXCLUDED.job_count,
        in_flight_until = CASE WHEN ${groupsTbl}.in_flight_until <= NOW()
          THEN NULL ELSE ${groupsTbl}.in_flight_until END;

      RETURN NULL;
    END;
    ${dd} LANGUAGE plpgsql;
  |]

groupsDeleteFunction :: Text -> Text -> Text -> Text -> Text
groupsDeleteFunction :: Text -> Text -> Text -> Text -> Text
groupsDeleteFunction Text
funcName Text
groupsTbl Text
tbl Text
dd =
  [text|
    CREATE OR REPLACE FUNCTION ${funcName}()
    RETURNS TRIGGER AS ${dd}
    BEGIN
      IF NOT EXISTS (SELECT 1 FROM old_table WHERE group_key IS NOT NULL LIMIT 1) THEN
        RETURN NULL;
      END IF;

      UPDATE ${groupsTbl} g
      SET job_count = g.job_count - sub.removed_count,
          min_priority = COALESCE(sub.new_min_priority, g.min_priority),
          min_id = COALESCE(sub.new_min_id, g.min_id),
          in_flight_until = CASE
            WHEN sub.had_inflight THEN NULL
            ELSE g.in_flight_until
          END
      FROM (
        SELECT d.group_key, d.removed_count, d.had_inflight,
          MIN(t.priority) AS new_min_priority, MIN(t.id) AS new_min_id
        FROM (
          SELECT group_key, COUNT(*) AS removed_count,
            bool_or(not_visible_until > NOW() AND NOT suspended) AS had_inflight
          FROM old_table
          WHERE group_key IS NOT NULL
          GROUP BY group_key
        ) d
        LEFT JOIN ${tbl} t ON t.group_key = d.group_key
        GROUP BY d.group_key, d.removed_count, d.had_inflight
      ) sub
      WHERE g.group_key = sub.group_key;

      DELETE FROM ${groupsTbl}
      WHERE job_count <= 0
        AND group_key IN (SELECT group_key FROM old_table WHERE group_key IS NOT NULL);

      RETURN NULL;
    END;
    ${dd} LANGUAGE plpgsql;
  |]

groupsUpdateFunction :: Text -> Text -> Text -> Text -> Text
groupsUpdateFunction :: Text -> Text -> Text -> Text -> Text
groupsUpdateFunction Text
funcName Text
groupsTbl Text
tbl Text
dd =
  [text|
    CREATE OR REPLACE FUNCTION ${funcName}()
    RETURNS TRIGGER AS ${dd}
    BEGIN
      IF NOT EXISTS (
        SELECT 1 FROM new_table WHERE group_key IS NOT NULL LIMIT 1
      ) AND NOT EXISTS (
        SELECT 1 FROM old_table WHERE group_key IS NOT NULL LIMIT 1
      ) THEN
        RETURN NULL;
      END IF;

      -- Step 1: Fast path — extend in_flight_until when not_visible_until increases (claim, retry)
      UPDATE ${groupsTbl} g
      SET in_flight_until = GREATEST(g.in_flight_until, sub.new_ift)
      FROM (
        SELECT n.group_key, MAX(n.not_visible_until) AS new_ift
        FROM new_table n
        JOIN old_table o ON o.id = n.id
        WHERE n.group_key IS NOT NULL
          AND n.not_visible_until > NOW()
          AND NOT n.suspended
          AND (o.not_visible_until IS NULL OR o.not_visible_until <= NOW()
               OR n.not_visible_until > o.not_visible_until)
        GROUP BY n.group_key
      ) sub
      WHERE g.group_key = sub.group_key;

      -- Step 2: Full rescan — recompute in_flight_until when not_visible_until decreases or suspended changes
      UPDATE ${groupsTbl} g
      SET in_flight_until = sub.new_ift
      FROM (
        SELECT t.group_key,
          MAX(t.not_visible_until) FILTER (
            WHERE t.not_visible_until > NOW() AND NOT t.suspended
          ) AS new_ift
        FROM ${tbl} t
        WHERE t.group_key IN (
          SELECT n.group_key FROM new_table n
          JOIN old_table o ON o.id = n.id
          WHERE n.group_key IS NOT NULL
            AND (o.not_visible_until IS DISTINCT FROM n.not_visible_until
                 OR o.suspended IS DISTINCT FROM n.suspended)
            AND (
              n.not_visible_until > NOW() AND NOT n.suspended
              AND (o.not_visible_until IS NULL OR o.not_visible_until <= NOW()
                   OR n.not_visible_until > o.not_visible_until)
            ) IS NOT TRUE
        )
        GROUP BY t.group_key
      ) sub
      WHERE g.group_key = sub.group_key
        AND g.in_flight_until IS DISTINCT FROM sub.new_ift;

      -- Step 3: Handle group_key changes (dedup replace) — remove from old group
      UPDATE ${groupsTbl} g
      SET job_count = g.job_count - sub.cnt,
          min_priority = COALESCE(sub.new_min_priority, g.min_priority),
          min_id = COALESCE(sub.new_min_id, g.min_id),
          in_flight_until = CASE
            WHEN sub.had_inflight THEN NULL
            ELSE g.in_flight_until
          END
      FROM (
        SELECT d.group_key, d.cnt, d.had_inflight,
          MIN(t.priority) AS new_min_priority, MIN(t.id) AS new_min_id
        FROM (
          SELECT o.group_key, COUNT(*) AS cnt,
            bool_or(o.not_visible_until > NOW() AND NOT o.suspended) AS had_inflight
          FROM old_table o
          JOIN new_table n ON o.id = n.id
          WHERE o.group_key IS NOT NULL
            AND o.group_key IS DISTINCT FROM n.group_key
          GROUP BY o.group_key
        ) d
        LEFT JOIN ${tbl} t ON t.group_key = d.group_key
        GROUP BY d.group_key, d.cnt, d.had_inflight
      ) sub
      WHERE g.group_key = sub.group_key;

      DELETE FROM ${groupsTbl}
      WHERE job_count <= 0
        AND group_key IN (
          SELECT o.group_key FROM old_table o
          JOIN new_table n ON o.id = n.id
          WHERE o.group_key IS NOT NULL
            AND o.group_key IS DISTINCT FROM n.group_key
        );

      -- Step 4: Handle group_key changes — add to new group
      INSERT INTO ${groupsTbl} (group_key, min_priority, min_id, job_count)
      SELECT n.group_key, MIN(n.priority), MIN(n.id), COUNT(*)
      FROM new_table n
      JOIN old_table o ON o.id = n.id
      WHERE n.group_key IS NOT NULL
        AND o.group_key IS DISTINCT FROM n.group_key
      GROUP BY n.group_key
      ON CONFLICT (group_key) DO UPDATE SET
        min_priority = LEAST(${groupsTbl}.min_priority, EXCLUDED.min_priority),
        min_id = LEAST(${groupsTbl}.min_id, EXCLUDED.min_id),
        job_count = ${groupsTbl}.job_count + EXCLUDED.job_count;

      RETURN NULL;
    END;
    ${dd} LANGUAGE plpgsql;
  |]

-- | SQL to create 3 statement-level AFTER triggers on the main job table
-- that call the groups maintenance functions.
--
-- Uses @REFERENCING NEW\/OLD TABLE AS@ for efficient batch access to
-- affected rows via transition tables.
createGroupsTriggersSQL :: Text -> Text -> Text
createGroupsTriggersSQL :: Text -> Text -> Text
createGroupsTriggersSQL Text
schemaName Text
tableName =
  let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
      baseName :: Text
baseName = Text
"maintain_" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_groups"
      funcInsert :: Text
funcInsert = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_insert")
      funcDelete :: Text
funcDelete = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_delete")
      funcUpdate :: Text
funcUpdate = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_update")
      trigInsert :: Text
trigInsert = Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_insert")
      trigDelete :: Text
trigDelete = Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_delete")
      trigUpdate :: Text
trigUpdate = Text -> Text
quoteIdentifier (Text
baseName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"_update")
   in [Text] -> Text
T.unlines
        [ Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigInsert Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigInsert
        , Text
"AFTER INSERT ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
        , Text
"REFERENCING NEW TABLE AS new_table"
        , Text
"FOR EACH STATEMENT EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcInsert Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
        , Text
""
        , Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigDelete Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigDelete
        , Text
"AFTER DELETE ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
        , Text
"REFERENCING OLD TABLE AS old_table"
        , Text
"FOR EACH STATEMENT EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcDelete Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
        , Text
""
        , Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigUpdate Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigUpdate
        , Text
"AFTER UPDATE ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
        , Text
"REFERENCING OLD TABLE AS old_table NEW TABLE AS new_table"
        , Text
"FOR EACH STATEMENT EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcUpdate Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
        ]

-- | SQL to create the NOTIFY function for a specific table
--
-- This function is triggered after job insertion and sends a notification on a
-- table-specific channel. Each table gets its own function and channel for isolation.
--
-- Note: The channel name is quoted as a string literal (single quotes), not an identifier.
createNotifyFunctionSQL :: Text -> Text -> Text
createNotifyFunctionSQL :: Text -> Text -> Text
createNotifyFunctionSQL Text
schemaName Text
tableName =
  let functionName :: Text
functionName = Text -> Text
notifyFunctionName Text
tableName
      channelName :: Text
channelName = Text -> Text
notificationChannelForTable Text
tableName
      quotedChannel :: Text
quotedChannel = HasCallStack => Text -> Text -> Text -> Text
Text -> Text -> Text -> Text
T.replace Text
"'" Text
"''" Text
channelName -- Escape single quotes for string literal
   in [Text] -> Text
T.unlines
        [ Text
"CREATE OR REPLACE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
functionName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"()"
        , Text
"RETURNS TRIGGER AS $$"
        , Text
"BEGIN"
        , Text
"  PERFORM pg_notify('" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
quotedChannel Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"', '');"
        , Text
"  RETURN NEW;"
        , Text
"END;"
        , Text
"$$ LANGUAGE plpgsql;"
        ]

-- | SQL to create the NOTIFY trigger for a specific table
--
-- This trigger fires AFTER INSERT on the job queue table and calls the table-specific notify function.
createNotifyTriggerSQL :: Text -> Text -> Text
createNotifyTriggerSQL :: Text -> Text -> Text
createNotifyTriggerSQL Text
schemaName Text
tableName =
  let functionName :: Text
functionName = Text -> Text
notifyFunctionName Text
tableName
      trigName :: Text
trigName = Text -> Text
quoteIdentifier (Text -> Text
notifyTriggerName Text
tableName)
      tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
   in [Text] -> Text
T.unlines
        [ Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
trigName
        , Text
"AFTER INSERT ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
        , Text
"FOR EACH ROW"
        , Text
"EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
functionName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
        ]

-- | SQL to drop the NOTIFY trigger
dropNotifyTriggerSQL :: Text -> Text -> Text
dropNotifyTriggerSQL :: Text -> Text -> Text
dropNotifyTriggerSQL Text
schemaName Text
tableName =
  Text
"DROP TRIGGER IF EXISTS "
    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier (Text -> Text
notifyTriggerName Text
tableName)
    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON "
    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"

-- | SQL to drop the NOTIFY function for a specific table
dropNotifyFunctionSQL :: Text -> Text -> Text
dropNotifyFunctionSQL :: Text -> Text -> Text
dropNotifyFunctionSQL Text
schemaName Text
tableName =
  let functionName :: Text
functionName = Text -> Text
notifyFunctionName Text
tableName
   in Text
"DROP FUNCTION IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
functionName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"

-- ---------------------------------------------------------------------------
-- Event Streaming Triggers (for admin UI / SSE)
-- ---------------------------------------------------------------------------

-- | SQL to create the event streaming notification function
--
-- This is a shared function (one per schema) that fires on INSERT, UPDATE,
-- DELETE of any job table and INSERT on any DLQ table. It sends a lightweight
-- JSON event via @pg_notify@ on the @arbiter_job_events@ channel with just
-- the event type, table name, and job ID.
--
-- The function uses @TG_TABLE_NAME@ and @TG_OP@ to determine context, so it
-- works for all tables without per-table copies.
createEventStreamingFunctionSQL :: Text -> Text
createEventStreamingFunctionSQL :: Text -> Text
createEventStreamingFunctionSQL Text
schemaName =
  let funcName :: Text
funcName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
eventStreamingFunctionName
   in [Text] -> Text
T.unlines
        [ Text
"CREATE OR REPLACE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"() RETURNS trigger AS $$"
        , Text
"DECLARE"
        , Text
"  event_type text;"
        , Text
"  job_id bigint;"
        , Text
"BEGIN"
        , Text
"  CASE TG_OP"
        , Text
"    WHEN 'INSERT' THEN"
        , Text
"      event_type := CASE"
        , Text
"        WHEN TG_TABLE_NAME LIKE '%_dlq' THEN 'job_dlq'"
        , Text
"        ELSE 'job_inserted'"
        , Text
"      END;"
        , Text
"      job_id := NEW.id;"
        , Text
"    WHEN 'UPDATE' THEN"
        , Text
"      event_type := 'job_updated';"
        , Text
"      job_id := NEW.id;"
        , Text
"    WHEN 'DELETE' THEN"
        , Text
"      event_type := 'job_deleted';"
        , Text
"      job_id := OLD.id;"
        , Text
"  END CASE;"
        , Text
""
        , Text
"  PERFORM pg_notify('" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
eventStreamingChannel Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"',"
        , Text
"    json_build_object("
        , Text
"      'event', event_type,"
        , Text
"      'table', regexp_replace(TG_TABLE_NAME, '_dlq$', ''),"
        , Text
"      'job_id', job_id"
        , Text
"    )::text);"
        , Text
"  RETURN NULL;"
        , Text
"END;"
        , Text
"$$ LANGUAGE plpgsql;"
        ]

-- | SQL to create event streaming triggers for a table and its DLQ
--
-- Creates a combined INSERT/UPDATE/DELETE trigger on the main table and an
-- INSERT trigger on the DLQ table, both calling the shared @notify_job_event@
-- function. Also drops any legacy per-operation triggers left by older versions
-- of @setupEventTriggers@.
createEventStreamingTriggersSQL :: Text -> Text -> Text
createEventStreamingTriggersSQL :: Text -> Text -> Text
createEventStreamingTriggersSQL Text
schemaName Text
tableName =
  let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
      dlqTbl :: Text
dlqTbl = Text -> Text -> Text
jobQueueDLQTable Text
schemaName Text
tableName
      funcName :: Text
funcName = Text -> Text
quoteIdentifier Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
eventStreamingFunctionName
      trigName :: Text
trigName = Text -> Text
eventStreamingTriggerName Text
tableName
      dlqTrigName :: Text
dlqTrigName = Text -> Text
eventStreamingDLQTriggerName Text
tableName
   in [Text] -> Text
T.unlines
        [ -- Drop legacy per-operation triggers (from setupEventTriggers)
          Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
"notify_job_insert" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
"notify_job_update" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
"notify_job_delete" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
"notify_dlq_insert" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dlqTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
""
        , -- Create combined triggers (drop first for idempotency)
          Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
trigName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
trigName
        , Text
"AFTER INSERT OR UPDATE OR DELETE ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tbl
        , Text
"FOR EACH ROW EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
        , Text
""
        , Text
"DROP TRIGGER IF EXISTS " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
dlqTrigName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dlqTbl Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
";"
        , Text
"CREATE TRIGGER " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> Text
quoteIdentifier Text
dlqTrigName
        , Text
"AFTER INSERT ON " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
dlqTbl
        , Text
"FOR EACH ROW EXECUTE FUNCTION " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
funcName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"();"
        ]