{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE OverloadedStrings #-}

module Arbiter.Core.Operations
  ( -- * Job Insertion
    insertJob
  , insertJobUnsafe
  , insertJobsBatch
  , insertJobsBatch_
  , insertResult
  , getResultsByParent
  , getDLQChildErrorsByParent
  , persistParentState
  , claimNextVisibleJobs
  , claimNextVisibleJobsBatched
  , ackJob
  , ackJobsBatch
  , ackJobsBulk
  , setVisibilityTimeout
  , setVisibilityTimeoutBatch
  , VisibilityUpdateInfo (..)
  , updateJobForRetry
  , moveToDLQ
  , moveToDLQBatch
  , retryFromDLQ
  , dlqJobExists
  , listDLQJobs
  , listDLQJobsByParent
  , countDLQJobsByParent
  , deleteDLQJob
  , deleteDLQJobsBatch

    -- * Filtered Query Operations
  , Tmpl.JobFilter (..)
  , listJobsFiltered
  , countJobsFiltered
  , listDLQFiltered
  , countDLQFiltered

    -- * Admin Operations
  , listJobs
  , getJobById
  , getJobsByGroup
  , getInFlightJobs
  , cancelJob
  , cancelJobsBatch
  , promoteJob
  , QueueStats (..)
  , getQueueStats

    -- * Count Operations
  , countJobs
  , countJobsByGroup
  , countInFlightJobs
  , countDLQJobs

    -- * Parent-Child Operations
  , getJobsByParent
  , countJobsByParent
  , countChildrenBatch
  , countDLQChildrenBatch

    -- * Job Dependency Operations
  , pauseChildren
  , resumeChildren
  , cancelJobCascade
  , cancelJobTree

    -- * Suspend/Resume Operations
  , suspendJob
  , resumeJob

    -- * Groups Table Operations
  , refreshGroups

    -- * Internal Operations
  , getParentStateSnapshot
  , readChildResultsRaw
  , mergeRawChildResults
  ) where

import Control.Monad (foldM, void, when)
import Data.Aeson (FromJSON, Result (..), ToJSON, Value (Object), fromJSON, toJSON)
import Data.Foldable (for_, toList)
import Data.Functor qualified as Functor
import Data.Int (Int32, Int64)
import Data.List (groupBy, sortOn)
import Data.List.NonEmpty (NonEmpty (..))
import Data.List.NonEmpty qualified as NE
import Data.Map.Strict qualified as Map
import Data.Maybe (catMaybes, mapMaybe)
import Data.Sequence (Seq, (|>))
import Data.Sequence qualified as Seq
import Data.Set qualified as Set
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (NominalDiffTime, UTCTime)
import GHC.Generics (Generic, Generically (..))

import Arbiter.Core.Codec
  ( Col (..)
  , Params
  , RowCodec
  , col
  , countCodec
  , dlqRowCodec
  , jobRowCodec
  , ncol
  , parr
  , pnarr
  , pnul
  , pval
  , statsRowCodec
  )
import Arbiter.Core.Exceptions (throwParsing)
import Arbiter.Core.Job.DLQ qualified as DLQ
import Arbiter.Core.Job.Schema (jobQueueTable)
import Arbiter.Core.Job.Types
  ( DedupKey (IgnoreDuplicate, ReplaceDuplicate)
  , Job (..)
  , JobPayload
  , JobRead
  , JobWrite
  )
import Arbiter.Core.MonadArbiter (MonadArbiter (..))
import Arbiter.Core.SqlTemplates qualified as Tmpl

decodePayload :: (JobPayload payload, MonadArbiter m) => Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload :: forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
job = case Value -> Result payload
forall a. FromJSON a => Value -> Result a
fromJSON (Job Value Int64 Text UTCTime -> Value
forall payload key q insertedAt.
Job payload key q insertedAt -> payload
payload Job Value Int64 Text UTCTime
job) of
  Success payload
p -> JobRead payload -> m (JobRead payload)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (JobRead payload -> m (JobRead payload))
-> JobRead payload -> m (JobRead payload)
forall a b. (a -> b) -> a -> b
$ Job Value Int64 Text UTCTime
job {payload = p}
  Error String
e -> Text -> m (JobRead payload)
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing (Text -> m (JobRead payload)) -> Text -> m (JobRead payload)
forall a b. (a -> b) -> a -> b
$ Text
"Failed to decode job payload: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
e

boolCodec :: RowCodec Bool
boolCodec :: RowCodec Bool
boolCodec = Text -> Col Bool -> RowCodec Bool
forall a. Text -> Col a -> RowCodec a
col Text
"result" Col Bool
CBool

int64Codec :: RowCodec Int64
int64Codec :: RowCodec Int64
int64Codec = Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"result" Col Int64
CInt8

resultsRowCodec :: RowCodec (Int64, Value)
resultsRowCodec :: RowCodec (Int64, Value)
resultsRowCodec = (,) (Int64 -> Value -> (Int64, Value))
-> RowCodec Int64 -> Ap NullCol (Value -> (Int64, Value))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"child_id" Col Int64
CInt8 Ap NullCol (Value -> (Int64, Value))
-> Ap NullCol Value -> RowCodec (Int64, Value)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Value -> Ap NullCol Value
forall a. Text -> Col a -> RowCodec a
col Text
"result" Col Value
CJsonb

errorResultsRowCodec :: RowCodec (Int64, Maybe Text)
errorResultsRowCodec :: RowCodec (Int64, Maybe Text)
errorResultsRowCodec = (,) (Int64 -> Maybe Text -> (Int64, Maybe Text))
-> RowCodec Int64 -> Ap NullCol (Maybe Text -> (Int64, Maybe Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"job_id" Col Int64
CInt8 Ap NullCol (Maybe Text -> (Int64, Maybe Text))
-> Ap NullCol (Maybe Text) -> RowCodec (Int64, Maybe Text)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"last_error" Col Text
CText

visibilityUpdateCodec :: RowCodec VisibilityUpdateInfo
visibilityUpdateCodec :: RowCodec VisibilityUpdateInfo
visibilityUpdateCodec =
  Int64 -> Bool -> Maybe Int32 -> VisibilityUpdateInfo
VisibilityUpdateInfo
    (Int64 -> Bool -> Maybe Int32 -> VisibilityUpdateInfo)
-> RowCodec Int64
-> Ap NullCol (Bool -> Maybe Int32 -> VisibilityUpdateInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"id" Col Int64
CInt8
    Ap NullCol (Bool -> Maybe Int32 -> VisibilityUpdateInfo)
-> RowCodec Bool
-> Ap NullCol (Maybe Int32 -> VisibilityUpdateInfo)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Bool -> RowCodec Bool
forall a. Text -> Col a -> RowCodec a
col Text
"was_heartbeated" Col Bool
CBool
    Ap NullCol (Maybe Int32 -> VisibilityUpdateInfo)
-> Ap NullCol (Maybe Int32) -> RowCodec VisibilityUpdateInfo
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int32 -> Ap NullCol (Maybe Int32)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"current_db_attempts" Col Int32
CInt4

parentCountCodec :: RowCodec (Int64, (Int64, Int64))
parentCountCodec :: RowCodec (Int64, (Int64, Int64))
parentCountCodec =
  (\Int64
pid Int64
cnt Int64
paused -> (Int64
pid, (Int64
cnt, Int64
paused)))
    (Int64 -> Int64 -> Int64 -> (Int64, (Int64, Int64)))
-> RowCodec Int64
-> Ap NullCol (Int64 -> Int64 -> (Int64, (Int64, Int64)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"parent_id" Col Int64
CInt8
    Ap NullCol (Int64 -> Int64 -> (Int64, (Int64, Int64)))
-> RowCodec Int64 -> Ap NullCol (Int64 -> (Int64, (Int64, Int64)))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"count" Col Int64
CInt8
    Ap NullCol (Int64 -> (Int64, (Int64, Int64)))
-> RowCodec Int64 -> RowCodec (Int64, (Int64, Int64))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"count_suspended" Col Int64
CInt8

dlqParentCountCodec :: RowCodec (Int64, Int64)
dlqParentCountCodec :: RowCodec (Int64, Int64)
dlqParentCountCodec = (,) (Int64 -> Int64 -> (Int64, Int64))
-> RowCodec Int64 -> Ap NullCol (Int64 -> (Int64, Int64))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"parent_id" Col Int64
CInt8 Ap NullCol (Int64 -> (Int64, Int64))
-> RowCodec Int64 -> RowCodec (Int64, Int64)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"count" Col Int64
CInt8

childResultsRowCodec :: RowCodec (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
childResultsRowCodec :: RowCodec (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
childResultsRowCodec =
  (,,,,)
    (Text
 -> Maybe Int64
 -> Maybe Value
 -> Maybe Text
 -> Maybe Int64
 -> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol Text
-> Ap
     NullCol
     (Maybe Int64
      -> Maybe Value
      -> Maybe Text
      -> Maybe Int64
      -> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Text -> Ap NullCol Text
forall a. Text -> Col a -> RowCodec a
col Text
"source" Col Text
CText
    Ap
  NullCol
  (Maybe Int64
   -> Maybe Value
   -> Maybe Text
   -> Maybe Int64
   -> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol (Maybe Int64)
-> Ap
     NullCol
     (Maybe Value
      -> Maybe Text
      -> Maybe Int64
      -> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> Ap NullCol (Maybe Int64)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"child_id" Col Int64
CInt8
    Ap
  NullCol
  (Maybe Value
   -> Maybe Text
   -> Maybe Int64
   -> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol (Maybe Value)
-> Ap
     NullCol
     (Maybe Text
      -> Maybe Int64
      -> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Value -> Ap NullCol (Maybe Value)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"result" Col Value
CJsonb
    Ap
  NullCol
  (Maybe Text
   -> Maybe Int64
   -> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol (Maybe Text)
-> Ap
     NullCol
     (Maybe Int64
      -> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"error" Col Text
CText
    Ap
  NullCol
  (Maybe Int64
   -> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol (Maybe Int64)
-> RowCodec
     (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> Ap NullCol (Maybe Int64)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"dlq_pk" Col Int64
CInt8

nullableInt64Codec :: RowCodec (Maybe Int64)
nullableInt64Codec :: Ap NullCol (Maybe Int64)
nullableInt64Codec = Text -> Col Int64 -> Ap NullCol (Maybe Int64)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"parent_id" Col Int64
CInt8

buildWhereClause :: [Tmpl.JobFilter] -> (Text, Params)
buildWhereClause :: [JobFilter] -> (Text, Params)
buildWhereClause [] = (Text
"", [])
buildWhereClause [JobFilter]
filters =
  let ([Text]
clauses, [Params]
params) = [(Text, Params)] -> ([Text], [Params])
forall (f :: * -> *) a b. Functor f => f (a, b) -> (f a, f b)
Functor.unzip ([(Text, Params)] -> ([Text], [Params]))
-> [(Text, Params)] -> ([Text], [Params])
forall a b. (a -> b) -> a -> b
$ (JobFilter -> (Text, Params)) -> [JobFilter] -> [(Text, Params)]
forall a b. (a -> b) -> [a] -> [b]
map JobFilter -> (Text, Params)
filterToClause [JobFilter]
filters
   in (Text
"WHERE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> [Text] -> Text
T.intercalate Text
" AND " [Text]
clauses, [Params] -> Params
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [Params]
params)

filterToClause :: Tmpl.JobFilter -> (Text, Params)
filterToClause :: JobFilter -> (Text, Params)
filterToClause (Tmpl.FilterGroupKey Text
gk) = (Text
"group_key = ?", [Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
gk])
filterToClause (Tmpl.FilterParentId Int64
pid) = (Text
"parent_id = ?", [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid])
filterToClause (Tmpl.FilterSuspended Bool
b) = (Text
"suspended = ?", [Col Bool -> Bool -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Bool
CBool Bool
b])
filterToClause JobFilter
Tmpl.FilterInFlight =
  (Text
"attempts > 0 AND NOT suspended AND not_visible_until IS NOT NULL AND not_visible_until > NOW()", [])

-- | Insert a job without validating that the parent exists.
--
-- This is an internal fast path for callers that already guarantee the parent
-- is present (e.g. 'insertJobTree'). External callers
-- should use 'insertJob' which validates the parent first.
insertJobUnsafe
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> JobWrite payload
  -> m (Maybe (JobRead payload))
insertJobUnsafe :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
insertJobUnsafe Text
schemaName Text
tableName JobWrite payload
job = m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload))
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload)))
-> m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ do
  let (Text
sql, SomeParam
dedupKeyParam, SomeParam
dedupStrategyParam) = case JobWrite payload -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey JobWrite payload
job of
        Maybe DedupKey
Nothing -> (Text -> Text -> Text
Tmpl.insertJobSQL Text
schemaName Text
tableName, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText Maybe Text
forall a. Maybe a
Nothing, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText Maybe Text
forall a. Maybe a
Nothing)
        Just (IgnoreDuplicate Text
k) -> (Text -> Text -> Text
Tmpl.insertJobSQL Text
schemaName Text
tableName, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k), Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"ignore"))
        Just (ReplaceDuplicate Text
k) -> (Text -> Text -> Text
Tmpl.insertJobReplaceSQL Text
schemaName Text
tableName, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k), Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"replace"))

      parentStateVal :: Maybe Value
parentStateVal = if JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobWrite payload
job then Value -> Maybe Value
forall a. a -> Maybe a
Just (Object -> Value
Object Object
forall a. Monoid a => a
mempty) else Maybe Value
forall a. Maybe a
Nothing

      params :: Params
params =
        [ Col Value -> Value -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Value
CJsonb (payload -> Value
forall a. ToJSON a => a -> Value
toJSON (payload -> Value) -> payload -> Value
forall a b. (a -> b) -> a -> b
$ JobWrite payload -> payload
forall payload key q insertedAt.
Job payload key q insertedAt -> payload
payload JobWrite payload
job)
        , Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (JobWrite payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey JobWrite payload
job)
        , Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobWrite payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobWrite payload
job)
        , Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (JobWrite payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
lastError JobWrite payload
job)
        , Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobWrite payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
priority JobWrite payload
job)
        , SomeParam
dedupKeyParam
        , SomeParam
dedupStrategyParam
        , Col Int32 -> Maybe Int32 -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Int32
CInt4 (JobWrite payload -> Maybe Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
maxAttempts JobWrite payload
job)
        , Col Int64 -> Maybe Int64 -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Int64
CInt8 (JobWrite payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId JobWrite payload
job)
        , Col Value -> Maybe Value -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Value
CJsonb Maybe Value
parentStateVal
        , Col Bool -> Bool -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Bool
CBool (JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobWrite payload
job)
        , Col UTCTime -> Maybe UTCTime -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col UTCTime
CTimestamptz (JobWrite payload -> Maybe UTCTime
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
notVisibleUntil JobWrite payload
job)
        ]

  rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
params (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
  case rawJobs of
    [] -> case JobWrite payload -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey JobWrite payload
job of
      Just (IgnoreDuplicate Text
_) -> Maybe (JobRead payload) -> m (Maybe (JobRead payload))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JobRead payload)
forall a. Maybe a
Nothing
      Just (ReplaceDuplicate Text
_) -> Maybe (JobRead payload) -> m (Maybe (JobRead payload))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JobRead payload)
forall a. Maybe a
Nothing
      Maybe DedupKey
Nothing -> Text -> m (Maybe (JobRead payload))
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"insertJob: No rows returned from INSERT"
    (Job Value Int64 Text UTCTime
raw : [Job Value Int64 Text UTCTime]
_) -> JobRead payload -> Maybe (JobRead payload)
forall a. a -> Maybe a
Just (JobRead payload -> Maybe (JobRead payload))
-> m (JobRead payload) -> m (Maybe (JobRead payload))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Job Value Int64 Text UTCTime -> m (JobRead payload)
forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
raw

-- | Inserts a job into the queue.
--
-- Returns the inserted job with database-generated fields populated.
--
-- __Ordering and concurrency__
--
-- Jobs are claimed in ID order (lowest ID first within a priority level).
-- Concurrent inserts to the same group are serialized at the trigger level:
-- the AFTER INSERT trigger's @ON CONFLICT DO UPDATE@ on the groups table
-- takes a row-level lock, preventing out-of-order commits within a group.
--
-- __Deduplication__
--
-- * @Nothing@: Always insert (dedup_key is NULL)
-- * @Just (IgnoreDuplicate k)@: Skip if dedup_key exists, return Nothing
-- * @Just (ReplaceDuplicate k)@: Replace existing job unless actively in-flight on its
--   first attempt. Returns Nothing only when @attempts > 0@,
--   @not_visible_until > NOW()@, and @last_error IS NULL@ (i.e., the job is
--   being processed for the first time). Jobs that have previously failed
--   (@last_error IS NOT NULL@) can always be replaced, even if currently
--   in-flight on a retry attempt — this is by design, so that a fresh
--   replacement takes priority over a failing job.
--
-- @parentId@ is validated: if set to a non-existent job ID, returns @Nothing@.
-- For building parent-child trees, prefer @insertJobTree@ which handles
-- @parentId@, @isRollup@, and @suspended@ atomically.
insertJob
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> JobWrite payload
  -> m (Maybe (JobRead payload))
insertJob :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
insertJob Text
schemaName Text
tableName JobWrite payload
job = do
  parentOk <- case JobWrite payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId JobWrite payload
job of
    Maybe Int64
Nothing -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
    Just Int64
pid -> do
      checkRows <- Text -> Params -> RowCodec Bool -> m [Bool]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.parentExistsSQL Text
schemaName Text
tableName) [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid] RowCodec Bool
boolCodec
      case checkRows of
        [Bool
True] -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
        [Bool]
_ -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
  if not parentOk
    then pure Nothing
    else insertJobUnsafe schemaName tableName job

-- | Insert multiple jobs in a single batch operation.
--
-- Supports dedup keys: 'IgnoreDuplicate' jobs are silently skipped on
-- conflict, 'ReplaceDuplicate' jobs update the existing row (unless
-- actively in-flight). Only actually inserted or replaced jobs are returned.
--
-- If multiple jobs in the batch share the same dedup key, only the last
-- occurrence is kept (last writer wins), consistent with sequential
-- 'insertJob' calls.
--
-- Does not validate @parentId@ — callers must ensure referenced parents
-- exist. For parent-child trees, use @insertJobTree@ instead.
insertJobsBatch
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> [JobWrite payload]
  -- ^ Jobs to insert
  -> m [JobRead payload]
insertJobsBatch :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobWrite payload] -> m [JobRead payload]
insertJobsBatch Text
_ Text
_ [] = [JobRead payload] -> m [JobRead payload]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
insertJobsBatch Text
schemaName Text
tableName [JobWrite payload]
jobs = m [JobRead payload] -> m [JobRead payload]
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m [JobRead payload] -> m [JobRead payload])
-> m [JobRead payload] -> m [JobRead payload]
forall a b. (a -> b) -> a -> b
$ do
  let cols :: BatchColumns
cols = [JobWrite payload] -> BatchColumns
forall payload.
ToJSON payload =>
[JobWrite payload] -> BatchColumns
buildBatchColumns [JobWrite payload]
jobs
      params :: Params
params =
        [ Col Value -> [Value] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Value
CJsonb (Seq Value -> [Value]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Value -> [Value]) -> Seq Value -> [Value]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Value
colPayloads BatchColumns
cols)
        , Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colGroupKeys BatchColumns
cols)
        , Col Int32 -> [Int32] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int32
CInt4 (Seq Int32 -> [Int32]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Int32 -> [Int32]) -> Seq Int32 -> [Int32]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Int32
colPriorities BatchColumns
cols)
        , Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colDedupKeys BatchColumns
cols)
        , Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colDedupStrategies BatchColumns
