{-# LANGUAGE AllowAmbiguousTypes #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}

-- | High-level API for job queue operations.
--
-- Table names are automatically extracted from the payload type using the
-- registry. Compile-time checks ensure payloads are only used with registered tables.
module Arbiter.Core.HighLevel
  ( -- * Constraint Aliases
    QueueOperation
  , JobOperation

    -- * Job Operations
  , insertJob
  , insertJobsBatch
  , insertJobsBatch_
  , claimNextVisibleJobs
  , claimNextVisibleJobsBatched
  , ackJob
  , ackJobsBatch
  , ackJobsBulk
  , updateJobForRetry
  , setVisibilityTimeout
  , setVisibilityTimeoutBatch
  , SetVisibilityResult (..)

    -- * Filtered Query Operations
  , Ops.JobFilter (..)
  , listJobsFiltered
  , countJobsFiltered
  , listDLQFiltered
  , countDLQFiltered

    -- * Dead Letter Queue Operations
  , moveToDLQ
  , moveToDLQBatch
  , listDLQJobs
  , retryFromDLQ
  , dlqJobExists
  , deleteDLQJob
  , deleteDLQJobsBatch

    -- * Admin Operations
  , listJobs
  , getJobById
  , getJobsByGroup
  , getJobsByParent
  , getInFlightJobs
  , cancelJob
  , cancelJobsBatch
  , promoteJob
  , Ops.QueueStats (..)
  , getQueueStats

    -- * Count Operations
  , countJobs
  , countJobsByGroup
  , countJobsByParent
  , countInFlightJobs
  , countDLQJobs
  , countChildrenBatch
  , countDLQChildren
  , countDLQChildrenBatch

    -- * Job Dependency Operations
  , pauseChildren
  , resumeChildren
  , cancelJobCascade

    -- * Suspend/Resume Operations
  , suspendJob
  , resumeJob

    -- * Results Table Operations
  , insertResult
  , getResultsByParent
  , getDLQChildErrorsByParent
  , readChildResultsRaw
  , Ops.mergeRawChildResults
  , persistParentState
  , getParentStateSnapshot

    -- * Groups Table Operations
  , refreshGroups

    -- * Job Tree DSL
  , insertJobTree

    -- * Re-exports
  , getSchema
  ) where

import Data.Aeson (Value)
import Data.Int (Int32, Int64)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (NominalDiffTime)
import GHC.TypeLits (KnownSymbol, symbolVal)
import UnliftIO (MonadUnliftIO)

import Arbiter.Core.HasArbiterSchema (HasArbiterSchema (..))
import Arbiter.Core.Job.DLQ qualified as DLQ
import Arbiter.Core.Job.Types (Job (..), JobPayload, JobRead, JobWrite)
import Arbiter.Core.JobTree qualified as JT
import Arbiter.Core.MonadArbiter (MonadArbiter)
import Arbiter.Core.Operations qualified as Ops
import Arbiter.Core.QueueRegistry (TableForPayload)

-- | Constraints for queue operations (requires table name lookup from registry).
type QueueOperation m registry payload =
  ( HasArbiterSchema m registry
  , JobPayload payload
  , KnownSymbol (TableForPayload payload registry)
  , MonadArbiter m
  )

-- | Constraints for job operations (table name stored in job).
type JobOperation m registry payload =
  ( HasArbiterSchema m registry
  , JobPayload payload
  , MonadArbiter m
  )

-- | Inserts a job into the queue.
--
-- Returns @Nothing@ if a job with the same deduplication key already
-- exists (with @IgnoreDuplicate@ strategy).
insertJob
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => JobWrite payload
  -> m (Maybe (JobRead payload))
insertJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
JobWrite payload -> m (Maybe (JobRead payload))
insertJob JobWrite payload
job = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.insertJob schemaName tableName job

-- | Insert multiple jobs in a single batch operation.
--
-- Supports dedup keys: within the batch, duplicate keys are resolved
-- (last 'ReplaceDuplicate' wins), and against existing rows via
-- @ON CONFLICT@.
--
-- Does not validate @parentId@. Use @insertJobTree@ for parent-child relationships.
insertJobsBatch
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [JobWrite payload]
  -> m [JobRead payload]
insertJobsBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobWrite payload] -> m [JobRead payload]
insertJobsBatch [JobWrite payload]
jobs = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.insertJobsBatch schemaName tableName jobs

