{-# LANGUAGE AllowAmbiguousTypes #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
module Arbiter.Core.HighLevel
(
QueueOperation
, JobOperation
, insertJob
, insertJobsBatch
, insertJobsBatch_
, claimNextVisibleJobs
, claimNextVisibleJobsBatched
, ackJob
, ackJobsBatch
, ackJobsBulk
, updateJobForRetry
, setVisibilityTimeout
, setVisibilityTimeoutBatch
, SetVisibilityResult (..)
, Ops.JobFilter (..)
, listJobsFiltered
, countJobsFiltered
, listDLQFiltered
, countDLQFiltered
, moveToDLQ
, moveToDLQBatch
, listDLQJobs
, retryFromDLQ
, dlqJobExists
, deleteDLQJob
, deleteDLQJobsBatch
, listJobs
, getJobById
, getJobsByGroup
, getJobsByParent
, getInFlightJobs
, cancelJob
, cancelJobsBatch
, promoteJob
, Ops.QueueStats (..)
, getQueueStats
, countJobs
, countJobsByGroup
, countJobsByParent
, countInFlightJobs
, countDLQJobs
, countChildrenBatch
, countDLQChildren
, countDLQChildrenBatch
, pauseChildren
, resumeChildren
, cancelJobCascade
, suspendJob
, resumeJob
, insertResult
, getResultsByParent
, getDLQChildErrorsByParent
, readChildResultsRaw
, Ops.mergeRawChildResults
, persistParentState
, getParentStateSnapshot
, refreshGroups
, insertJobTree
, getSchema
) where
import Data.Aeson (Value)
import Data.Int (Int32, Int64)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Map.Strict (Map)
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (NominalDiffTime)
import GHC.TypeLits (KnownSymbol, symbolVal)
import UnliftIO (MonadUnliftIO)
import Arbiter.Core.HasArbiterSchema (HasArbiterSchema (..))
import Arbiter.Core.Job.DLQ qualified as DLQ
import Arbiter.Core.Job.Types (Job (..), JobPayload, JobRead, JobWrite)
import Arbiter.Core.JobTree qualified as JT
import Arbiter.Core.MonadArbiter (MonadArbiter)
import Arbiter.Core.Operations qualified as Ops
import Arbiter.Core.QueueRegistry (TableForPayload)
type QueueOperation m registry payload =
( HasArbiterSchema m registry
, JobPayload payload
, KnownSymbol (TableForPayload payload registry)
, MonadArbiter m
)
type JobOperation m registry payload =
( HasArbiterSchema m registry
, JobPayload payload
, MonadArbiter m
)
insertJob
:: forall m registry payload
. (QueueOperation m registry payload)
=> JobWrite payload
-> m (Maybe (JobRead payload))
insertJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
JobWrite payload -> m (Maybe (JobRead payload))
insertJob JobWrite payload
job = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.insertJob schemaName tableName job
insertJobsBatch
:: forall m registry payload
. (QueueOperation m registry payload)
=> [JobWrite payload]
-> m [JobRead payload]
insertJobsBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobWrite payload] -> m [JobRead payload]
insertJobsBatch [JobWrite payload]
jobs = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.insertJobsBatch schemaName tableName jobs
insertJobsBatch_
:: forall m registry payload
. (QueueOperation m registry payload)
=> [JobWrite payload]
-> m Int64
insertJobsBatch_ :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobWrite payload] -> m Int64
insertJobsBatch_ [JobWrite payload]
jobs = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.insertJobsBatch_ schemaName tableName jobs
claimNextVisibleJobs
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int
-> NominalDiffTime
-> m [JobRead payload]
claimNextVisibleJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> NominalDiffTime -> m [JobRead payload]
claimNextVisibleJobs Int
limit NominalDiffTime
timeout = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.claimNextVisibleJobs schemaName tableName limit timeout
claimNextVisibleJobsBatched
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int
-> Int
-> NominalDiffTime
-> m [NonEmpty (JobRead payload)]
claimNextVisibleJobsBatched :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> Int -> NominalDiffTime -> m [NonEmpty (JobRead payload)]
claimNextVisibleJobsBatched Int
batchSize Int
maxGroups NominalDiffTime
timeout = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.claimNextVisibleJobsBatched schemaName tableName batchSize maxGroups timeout
ackJob
:: forall m registry payload
. (JobOperation m registry payload)
=> JobRead payload
-> m Int64
ackJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
JobRead payload -> m Int64
ackJob JobRead payload
job = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
job
Ops.ackJob schemaName tableName job
ackJobsBatch
:: forall m registry payload
. (JobOperation m registry payload)
=> [JobRead payload]
-> m Int64
ackJobsBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[JobRead payload] -> m Int64
ackJobsBatch [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
ackJobsBatch jobs :: [JobRead payload]
jobs@(JobRead payload
firstJob : [JobRead payload]
_) = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
firstJob
Ops.ackJobsBatch schemaName tableName jobs
ackJobsBulk
:: forall m registry payload
. (JobOperation m registry payload)
=> [JobRead payload]
-> m Int64
ackJobsBulk :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[JobRead payload] -> m Int64
ackJobsBulk [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
ackJobsBulk jobs :: [JobRead payload]
jobs@(JobRead payload
firstJob : [JobRead payload]
_) = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
firstJob
Ops.ackJobsBulk schemaName tableName jobs
updateJobForRetry
:: forall m registry payload
. (JobOperation m registry payload)
=> NominalDiffTime
-> Text
-> JobRead payload
-> m Int64
updateJobForRetry :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
NominalDiffTime -> Text -> JobRead payload -> m Int64
updateJobForRetry NominalDiffTime
delay Text
errorMsg JobRead payload
job = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
job
Ops.updateJobForRetry schemaName tableName delay errorMsg job
setVisibilityTimeout
:: forall m registry payload
. (JobOperation m registry payload)
=> NominalDiffTime
-> JobRead payload
-> m Int64
setVisibilityTimeout :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
NominalDiffTime -> JobRead payload -> m Int64
setVisibilityTimeout NominalDiffTime
timeout JobRead payload
job = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
job
Ops.setVisibilityTimeout schemaName tableName timeout job
data SetVisibilityResult
=
VisibilityExtended Int64
|
JobGone Int64
|
JobReclaimed Int64 Int32 Int32
deriving stock (SetVisibilityResult -> SetVisibilityResult -> Bool
(SetVisibilityResult -> SetVisibilityResult -> Bool)
-> (SetVisibilityResult -> SetVisibilityResult -> Bool)
-> Eq SetVisibilityResult
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: SetVisibilityResult -> SetVisibilityResult -> Bool
== :: SetVisibilityResult -> SetVisibilityResult -> Bool
$c/= :: SetVisibilityResult -> SetVisibilityResult -> Bool
/= :: SetVisibilityResult -> SetVisibilityResult -> Bool
Eq, Int -> SetVisibilityResult -> ShowS
[SetVisibilityResult] -> ShowS
SetVisibilityResult -> String
(Int -> SetVisibilityResult -> ShowS)
-> (SetVisibilityResult -> String)
-> ([SetVisibilityResult] -> ShowS)
-> Show SetVisibilityResult
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SetVisibilityResult -> ShowS
showsPrec :: Int -> SetVisibilityResult -> ShowS
$cshow :: SetVisibilityResult -> String
show :: SetVisibilityResult -> String
$cshowList :: [SetVisibilityResult] -> ShowS
showList :: [SetVisibilityResult] -> ShowS
Show)
setVisibilityTimeoutBatch
:: forall m registry payload
. (JobOperation m registry payload)
=> NominalDiffTime
-> [JobRead payload]
-> m [SetVisibilityResult]
setVisibilityTimeoutBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
NominalDiffTime -> [JobRead payload] -> m [SetVisibilityResult]
setVisibilityTimeoutBatch NominalDiffTime
_ [] = [SetVisibilityResult] -> m [SetVisibilityResult]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
setVisibilityTimeoutBatch NominalDiffTime
timeout jobs :: [JobRead payload]
jobs@(JobRead payload
firstJob : [JobRead payload]
_) = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
firstJob
infos <- Ops.setVisibilityTimeoutBatch schemaName tableName timeout jobs
let jobMap = [(Int64, JobRead payload)] -> Map Int64 (JobRead payload)
forall k a. Ord k => [(k, a)] -> Map k a
Map.fromList [(JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
j, JobRead payload
j) | JobRead payload
j <- [JobRead payload]
jobs]
toResult VisibilityUpdateInfo
info = case VisibilityUpdateInfo
info of
Ops.VisibilityUpdateInfo Int64
jobId Bool
True Maybe Int32
_ -> Int64 -> SetVisibilityResult
VisibilityExtended Int64
jobId
Ops.VisibilityUpdateInfo Int64
jobId Bool
False Maybe Int32
Nothing -> Int64 -> SetVisibilityResult
JobGone Int64
jobId
Ops.VisibilityUpdateInfo Int64
jobId Bool
False (Just Int32
actual) ->
let jobAttempts :: Int32
jobAttempts = Int32
-> (JobRead payload -> Int32) -> Maybe (JobRead payload) -> Int32
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int32
0 JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts (Int64 -> Map Int64 (JobRead payload) -> Maybe (JobRead payload)
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Int64
jobId Map Int64 (JobRead payload)
jobMap)
in Int64 -> Int32 -> Int32 -> SetVisibilityResult
JobReclaimed Int64
jobId Int32
jobAttempts Int32
actual
pure $ map toResult infos
moveToDLQ
:: forall m registry payload
. (JobOperation m registry payload)
=> Text
-> JobRead payload
-> m Int64
moveToDLQ :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
Text -> JobRead payload -> m Int64
moveToDLQ Text
errorMsg JobRead payload
job = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
job
Ops.moveToDLQ schemaName tableName errorMsg job
listDLQJobs
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int
-> Int
-> m [DLQ.DLQJob payload]
listDLQJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> Int -> m [DLQJob payload]
listDLQJobs Int
limit Int
offset = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.listDLQJobs schemaName tableName limit offset
retryFromDLQ
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m (Maybe (JobRead payload))
retryFromDLQ :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Maybe (JobRead payload))
retryFromDLQ Int64
dlqId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.retryFromDLQ schemaName tableName dlqId
dlqJobExists
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Bool
dlqJobExists :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Bool
dlqJobExists Int64
dlqId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.dlqJobExists schemaName tableName dlqId
deleteDLQJob
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
deleteDLQJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
deleteDLQJob Int64
dlqId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.deleteDLQJob schemaName tableName dlqId
moveToDLQBatch
:: forall m registry payload
. (JobOperation m registry payload)
=> [(JobRead payload, Text)]
-> m Int64
moveToDLQBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[(JobRead payload, Text)] -> m Int64
moveToDLQBatch [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
moveToDLQBatch jobsWithErrors :: [(JobRead payload, Text)]
jobsWithErrors@((JobRead payload
firstJob, Text
_) : [(JobRead payload, Text)]
_) = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName JobRead payload
firstJob
Ops.moveToDLQBatch schemaName tableName jobsWithErrors
deleteDLQJobsBatch
:: forall m registry payload
. (QueueOperation m registry payload)
=> [Int64]
-> m Int64
deleteDLQJobsBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m Int64
deleteDLQJobsBatch [Int64]
dlqIds = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.deleteDLQJobsBatch schemaName tableName dlqIds
listJobsFiltered
:: forall m registry payload
. (QueueOperation m registry payload)
=> [Ops.JobFilter]
-> Int
-> Int
-> m [JobRead payload]
listJobsFiltered :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered [JobFilter]
filters Int
limit Int
offset = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.listJobsFiltered schemaName tableName filters limit offset
countJobsFiltered
:: forall m registry payload
. (QueueOperation m registry payload)
=> [Ops.JobFilter]
-> m Int64
countJobsFiltered :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobFilter] -> m Int64
countJobsFiltered [JobFilter]
filters = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.countJobsFiltered schemaName tableName filters
listDLQFiltered
:: forall m registry payload
. (QueueOperation m registry payload)
=> [Ops.JobFilter]
-> Int
-> Int
-> m [DLQ.DLQJob payload]
listDLQFiltered :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobFilter] -> Int -> Int -> m [DLQJob payload]
listDLQFiltered [JobFilter]
filters Int
limit Int
offset = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.listDLQFiltered schemaName tableName filters limit offset
countDLQFiltered
:: forall m registry payload
. (QueueOperation m registry payload)
=> [Ops.JobFilter]
-> m Int64
countDLQFiltered :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[JobFilter] -> m Int64
countDLQFiltered [JobFilter]
filters = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.countDLQFiltered schemaName tableName filters
listJobs
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int
-> Int
-> m [JobRead payload]
listJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> Int -> m [JobRead payload]
listJobs Int
limit Int
offset = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.listJobs schemaName tableName limit offset
getJobById
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m (Maybe (JobRead payload))
getJobById :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Maybe (JobRead payload))
getJobById Int64
jobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.getJobById schemaName tableName jobId
getJobsByGroup
:: forall m registry payload
. (QueueOperation m registry payload)
=> Text
-> Int
-> Int
-> m [JobRead payload]
getJobsByGroup :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Text -> Int -> Int -> m [JobRead payload]
getJobsByGroup Text
groupKey Int
limit Int
offset = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.getJobsByGroup schemaName tableName groupKey limit offset
getJobsByParent
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> Int
-> Int
-> m [JobRead payload]
getJobsByParent :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> Int -> Int -> m [JobRead payload]
getJobsByParent Int64
pid Int
limit Int
offset = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.getJobsByParent schemaName tableName pid limit offset
getInFlightJobs
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int
-> Int
-> m [JobRead payload]
getInFlightJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> Int -> m [JobRead payload]
getInFlightJobs Int
limit Int
offset = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.getInFlightJobs schemaName tableName limit offset
cancelJob
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
cancelJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
cancelJob Int64
jobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.cancelJob schemaName tableName jobId
cancelJobsBatch
:: forall m registry payload
. (QueueOperation m registry payload)
=> [Int64]
-> m Int64
cancelJobsBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m Int64
cancelJobsBatch [Int64]
jobIds = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.cancelJobsBatch schemaName tableName jobIds
promoteJob
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
promoteJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
promoteJob Int64
jobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.promoteJob schemaName tableName jobId
getQueueStats
:: forall m registry payload
. (QueueOperation m registry payload)
=> m Ops.QueueStats
getQueueStats :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
m QueueStats
getQueueStats = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.getQueueStats schemaName tableName
countJobs
:: forall m registry payload
. (QueueOperation m registry payload)
=> m Int64
countJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
m Int64
countJobs = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.countJobs schemaName tableName
countJobsByGroup
:: forall m registry payload
. (QueueOperation m registry payload)
=> Text
-> m Int64
countJobsByGroup :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Text -> m Int64
countJobsByGroup Text
groupKey = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.countJobsByGroup schemaName tableName groupKey
countJobsByParent
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
countJobsByParent :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
countJobsByParent Int64
pid = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.countJobsByParent schemaName tableName pid
countInFlightJobs
:: forall m registry payload
. (QueueOperation m registry payload)
=> m Int64
countInFlightJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
m Int64
countInFlightJobs = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.countInFlightJobs schemaName tableName
countDLQJobs
:: forall m registry payload
. (QueueOperation m registry payload)
=> m Int64
countDLQJobs :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
m Int64
countDLQJobs = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.countDLQJobs schemaName tableName
countChildrenBatch
:: forall m registry payload
. (QueueOperation m registry payload)
=> [Int64]
-> m (Map Int64 (Int64, Int64))
countChildrenBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m (Map Int64 (Int64, Int64))
countChildrenBatch [Int64]
ids = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.countChildrenBatch schemaName tableName ids
countDLQChildren
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
countDLQChildren :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
countDLQChildren Int64
parentJobId = do
m <- forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m (Map Int64 Int64)
countDLQChildrenBatch @m @registry @payload [Int64
parentJobId]
pure $ fromMaybe 0 (Map.lookup parentJobId m)
countDLQChildrenBatch
:: forall m registry payload
. (QueueOperation m registry payload)
=> [Int64]
-> m (Map Int64 Int64)
countDLQChildrenBatch :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
[Int64] -> m (Map Int64 Int64)
countDLQChildrenBatch [Int64]
ids = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.countDLQChildrenBatch schemaName tableName ids
pauseChildren
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
pauseChildren :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
pauseChildren Int64
parentJobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.pauseChildren schemaName tableName parentJobId
resumeChildren
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
resumeChildren :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
resumeChildren Int64
parentJobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.resumeChildren schemaName tableName parentJobId
cancelJobCascade
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
cancelJobCascade :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
cancelJobCascade Int64
jobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.cancelJobCascade schemaName tableName jobId
suspendJob
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
suspendJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
suspendJob Int64
jobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.suspendJob schemaName tableName jobId
resumeJob
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m Int64
resumeJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m Int64
resumeJob Int64
jobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.resumeJob schemaName tableName jobId
insertResult
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> Int64
-> Value
-> m Int64
insertResult :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> Int64 -> Value -> m Int64
insertResult Int64
parentJobId Int64
childId Value
result = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.insertResult schemaName tableName parentJobId childId result
getResultsByParent
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m (Map Int64 Value)
getResultsByParent :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Map Int64 Value)
getResultsByParent Int64
parentJobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.getResultsByParent schemaName tableName parentJobId
getDLQChildErrorsByParent
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m (Map Int64 Text)
getDLQChildErrorsByParent :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Map Int64 Text)
getDLQChildErrorsByParent Int64
parentJobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.getDLQChildErrorsByParent schemaName tableName parentJobId
readChildResultsRaw
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
readChildResultsRaw :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
readChildResultsRaw Int64
parentJobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.readChildResultsRaw schemaName tableName parentJobId
persistParentState
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> Value
-> m Int64
persistParentState :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> Value -> m Int64
persistParentState Int64
jobId Value
state = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.persistParentState schemaName tableName jobId state
getParentStateSnapshot
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int64
-> m (Maybe Value)
getParentStateSnapshot :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int64 -> m (Maybe Value)
getParentStateSnapshot Int64
jobId = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.getParentStateSnapshot schemaName tableName jobId
refreshGroups
:: forall m registry payload
. (QueueOperation m registry payload)
=> Int
-> m ()
refreshGroups :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> m ()
refreshGroups Int
intervalSecs = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
Ops.refreshGroups schemaName tableName intervalSecs
insertJobTree
:: forall m registry payload
. (MonadUnliftIO m, QueueOperation m registry payload)
=> JT.JobTree payload
-> m (Either Text (NonEmpty (JobRead payload)))
insertJobTree :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
JobTree payload -> m (Either Text (NonEmpty (JobRead payload)))
insertJobTree JobTree payload
tree = do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
JT.insertJobTree schemaName tableName tree