cols)
        , Col Int32 -> [Maybe Int32] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Int32
CInt4 (Seq (Maybe Int32) -> [Maybe Int32]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Int32) -> [Maybe Int32])
-> Seq (Maybe Int32) -> [Maybe Int32]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Int32)
colMaxAttempts BatchColumns
cols)
        , Col Int64 -> [Maybe Int64] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Int64
CInt8 (Seq (Maybe Int64) -> [Maybe Int64]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Int64) -> [Maybe Int64])
-> Seq (Maybe Int64) -> [Maybe Int64]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Int64)
colParentIds BatchColumns
cols)
        , Col Value -> [Maybe Value] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Value
CJsonb (Seq (Maybe Value) -> [Maybe Value]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Value) -> [Maybe Value])
-> Seq (Maybe Value) -> [Maybe Value]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Value)
colParentStates BatchColumns
cols)
        , Col Bool -> [Bool] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Bool
CBool (Seq Bool -> [Bool]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Bool -> [Bool]) -> Seq Bool -> [Bool]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Bool
colSuspended BatchColumns
cols)
        , Col UTCTime -> [Maybe UTCTime] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col UTCTime
CTimestamptz (Seq (Maybe UTCTime) -> [Maybe UTCTime]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe UTCTime) -> [Maybe UTCTime])
-> Seq (Maybe UTCTime) -> [Maybe UTCTime]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe UTCTime)
colNotVisibleUntils BatchColumns
cols)
        ]

  rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.insertJobsBatchSQL Text
schemaName Text
tableName) Params
params (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
  mapM decodePayload rawJobs

insertJobsBatch_
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -> Text
  -> [JobWrite payload]
  -> m Int64
insertJobsBatch_ :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobWrite payload] -> m Int64
insertJobsBatch_ Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
insertJobsBatch_ Text
schemaName Text
tableName [JobWrite payload]
jobs = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
  let cols :: BatchColumns
cols = [JobWrite payload] -> BatchColumns
forall payload.
ToJSON payload =>
[JobWrite payload] -> BatchColumns
buildBatchColumns [JobWrite payload]
jobs
      params :: Params
params =
        [ Col Value -> [Value] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Value
CJsonb (Seq Value -> [Value]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Value -> [Value]) -> Seq Value -> [Value]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Value
colPayloads BatchColumns
cols)
        , Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colGroupKeys BatchColumns
cols)
        , Col Int32 -> [Int32] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int32
CInt4 (Seq Int32 -> [Int32]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Int32 -> [Int32]) -> Seq Int32 -> [Int32]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Int32
colPriorities BatchColumns
cols)
        , Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colDedupKeys BatchColumns
cols)
        , Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colDedupStrategies BatchColumns
cols)
        , Col Int32 -> [Maybe Int32] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Int32
CInt4 (Seq (Maybe Int32) -> [Maybe Int32]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Int32) -> [Maybe Int32])
-> Seq (Maybe Int32) -> [Maybe Int32]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Int32)
colMaxAttempts BatchColumns
cols)
        , Col Int64 -> [Maybe Int64] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Int64
CInt8 (Seq (Maybe Int64) -> [Maybe Int64]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Int64) -> [Maybe Int64])
-> Seq (Maybe Int64) -> [Maybe Int64]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Int64)
colParentIds BatchColumns
cols)
        , Col Value -> [Maybe Value] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Value
CJsonb (Seq (Maybe Value) -> [Maybe Value]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Value) -> [Maybe Value])
-> Seq (Maybe Value) -> [Maybe Value]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Value)
colParentStates BatchColumns
cols)
        , Col Bool -> [Bool] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Bool
CBool (Seq Bool -> [Bool]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Bool -> [Bool]) -> Seq Bool -> [Bool]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Bool
colSuspended BatchColumns
cols)
        , Col UTCTime -> [Maybe UTCTime] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col UTCTime
CTimestamptz (Seq (Maybe UTCTime) -> [Maybe UTCTime]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe UTCTime) -> [Maybe UTCTime])
-> Seq (Maybe UTCTime) -> [Maybe UTCTime]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe UTCTime)
colNotVisibleUntils BatchColumns
cols)
        ]
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement (Text -> Text -> Text
Tmpl.insertJobsBatchSQL_ Text
schemaName Text
tableName) Params
params