insertJobsBatch_
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [JobWrite payload]
  -> m Int64
insertJobsBatch_ :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobWrite payload] -> m Int64
insertJobsBatch_ [JobWrite payload]
jobs = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.insertJobsBatch_ schemaName tableName jobs

-- | Claims visible jobs from the queue. At most one job per group is claimed
-- to enforce head-of-line blocking.
claimNextVisibleJobs
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int
  -- ^ Maximum number of jobs to claim.
  -> NominalDiffTime
  -- ^ How long the claimed jobs should remain invisible (in seconds).
  -> m [JobRead payload]
claimNextVisibleJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> NominalDiffTime -> m [JobRead payload]
claimNextVisibleJobs Int
limit NominalDiffTime
timeout = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.claimNextVisibleJobs schemaName tableName limit timeout

-- | Claims multiple jobs per group. Unlike 'claimNextVisibleJobs', this can
-- claim up to @batchSize@ jobs from each group while still respecting
-- head-of-line blocking between batches.
claimNextVisibleJobsBatched
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int
  -- ^ Batch size: maximum number of jobs to claim per group.
  -> Int
  -- ^ Max groups: maximum number of groups/batches to claim.
  -> NominalDiffTime
  -- ^ How long the claimed jobs should remain invisible (in seconds).
  -> m [NonEmpty (JobRead payload)]
claimNextVisibleJobsBatched :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> Int -> NominalDiffTime -> m [NonEmpty (JobRead payload)]
claimNextVisibleJobsBatched Int
batchSize Int
maxGroups NominalDiffTime
timeout = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.claimNextVisibleJobsBatched schemaName tableName batchSize maxGroups timeout

-- | Acknowledges a job as complete, permanently deleting it from the queue.
--
-- Returns 1 on success (job deleted or parent suspended), 0 if already gone.
ackJob
  :: forall m registry payload
   . (JobOperation m registry payload)
  => JobRead payload
  -> m Int64
ackJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
JobRead payload -> m Int64
ackJob JobRead payload
job = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
job
  Ops.ackJob schemaName tableName job

-- | Acknowledges multiple jobs as complete. All jobs must be from the same
-- queue. Returns the total number of rows deleted.
ackJobsBatch
  :: forall m registry payload
   . (JobOperation m registry payload)
  => [JobRead payload]
  -> m Int64
ackJobsBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[JobRead payload] -> m Int64
ackJobsBatch [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
ackJobsBatch jobs :: [JobRead payload]
jobs@(JobRead payload
firstJob : [JobRead payload]
_) = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
firstJob
  Ops.ackJobsBatch schemaName tableName jobs

-- | Bulk ack for standalone jobs (no parent, no tree logic).
--
-- A single DELETE with unnest — one round trip for N jobs.
-- Only valid for jobs claimed in 'BatchedJobsMode'.
--
-- Returns the number of rows deleted.
ackJobsBulk
  :: forall m registry payload
   . (JobOperation m registry payload)
  => [JobRead payload]
  -> m Int64
ackJobsBulk :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[JobRead payload] -> m Int64
ackJobsBulk [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
ackJobsBulk jobs :: [JobRead payload]
jobs@(JobRead payload
firstJob : [JobRead payload]
_) = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
firstJob
  Ops.ackJobsBulk schemaName tableName jobs

-- | Marks a failed job for retry at a later time.
--
-- Returns the number of rows updated (0 if job was already claimed by another worker).
updateJobForRetry
  :: forall m registry payload
   . (JobOperation m registry payload)
  => NominalDiffTime
  -- ^ The delay before this job becomes visible again for retry.
  -> Text
  -- ^ An error message to store with the job.
  -> JobRead payload
  -> m Int64
updateJobForRetry :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
NominalDiffTime -> Text -> JobRead payload -> m Int64
updateJobForRetry NominalDiffTime
delay Text
errorMsg JobRead payload
job = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
job
  Ops.updateJobForRetry schemaName tableName delay errorMsg job

-- | Manually extends a job's visibility timeout, useful for long-running jobs.
--
-- Returns the number of rows updated (0 if job was already reclaimed by another worker).
setVisibilityTimeout
  :: forall m registry payload
   . (JobOperation m registry payload)
  => NominalDiffTime
  -- ^ The new visibility timeout (in seconds) from the current time.
  -> JobRead payload
  -> m Int64
setVisibilityTimeout :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
NominalDiffTime -> JobRead payload -> m Int64
setVisibilityTimeout NominalDiffTime
timeout JobRead payload
job = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
job
  Ops.setVisibilityTimeout schemaName tableName timeout job

-- | Result of setting visibility timeout for a single job in a batch.
data SetVisibilityResult
  = -- | Visibility timeout was successfully extended. Contains job ID.
    VisibilityExtended Int64
  | -- | Job no longer exists (was deleted/acked). Contains job ID.
    JobGone Int64
  | -- | Job was reclaimed by another worker (attempts count changed).
    -- Contains: job ID, expected attempts, actual attempts.
    JobReclaimed Int64 Int32 Int32
  deriving stock (SetVisibilityResult -> SetVisibilityResult -> Bool
(SetVisibilityResult -> SetVisibilityResult -> Bool)
-> (SetVisibilityResult -> SetVisibilityResult -> Bool)
-> Eq SetVisibilityResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SetVisibilityResult -> SetVisibilityResult -> Bool
== :: SetVisibilityResult -> SetVisibilityResult -> Bool
$c/= :: SetVisibilityResult -> SetVisibilityResult -> Bool
/= :: SetVisibilityResult -> SetVisibilityResult -> Bool
Eq, Int -> SetVisibilityResult -> ShowS
[SetVisibilityResult] -> ShowS
SetVisibilityResult -> String
(Int -> SetVisibilityResult -> ShowS)
-> (SetVisibilityResult -> String)
-> ([SetVisibilityResult] -> ShowS)
-> Show SetVisibilityResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SetVisibilityResult -> ShowS
showsPrec :: Int -> SetVisibilityResult -> ShowS
$cshow :: SetVisibilityResult -> String
show :: SetVisibilityResult -> String
$cshowList :: [SetVisibilityResult] -> ShowS
showList :: [SetVisibilityResult] -> ShowS
Show)

-- | Extends visibility timeout for multiple jobs. All jobs must be from
-- the same queue.
setVisibilityTimeoutBatch
  :: forall m registry payload
   . (JobOperation m registry payload)
  => NominalDiffTime
  -- ^ The new visibility timeout (in seconds) from the current time.
  -> [JobRead payload]
  -- ^ Jobs to heartbeat (all must be from the same queue)
  -> m [SetVisibilityResult]
setVisibilityTimeoutBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
NominalDiffTime -> [JobRead payload] -> m [SetVisibilityResult]
setVisibilityTimeoutBatch NominalDiffTime
_ [] = [SetVisibilityResult] -> m [SetVisibilityResult]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
setVisibilityTimeoutBatch NominalDiffTime
timeout jobs :: [JobRead payload]
jobs@(JobRead payload
firstJob : [JobRead payload]
_) = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
firstJob
  infos <- Ops.setVisibilityTimeoutBatch schemaName tableName timeout jobs
  let jobMap = [(Int64, JobRead payload)] -> Map Int64 (JobRead payload)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
j, JobRead payload
j) | JobRead payload
j <- [JobRead payload]
jobs]
      toResult VisibilityUpdateInfo
info = case VisibilityUpdateInfo
info of
        Ops.VisibilityUpdateInfo Int64
jobId Bool
True Maybe Int32
_ -> Int64 -> SetVisibilityResult
VisibilityExtended Int64
jobId
        Ops.VisibilityUpdateInfo Int64
jobId Bool
False Maybe Int32
Nothing -> Int64 -> SetVisibilityResult
JobGone Int64
jobId
        Ops.VisibilityUpdateInfo Int64
jobId Bool
False (Just Int32
actual) ->
          let jobAttempts :: Int32