-- | Insert a child's result into the results table.
--
-- Each child gets its own row keyed by @(parent_id, child_id)@.
-- The FK @ON DELETE CASCADE@ ensures cleanup when the parent is acked.
--
-- Returns the number of rows inserted (1 on success).
insertResult
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent job ID
  -> Int64
  -- ^ Child job ID
  -> Value
  -- ^ Encoded result value
  -> m Int64
insertResult :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Int64 -> Value -> m Int64
insertResult Text
schemaName Text
tableName Int64
parentJobId Int64
childId Value
result =
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
    (Text -> Text -> Text
Tmpl.insertResultSQL Text
schemaName Text
tableName)
    [ Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId
    , Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
childId
    , Col Value -> Value -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Value
CJsonb Value
result
    ]

-- | Get all child results for a parent from the results table.
--
-- Returns a 'Map' from child ID to the result 'Value'.
getResultsByParent
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent job ID
  -> m (Map.Map Int64 Value)
getResultsByParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m (Map Int64 Value)
getResultsByParent Text
schemaName Text
tableName Int64
parentJobId = do
  rows <-
    Text -> Params -> RowCodec (Int64, Value) -> m [(Int64, Value)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.getResultsByParentSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
      RowCodec (Int64, Value)
resultsRowCodec
  pure $ Map.fromList rows

-- | Get DLQ child errors for a parent.
--
-- Returns a 'Map' from child job ID to the last error message.
getDLQChildErrorsByParent
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent job ID
  -> m (Map.Map Int64 Text)
getDLQChildErrorsByParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m (Map Int64 Text)
getDLQChildErrorsByParent Text
schemaName Text
tableName Int64
parentJobId = do
  rows <-
    Text
-> Params
-> RowCodec (Int64, Maybe Text)
-> m [(Int64, Maybe Text)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.getDLQChildErrorsByParentSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
      RowCodec (Int64, Maybe Text)
errorResultsRowCodec
  pure $ Map.fromList $ mapMaybe (\(Int64
jid, Maybe Text
mErr) -> (Int64
jid,) (Text -> (Int64, Text)) -> Maybe Text -> Maybe (Int64, Text)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
mErr) rows

-- | Snapshot results into @parent_state@ before DLQ move.
persistParentState
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Job ID
  -> Value
  -- ^ The pre-populated parent state to persist
  -> m Int64
persistParentState :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Value -> m Int64
persistParentState Text
schemaName Text
tableName Int64
jobId Value
state =
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
    (Text -> Text -> Text
Tmpl.persistParentStateSQL Text
schemaName Text
tableName)
    [Col Value -> Value -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Value
CJsonb Value
state, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]

-- | Build batch columns with within-batch dedup, preserving input order.
--
-- A single fold over the input list. Non-keyed jobs are appended; keyed
-- jobs occupy the slot of their first occurrence, with O(log n) positional
-- updates via 'Seq' when a later 'ReplaceDuplicate' overwrites an earlier
-- entry for the same key.
--
-- Dedup semantics (matching sequential 'insertJob' behaviour):
--
--   * All 'IgnoreDuplicate' for a key -> first occurrence wins
--   * All 'ReplaceDuplicate' for a key -> last occurrence wins
--   * Mixed strategies for the same key -> 'ReplaceDuplicate' takes precedence
buildBatchColumns :: forall payload. (ToJSON payload) => [JobWrite payload] -> BatchColumns
buildBatchColumns :: forall payload.
ToJSON payload =>
[JobWrite payload] -> BatchColumns
buildBatchColumns = (Map Text Int, Int, BatchColumns) -> BatchColumns
forall {a} {b} {c}. (a, b, c) -> c
extractCols ((Map Text Int, Int, BatchColumns) -> BatchColumns)
-> ([JobWrite payload] -> (Map Text Int, Int, BatchColumns))
-> [JobWrite payload]
-> BatchColumns
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Map Text Int, Int, BatchColumns)
 -> JobWrite payload -> (Map Text Int, Int, BatchColumns))
-> (Map Text Int, Int, BatchColumns)
-> [JobWrite payload]
-> (Map Text Int, Int, BatchColumns)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (Map Text Int, Int, BatchColumns)
-> JobWrite payload -> (Map Text Int, Int, BatchColumns)
step (Map Text Int
forall k a. Map k a
Map.empty, Int
0, BatchColumns
emptyColumns)
  where
    extractCols :: (a, b, c) -> c
extractCols (a
_, b
_, c
cols) = c
cols

    step :: (Map Text Int, Int, BatchColumns)
-> JobWrite payload -> (Map Text Int, Int, BatchColumns)
step (!Map Text Int
seen, !Int
n, !BatchColumns
cols) JobWrite payload
job = case Maybe DedupKey -> Maybe Text
dedupKeyText (JobWrite payload -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey JobWrite payload
job) of
      Maybe Text
Nothing -> (Map Text Int
seen, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, BatchColumns -> JobWrite payload -> BatchColumns
snocJob BatchColumns
cols JobWrite payload
job)
      Just Text
k -> case Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
k Map Text Int
seen of
        Maybe Int
Nothing -> (Text -> Int -> Map Text Int -> Map Text Int
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Text
k Int
n Map Text Int
seen, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, BatchColumns -> JobWrite payload -> BatchColumns
snocJob BatchColumns
cols JobWrite payload
job)
        Just Int
idx
          | JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isReplace JobWrite payload
job -> (Map Text Int
seen, Int
n, Int -> BatchColumns -> JobWrite payload -> BatchColumns
updateJob Int
idx BatchColumns
cols JobWrite payload
job)
          | Bool
otherwise -> (Map Text Int
seen, Int
n, BatchColumns
cols)

    isReplace :: Job payload key q insertedAt -> Bool
isReplace Job payload key q insertedAt
job = case Job payload key q insertedAt -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey Job payload key q insertedAt
job of
      Just (ReplaceDuplicate Text
_) -> Bool
True
      Maybe DedupKey
_ -> Bool
False

    snocJob :: BatchColumns -> JobWrite payload -> BatchColumns
snocJob = (forall a. a -> Seq a -> Seq a)
-> BatchColumns -> JobWrite payload -> BatchColumns
withJob ((Seq a -> a -> Seq a) -> a -> Seq a -> Seq a
forall a b c. (a -> b -> c) -> b -> a -> c
flip Seq a -> a -> Seq a
forall a. Seq a -> a -> Seq a
(|>))
    updateJob :: Int -> BatchColumns -> JobWrite payload -> BatchColumns
updateJob Int
idx = (forall a. a -> Seq a -> Seq a)
-> BatchColumns -> JobWrite payload -> BatchColumns
withJob (Int -> a -> Seq a -> Seq a
forall a. Int -> a -> Seq a -> Seq a
Seq.update Int
idx)

    withJob :: (forall a. a -> Seq a -> Seq a) -> BatchColumns -> JobWrite payload -> BatchColumns
    withJob :: (forall a. a -> Seq a -> Seq a)
-> BatchColumns -> JobWrite payload -> BatchColumns
withJob forall a. a -> Seq a -> Seq a
f BatchColumns
cols JobWrite payload
job =
      let (Maybe Text
dk, Maybe Text
ds) = Maybe DedupKey -> (Maybe Text, Maybe Text)
dedupParts (JobWrite payload -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey JobWrite payload
job)
       in BatchColumns
            { colPayloads :: Seq Value
colPayloads = Value -> Seq Value -> Seq Value
forall a. a -> Seq a -> Seq a
f (payload -> Value
forall a. ToJSON a => a -> Value
toJSON (JobWrite payload -> payload
forall payload key q insertedAt.
Job payload key q insertedAt -> payload
payload JobWrite payload
job)) (BatchColumns -> Seq Value
colPayloads BatchColumns
cols)
            , colGroupKeys :: Seq (Maybe Text)
colGroupKeys = Maybe Text -> Seq (Maybe Text) -> Seq (Maybe Text)
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey JobWrite payload
job) (BatchColumns -> Seq (Maybe Text)
colGroupKeys BatchColumns
cols)
            , colPriorities :: Seq Int32
colPriorities = Int32 -> Seq Int32 -> Seq Int32
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
priority JobWrite payload
job) (BatchColumns -> Seq Int32
colPriorities BatchColumns
cols)
            , colDedupKeys :: Seq (Maybe Text)
colDedupKeys = Maybe Text -> Seq (Maybe Text) -> Seq (Maybe Text)
forall a. a -> Seq a -> Seq a
f Maybe Text
dk (BatchColumns -> Seq (Maybe Text)
colDedupKeys BatchColumns
cols)
            , colDedupStrategies :: Seq (Maybe Text)
colDedupStrategies = Maybe Text -> Seq (Maybe Text) -> Seq (Maybe Text)
forall a. a -> Seq a -> Seq a
f Maybe Text
ds (BatchColumns -> Seq (Maybe Text)
colDedupStrategies BatchColumns
cols)
            , colMaxAttempts :: Seq (Maybe Int32)
colMaxAttempts = Maybe Int32 -> Seq (Maybe Int32) -> Seq (Maybe Int32)
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Maybe Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
maxAttempts JobWrite payload
job) (BatchColumns -> Seq (Maybe Int32)
colMaxAttempts BatchColumns
cols)
            , colParentIds :: Seq (Maybe Int64)
colParentIds = Maybe Int64 -> Seq (Maybe Int64) -> Seq (Maybe Int64)
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId JobWrite payload
job) (BatchColumns -> Seq (Maybe Int64)
colParentIds BatchColumns
cols)
            , colParentStates :: Seq (Maybe Value)
colParentStates = Maybe Value -> Seq (Maybe Value) -> Seq (Maybe Value)
forall a. a -> Seq a -> Seq a
f (if JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobWrite payload
job then Value -> Maybe Value
forall a. a -> Maybe a
Just (Object -> Value
Object Object
forall a. Monoid a => a
mempty) else Maybe Value
forall a. Maybe a
Nothing) (BatchColumns -> Seq (Maybe Value)
colParentStates BatchColumns
cols)
            , colSuspended :: Seq Bool
colSuspended = Bool -> Seq Bool -> Seq Bool
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobWrite payload
job) (BatchColumns -> Seq Bool
colSuspended BatchColumns
cols)
            , colNotVisibleUntils :: Seq (Maybe UTCTime)
colNotVisibleUntils = Maybe UTCTime -> Seq (Maybe UTCTime) -> Seq (Maybe UTCTime)
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Maybe UTCTime
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
notVisibleUntil JobWrite payload
job) (BatchColumns -> Seq (Maybe UTCTime)
colNotVisibleUntils BatchColumns
cols)
            }

dedupParts :: Maybe DedupKey -> (Maybe Text, Maybe Text)
dedupParts :: Maybe DedupKey -> (Maybe Text, Maybe Text)
dedupParts Maybe DedupKey
Nothing = (Maybe Text
forall a. Maybe a
Nothing, Maybe Text
forall a. Maybe a
Nothing)
dedupParts (Just (IgnoreDuplicate Text
k)) = (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k, Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"ignore")
dedupParts (Just (ReplaceDuplicate Text
k)) = (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k, Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"replace")

dedupKeyText :: Maybe DedupKey -> Maybe Text
dedupKeyText :: Maybe DedupKey -> Maybe Text
dedupKeyText Maybe DedupKey
Nothing = Maybe Text
forall a. Maybe a
Nothing
dedupKeyText (Just (IgnoreDuplicate Text
k)) = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k
dedupKeyText (Just (ReplaceDuplicate Text
k)) = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k

data BatchColumns = BatchColumns
  { BatchColumns -> Seq Value
colPayloads :: !(Seq Value)
  , BatchColumns -> Seq (Maybe Text)
colGroupKeys :: !(Seq (Maybe Text))
  , BatchColumns -> Seq Int32
colPriorities :: !(Seq Int32)
  , BatchColumns -> Seq (Maybe Text)
colDedupKeys :: !(Seq (Maybe Text))
  , BatchColumns -> Seq (Maybe Text)
colDedupStrategies :: !(Seq (Maybe Text))
  , BatchColumns -> Seq (Maybe Int32)
colMaxAttempts :: !(Seq (Maybe Int32))
  , BatchColumns -> Seq (Maybe Int64)
colParentIds :: !(Seq (Maybe Int64))
  , BatchColumns -> Seq (Maybe Value)
colParentStates :: !(Seq (Maybe Value))
  , BatchColumns -> Seq Bool
colSuspended :: !(Seq Bool)
  , BatchColumns -> Seq (Maybe UTCTime)
colNotVisibleUntils :: !(Seq (Maybe UTCTime))
  }
  deriving stock ((forall x. BatchColumns -> Rep BatchColumns x)
-> (forall x. Rep BatchColumns x -> BatchColumns)
-> Generic BatchColumns
forall x. Rep BatchColumns x -> BatchColumns
forall x. BatchColumns -> Rep BatchColumns x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. BatchColumns -> Rep BatchColumns x
from :: forall x. BatchColumns -> Rep BatchColumns x
$cto :: forall x. Rep BatchColumns x -> BatchColumns
to :: forall x. Rep BatchColumns x -> BatchColumns
Generic)
  deriving (Semigroup BatchColumns
BatchColumns
Semigroup BatchColumns =>
BatchColumns
-> (BatchColumns -> BatchColumns -> BatchColumns)
-> ([BatchColumns] -> BatchColumns)
-> Monoid BatchColumns
[BatchColumns] -> BatchColumns
BatchColumns -> BatchColumns -> BatchColumns
forall a.
Semigroup a =>
a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
$cmempty :: BatchColumns
mempty :: BatchColumns
$cmappend :: BatchColumns -> BatchColumns -> BatchColumns
mappend :: BatchColumns -> BatchColumns -> BatchColumns
$cmconcat :: [BatchColumns] -> BatchColumns
mconcat :: [BatchColumns] -> BatchColumns
Monoid, NonEmpty BatchColumns -> BatchColumns
BatchColumns -> BatchColumns -> BatchColumns
(BatchColumns -> BatchColumns -> BatchColumns)
-> (NonEmpty BatchColumns -> BatchColumns)
-> (forall b. Integral b => b -> BatchColumns -> BatchColumns)
-> Semigroup BatchColumns
forall b. Integral b => b -> BatchColumns -> BatchColumns
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
$c<> :: BatchColumns -> BatchColumns -> BatchColumns
<> :: BatchColumns -> BatchColumns -> BatchColumns
$csconcat :: NonEmpty BatchColumns -> BatchColumns
sconcat :: NonEmpty BatchColumns -> BatchColumns
$cstimes :: forall b. Integral b => b -> BatchColumns -> BatchColumns
stimes :: forall b. Integral b => b -> BatchColumns -> BatchColumns
Semigroup) via Generically BatchColumns

emptyColumns :: BatchColumns
emptyColumns :: BatchColumns
emptyColumns = BatchColumns
forall a. Monoid a => a
mempty

-- | Claim up to @maxJobs@ visible jobs, respecting head-of-line blocking
-- (one job per group). Uses a single-CTE claim with the groups table.
claimNextVisibleJobs
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Int
  -- ^ Maximum number of jobs to claim
  -> NominalDiffTime
  -- ^ Visibility timeout in seconds
  -> m [JobRead payload]
claimNextVisibleJobs :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> NominalDiffTime -> m [JobRead payload]
claimNextVisibleJobs Text
schemaName Text
tableName Int
maxJobs NominalDiffTime
timeout = m [JobRead payload] -> m [JobRead payload]
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m [JobRead payload] -> m [JobRead payload])
-> m [JobRead payload] -> m [JobRead payload]
forall a b. (a -> b) -> a -> b
$ do
  let claimSql :: Text
claimSql = Text -> Text -> Int -> Int -> Text
Tmpl.claimJobsSQL Text
schemaName Text
tableName Int
maxJobs (NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
timeout)
  rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
claimSql [] (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
  mapM decodePayload rawJobs

-- | Batched variant of 'claimNextVisibleJobs' — claims up to @batchSize@ jobs
-- per group, across up to @maxBatches@ groups.
claimNextVisibleJobsBatched
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Int
  -- ^ Batch size per group (how many jobs to claim from each group)
  -> Int
  -- ^ Maximum number of groups/batches to claim
  -> NominalDiffTime
  -- ^ Visibility timeout in seconds
  -> m [NonEmpty (JobRead payload)]
claimNextVisibleJobsBatched :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text
-> Text
-> Int
-> Int
-> NominalDiffTime
-> m [NonEmpty (JobRead payload)]
claimNextVisibleJobsBatched Text
schemaName Text
tableName Int
batchSize Int
maxBatches NominalDiffTime
timeout
  | Int
batchSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 = [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
  | Int
maxBatches Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 = [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
  | Bool
otherwise = m [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)]
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)])
-> m [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)]
forall a b. (a -> b) -> a -> b
$ do
      let claimSql :: Text
claimSql = Text -> Text -> Int -> Int -> Int -> Text
Tmpl.claimJobsBatchedSQL Text
schemaName Text
tableName Int
batchSize Int
maxBatches (NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
timeout)
      rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
claimSql [] (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
      jobs <- mapM decodePayload rawJobs
      let sorted = (JobRead payload -> Maybe Text)
-> [JobRead payload] -> [JobRead payload]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn JobRead payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey [JobRead payload]
jobs
          groups = (JobRead payload -> JobRead payload -> Bool)
-> [JobRead payload] -> [[JobRead payload]]
forall a. (a -> a -> Bool) -> [a] -> [[a]]
groupBy (\JobRead payload
j1 JobRead payload
j2 -> JobRead payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey JobRead payload
j1 Maybe Text -> Maybe Text -> Bool
forall a. Eq a => a -> a -> Bool
== JobRead payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey JobRead payload
j2) [JobRead payload]
sorted
      pure $ concatMap (chunksOfNE batchSize) $ mapMaybe NE.nonEmpty groups

-- | Split a NonEmpty list into chunks of at most @n@ elements.
chunksOfNE :: Int -> NonEmpty a -> [NonEmpty a]
chunksOfNE :: forall a. Int -> NonEmpty a -> [NonEmpty a]
chunksOfNE Int
n (a
x :| [a]
xs) = [a] -> [NonEmpty a]
go (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs)
  where
    go :: [a] -> [NonEmpty a]
go [] = []
    go (a
y : [a]
ys) =
      let ([a]
chunk, [a]
rest) = Int -> [a] -> ([a], [a])
forall a. Int -> [a] -> ([a], [a])
splitAt (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [a]
ys
       in (a
y a -> [a] -> NonEmpty a
forall a. a -> [a] -> NonEmpty a
:| [a]
chunk) NonEmpty a -> [NonEmpty a] -> [NonEmpty a]
forall a. a -> [a] -> [a]
: [a] -> [NonEmpty a]
go [a]
rest

-- | Acknowledge a job as completed (smart ack).
--
-- Deletes standalone jobs; suspends parents waiting for children; wakes
-- parents when the last sibling completes. Uses an advisory lock for
-- child jobs to serialize with concurrent sibling acks.
--
-- Returns 1 on success, 0 if the job was already gone.
ackJob
  :: forall m payload
   . (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> JobRead payload
  -> m Int64
ackJob :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> JobRead payload -> m Int64
ackJob Text
schemaName Text
tableName JobRead payload
job = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> JobRead payload -> m Int64
forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> JobRead payload -> m Int64
ackJobInner Text
schemaName Text
tableName JobRead payload
job

-- | Inner ack logic (must be called within an existing transaction).
ackJobInner
  :: forall m payload
   . (MonadArbiter m)
  => Text -> Text -> JobRead payload -> m Int64
ackJobInner :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> JobRead payload -> m Int64
ackJobInner Text
schemaName Text
tableName JobRead payload
job = do
  Maybe Int64 -> (Int64 -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId JobRead payload
job) ((Int64 -> m ()) -> m ()) -> (Int64 -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Int64
pid ->
    m [Maybe Text] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Maybe Text] -> m ()) -> m [Maybe Text] -> m ()
forall a b. (a -> b) -> a -> b
$
      Text -> Params -> Ap NullCol (Maybe Text) -> m [Maybe Text]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
        Text
"SELECT pg_advisory_xact_lock(hashtextextended(?, ?))::text AS result"
        [Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText (Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid]
        (Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"result" Col Text
CText)
  let jid :: Int64
jid = JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job
      jatt :: Int32
jatt = JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job
      params :: Params
params = [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jid, Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 Int32
jatt, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jid, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jid, Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 Int32
jatt, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jid]
  rows <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.smartAckJobSQL Text
schemaName Text
tableName) Params
params RowCodec Int64
int64Codec
  case rows of
    [Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
    [Int64]
_ -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0

-- | Wake a suspended parent if all children are done.
tryResumeParent :: (MonadArbiter m) => Text -> Text -> Int64 -> m ()
tryResumeParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
tryResumeParent Text
schemaName Text
tableName Int64
pid = do
  m [Maybe Text] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Maybe Text] -> m ()) -> m [Maybe Text] -> m ()
forall a b. (a -> b) -> a -> b
$
    Text -> Params -> Ap NullCol (Maybe Text) -> m [Maybe Text]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      Text
"SELECT pg_advisory_xact_lock(hashtextextended(?, ?))::text AS result"
      [Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText (Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid]
      (Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"result" Col Text
CText)
  m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$
    Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
      (Text -> Text -> Text
Tmpl.tryWakeAncestorSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid]

-- | Acknowledge multiple jobs as completed.
--
-- Iterates calling 'ackJob' so every job gets smart-ack treatment
-- (parent suspend/wake logic).
-- Returns the total number of rows affected.
ackJobsBatch
  :: forall m payload
   . (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> [JobRead payload]
  -> m Int64
ackJobsBatch :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> [JobRead payload] -> m Int64
ackJobsBatch Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
ackJobsBatch Text
schemaName Text
tableName [JobRead payload]
jobs =
  m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ [Int64] -> Int64
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int64] -> Int64) -> m [Int64] -> m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (JobRead payload -> m Int64) -> [JobRead payload] -> m [Int64]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (Text -> Text -> JobRead payload -> m Int64
forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> JobRead payload -> m Int64
ackJobInner Text
schemaName Text
tableName) [JobRead payload]
jobs

-- | Bulk ack for standalone jobs (no parent, no tree logic).
--
-- A single DELETE with unnest — one round trip for N jobs.
-- Only valid for jobs claimed in 'BatchedJobsMode', which guarantees
-- @parent_id IS NULL AND parent_state IS NULL@.
--
-- Returns the number of rows deleted.
ackJobsBulk
  :: forall m payload
   . (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> [JobRead payload]
  -> m Int64
ackJobsBulk :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> [JobRead payload] -> m Int64
ackJobsBulk Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
ackJobsBulk Text
schemaName Text
tableName [JobRead payload]
jobs = do
  let ids :: [Int64]
ids = (JobRead payload -> Int64) -> [JobRead payload] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey [JobRead payload]
jobs
      atts :: [Int32]
atts = (JobRead payload -> Int32) -> [JobRead payload] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts [JobRead payload]
jobs
  rows <-
    Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.ackJobsBulkSQL Text
schemaName Text
tableName)
      [ Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
ids
      , Col Int32 -> [Int32] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int32
CInt4 [Int32]
atts
      ]
      RowCodec Int64
countCodec
  case rows of
    [Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
    [Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"ackJobsBulk: unexpected result"

-- | Set the visibility timeout for a job
setVisibilityTimeout
  :: forall m payload
   . (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> NominalDiffTime
  -- ^ Timeout in seconds
  -> JobRead payload
  -> m Int64
  -- ^ Returns the number of rows updated (0 if job was reclaimed by another worker)
setVisibilityTimeout :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> NominalDiffTime -> JobRead payload -> m Int64
setVisibilityTimeout Text
schemaName Text
tableName NominalDiffTime
timeout JobRead payload
job =
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
    (Text -> Text -> Text
Tmpl.setVisibilityTimeoutSQL Text
schemaName Text
tableName)
    [ Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (NominalDiffTime -> Int64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
timeout)
    , Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
    , Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job)
    ]

-- | Detailed information about the result of a visibility update operation for a single job.
data VisibilityUpdateInfo = VisibilityUpdateInfo
  { VisibilityUpdateInfo -> Int64
vuiJobId :: Int64
  -- ^ The ID of the job that was targeted.
  , VisibilityUpdateInfo -> Bool
vuiWasUpdated :: Bool
  -- ^ 'True' if the job's visibility timeout was successfully extended.
  , VisibilityUpdateInfo -> Maybe Int32
vuiCurrentDbAttempts :: Maybe Int32
  -- ^ The current attempt count of the job in the database.
  -- This is used to distinguish between a stolen job (attempts changed)
  -- and an acked job (row is missing, so this is 'Nothing').
  }
  deriving stock (VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
(VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool)
-> (VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool)
-> Eq VisibilityUpdateInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
== :: VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
$c/= :: VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
/= :: VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
Eq, (forall x. VisibilityUpdateInfo -> Rep VisibilityUpdateInfo x)
-> (forall x. Rep VisibilityUpdateInfo x -> VisibilityUpdateInfo)
-> Generic VisibilityUpdateInfo
forall x. Rep VisibilityUpdateInfo x -> VisibilityUpdateInfo
forall x. VisibilityUpdateInfo -> Rep VisibilityUpdateInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. VisibilityUpdateInfo -> Rep VisibilityUpdateInfo x
from :: forall x. VisibilityUpdateInfo -> Rep VisibilityUpdateInfo x
$cto :: forall x. Rep VisibilityUpdateInfo x -> VisibilityUpdateInfo
to :: forall x. Rep VisibilityUpdateInfo x -> VisibilityUpdateInfo
Generic, Int -> VisibilityUpdateInfo -> ShowS
[VisibilityUpdateInfo] -> ShowS
VisibilityUpdateInfo -> String
(Int -> VisibilityUpdateInfo -> ShowS)
-> (VisibilityUpdateInfo -> String)
-> ([VisibilityUpdateInfo] -> ShowS)
-> Show VisibilityUpdateInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> VisibilityUpdateInfo -> ShowS
showsPrec :: Int -> VisibilityUpdateInfo -> ShowS
$cshow :: VisibilityUpdateInfo -> String
show :: VisibilityUpdateInfo -> String
$cshowList :: [VisibilityUpdateInfo] -> ShowS
showList :: [VisibilityUpdateInfo] -> ShowS
Show)

-- | Batch variant of 'setVisibilityTimeout'. Returns per-job status
-- (success, acked, or stolen).
setVisibilityTimeoutBatch
  :: forall m payload
   . (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> NominalDiffTime
  -- ^ Timeout in seconds
  -> [JobRead payload]
  -> m [VisibilityUpdateInfo]
  -- ^ Returns a list of status records, one for each job that was targeted.
setVisibilityTimeoutBatch :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text
-> Text
-> NominalDiffTime
-> [JobRead payload]
-> m [VisibilityUpdateInfo]
setVisibilityTimeoutBatch Text
_ Text
_ NominalDiffTime
_ [] = [VisibilityUpdateInfo] -> m [VisibilityUpdateInfo]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
setVisibilityTimeoutBatch Text
schemaName Text
tableName NominalDiffTime
timeout [JobRead payload]
jobs = do
  let valuesPlaceholder :: Text
valuesPlaceholder = Text -> [Text] -> Text
T.intercalate Text
"," ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ Int -> Text -> [Text]
forall a. Int -> a -> [a]
replicate ([JobRead payload] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [JobRead payload]
jobs) Text
"(?,?)"
      jobParams :: Params
jobParams = (JobRead payload -> Params) -> [JobRead payload] -> Params
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\JobRead payload
job -> [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job), Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job)]) [JobRead payload]
jobs
      params :: Params
params = Params
jobParams Params -> Params -> Params
forall a. Semigroup a => a -> a -> a
<> [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (NominalDiffTime -> Int64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
timeout)]

  Text
-> Params
-> RowCodec VisibilityUpdateInfo
-> m [VisibilityUpdateInfo]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
    (Text -> Text -> Text -> Text
Tmpl.setVisibilityTimeoutBatchSQL Text
schemaName Text
tableName Text
valuesPlaceholder)
    Params
params
    RowCodec VisibilityUpdateInfo
visibilityUpdateCodec

-- | Update a job for retry with backoff and error tracking
--
-- Returns the number of rows updated (0 if job was already claimed by another worker).
updateJobForRetry
  :: forall m payload
   . (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> NominalDiffTime
  -- ^ Backoff timeout in seconds
  -> Text
  -- ^ Error message
  -> JobRead payload
  -> m Int64
updateJobForRetry :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text
-> Text -> NominalDiffTime -> Text -> JobRead payload -> m Int64
updateJobForRetry Text
schemaName Text
tableName NominalDiffTime
backoff Text
errorMsg JobRead payload
job =
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
    (Text -> Text -> Text
Tmpl.updateJobForRetrySQL Text
schemaName Text
tableName)
    [ Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (NominalDiffTime -> Int64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
backoff)
    , Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
errorMsg
    , Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
    , Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job)
    ]

-- | Move a job to the DLQ. Cascades descendants for rollup parents.
-- Wakes the parent if this was a child job.
--
-- Returns 0 if the job was already claimed by another worker.
moveToDLQ
  :: forall m payload
   . (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Text
  -- ^ Error message (the final error that caused the DLQ move)
  -> JobRead payload
  -> m Int64
moveToDLQ :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> Text -> JobRead payload -> m Int64
moveToDLQ Text
schemaName Text
tableName Text
errorMsg JobRead payload
job = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
  Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobRead payload
job) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    Text -> Text -> Int64 -> m ()
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
snapshotDescendantRollups Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
    m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> Text -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Text -> m Int64
cascadeChildrenToDLQ Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job) Text
"Parent moved to DLQ"
  countRows <-
    Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.moveToDLQSQL Text