jobAttempts = Int32
-> (JobRead payload -> Int32) -> Maybe (JobRead payload) -> Int32
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int32
0 JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts (Int64 -> Map Int64 (JobRead payload) -> Maybe (JobRead payload)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Int64
jobId Map Int64 (JobRead payload)
jobMap)
           in Int64 -> Int32 -> Int32 -> SetVisibilityResult
JobReclaimed Int64
jobId Int32
jobAttempts Int32
actual
  pure $ map toResult infos

-- | Moves a job from the main queue to the dead-letter queue (DLQ).
--
-- Returns the number of rows deleted from main queue (0 if job was already
-- claimed by another worker).
moveToDLQ
  :: forall m registry payload
   . (JobOperation m registry payload)
  => Text
  -- ^ Error message (the final error that caused the DLQ move)
  -> JobRead payload
  -> m Int64
moveToDLQ :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
Text -> JobRead payload -> m Int64
moveToDLQ Text
errorMsg JobRead payload
job = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
job
  Ops.moveToDLQ schemaName tableName errorMsg job

-- | Lists jobs in the dead-letter queue with pagination.
listDLQJobs
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int
  -- ^ The maximum number of jobs to return.
  -> Int
  -- ^ The number of jobs to skip (for pagination).
  -> m [DLQ.DLQJob payload]
listDLQJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> Int -> m [DLQJob payload]
listDLQJobs Int
limit Int
offset = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.listDLQJobs schemaName tableName limit offset

-- | Moves a job from the dead-letter queue back into the main queue to be retried.
--
-- Returns @Nothing@ if the DLQ job no longer exists.
retryFromDLQ
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ DLQ job ID
  -> m (Maybe (JobRead payload))
retryFromDLQ :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Maybe (JobRead payload))
retryFromDLQ Int64
dlqId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.retryFromDLQ schemaName tableName dlqId

-- | Check whether a DLQ job exists by ID.
dlqJobExists
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -> m Bool
dlqJobExists :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Bool
dlqJobExists Int64
dlqId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.dlqJobExists schemaName tableName dlqId

-- | Permanently deletes a job from the dead-letter queue.
--
-- Returns the number of rows deleted (0 if the DLQ job no longer exists).
deleteDLQJob
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ DLQ job ID
  -> m Int64
deleteDLQJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
deleteDLQJob Int64
dlqId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.deleteDLQJob schemaName tableName dlqId

-- | Moves multiple jobs to the dead-letter queue.
--
-- Each job is moved with its own error message. Jobs that have already been
-- claimed by another worker (attempts mismatch) are silently skipped.
--
-- Returns the total number of jobs moved to DLQ.
moveToDLQBatch
  :: forall m registry payload
   . (JobOperation m registry payload)
  => [(JobRead payload, Text)]
  -- ^ List of (job, error message) pairs. All jobs must be from the same queue.
  -> m Int64
moveToDLQBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[(JobRead payload, Text)] -> m Int64
moveToDLQBatch [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
moveToDLQBatch jobsWithErrors :: [(JobRead payload, Text)]
jobsWithErrors@((JobRead payload
firstJob, Text
_) : [(JobRead payload, Text)]
_) = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
firstJob
  Ops.moveToDLQBatch schemaName tableName jobsWithErrors

-- | Permanently deletes multiple jobs from the dead-letter queue.
--
-- Returns the total number of DLQ jobs deleted.
deleteDLQJobsBatch
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [Int64]
  -- ^ DLQ job IDs
  -> m Int64
deleteDLQJobsBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m Int64
deleteDLQJobsBatch [Int64]
dlqIds = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.deleteDLQJobsBatch schemaName tableName dlqIds

-- ---------------------------------------------------------------------------
-- Filtered Query Operations
-- ---------------------------------------------------------------------------

-- | Lists jobs with composable filters.
--
-- Returns jobs ordered by ID (descending, newest first).
listJobsFiltered
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [Ops.JobFilter]
  -- ^ Composable filters
  -> Int
  -- ^ Maximum number of jobs to return.
  -> Int
  -- ^ Number of jobs to skip (for pagination).
  -> m [JobRead payload]
listJobsFiltered :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered [JobFilter]
filters Int
limit Int
offset = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.listJobsFiltered schemaName tableName filters limit offset

-- | Counts jobs with composable filters.
countJobsFiltered
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [Ops.JobFilter]
  -- ^ Composable filters
  -> m Int64
countJobsFiltered :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobFilter] -> m Int64
countJobsFiltered [JobFilter]
filters = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.countJobsFiltered schemaName tableName filters