schemaName Text
tableName)
      [ Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
      , Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job)
      , Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
errorMsg
      ]
      RowCodec Int64
countCodec
  let rows = case [Int64]
countRows of
        [Int64
n] -> Int64
n
        [Int64]
_ -> Int64
0
  when (rows > 0) $
    for_ (parentId job) $ \Int64
pid ->
      Text -> Text -> Int64 -> m ()
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
tryResumeParent Text
schemaName Text
tableName Int64
pid
  pure rows

-- | Cascade all descendants of a rollup parent to the DLQ.
--
-- Recursively finds all descendants in the main queue and moves them
-- to the DLQ with the given error message.
--
-- Returns the number of children moved to DLQ.
cascadeChildrenToDLQ
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent job ID
  -> Text
  -- ^ Error message for cascaded children
  -> m Int64
cascadeChildrenToDLQ :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Text -> m Int64
cascadeChildrenToDLQ Text
schemaName Text
tableName Int64
parentJobId Text
errorMsg = do
  countRows <-
    Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.cascadeChildrenToDLQSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId, Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
errorMsg]
      RowCodec Int64
countCodec
  case countRows of
    [Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
    [Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"cascadeChildrenToDLQ: unexpected result"

-- | Snapshot child results for descendant rollup finalizers before cascade-DLQ.
-- Persists accumulated results into @parent_state@ so they survive deletion.
snapshotDescendantRollups
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent job ID (root of the subtree being cascaded)
  -> m ()
snapshotDescendantRollups :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
snapshotDescendantRollups Text
schemaName Text
tableName Int64
parentJobId = do
  rollupIds <-
    Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.descendantRollupIdsSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
      RowCodec Int64
int64Codec
  for_ rollupIds $ \Int64
rid -> do
    (results, errors, snap, _) <- Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
readChildResultsRaw Text
schemaName Text
tableName Int64
rid
    let merged = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
mergeRawChildResults Map Int64 Value
results Map Int64 Text
errors Maybe Value
snap
    when (not $ Map.null merged) $
      void $
        persistParentState schemaName tableName rid (toJSON merged)

-- | Moves multiple jobs from the main queue to the dead-letter queue.
--
-- Each job is moved with its own error message. Jobs that have already been
-- claimed by another worker (attempts mismatch) are silently skipped.
--
-- Returns the total number of jobs moved to DLQ.
moveToDLQBatch
  :: forall m payload
   . (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> [(JobRead payload, Text)]
  -- ^ List of (job, error message) pairs
  -> m Int64
moveToDLQBatch :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> [(JobRead payload, Text)] -> m Int64
moveToDLQBatch Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
moveToDLQBatch Text
schemaName Text
tableName [(JobRead payload, Text)]
jobsWithErrors = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
  [(JobRead payload, Text)]
-> ((JobRead payload, Text) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [(JobRead payload, Text)]
jobsWithErrors (((JobRead payload, Text) -> m ()) -> m ())
-> ((JobRead payload, Text) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(JobRead payload
job, Text
_) ->
    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobRead payload
job) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      Text -> Text -> Int64 -> m ()
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
snapshotDescendantRollups Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
      m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> Text -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Text -> m Int64
cascadeChildrenToDLQ Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job) Text
"Parent moved to DLQ"
  let ids :: [Int64]
ids = ((JobRead payload, Text) -> Int64)
-> [(JobRead payload, Text)] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey (JobRead payload -> Int64)
-> ((JobRead payload, Text) -> JobRead payload)
-> (JobRead payload, Text)
-> Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (JobRead payload, Text) -> JobRead payload
forall a b. (a, b) -> a
fst) [(JobRead payload, Text)]
jobsWithErrors
      atts :: [Int32]
atts = ((JobRead payload, Text) -> Int32)
-> [(JobRead payload, Text)] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts (JobRead payload -> Int32)
-> ((JobRead payload, Text) -> JobRead payload)
-> (JobRead payload, Text)
-> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (JobRead payload, Text) -> JobRead payload
forall a b. (a, b) -> a
fst) [(JobRead payload, Text)]
jobsWithErrors
      msgs :: [Text]
msgs = ((JobRead payload, Text) -> Text)
-> [(JobRead payload, Text)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (JobRead payload, Text) -> Text
forall a b. (a, b) -> b
snd [(JobRead payload, Text)]
jobsWithErrors
  countRows <-
    Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.moveToDLQBatchSQL Text
schemaName Text
tableName)
      [ Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
ids
      , Col Int32 -> [Int32] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int32
CInt4 [Int32]
atts
      , Col Text -> [Text] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Text
CText [Text]
msgs
      ]
      RowCodec Int64
countCodec
  let rows = case [Int64]
countRows of
        [Int64
n] -> Int64
n
        [Int64]
_ -> Int64
0
  when (rows > 0) $ do
    let parentIds = Set Int64 -> [Int64]
forall a. Set a -> [a]
Set.toAscList (Set Int64 -> [Int64])
-> ([Int64] -> Set Int64) -> [Int64] -> [Int64]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int64] -> Set Int64
forall a. Ord a => [a] -> Set a
Set.fromList ([Int64] -> [Int64]) -> [Int64] -> [Int64]
forall a b. (a -> b) -> a -> b
$ ((JobRead payload, Text) -> Maybe Int64)
-> [(JobRead payload, Text)] -> [Int64]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId (JobRead payload -> Maybe Int64)
-> ((JobRead payload, Text) -> JobRead payload)
-> (JobRead payload, Text)
-> Maybe Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (JobRead payload, Text) -> JobRead payload
forall a b. (a, b) -> a
fst) [(JobRead payload, Text)]
jobsWithErrors
    for_ parentIds $ tryResumeParent schemaName tableName
  pure rows

-- * Dead Letter Queue Operations

-- | Retry a job from the DLQ (re-inserts with attempts reset to 0).
-- The dedup_key is NOT restored — retried jobs won't conflict with new dedup inserts.
retryFromDLQ
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Int64
  -- ^ DLQ job ID
  -> m (Maybe (JobRead payload))
retryFromDLQ :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
retryFromDLQ Text
schemaName Text
tableName Int64
dlqId = m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload))
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload)))
-> m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ do
  rawJobs <-
    Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.retryFromDLQSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
dlqId]
      (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
  case rawJobs of
    [] -> Maybe (JobRead payload) -> m (Maybe (JobRead payload))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JobRead payload)
forall a. Maybe a
Nothing
    (Job Value Int64 Text UTCTime
raw : [Job Value Int64 Text UTCTime]
_) -> JobRead payload -> Maybe (JobRead payload)
forall a. a -> Maybe a
Just (JobRead payload -> Maybe (JobRead payload))
-> m (JobRead payload) -> m (Maybe (JobRead payload))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Job Value Int64 Text UTCTime -> m (JobRead payload)
forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
raw

-- | Check whether a DLQ job exists by ID.
dlqJobExists
  :: (MonadArbiter m)
  => Text
  -> Text
  -> Int64
  -> m Bool
dlqJobExists :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Bool
dlqJobExists Text
schemaName Text
tableName Int64
dlqId = do
  rows <-
    Text -> Params -> RowCodec Bool -> m [Bool]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.dlqJobExistsSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
dlqId]
      RowCodec Bool
boolCodec
  case rows of
    [Bool
b] -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
    [Bool]
_ -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False

-- ---------------------------------------------------------------------------
-- Filtered Query Operations
-- ---------------------------------------------------------------------------

-- | List jobs with composable filters.
--
-- Returns jobs ordered by ID (descending, newest first).
listJobsFiltered
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> [Tmpl.JobFilter]
  -- ^ Composable filters
  -> Int
  -- ^ Limit
  -> Int
  -- ^ Offset
  -> m [JobRead payload]
listJobsFiltered :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName [JobFilter]
filters Int
limit Int
offset = do
  let (Text
whereClause, Params
filterParams) = [JobFilter] -> (Text, Params)
buildWhereClause [JobFilter]
filters
      sql :: Text
sql = Text -> Text -> Text -> Text
Tmpl.listJobsFilteredSQL Text
schemaName Text
tableName Text
whereClause
      params :: Params
params = Params
filterParams Params -> Params -> Params
forall a. Semigroup a => a -> a -> a
<> [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
limit), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
offset)]
  rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