-- | Lists DLQ jobs with composable filters.
--
-- Returns jobs ordered by failed_at (most recent first).
listDLQFiltered
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [Ops.JobFilter]
  -- ^ Composable filters
  -> Int
  -- ^ Maximum number of jobs to return.
  -> Int
  -- ^ Number of jobs to skip (for pagination).
  -> m [DLQ.DLQJob payload]
listDLQFiltered :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobFilter] -> Int -> Int -> m [DLQJob payload]
listDLQFiltered [JobFilter]
filters Int
limit Int
offset = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.listDLQFiltered schemaName tableName filters limit offset

-- | Counts DLQ jobs with composable filters.
countDLQFiltered
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [Ops.JobFilter]
  -- ^ Composable filters
  -> m Int64
countDLQFiltered :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobFilter] -> m Int64
countDLQFiltered [JobFilter]
filters = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.countDLQFiltered schemaName tableName filters

-- ---------------------------------------------------------------------------
-- Admin Operations
-- ---------------------------------------------------------------------------

-- | Lists jobs in the queue with pagination.
--
-- Returns jobs ordered by ID (descending, newest first).
listJobs
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int
  -- ^ Maximum number of jobs to return.
  -> Int
  -- ^ Number of jobs to skip (for pagination).
  -> m [JobRead payload]
listJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> Int -> m [JobRead payload]
listJobs Int
limit Int
offset = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.listJobs schemaName tableName limit offset

-- | Gets a single job by its ID.
--
-- Returns @Nothing@ if the job doesn't exist.
getJobById
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Job ID
  -> m (Maybe (JobRead payload))
getJobById :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Maybe (JobRead payload))
getJobById Int64
jobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.getJobById schemaName tableName jobId

-- | Gets all jobs for a specific group key with pagination.
--
-- Useful for debugging or admin UI to see all jobs for a specific entity.
getJobsByGroup
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Text
  -- ^ Group key to filter by
  -> Int
  -- ^ Maximum number of jobs to return.
  -> Int
  -- ^ Number of jobs to skip (for pagination).
  -> m [JobRead payload]
getJobsByGroup :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Text -> Int -> Int -> m [JobRead payload]
getJobsByGroup Text
groupKey Int
limit Int
offset = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.getJobsByGroup schemaName tableName groupKey limit offset

-- | Gets all jobs for a specific parent ID with pagination.
getJobsByParent
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Parent ID to filter by
  -> Int
  -- ^ Maximum number of jobs to return.
  -> Int
  -- ^ Number of jobs to skip (for pagination).
  -> m [JobRead payload]
getJobsByParent :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> Int -> Int -> m [JobRead payload]
getJobsByParent Int64
pid Int
limit Int
offset = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.getJobsByParent schemaName tableName pid limit offset

-- | Gets all in-flight jobs (currently being processed by workers).
--
-- A job is considered in-flight if it has been claimed (attempts > 0) and
-- its visibility timeout hasn't expired yet.
--
-- Useful for monitoring active work and detecting stuck jobs.
getInFlightJobs
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int
  -- ^ Maximum number of jobs to return.
  -> Int
  -- ^ Number of jobs to skip (for pagination).
  -> m [JobRead payload]
getInFlightJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> Int -> m [JobRead payload]
getInFlightJobs Int
limit Int
offset = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.getInFlightJobs schemaName tableName limit offset

-- | Cancels (deletes) a job by ID.
--
-- Returns 0 if the job has children — use 'cancelJobCascade' to delete
-- a parent and all its descendants.
cancelJob
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Job ID
  -> m Int64
cancelJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
cancelJob Int64
jobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.cancelJob schemaName tableName jobId

-- | Cancels (deletes) multiple jobs by ID.
--
-- Returns the total number of jobs deleted.
cancelJobsBatch
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [Int64]
  -- ^ Job IDs
  -> m Int64
cancelJobsBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m Int64
cancelJobsBatch [Int64]
jobIds = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.cancelJobsBatch schemaName tableName jobIds

-- | Promote a delayed or retrying job to be immediately visible.
--
-- Refuses in-flight jobs (attempts > 0 with no last_error).
-- Returns 1 on success, 0 if not found, already visible, or in-flight.
promoteJob
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Job ID
  -> m Int64
promoteJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
promoteJob Int64
jobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.promoteJob schemaName tableName jobId

-- | Gets statistics about the job queue.
getQueueStats
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => m Ops.QueueStats
getQueueStats :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
m QueueStats
getQueueStats = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.getQueueStats schemaName tableName

-- ---------------------------------------------------------------------------
-- Count Operations
-- ---------------------------------------------------------------------------

-- | Counts all jobs in the queue.
countJobs
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => m Int64
countJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
m Int64
countJobs = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.countJobs schemaName tableName

-- | Counts jobs matching a group key.
countJobsByGroup
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Text
  -- ^ Group key to count
  -> m Int64
countJobsByGroup :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Text -> m Int64
countJobsByGroup Text
groupKey = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.countJobsByGroup schemaName tableName groupKey

-- | Counts jobs matching a parent ID.
countJobsByParent
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Parent ID to count children of
  -> m Int64
countJobsByParent :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
countJobsByParent Int64
pid = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.countJobsByParent schemaName tableName pid

-- | Counts in-flight jobs (currently being processed by workers).
countInFlightJobs
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => m Int64
countInFlightJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
m Int64
countInFlightJobs = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.countInFlightJobs schemaName tableName

-- | Counts jobs in the dead-letter queue.
countDLQJobs
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => m Int64
countDLQJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
m Int64
countDLQJobs = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.countDLQJobs schemaName tableName

-- | Counts children for a batch of potential parent IDs.
--
-- Returns a Map from parent_id to @(total, paused)@ counts (only non-zero entries).
countChildrenBatch
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [Int64]
  -> m (Map Int64 (Int64, Int64))
countChildrenBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m (Map Int64 (Int64, Int64))
countChildrenBatch [Int64]
ids = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.countChildrenBatch schemaName tableName ids

-- | Count how many children of a parent are in the DLQ.
-- Useful inside finalizer handlers to detect failed children.
countDLQChildren
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Parent job ID
  -> m Int64
countDLQChildren :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
countDLQChildren Int64
parentJobId = do
  m <- forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m (Map Int64 Int64)
countDLQChildrenBatch @m @registry @payload [Int64
parentJobId]
  pure $ fromMaybe 0 (Map.lookup parentJobId m)

-- | Counts children in the DLQ for a batch of potential parent IDs.
--
-- Returns a Map from parent_id to DLQ child count (only non-zero entries).
countDLQChildrenBatch
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => [Int64]
  -> m (Map Int64 Int64)
countDLQChildrenBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m (Map Int64 Int64)
countDLQChildrenBatch [Int64]
ids = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.countDLQChildrenBatch schemaName tableName ids

-- ---------------------------------------------------------------------------
-- Job Dependency Operations
-- ---------------------------------------------------------------------------

-- | Pause all visible children of a parent job, making them unclaimable.
--
-- Only affects children that are currently claimable. In-flight children
-- are left alone so their visibility timeout can expire normally if the
-- worker crashes.
--
-- Returns the number of children paused.
pauseChildren
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Parent job ID
  -> m Int64
pauseChildren :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
pauseChildren Int64
parentJobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.pauseChildren schemaName tableName parentJobId

-- | Resume all suspended children of a parent job.
--
-- Returns the number of children resumed.
resumeChildren
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Parent job ID
  -> m Int64
resumeChildren :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
resumeChildren Int64
parentJobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.resumeChildren schemaName tableName parentJobId

-- | Cancel a job and all its descendants recursively.
--
-- Returns the total number of jobs deleted (parent + all descendants).
cancelJobCascade
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Root job ID
  -> m Int64
cancelJobCascade :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
cancelJobCascade Int64
jobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.cancelJobCascade schemaName tableName jobId

-- ---------------------------------------------------------------------------
-- Suspend/Resume Operations
-- ---------------------------------------------------------------------------