params (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
  mapM decodePayload rawJobs

-- | Count jobs with composable filters.
countJobsFiltered
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> [Tmpl.JobFilter]
  -- ^ Composable filters
  -> m Int64
countJobsFiltered :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName [JobFilter]
filters = do
  let (Text
whereClause, Params
filterParams) = [JobFilter] -> (Text, Params)
buildWhereClause [JobFilter]
filters
      sql :: Text
sql = Text -> Text -> Text -> Text
Tmpl.countJobsFilteredSQL Text
schemaName Text
tableName Text
whereClause
  rows <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
filterParams RowCodec Int64
countCodec
  case rows of
    [Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
    [Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"countJobsFiltered: unexpected result"

-- | List DLQ jobs with composable filters.
--
-- Returns jobs ordered by failed_at (most recent first).
listDLQFiltered
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> [Tmpl.JobFilter]
  -- ^ Composable filters
  -> Int
  -- ^ Limit
  -> Int
  -- ^ Offset
  -> m [DLQ.DLQJob payload]
listDLQFiltered :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
listDLQFiltered Text
schemaName Text
tableName [JobFilter]
filters Int
limit Int
offset = do
  let (Text
whereClause, Params
filterParams) = [JobFilter] -> (Text, Params)
buildWhereClause [JobFilter]
filters
      sql :: Text
sql = Text -> Text -> Text -> Text
Tmpl.listDLQFilteredSQL Text
schemaName Text
tableName Text
whereClause
      params :: Params
params = Params
filterParams Params -> Params -> Params
forall a. Semigroup a => a -> a -> a
<> [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
limit), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
offset)]
  rawRows <- Text
-> Params
-> RowCodec (Int64, UTCTime, Job Value Int64 Text UTCTime)
-> m [(Int64, UTCTime, Job Value Int64 Text UTCTime)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
params (Text -> RowCodec (Int64, UTCTime, Job Value Int64 Text UTCTime)
dlqRowCodec Text
tableName)
  mapM decodeDLQRow rawRows

-- | Count DLQ jobs with composable filters.
countDLQFiltered
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> [Tmpl.JobFilter]
  -- ^ Composable filters
  -> m Int64
countDLQFiltered :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countDLQFiltered Text
schemaName Text
tableName [JobFilter]
filters = do
  let (Text
whereClause, Params
filterParams) = [JobFilter] -> (Text, Params)
buildWhereClause [JobFilter]
filters
      sql :: Text
sql = Text -> Text -> Text -> Text
Tmpl.countDLQFilteredSQL Text
schemaName Text
tableName Text
whereClause
  rows <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
filterParams RowCodec Int64
countCodec
  case rows of
    [Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
    [Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"countDLQFiltered: unexpected result"

decodeDLQRow
  :: (JobPayload payload, MonadArbiter m)
  => (Int64, UTCTime, Job Value Int64 Text UTCTime)
  -> m (DLQ.DLQJob payload)
decodeDLQRow :: forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
(Int64, UTCTime, Job Value Int64 Text UTCTime)
-> m (DLQJob payload)
decodeDLQRow (Int64
dlqId, UTCTime
dlqFailedAt, Job Value Int64 Text UTCTime
rawJob) = do
  jobSnapshot <- Job Value Int64 Text UTCTime -> m (JobRead payload)
forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
rawJob
  pure $
    DLQ.DLQJob
      { DLQ.dlqPrimaryKey = dlqId
      , DLQ.failedAt = dlqFailedAt
      , DLQ.jobSnapshot = jobSnapshot
      }

-- | List jobs in the dead letter queue
--
-- Returns jobs ordered by failed_at (most recent first).
listDLQJobs
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Int
  -- ^ Limit
  -> Int
  -- ^ Offset
  -> m [DLQ.DLQJob payload]
listDLQJobs :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> Int -> m [DLQJob payload]
listDLQJobs Text
schemaName Text
tableName = Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
listDLQFiltered Text
schemaName Text
tableName []

-- | List DLQ jobs filtered by parent_id.
--
-- Returns jobs ordered by failed_at (most recent first).
listDLQJobsByParent
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent job ID
  -> Int
  -- ^ Limit
  -> Int
  -- ^ Offset
  -> m [DLQ.DLQJob payload]
listDLQJobsByParent :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> Int -> Int -> m [DLQJob payload]
listDLQJobsByParent Text
schemaName Text
tableName Int64
parentJobId =
  Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
listDLQFiltered Text
schemaName Text
tableName [Int64 -> JobFilter
Tmpl.FilterParentId Int64
parentJobId]

-- | Count DLQ jobs matching a parent_id.
countDLQJobsByParent
  :: (MonadArbiter m)
  => Text -> Text -> Int64 -> m Int64
countDLQJobsByParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
countDLQJobsByParent Text
schemaName Text
tableName Int64
parentJobId =
  Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countDLQFiltered Text
schemaName Text
tableName [Int64 -> JobFilter
Tmpl.FilterParentId Int64
parentJobId]

-- | Delete a job from the dead letter queue.
--
-- This permanently removes the job from the DLQ without retrying it.
-- If the deleted job was a child, tries to resume the parent when no
-- siblings remain in the main queue.
deleteDLQJob
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Int64
  -- ^ DLQ job ID
  -> m Int64
deleteDLQJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
deleteDLQJob Text
schemaName Text
tableName Int64
dlqId = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
  rows <-
    Text -> Params -> Ap NullCol (Maybe Int64) -> m [Maybe Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.deleteDLQJobSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
dlqId]
      Ap NullCol (Maybe Int64)
nullableInt64Codec
  case rows of
    [] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
    (Just Int64
pid : [Maybe Int64]
_) -> do
      Text -> Text -> Int64 -> m ()
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
tryResumeParent Text
schemaName Text
tableName Int64
pid
      Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
1
    [Maybe Int64]
_ -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
1

-- | Delete multiple jobs from the dead letter queue.
--
-- If any deleted jobs were children, tries to resume their parents when
-- no siblings remain. Parent IDs are deduplicated and sorted to prevent
-- deadlocks between concurrent batch deletes.
--
-- Returns the total number of DLQ jobs deleted.
deleteDLQJobsBatch
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> [Int64]
  -- ^ DLQ job IDs
  -> m Int64
deleteDLQJobsBatch :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m Int64
deleteDLQJobsBatch Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
deleteDLQJobsBatch Text
schemaName Text
tableName [Int64]
dlqIds = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
  rows <-
    Text -> Params -> Ap NullCol (Maybe Int64) -> m [Maybe Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.deleteDLQJobsBatchSQL Text
schemaName Text
tableName)
      [Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
dlqIds]
      Ap NullCol (Maybe Int64)
nullableInt64Codec
  let parentIds = Set Int64 -> [Int64]
forall a. Set a -> [a]
Set.toAscList (Set Int64 -> [Int64])
-> ([Int64] -> Set Int64) -> [Int64] -> [Int64]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int64] -> Set Int64
forall a. Ord a => [a] -> Set a
Set.fromList ([Int64] -> [Int64]) -> [Int64] -> [Int64]
forall a b. (a -> b) -> a -> b
$ [Maybe Int64] -> [Int64]
forall a. [Maybe a] -> [a]
catMaybes [Maybe Int64]
rows
  for_ parentIds $ tryResumeParent schemaName tableName
  pure $ fromIntegral (length rows)

-- * Admin Operations

-- | List jobs in the queue with pagination.
--
-- Returns jobs ordered by ID (descending).
listJobs
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Int
  -- ^ Limit
  -> Int
  -- ^ Offset
  -> m [JobRead payload]
listJobs :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> Int -> m [JobRead payload]
listJobs Text
schemaName Text
tableName = Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName []

-- | Get a single job by its ID
getJobById
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Int64
  -- ^ Job ID
  -> m (Maybe (JobRead payload))
getJobById :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
getJobById Text
schemaName Text
tableName Int64
jobId = do
  rawJobs <-
    Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.getJobByIdSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
      (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
  case rawJobs of
    [] -> Maybe (JobRead payload) -> m (Maybe (JobRead payload))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JobRead payload)
forall a. Maybe a
Nothing
    (Job Value Int64 Text UTCTime
raw : [Job Value Int64 Text UTCTime]
_) -> JobRead payload -> Maybe (JobRead payload)
forall a. a -> Maybe a
Just (JobRead payload -> Maybe (JobRead payload))
-> m (JobRead payload) -> m (Maybe (JobRead payload))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Job Value Int64 Text UTCTime -> m (JobRead payload)
forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
raw

-- | Get all jobs for a specific group key
--
-- Useful for debugging or admin UI to see all jobs for a specific entity.
getJobsByGroup
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Text
  -- ^ Group key to filter by
  -> Int
  -- ^ Limit
  -> Int
  -- ^ Offset
  -> m [JobRead payload]
getJobsByGroup :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Text -> Int -> Int -> m [JobRead payload]
getJobsByGroup Text
schemaName Text
tableName Text
gk =
  Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName [Text -> JobFilter
Tmpl.FilterGroupKey Text
gk]

-- | Get all in-flight jobs (currently being processed by workers)
--
-- A job is considered in-flight if it has been claimed (attempts > 0) and
-- its visibility timeout hasn't expired yet.
--
-- Useful for monitoring active work and detecting stuck jobs.
getInFlightJobs
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Int
  -- ^ Limit
  -> Int
  -- ^ Offset
  -> m [JobRead payload]
getInFlightJobs :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> Int -> m [JobRead payload]
getInFlightJobs Text
schemaName Text
tableName =
  Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName [JobFilter
Tmpl.FilterInFlight]

-- | Cancels (deletes) a job by ID.
--
-- Returns 0 if the job has children — use 'cancelJobCascade' to delete
-- a parent and all its descendants.
--
-- If the deleted job was a child and no siblings remain, the parent is
-- resumed for its completion round.
cancelJob
  :: (MonadArbiter m)
  => Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Job ID
  -> m Int64
cancelJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJob Text
schemaName Text
tableName Int64
jobId = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobInner Text
schemaName Text
tableName Int64
jobId

-- | Inner cancel logic (must be called within an existing transaction).
cancelJobInner
  :: (MonadArbiter m)
  => Text -> Text -> Int64 -> m Int64
cancelJobInner :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobInner Text
schemaName Text
tableName Int64
jobId = do
  parentRows <-
    Text -> Params -> Ap NullCol (Maybe Int64) -> m [Maybe Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.getParentIdSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
      Ap NullCol (Maybe Int64)
nullableInt64Codec
  let mParentId = case [Maybe Int64]
parentRows of
        [Just Int64
pid] -> Int64 -> Maybe Int64
forall a. a -> Maybe a
Just Int64
pid
        [Maybe Int64]
_ -> Maybe Int64
forall a. Maybe a
Nothing
  for_ mParentId $ \Int64
pid ->
    m [Maybe Text] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Maybe Text] -> m ()) -> m [Maybe Text] -> m ()
forall a b. (a -> b) -> a -> b
$
      Text -> Params -> Ap NullCol (Maybe Text) -> m [Maybe Text]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
        Text
"SELECT pg_advisory_xact_lock(hashtextextended(?, ?))::text AS result"
        [Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText (Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid]
        (Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"result" Col Text
CText)
  rows <-
    executeQuery
      (Tmpl.cancelJobSQL schemaName tableName)
      [pval CInt8 jobId, pval CInt8 jobId]
      int64Codec
  case rows of
    [Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
    [Int64]
_ -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0

-- | Cancels (deletes) multiple jobs by ID.
--
-- Each job gets full wake-parent logic (same as 'cancelJob').
-- Wrapped in a transaction so that cancelling multiple children of the
-- same parent sees a consistent view — the last cancel's CTE correctly
-- detects no remaining siblings and resumes the parent.
-- Returns the total number of jobs deleted.
cancelJobsBatch
  :: (MonadArbiter m)
  => Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> [Int64]
  -- ^ Job IDs
  -> m Int64
cancelJobsBatch :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m Int64
cancelJobsBatch Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
cancelJobsBatch Text
schemaName Text
tableName [Int64]
jobIds =
  m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ [Int64] -> Int64
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int64] -> Int64) -> m [Int64] -> m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Int64 -> m Int64) -> [Int64] -> m [Int64]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (Text -> Text -> Int64 -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobInner Text
schemaName Text
tableName) [Int64]
jobIds

-- | Promote a delayed or retrying job to be immediately visible.
--
-- Refuses in-flight jobs (attempts > 0 with no last_error).
-- Returns 1 on success, 0 if not found, already visible, or in-flight.
promoteJob
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> Int64
  -- ^ Job ID
  -> m Int64
  -- ^ Number of rows updated
promoteJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
promoteJob Text
schemaName Text
tableName Int64
jobId =
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
    (Text -> Text -> Text
Tmpl.promoteJobSQL Text
schemaName Text
tableName)
    [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]

-- | Statistics about the job queue
data QueueStats = QueueStats
  { QueueStats -> Int64
totalJobs :: Int64
  -- ^ Total number of jobs in the queue
  , QueueStats -> Int64
visibleJobs :: Int64
  -- ^ Number of jobs that are visible (can be claimed)
  , QueueStats -> Int64
invisibleJobs :: Int64
  -- ^ Number of jobs that are invisible (claimed or delayed)
  , QueueStats -> Maybe Double
oldestJobAgeSeconds :: Maybe Double
  -- ^ Age in seconds of the oldest job (Nothing if queue is empty)
  }
  deriving stock (QueueStats -> QueueStats -> Bool
(QueueStats -> QueueStats -> Bool)
-> (QueueStats -> QueueStats -> Bool) -> Eq QueueStats
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QueueStats -> QueueStats -> Bool
== :: QueueStats -> QueueStats -> Bool
$c/= :: QueueStats -> QueueStats -> Bool
/= :: QueueStats -> QueueStats -> Bool
Eq, (forall x. QueueStats -> Rep QueueStats x)
-> (forall x. Rep QueueStats x -> QueueStats) -> Generic QueueStats
forall x. Rep QueueStats x -> QueueStats
forall x. QueueStats -> Rep QueueStats x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. QueueStats -> Rep QueueStats x
from :: forall x. QueueStats -> Rep QueueStats x
$cto :: forall x. Rep QueueStats x -> QueueStats
to :: forall x. Rep QueueStats x -> QueueStats
Generic, Int -> QueueStats -> ShowS
[QueueStats] -> ShowS
QueueStats -> String
(Int -> QueueStats -> ShowS)
-> (QueueStats -> String)
-> ([QueueStats] -> ShowS)
-> Show QueueStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QueueStats -> ShowS
showsPrec :: Int -> QueueStats -> ShowS
$cshow :: QueueStats -> String
show :: QueueStats -> String
$cshowList :: [QueueStats] -> ShowS
showList :: [QueueStats] -> ShowS
Show)
  deriving anyclass (Maybe QueueStats
Value -> Parser [QueueStats]
Value -> Parser QueueStats
(Value -> Parser QueueStats)
-> (Value -> Parser [QueueStats])
-> Maybe QueueStats
-> FromJSON QueueStats
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser QueueStats
parseJSON :: Value -> Parser QueueStats
$cparseJSONList :: Value -> Parser [QueueStats]
parseJSONList :: Value -> Parser [QueueStats]
$comittedField :: Maybe QueueStats
omittedField :: Maybe QueueStats
FromJSON, [QueueStats] -> Encoding
[QueueStats] -> Value
QueueStats -> Bool
QueueStats -> Encoding
QueueStats -> Value
(QueueStats -> Value)
-> (QueueStats -> Encoding)
-> ([QueueStats] -> Value)
-> ([QueueStats] -> Encoding)
-> (QueueStats -> Bool)
-> ToJSON QueueStats
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: QueueStats -> Value
toJSON :: QueueStats -> Value
$ctoEncoding :: QueueStats -> Encoding
toEncoding :: QueueStats -> Encoding
$ctoJSONList :: [QueueStats] -> Value
toJSONList :: [QueueStats] -> Value
$ctoEncodingList :: [QueueStats] -> Encoding
toEncodingList :: [QueueStats] -> Encoding
$comitField :: QueueStats -> Bool
omitField :: QueueStats -> Bool
ToJSON)

-- | Get statistics about the job queue
getQueueStats
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> Text
  -- ^ Table name (e.g., "email_jobs")
  -> m QueueStats
getQueueStats :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> m QueueStats
getQueueStats Text
schemaName Text
tableName = do
  rows <-
    Text
-> Params
-> RowCodec (Int64, Int64, Maybe Double)
-> m [(Int64, Int64, Maybe Double)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.getQueueStatsSQL Text
schemaName Text
tableName)
      []
      RowCodec (Int64, Int64, Maybe Double)
statsRowCodec
  case rows of
    [] -> QueueStats -> m QueueStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (QueueStats -> m QueueStats) -> QueueStats -> m QueueStats
forall a b. (a -> b) -> a -> b
$ Int64 -> Int64 -> Int64 -> Maybe Double -> QueueStats
QueueStats Int64
0 Int64
0 Int64
0 Maybe Double
forall a. Maybe a
Nothing
    ((Int64
total, Int64
visible, Maybe Double
age) : [(Int64, Int64, Maybe Double)]
_) -> QueueStats -> m QueueStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (QueueStats -> m QueueStats) -> QueueStats -> m QueueStats
forall a b. (a -> b) -> a -> b
$ Int64 -> Int64 -> Int64 -> Maybe Double -> QueueStats
QueueStats Int64
total Int64
visible (Int64
total Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
visible) Maybe Double
age

-- ---------------------------------------------------------------------------
-- Count Operations
-- ---------------------------------------------------------------------------

-- | Count all jobs in a table
countJobs
  :: (MonadArbiter m)
  => Text -> Text -> m Int64
countJobs :: forall (m :: * -> *). MonadArbiter m => Text -> Text -> m Int64
countJobs Text
schemaName Text
tableName = Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName []

-- | Count jobs matching a group key
countJobsByGroup
  :: (MonadArbiter m)
  => Text -> Text -> Text -> m Int64
countJobsByGroup :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Text -> m Int64
countJobsByGroup Text
schemaName Text
tableName Text
gk =
  Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName [Text -> JobFilter
Tmpl.FilterGroupKey Text
gk]

-- | Count in-flight jobs
countInFlightJobs
  :: (MonadArbiter m)
  => Text -> Text -> m Int64
countInFlightJobs :: forall (m :: * -> *). MonadArbiter m => Text -> Text -> m Int64
countInFlightJobs Text
schemaName Text
tableName =
  Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName [JobFilter
Tmpl.FilterInFlight]

-- | Count DLQ jobs
countDLQJobs
  :: (MonadArbiter m)
  => Text -> Text -> m Int64
countDLQJobs :: forall (m :: * -> *). MonadArbiter m => Text -> Text -> m Int64
countDLQJobs Text
schemaName Text
tableName = Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countDLQFiltered Text
schemaName Text
tableName []

-- ---------------------------------------------------------------------------
-- Parent-Child Operations
-- ---------------------------------------------------------------------------

-- | List jobs filtered by parent_id with pagination.
getJobsByParent
  :: forall m payload
   . (JobPayload payload, MonadArbiter m)
  => Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent ID
  -> Int
  -- ^ Limit
  -> Int
  -- ^ Offset
  -> m [JobRead payload]
getJobsByParent :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> Int -> Int -> m [JobRead payload]
getJobsByParent Text
schemaName Text
tableName Int64
pid =
  Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName [Int64 -> JobFilter
Tmpl.FilterParentId Int64
pid]

-- | Count jobs matching a parent_id.
countJobsByParent
  :: (MonadArbiter m)
  => Text -> Text -> Int64 -> m Int64
countJobsByParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
countJobsByParent Text
schemaName Text
tableName Int64
pid =
  Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName [Int64 -> JobFilter
Tmpl.FilterParentId Int64
pid]

-- | Count children for a batch of potential parent IDs.
--
-- Returns a Map from parent_id to @(total, paused)@ counts (only non-zero entries).
countChildrenBatch
  :: (MonadArbiter m)
  => Text -> Text -> [Int64] -> m (Map.Map Int64 (Int64, Int64))
countChildrenBatch :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m (Map Int64 (Int64, Int64))
countChildrenBatch Text
_ Text
_ [] = Map Int64 (Int64, Int64) -> m (Map Int64 (Int64, Int64))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Map Int64 (Int64, Int64)
forall k a. Map k a
Map.empty
countChildrenBatch Text
schemaName Text
tableName [Int64]
ids = do
  rows <-
    Text
-> Params
-> RowCodec (Int64, (Int64, Int64))
-> m [(Int64, (Int64, Int64))]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.countChildrenBatchSQL Text
schemaName Text
tableName)
      [Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
ids]
      RowCodec (Int64, (Int64, Int64))
parentCountCodec
  pure $ Map.fromList rows

-- | Count children in the DLQ for a batch of potential parent IDs.
--
-- Returns a Map from parent_id to DLQ child count (only non-zero entries).
countDLQChildrenBatch
  :: (MonadArbiter m)
  => Text -> Text -> [Int64] -> m (Map.Map Int64 Int64)
countDLQChildrenBatch :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m (Map Int64 Int64)
countDLQChildrenBatch Text
_ Text
_ [] = Map Int64 Int64 -> m (Map Int64 Int64)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Map Int64 Int64
forall k a. Map k a
Map.empty
countDLQChildrenBatch Text
schemaName Text
tableName [Int64]
ids = do
  rows <-
    Text -> Params -> RowCodec (Int64, Int64) -> m [(Int64, Int64)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.countDLQChildrenBatchSQL Text
schemaName Text
tableName)
      [Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
ids]
      RowCodec (Int64, Int64)
dlqParentCountCodec
  pure $ Map.fromList rows

-- ---------------------------------------------------------------------------
-- Job Dependency Operations
-- ---------------------------------------------------------------------------

-- | Pause all visible children of a parent job.
--
-- Sets suspended = TRUE for claimable children, making them
-- unclaimable. In-flight children (currently being processed by workers)
-- are left alone so their visibility timeout can expire normally if the
-- worker crashes.
-- Returns the number of children paused.
pauseChildren
  :: (MonadArbiter m)
  => Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent job ID
  -> m Int64
pauseChildren :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
pauseChildren Text
schemaName Text
tableName Int64
parentJobId =
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
    (Text -> Text -> Text
Tmpl.pauseChildrenSQL Text
schemaName Text
tableName)
    [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]

-- | Resume all suspended children of a parent job.
--
-- Only affects children whose suspended = TRUE.
-- Returns the number of children resumed.
resumeChildren
  :: (MonadArbiter m)
  => Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent job ID
  -> m Int64
resumeChildren :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
resumeChildren Text
schemaName Text
tableName Int64
parentJobId =
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
    (Text -> Text -> Text
Tmpl.resumeChildrenSQL Text
schemaName Text
tableName)
    [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]

-- | Cancel a job and all its descendants recursively.
--
-- Uses a recursive CTE to find all descendants and deletes them all.
-- If the root job itself is a child, resumes its parent for a completion round.
-- Returns the total number of jobs deleted (parent + all descendants).
cancelJobCascade
  :: (MonadArbiter m)
  => Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Root job ID
  -> m Int64
cancelJobCascade :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobCascade Text
schemaName Text
tableName Int64
jobId = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
  parentRows <-
    Text -> Params -> Ap NullCol (Maybe Int64) -> m [Maybe Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.getParentIdSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
      Ap NullCol (Maybe Int64)
nullableInt64Codec
  let rootParentId = case [Maybe Int64]
parentRows of
        [Just Int64
pid] -> Int64 -> Maybe Int64
forall a. a -> Maybe a
Just Int64
pid
        [Maybe Int64]
_ -> Maybe Int64
forall a. Maybe a
Nothing

  countRows <-
    executeQuery
      (Tmpl.cancelJobCascadeSQL schemaName tableName)
      [pval CInt8 jobId]
      countCodec
  let deleted = case [Int64]
countRows of
        [Int64
n] -> Int64
n
        [Int64]
_ -> Int64
0

  when (deleted > 0) $
    for_ rootParentId $
      tryResumeParent schemaName tableName

  pure deleted

-- | Cancel an entire job tree by walking up from any node to the root,
-- then cascade-deleting everything from the root down.
--
-- Unlike 'cancelJobCascade', this does NOT call 'tryResumeParent' — the root
-- by definition has no parent. Returns the total number of jobs deleted.
cancelJobTree
  :: (MonadArbiter m)
  => Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Any job ID within the tree
  -> m Int64
cancelJobTree :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobTree Text
schemaName Text
tableName Int64
jobId = do
  countRows <-
    Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.cancelJobTreeSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
      RowCodec Int64
countCodec
  case countRows of
    [Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
    [Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"cancelJobTree: unexpected result"

-- ---------------------------------------------------------------------------
-- Suspend/Resume Operations
-- ---------------------------------------------------------------------------

-- | Suspend a job, making it unclaimable.
--
-- Only suspends non-in-flight jobs (not currently being processed by workers).
-- Returns the number of rows updated (0 if job doesn't exist, is in-flight,
-- or already suspended).
suspendJob
  :: (MonadArbiter m)
  => Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Job ID
  -> m Int64
suspendJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
suspendJob Text
schemaName Text
tableName Int64
jobId =
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
    (Text -> Text -> Text
Tmpl.suspendJobSQL Text
schemaName Text
tableName)
    [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]

-- | Resume a suspended job, making it claimable again.
--
-- Returns the number of rows updated (0 if job doesn't exist or isn't suspended).
resumeJob
  :: (MonadArbiter m)
  => Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Job ID
  -> m Int64
resumeJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
resumeJob Text
schemaName Text
tableName Int64
jobId =
  Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
    (Text -> Text -> Text
Tmpl.resumeJobSQL Text
schemaName Text
tableName)
    [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]

-- | Full recompute of the groups table from the main queue.
--
-- Corrects any drift in job_count, min_priority, min_id, and in_flight_until.
-- Checks the reaper sequence to skip if a recent run occurred within the
-- given interval. Uses an advisory lock to serialize concurrent attempts,
-- then locks all groups rows to prevent trigger interleaving.
refreshGroups
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int
  -- ^ Minimum interval between runs (seconds)
  -> m ()
refreshGroups :: forall (m :: * -> *). MonadArbiter m => Text -> Text -> Int -> m ()
refreshGroups Text
schemaName Text
tableName Int
intervalSecs = do
  seqResult <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.checkReaperSeqSQL Text
schemaName Text
tableName) [] RowCodec Int64
int64Codec
  case seqResult of
    [Int64
lastEpoch] -> do
      nowResult <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
"SELECT extract(epoch FROM now())::bigint AS result" [] RowCodec Int64
int64Codec
      case nowResult of
        [Int64
nowEpoch]
          | Int64
nowEpoch Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
lastEpoch Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
intervalSecs -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        [Int64]
_ -> m ()
doRefresh
    [Int64]
_ -> m ()
doRefresh
  where
    doRefresh :: m ()
doRefresh = m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
      acquired <-
        Text -> Params -> RowCodec Bool -> m [Bool]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
          Text
"SELECT pg_try_advisory_xact_lock(hashtextextended(?, ?)) AS result"
          [Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
tbl, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
2]
          RowCodec Bool
boolCodec
      case acquired of
        [Bool
True] -> do
          m [Int64] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Int64] -> m ()) -> m [Int64] -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.lockGroupsSQL Text
schemaName Text
tableName) [] RowCodec Int64
int64Codec
          m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement (Text -> Text -> Text
Tmpl.refreshGroupsSQL Text
schemaName Text
tableName) []
          m [Int64] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Int64] -> m ()) -> m [Int64] -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.updateReaperSeqSQL Text
schemaName Text
tableName) [] RowCodec Int64
int64Codec
        [Bool]
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Read child results, DLQ errors, parent_state snapshot, and DLQ failures
-- for a rollup finalizer in a single query.
readChildResultsRaw
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Parent job ID
  -> m (Map.Map Int64 Value, Map.Map Int64 Text, Maybe Value, Map.Map Int64 Text)
readChildResultsRaw :: forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
readChildResultsRaw Text
schemaName Text
tableName Int64
parentJobId = do
  rows <-
    Text
-> Params
-> RowCodec
     (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
-> m [(Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.readChildResultsSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
      RowCodec (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
childResultsRowCodec
  foldM parseRow (Map.empty, Map.empty, Nothing, Map.empty) rows
  where
    parseRow :: (Map k a, Map k a, Maybe a, Map k a)
-> (a, Maybe k, Maybe a, Maybe a, Maybe k)
-> f (Map k a, Map k a, Maybe a, Map k a)
parseRow (!Map k a
results, !Map k a
errors, !Maybe a
snap, !Map k a
dlqFailures) (a, Maybe k, Maybe a, Maybe a, Maybe k)
row = case (a, Maybe k, Maybe a, Maybe a, Maybe k)
row of
      (a
"r", Just k
cid, Just a
val, Maybe a
_, Maybe k
_) ->
        (Map k a, Map k a, Maybe a, Map k a)
-> f (Map k a, Map k a, Maybe a, Map k a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
cid a
val Map k a
results, Map k a
errors, Maybe a
snap, Map k a
dlqFailures)
      (a
"e", Just k
jid, Maybe a
_, Just a
err, Just k
dlqPk) ->
        (Map k a, Map k a, Maybe a, Map k a)
-> f (Map k a, Map k a, Maybe a, Map k a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map k a
results, k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
jid a
err Map k a
errors, Maybe a
snap, k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
dlqPk a
err Map k a
dlqFailures)
      (a
"e", Just k
jid, Maybe a
_, Maybe a
Nothing, Just k
dlqPk) ->
        (Map k a, Map k a, Maybe a, Map k a)
-> f (Map k a, Map k a, Maybe a, Map k a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map k a
results, k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
jid a
"" Map k a
errors, Maybe a
snap, k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
dlqPk a
"" Map k a
dlqFailures)
      (a
"s", Maybe k
_, Just a
val, Maybe a
_, Maybe k
_) ->
        (Map k a, Map k a, Maybe a, Map k a)
-> f (Map k a, Map k a, Maybe a, Map k a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map k a
results, Map k a
errors, a -> Maybe a
forall a. a -> Maybe a
Just a
val, Map k a
dlqFailures)
      (a, Maybe k, Maybe a, Maybe a, Maybe k)
_ -> Text -> f (Map k a, Map k a, Maybe a, Map k a)
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing (Text -> f (Map k a, Map k a, Maybe a, Map k a))
-> Text -> f (Map k a, Map k a, Maybe a, Map k a)
forall a b. (a -> b) -> a -> b
$ Text
"readChildResultsRaw: unexpected row: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack ((a, Maybe k, Maybe a, Maybe a, Maybe k) -> String
forall a. Show a => a -> String
show (a, Maybe k, Maybe a, Maybe a, Maybe k)
row)

-- | Read the raw @parent_state@ snapshot from the DB.
--
-- Internal operation used by the worker for DLQ-retried finalizers
-- that have a persisted snapshot.
getParentStateSnapshot
  :: (MonadArbiter m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> Int64
  -- ^ Job ID
  -> m (Maybe Value)
getParentStateSnapshot :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m (Maybe Value)
getParentStateSnapshot Text
schemaName Text
tableName Int64
jobId = do
  rows <-
    Text -> Params -> Ap NullCol (Maybe Value) -> m [Maybe Value]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
      (Text -> Text -> Text
Tmpl.getParentStateSnapshotSQL Text
schemaName Text
tableName)
      [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
      (Text -> Col Value -> Ap NullCol (Maybe Value)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"parent_state" Col Value
CJsonb)
  case rows of
    [Maybe Value
val] -> Maybe Value -> m (Maybe Value)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Value
val
    [Maybe Value]
_ -> Maybe Value -> m (Maybe Value)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Value
forall a. Maybe a
Nothing

-- | Merge raw child results from three sources.
--
-- Precedence (left-biased union): DLQ errors > results > snapshot.
mergeRawChildResults
  :: Map.Map Int64 Value
  -> Map.Map Int64 Text
  -> Maybe Value
  -> Map.Map Int64 (Either Text Value)
mergeRawChildResults :: Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot =
  (Text -> Either Text Value)
-> Map Int64 Text -> Map Int64 (Either Text Value)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map Text -> Either Text Value
forall a b. a -> Either a b
Left Map Int64 Text
failures
    Map Int64 (Either Text Value)
-> Map Int64 (Either Text Value) -> Map Int64 (Either Text Value)
forall k a. Ord k => Map k a -> Map k a -> Map k a
`Map.union` (Value -> Either Text Value)
-> Map Int64 Value -> Map Int64 (Either Text Value)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map Value -> Either Text Value
forall a b. b -> Either a b
Right Map Int64 Value
results
    Map Int64 (Either Text Value)
-> Map Int64 (Either Text Value) -> Map Int64 (Either Text Value)
forall k a. Ord k => Map k a -> Map k a -> Map k a
`Map.union` Map Int64 (Either Text Value)
base
  where
    base :: Map Int64 (Either Text Value)
base = case Maybe Value
mSnapshot of
      Just Value
val | Success Map Int64 (Either Text Value)
m <- Value -> Result (Map Int64 (Either Text Value))
forall a. FromJSON a => Value -> Result a
fromJSON Value
val -> Map Int64 (Either Text Value)
m
      Maybe Value
_ -> Map Int64 (Either Text Value)
forall k a. Map k a
Map.empty