-- | Suspend a job, making it unclaimable.
--
-- Only suspends non-in-flight jobs (not currently being processed by workers).
-- Returns the number of rows updated (0 if job doesn't exist, is in-flight,
-- or already suspended).
suspendJob
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Job ID
  -> m Int64
suspendJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
suspendJob Int64
jobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.suspendJob schemaName tableName jobId

-- | Resume a suspended job, making it claimable again.
--
-- Returns the number of rows updated (0 if job doesn't exist or isn't suspended).
resumeJob
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Job ID
  -> m Int64
resumeJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
resumeJob Int64
jobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.resumeJob schemaName tableName jobId

-- ---------------------------------------------------------------------------
-- Results Table Operations
-- ---------------------------------------------------------------------------

-- | Insert a child's result into the results table.
--
-- Each child gets its own row keyed by @(parent_id, child_id)@.
-- The FK @ON DELETE CASCADE@ ensures cleanup when the parent is acked.
--
-- Returns the number of rows inserted (1 on success).
insertResult
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Parent job ID
  -> Int64
  -- ^ Child job ID
  -> Value
  -- ^ Encoded result value
  -> m Int64
insertResult :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> Int64 -> Value -> m Int64
insertResult Int64
parentJobId Int64
childId Value
result = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.insertResult schemaName tableName parentJobId childId result

-- | Get all child results for a parent from the results table.
getResultsByParent
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Parent job ID
  -> m (Map Int64 Value)
getResultsByParent :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Map Int64 Value)
getResultsByParent Int64
parentJobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.getResultsByParent schemaName tableName parentJobId

-- | Get DLQ child errors for a parent.
--
-- Returns a 'Map' from child job ID to the last error message.
getDLQChildErrorsByParent
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Parent job ID
  -> m (Map Int64 Text)
getDLQChildErrorsByParent :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Map Int64 Text)
getDLQChildErrorsByParent Int64
parentJobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.getDLQChildErrorsByParent schemaName tableName parentJobId

-- | Read child results, DLQ errors, parent_state snapshot, and DLQ failures
-- for a rollup finalizer in a single query.
readChildResultsRaw
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -- ^ Parent job ID
  -> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
readChildResultsRaw :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
readChildResultsRaw Int64
parentJobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.readChildResultsRaw schemaName tableName parentJobId

-- | Snapshot results into @parent_state@ before DLQ move.
persistParentState
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -> Value
  -> m Int64
persistParentState :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> Value -> m Int64
persistParentState Int64
jobId Value
state = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.persistParentState schemaName tableName jobId state

-- | Read raw @parent_state@ snapshot from the DB.
getParentStateSnapshot
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int64
  -> m (Maybe Value)
getParentStateSnapshot :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Maybe Value)
getParentStateSnapshot Int64
jobId = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.getParentStateSnapshot schemaName tableName jobId

-- ---------------------------------------------------------------------------
-- Groups Table Operations
-- ---------------------------------------------------------------------------

-- | Full recompute of the groups table from the main queue.
--
-- Corrects any drift in job_count, min_priority, min_id, and in_flight_until.
refreshGroups
  :: forall m registry payload
   . (QueueOperation m registry payload)
  => Int
  -- ^ Minimum interval between runs (seconds)
  -> m ()
refreshGroups :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> m ()
refreshGroups Int
intervalSecs = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  Ops.refreshGroups schemaName tableName intervalSecs

-- ---------------------------------------------------------------------------
-- Job Tree DSL
-- ---------------------------------------------------------------------------

-- | Insert a 'JobTree' atomically in a single transaction.
--
-- HighLevel wrapper that resolves schema/table from the registry.
-- See 'Arbiter.Core.JobTree.insertJobTree' for details.
insertJobTree
  :: forall m registry payload
   . (MonadUnliftIO m, QueueOperation m registry payload)
  => JT.JobTree payload
  -> m (Either Text (NonEmpty (JobRead payload)))
insertJobTree :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
JobTree payload -> m (Either Text (NonEmpty (JobRead payload)))
insertJobTree JobTree payload
tree = do
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
  JT.insertJobTree schemaName tableName tree