{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE OverloadedStrings #-}
module Arbiter.Core.Operations
(
insertJob
, insertJobUnsafe
, insertJobsBatch
, insertJobsBatch_
, insertResult
, getResultsByParent
, getDLQChildErrorsByParent
, persistParentState
, claimNextVisibleJobs
, claimNextVisibleJobsBatched
, ackJob
, ackJobsBatch
, ackJobsBulk
, setVisibilityTimeout
, setVisibilityTimeoutBatch
, VisibilityUpdateInfo (..)
, updateJobForRetry
, moveToDLQ
, moveToDLQBatch
, retryFromDLQ
, dlqJobExists
, listDLQJobs
, listDLQJobsByParent
, countDLQJobsByParent
, deleteDLQJob
, deleteDLQJobsBatch
, Tmpl.JobFilter (..)
, listJobsFiltered
, countJobsFiltered
, listDLQFiltered
, countDLQFiltered
, listJobs
, getJobById
, getJobsByGroup
, getInFlightJobs
, cancelJob
, cancelJobsBatch
, promoteJob
, QueueStats (..)
, getQueueStats
, countJobs
, countJobsByGroup
, countInFlightJobs
, countDLQJobs
, getJobsByParent
, countJobsByParent
, countChildrenBatch
, countDLQChildrenBatch
, pauseChildren
, resumeChildren
, cancelJobCascade
, cancelJobTree
, suspendJob
, resumeJob
, refreshGroups
, getParentStateSnapshot
, readChildResultsRaw
, mergeRawChildResults
) where
import Control.Monad (foldM, void, when)
import Data.Aeson (FromJSON, Result (..), ToJSON, Value (Object), fromJSON, toJSON)
import Data.Foldable (for_, toList)
import Data.Functor qualified as Functor
import Data.Int (Int32, Int64)
import Data.List (groupBy, sortOn)
import Data.List.NonEmpty (NonEmpty (..))
import Data.List.NonEmpty qualified as NE
import Data.Map.Strict qualified as Map
import Data.Maybe (catMaybes, mapMaybe)
import Data.Sequence (Seq, (|>))
import Data.Sequence qualified as Seq
import Data.Set qualified as Set
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (NominalDiffTime, UTCTime)
import GHC.Generics (Generic, Generically (..))
import Arbiter.Core.Codec
( Col (..)
, Params
, RowCodec
, col
, countCodec
, dlqRowCodec
, jobRowCodec
, ncol
, parr
, pnarr
, pnul
, pval
, statsRowCodec
)
import Arbiter.Core.Exceptions (throwParsing)
import Arbiter.Core.Job.DLQ qualified as DLQ
import Arbiter.Core.Job.Schema (jobQueueTable)
import Arbiter.Core.Job.Types
( DedupKey (IgnoreDuplicate, ReplaceDuplicate)
, Job (..)
, JobPayload
, JobRead
, JobWrite
)
import Arbiter.Core.MonadArbiter (MonadArbiter (..))
import Arbiter.Core.SqlTemplates qualified as Tmpl
decodePayload :: (JobPayload payload, MonadArbiter m) => Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload :: forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
job = case Value -> Result payload
forall a. FromJSON a => Value -> Result a
fromJSON (Job Value Int64 Text UTCTime -> Value
forall payload key q insertedAt.
Job payload key q insertedAt -> payload
payload Job Value Int64 Text UTCTime
job) of
Success payload
p -> JobRead payload -> m (JobRead payload)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (JobRead payload -> m (JobRead payload))
-> JobRead payload -> m (JobRead payload)
forall a b. (a -> b) -> a -> b
$ Job Value Int64 Text UTCTime
job {payload = p}
Error String
e -> Text -> m (JobRead payload)
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing (Text -> m (JobRead payload)) -> Text -> m (JobRead payload)
forall a b. (a -> b) -> a -> b
$ Text
"Failed to decode job payload: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
e
boolCodec :: RowCodec Bool
boolCodec :: RowCodec Bool
boolCodec = Text -> Col Bool -> RowCodec Bool
forall a. Text -> Col a -> RowCodec a
col Text
"result" Col Bool
CBool
int64Codec :: RowCodec Int64
int64Codec :: RowCodec Int64
int64Codec = Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"result" Col Int64
CInt8
resultsRowCodec :: RowCodec (Int64, Value)
resultsRowCodec :: RowCodec (Int64, Value)
resultsRowCodec = (,) (Int64 -> Value -> (Int64, Value))
-> RowCodec Int64 -> Ap NullCol (Value -> (Int64, Value))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"child_id" Col Int64
CInt8 Ap NullCol (Value -> (Int64, Value))
-> Ap NullCol Value -> RowCodec (Int64, Value)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Value -> Ap NullCol Value
forall a. Text -> Col a -> RowCodec a
col Text
"result" Col Value
CJsonb
errorResultsRowCodec :: RowCodec (Int64, Maybe Text)
errorResultsRowCodec :: RowCodec (Int64, Maybe Text)
errorResultsRowCodec = (,) (Int64 -> Maybe Text -> (Int64, Maybe Text))
-> RowCodec Int64 -> Ap NullCol (Maybe Text -> (Int64, Maybe Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"job_id" Col Int64
CInt8 Ap NullCol (Maybe Text -> (Int64, Maybe Text))
-> Ap NullCol (Maybe Text) -> RowCodec (Int64, Maybe Text)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"last_error" Col Text
CText
visibilityUpdateCodec :: RowCodec VisibilityUpdateInfo
visibilityUpdateCodec :: RowCodec VisibilityUpdateInfo
visibilityUpdateCodec =
Int64 -> Bool -> Maybe Int32 -> VisibilityUpdateInfo
VisibilityUpdateInfo
(Int64 -> Bool -> Maybe Int32 -> VisibilityUpdateInfo)
-> RowCodec Int64
-> Ap NullCol (Bool -> Maybe Int32 -> VisibilityUpdateInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"id" Col Int64
CInt8
Ap NullCol (Bool -> Maybe Int32 -> VisibilityUpdateInfo)
-> RowCodec Bool
-> Ap NullCol (Maybe Int32 -> VisibilityUpdateInfo)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Bool -> RowCodec Bool
forall a. Text -> Col a -> RowCodec a
col Text
"was_heartbeated" Col Bool
CBool
Ap NullCol (Maybe Int32 -> VisibilityUpdateInfo)
-> Ap NullCol (Maybe Int32) -> RowCodec VisibilityUpdateInfo
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int32 -> Ap NullCol (Maybe Int32)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"current_db_attempts" Col Int32
CInt4
parentCountCodec :: RowCodec (Int64, (Int64, Int64))
parentCountCodec :: RowCodec (Int64, (Int64, Int64))
parentCountCodec =
(\Int64
pid Int64
cnt Int64
paused -> (Int64
pid, (Int64
cnt, Int64
paused)))
(Int64 -> Int64 -> Int64 -> (Int64, (Int64, Int64)))
-> RowCodec Int64
-> Ap NullCol (Int64 -> Int64 -> (Int64, (Int64, Int64)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"parent_id" Col Int64
CInt8
Ap NullCol (Int64 -> Int64 -> (Int64, (Int64, Int64)))
-> RowCodec Int64 -> Ap NullCol (Int64 -> (Int64, (Int64, Int64)))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"count" Col Int64
CInt8
Ap NullCol (Int64 -> (Int64, (Int64, Int64)))
-> RowCodec Int64 -> RowCodec (Int64, (Int64, Int64))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"count_suspended" Col Int64
CInt8
dlqParentCountCodec :: RowCodec (Int64, Int64)
dlqParentCountCodec :: RowCodec (Int64, Int64)
dlqParentCountCodec = (,) (Int64 -> Int64 -> (Int64, Int64))
-> RowCodec Int64 -> Ap NullCol (Int64 -> (Int64, Int64))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"parent_id" Col Int64
CInt8 Ap NullCol (Int64 -> (Int64, Int64))
-> RowCodec Int64 -> RowCodec (Int64, Int64)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> RowCodec Int64
forall a. Text -> Col a -> RowCodec a
col Text
"count" Col Int64
CInt8
childResultsRowCodec :: RowCodec (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
childResultsRowCodec :: RowCodec (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
childResultsRowCodec =
(,,,,)
(Text
-> Maybe Int64
-> Maybe Value
-> Maybe Text
-> Maybe Int64
-> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol Text
-> Ap
NullCol
(Maybe Int64
-> Maybe Value
-> Maybe Text
-> Maybe Int64
-> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Col Text -> Ap NullCol Text
forall a. Text -> Col a -> RowCodec a
col Text
"source" Col Text
CText
Ap
NullCol
(Maybe Int64
-> Maybe Value
-> Maybe Text
-> Maybe Int64
-> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol (Maybe Int64)
-> Ap
NullCol
(Maybe Value
-> Maybe Text
-> Maybe Int64
-> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> Ap NullCol (Maybe Int64)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"child_id" Col Int64
CInt8
Ap
NullCol
(Maybe Value
-> Maybe Text
-> Maybe Int64
-> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol (Maybe Value)
-> Ap
NullCol
(Maybe Text
-> Maybe Int64
-> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Value -> Ap NullCol (Maybe Value)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"result" Col Value
CJsonb
Ap
NullCol
(Maybe Text
-> Maybe Int64
-> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol (Maybe Text)
-> Ap
NullCol
(Maybe Int64
-> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"error" Col Text
CText
Ap
NullCol
(Maybe Int64
-> (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64))
-> Ap NullCol (Maybe Int64)
-> RowCodec
(Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
forall a b. Ap NullCol (a -> b) -> Ap NullCol a -> Ap NullCol b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Text -> Col Int64 -> Ap NullCol (Maybe Int64)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"dlq_pk" Col Int64
CInt8
nullableInt64Codec :: RowCodec (Maybe Int64)
nullableInt64Codec :: Ap NullCol (Maybe Int64)
nullableInt64Codec = Text -> Col Int64 -> Ap NullCol (Maybe Int64)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"parent_id" Col Int64
CInt8
buildWhereClause :: [Tmpl.JobFilter] -> (Text, Params)
buildWhereClause :: [JobFilter] -> (Text, Params)
buildWhereClause [] = (Text
"", [])
buildWhereClause [JobFilter]
filters =
let ([Text]
clauses, [Params]
params) = [(Text, Params)] -> ([Text], [Params])
forall (f :: * -> *) a b. Functor f => f (a, b) -> (f a, f b)
Functor.unzip ([(Text, Params)] -> ([Text], [Params]))
-> [(Text, Params)] -> ([Text], [Params])
forall a b. (a -> b) -> a -> b
$ (JobFilter -> (Text, Params)) -> [JobFilter] -> [(Text, Params)]
forall a b. (a -> b) -> [a] -> [b]
map JobFilter -> (Text, Params)
filterToClause [JobFilter]
filters
in (Text
"WHERE " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text -> [Text] -> Text
T.intercalate Text
" AND " [Text]
clauses, [Params] -> Params
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [Params]
params)
filterToClause :: Tmpl.JobFilter -> (Text, Params)
filterToClause :: JobFilter -> (Text, Params)
filterToClause (Tmpl.FilterGroupKey Text
gk) = (Text
"group_key = ?", [Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
gk])
filterToClause (Tmpl.FilterParentId Int64
pid) = (Text
"parent_id = ?", [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid])
filterToClause (Tmpl.FilterSuspended Bool
b) = (Text
"suspended = ?", [Col Bool -> Bool -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Bool
CBool Bool
b])
filterToClause JobFilter
Tmpl.FilterInFlight =
(Text
"attempts > 0 AND NOT suspended AND not_visible_until IS NOT NULL AND not_visible_until > NOW()", [])
insertJobUnsafe
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> JobWrite payload
-> m (Maybe (JobRead payload))
insertJobUnsafe :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
insertJobUnsafe Text
schemaName Text
tableName JobWrite payload
job = m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload))
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload)))
-> m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ do
let (Text
sql, SomeParam
dedupKeyParam, SomeParam
dedupStrategyParam) = case JobWrite payload -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey JobWrite payload
job of
Maybe DedupKey
Nothing -> (Text -> Text -> Text
Tmpl.insertJobSQL Text
schemaName Text
tableName, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText Maybe Text
forall a. Maybe a
Nothing, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText Maybe Text
forall a. Maybe a
Nothing)
Just (IgnoreDuplicate Text
k) -> (Text -> Text -> Text
Tmpl.insertJobSQL Text
schemaName Text
tableName, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k), Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"ignore"))
Just (ReplaceDuplicate Text
k) -> (Text -> Text -> Text
Tmpl.insertJobReplaceSQL Text
schemaName Text
tableName, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k), Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"replace"))
parentStateVal :: Maybe Value
parentStateVal = if JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobWrite payload
job then Value -> Maybe Value
forall a. a -> Maybe a
Just (Object -> Value
Object Object
forall a. Monoid a => a
mempty) else Maybe Value
forall a. Maybe a
Nothing
params :: Params
params =
[ Col Value -> Value -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Value
CJsonb (payload -> Value
forall a. ToJSON a => a -> Value
toJSON (payload -> Value) -> payload -> Value
forall a b. (a -> b) -> a -> b
$ JobWrite payload -> payload
forall payload key q insertedAt.
Job payload key q insertedAt -> payload
payload JobWrite payload
job)
, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (JobWrite payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey JobWrite payload
job)
, Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobWrite payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobWrite payload
job)
, Col Text -> Maybe Text -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Text
CText (JobWrite payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
lastError JobWrite payload
job)
, Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobWrite payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
priority JobWrite payload
job)
, SomeParam
dedupKeyParam
, SomeParam
dedupStrategyParam
, Col Int32 -> Maybe Int32 -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Int32
CInt4 (JobWrite payload -> Maybe Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
maxAttempts JobWrite payload
job)
, Col Int64 -> Maybe Int64 -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Int64
CInt8 (JobWrite payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId JobWrite payload
job)
, Col Value -> Maybe Value -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col Value
CJsonb Maybe Value
parentStateVal
, Col Bool -> Bool -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Bool
CBool (JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobWrite payload
job)
, Col UTCTime -> Maybe UTCTime -> SomeParam
forall a. Col a -> Maybe a -> SomeParam
pnul Col UTCTime
CTimestamptz (JobWrite payload -> Maybe UTCTime
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
notVisibleUntil JobWrite payload
job)
]
rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
params (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
case rawJobs of
[] -> case JobWrite payload -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey JobWrite payload
job of
Just (IgnoreDuplicate Text
_) -> Maybe (JobRead payload) -> m (Maybe (JobRead payload))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JobRead payload)
forall a. Maybe a
Nothing
Just (ReplaceDuplicate Text
_) -> Maybe (JobRead payload) -> m (Maybe (JobRead payload))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JobRead payload)
forall a. Maybe a
Nothing
Maybe DedupKey
Nothing -> Text -> m (Maybe (JobRead payload))
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"insertJob: No rows returned from INSERT"
(Job Value Int64 Text UTCTime
raw : [Job Value Int64 Text UTCTime]
_) -> JobRead payload -> Maybe (JobRead payload)
forall a. a -> Maybe a
Just (JobRead payload -> Maybe (JobRead payload))
-> m (JobRead payload) -> m (Maybe (JobRead payload))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Job Value Int64 Text UTCTime -> m (JobRead payload)
forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
raw
insertJob
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> JobWrite payload
-> m (Maybe (JobRead payload))
insertJob :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
insertJob Text
schemaName Text
tableName JobWrite payload
job = do
parentOk <- case JobWrite payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId JobWrite payload
job of
Maybe Int64
Nothing -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Just Int64
pid -> do
checkRows <- Text -> Params -> RowCodec Bool -> m [Bool]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.parentExistsSQL Text
schemaName Text
tableName) [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid] RowCodec Bool
boolCodec
case checkRows of
[Bool
True] -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
[Bool]
_ -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
if not parentOk
then pure Nothing
else insertJobUnsafe schemaName tableName job
insertJobsBatch
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> [JobWrite payload]
-> m [JobRead payload]
insertJobsBatch :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobWrite payload] -> m [JobRead payload]
insertJobsBatch Text
_ Text
_ [] = [JobRead payload] -> m [JobRead payload]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
insertJobsBatch Text
schemaName Text
tableName [JobWrite payload]
jobs = m [JobRead payload] -> m [JobRead payload]
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m [JobRead payload] -> m [JobRead payload])
-> m [JobRead payload] -> m [JobRead payload]
forall a b. (a -> b) -> a -> b
$ do
let cols :: BatchColumns
cols = [JobWrite payload] -> BatchColumns
forall payload.
ToJSON payload =>
[JobWrite payload] -> BatchColumns
buildBatchColumns [JobWrite payload]
jobs
params :: Params
params =
[ Col Value -> [Value] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Value
CJsonb (Seq Value -> [Value]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Value -> [Value]) -> Seq Value -> [Value]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Value
colPayloads BatchColumns
cols)
, Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colGroupKeys BatchColumns
cols)
, Col Int32 -> [Int32] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int32
CInt4 (Seq Int32 -> [Int32]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Int32 -> [Int32]) -> Seq Int32 -> [Int32]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Int32
colPriorities BatchColumns
cols)
, Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colDedupKeys BatchColumns
cols)
, Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colDedupStrategies BatchColumns
cols)
, Col Int32 -> [Maybe Int32] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Int32
CInt4 (Seq (Maybe Int32) -> [Maybe Int32]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Int32) -> [Maybe Int32])
-> Seq (Maybe Int32) -> [Maybe Int32]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Int32)
colMaxAttempts BatchColumns
cols)
, Col Int64 -> [Maybe Int64] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Int64
CInt8 (Seq (Maybe Int64) -> [Maybe Int64]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Int64) -> [Maybe Int64])
-> Seq (Maybe Int64) -> [Maybe Int64]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Int64)
colParentIds BatchColumns
cols)
, Col Value -> [Maybe Value] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Value
CJsonb (Seq (Maybe Value) -> [Maybe Value]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Value) -> [Maybe Value])
-> Seq (Maybe Value) -> [Maybe Value]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Value)
colParentStates BatchColumns
cols)
, Col Bool -> [Bool] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Bool
CBool (Seq Bool -> [Bool]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Bool -> [Bool]) -> Seq Bool -> [Bool]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Bool
colSuspended BatchColumns
cols)
, Col UTCTime -> [Maybe UTCTime] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col UTCTime
CTimestamptz (Seq (Maybe UTCTime) -> [Maybe UTCTime]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe UTCTime) -> [Maybe UTCTime])
-> Seq (Maybe UTCTime) -> [Maybe UTCTime]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe UTCTime)
colNotVisibleUntils BatchColumns
cols)
]
rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.insertJobsBatchSQL Text
schemaName Text
tableName) Params
params (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
mapM decodePayload rawJobs
insertJobsBatch_
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> [JobWrite payload]
-> m Int64
insertJobsBatch_ :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobWrite payload] -> m Int64
insertJobsBatch_ Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
insertJobsBatch_ Text
schemaName Text
tableName [JobWrite payload]
jobs = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
let cols :: BatchColumns
cols = [JobWrite payload] -> BatchColumns
forall payload.
ToJSON payload =>
[JobWrite payload] -> BatchColumns
buildBatchColumns [JobWrite payload]
jobs
params :: Params
params =
[ Col Value -> [Value] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Value
CJsonb (Seq Value -> [Value]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Value -> [Value]) -> Seq Value -> [Value]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Value
colPayloads BatchColumns
cols)
, Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colGroupKeys BatchColumns
cols)
, Col Int32 -> [Int32] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int32
CInt4 (Seq Int32 -> [Int32]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Int32 -> [Int32]) -> Seq Int32 -> [Int32]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Int32
colPriorities BatchColumns
cols)
, Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colDedupKeys BatchColumns
cols)
, Col Text -> [Maybe Text] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Text
CText (Seq (Maybe Text) -> [Maybe Text]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Text) -> [Maybe Text])
-> Seq (Maybe Text) -> [Maybe Text]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Text)
colDedupStrategies BatchColumns
cols)
, Col Int32 -> [Maybe Int32] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Int32
CInt4 (Seq (Maybe Int32) -> [Maybe Int32]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Int32) -> [Maybe Int32])
-> Seq (Maybe Int32) -> [Maybe Int32]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Int32)
colMaxAttempts BatchColumns
cols)
, Col Int64 -> [Maybe Int64] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Int64
CInt8 (Seq (Maybe Int64) -> [Maybe Int64]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Int64) -> [Maybe Int64])
-> Seq (Maybe Int64) -> [Maybe Int64]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Int64)
colParentIds BatchColumns
cols)
, Col Value -> [Maybe Value] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col Value
CJsonb (Seq (Maybe Value) -> [Maybe Value]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe Value) -> [Maybe Value])
-> Seq (Maybe Value) -> [Maybe Value]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe Value)
colParentStates BatchColumns
cols)
, Col Bool -> [Bool] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Bool
CBool (Seq Bool -> [Bool]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq Bool -> [Bool]) -> Seq Bool -> [Bool]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq Bool
colSuspended BatchColumns
cols)
, Col UTCTime -> [Maybe UTCTime] -> SomeParam
forall a. Col a -> [Maybe a] -> SomeParam
pnarr Col UTCTime
CTimestamptz (Seq (Maybe UTCTime) -> [Maybe UTCTime]
forall a. Seq a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList (Seq (Maybe UTCTime) -> [Maybe UTCTime])
-> Seq (Maybe UTCTime) -> [Maybe UTCTime]
forall a b. (a -> b) -> a -> b
$ BatchColumns -> Seq (Maybe UTCTime)
colNotVisibleUntils BatchColumns
cols)
]
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement (Text -> Text -> Text
Tmpl.insertJobsBatchSQL_ Text
schemaName Text
tableName) Params
params
insertResult
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> Int64
-> Value
-> m Int64
insertResult :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Int64 -> Value -> m Int64
insertResult Text
schemaName Text
tableName Int64
parentJobId Int64
childId Value
result =
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.insertResultSQL Text
schemaName Text
tableName)
[ Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId
, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
childId
, Col Value -> Value -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Value
CJsonb Value
result
]
getResultsByParent
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m (Map.Map Int64 Value)
getResultsByParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m (Map Int64 Value)
getResultsByParent Text
schemaName Text
tableName Int64
parentJobId = do
rows <-
Text -> Params -> RowCodec (Int64, Value) -> m [(Int64, Value)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.getResultsByParentSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
RowCodec (Int64, Value)
resultsRowCodec
pure $ Map.fromList rows
getDLQChildErrorsByParent
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m (Map.Map Int64 Text)
getDLQChildErrorsByParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m (Map Int64 Text)
getDLQChildErrorsByParent Text
schemaName Text
tableName Int64
parentJobId = do
rows <-
Text
-> Params
-> RowCodec (Int64, Maybe Text)
-> m [(Int64, Maybe Text)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.getDLQChildErrorsByParentSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
RowCodec (Int64, Maybe Text)
errorResultsRowCodec
pure $ Map.fromList $ mapMaybe (\(Int64
jid, Maybe Text
mErr) -> (Int64
jid,) (Text -> (Int64, Text)) -> Maybe Text -> Maybe (Int64, Text)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
mErr) rows
persistParentState
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> Value
-> m Int64
persistParentState :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Value -> m Int64
persistParentState Text
schemaName Text
tableName Int64
jobId Value
state =
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.persistParentStateSQL Text
schemaName Text
tableName)
[Col Value -> Value -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Value
CJsonb Value
state, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
buildBatchColumns :: forall payload. (ToJSON payload) => [JobWrite payload] -> BatchColumns
buildBatchColumns :: forall payload.
ToJSON payload =>
[JobWrite payload] -> BatchColumns
buildBatchColumns = (Map Text Int, Int, BatchColumns) -> BatchColumns
forall {a} {b} {c}. (a, b, c) -> c
extractCols ((Map Text Int, Int, BatchColumns) -> BatchColumns)
-> ([JobWrite payload] -> (Map Text Int, Int, BatchColumns))
-> [JobWrite payload]
-> BatchColumns
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((Map Text Int, Int, BatchColumns)
-> JobWrite payload -> (Map Text Int, Int, BatchColumns))
-> (Map Text Int, Int, BatchColumns)
-> [JobWrite payload]
-> (Map Text Int, Int, BatchColumns)
forall b a. (b -> a -> b) -> b -> [a] -> b
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (Map Text Int, Int, BatchColumns)
-> JobWrite payload -> (Map Text Int, Int, BatchColumns)
step (Map Text Int
forall k a. Map k a
Map.empty, Int
0, BatchColumns
emptyColumns)
where
extractCols :: (a, b, c) -> c
extractCols (a
_, b
_, c
cols) = c
cols
step :: (Map Text Int, Int, BatchColumns)
-> JobWrite payload -> (Map Text Int, Int, BatchColumns)
step (!Map Text Int
seen, !Int
n, !BatchColumns
cols) JobWrite payload
job = case Maybe DedupKey -> Maybe Text
dedupKeyText (JobWrite payload -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey JobWrite payload
job) of
Maybe Text
Nothing -> (Map Text Int
seen, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, BatchColumns -> JobWrite payload -> BatchColumns
snocJob BatchColumns
cols JobWrite payload
job)
Just Text
k -> case Text -> Map Text Int -> Maybe Int
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup Text
k Map Text Int
seen of
Maybe Int
Nothing -> (Text -> Int -> Map Text Int -> Map Text Int
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert Text
k Int
n Map Text Int
seen, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, BatchColumns -> JobWrite payload -> BatchColumns
snocJob BatchColumns
cols JobWrite payload
job)
Just Int
idx
| JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isReplace JobWrite payload
job -> (Map Text Int
seen, Int
n, Int -> BatchColumns -> JobWrite payload -> BatchColumns
updateJob Int
idx BatchColumns
cols JobWrite payload
job)
| Bool
otherwise -> (Map Text Int
seen, Int
n, BatchColumns
cols)
isReplace :: Job payload key q insertedAt -> Bool
isReplace Job payload key q insertedAt
job = case Job payload key q insertedAt -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey Job payload key q insertedAt
job of
Just (ReplaceDuplicate Text
_) -> Bool
True
Maybe DedupKey
_ -> Bool
False
snocJob :: BatchColumns -> JobWrite payload -> BatchColumns
snocJob = (forall a. a -> Seq a -> Seq a)
-> BatchColumns -> JobWrite payload -> BatchColumns
withJob ((Seq a -> a -> Seq a) -> a -> Seq a -> Seq a
forall a b c. (a -> b -> c) -> b -> a -> c
flip Seq a -> a -> Seq a
forall a. Seq a -> a -> Seq a
(|>))
updateJob :: Int -> BatchColumns -> JobWrite payload -> BatchColumns
updateJob Int
idx = (forall a. a -> Seq a -> Seq a)
-> BatchColumns -> JobWrite payload -> BatchColumns
withJob (Int -> a -> Seq a -> Seq a
forall a. Int -> a -> Seq a -> Seq a
Seq.update Int
idx)
withJob :: (forall a. a -> Seq a -> Seq a) -> BatchColumns -> JobWrite payload -> BatchColumns
withJob :: (forall a. a -> Seq a -> Seq a)
-> BatchColumns -> JobWrite payload -> BatchColumns
withJob forall a. a -> Seq a -> Seq a
f BatchColumns
cols JobWrite payload
job =
let (Maybe Text
dk, Maybe Text
ds) = Maybe DedupKey -> (Maybe Text, Maybe Text)
dedupParts (JobWrite payload -> Maybe DedupKey
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey JobWrite payload
job)
in BatchColumns
{ colPayloads :: Seq Value
colPayloads = Value -> Seq Value -> Seq Value
forall a. a -> Seq a -> Seq a
f (payload -> Value
forall a. ToJSON a => a -> Value
toJSON (JobWrite payload -> payload
forall payload key q insertedAt.
Job payload key q insertedAt -> payload
payload JobWrite payload
job)) (BatchColumns -> Seq Value
colPayloads BatchColumns
cols)
, colGroupKeys :: Seq (Maybe Text)
colGroupKeys = Maybe Text -> Seq (Maybe Text) -> Seq (Maybe Text)
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey JobWrite payload
job) (BatchColumns -> Seq (Maybe Text)
colGroupKeys BatchColumns
cols)
, colPriorities :: Seq Int32
colPriorities = Int32 -> Seq Int32 -> Seq Int32
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
priority JobWrite payload
job) (BatchColumns -> Seq Int32
colPriorities BatchColumns
cols)
, colDedupKeys :: Seq (Maybe Text)
colDedupKeys = Maybe Text -> Seq (Maybe Text) -> Seq (Maybe Text)
forall a. a -> Seq a -> Seq a
f Maybe Text
dk (BatchColumns -> Seq (Maybe Text)
colDedupKeys BatchColumns
cols)
, colDedupStrategies :: Seq (Maybe Text)
colDedupStrategies = Maybe Text -> Seq (Maybe Text) -> Seq (Maybe Text)
forall a. a -> Seq a -> Seq a
f Maybe Text
ds (BatchColumns -> Seq (Maybe Text)
colDedupStrategies BatchColumns
cols)
, colMaxAttempts :: Seq (Maybe Int32)
colMaxAttempts = Maybe Int32 -> Seq (Maybe Int32) -> Seq (Maybe Int32)
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Maybe Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
maxAttempts JobWrite payload
job) (BatchColumns -> Seq (Maybe Int32)
colMaxAttempts BatchColumns
cols)
, colParentIds :: Seq (Maybe Int64)
colParentIds = Maybe Int64 -> Seq (Maybe Int64) -> Seq (Maybe Int64)
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId JobWrite payload
job) (BatchColumns -> Seq (Maybe Int64)
colParentIds BatchColumns
cols)
, colParentStates :: Seq (Maybe Value)
colParentStates = Maybe Value -> Seq (Maybe Value) -> Seq (Maybe Value)
forall a. a -> Seq a -> Seq a
f (if JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobWrite payload
job then Value -> Maybe Value
forall a. a -> Maybe a
Just (Object -> Value
Object Object
forall a. Monoid a => a
mempty) else Maybe Value
forall a. Maybe a
Nothing) (BatchColumns -> Seq (Maybe Value)
colParentStates BatchColumns
cols)
, colSuspended :: Seq Bool
colSuspended = Bool -> Seq Bool -> Seq Bool
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobWrite payload
job) (BatchColumns -> Seq Bool
colSuspended BatchColumns
cols)
, colNotVisibleUntils :: Seq (Maybe UTCTime)
colNotVisibleUntils = Maybe UTCTime -> Seq (Maybe UTCTime) -> Seq (Maybe UTCTime)
forall a. a -> Seq a -> Seq a
f (JobWrite payload -> Maybe UTCTime
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
notVisibleUntil JobWrite payload
job) (BatchColumns -> Seq (Maybe UTCTime)
colNotVisibleUntils BatchColumns
cols)
}
dedupParts :: Maybe DedupKey -> (Maybe Text, Maybe Text)
dedupParts :: Maybe DedupKey -> (Maybe Text, Maybe Text)
dedupParts Maybe DedupKey
Nothing = (Maybe Text
forall a. Maybe a
Nothing, Maybe Text
forall a. Maybe a
Nothing)
dedupParts (Just (IgnoreDuplicate Text
k)) = (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k, Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"ignore")
dedupParts (Just (ReplaceDuplicate Text
k)) = (Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k, Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"replace")
dedupKeyText :: Maybe DedupKey -> Maybe Text
dedupKeyText :: Maybe DedupKey -> Maybe Text
dedupKeyText Maybe DedupKey
Nothing = Maybe Text
forall a. Maybe a
Nothing
dedupKeyText (Just (IgnoreDuplicate Text
k)) = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k
dedupKeyText (Just (ReplaceDuplicate Text
k)) = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
k
data BatchColumns = BatchColumns
{ BatchColumns -> Seq Value
colPayloads :: !(Seq Value)
, BatchColumns -> Seq (Maybe Text)
colGroupKeys :: !(Seq (Maybe Text))
, BatchColumns -> Seq Int32
colPriorities :: !(Seq Int32)
, BatchColumns -> Seq (Maybe Text)
colDedupKeys :: !(Seq (Maybe Text))
, BatchColumns -> Seq (Maybe Text)
colDedupStrategies :: !(Seq (Maybe Text))
, BatchColumns -> Seq (Maybe Int32)
colMaxAttempts :: !(Seq (Maybe Int32))
, BatchColumns -> Seq (Maybe Int64)
colParentIds :: !(Seq (Maybe Int64))
, BatchColumns -> Seq (Maybe Value)
colParentStates :: !(Seq (Maybe Value))
, BatchColumns -> Seq Bool
colSuspended :: !(Seq Bool)
, BatchColumns -> Seq (Maybe UTCTime)
colNotVisibleUntils :: !(Seq (Maybe UTCTime))
}
deriving stock ((forall x. BatchColumns -> Rep BatchColumns x)
-> (forall x. Rep BatchColumns x -> BatchColumns)
-> Generic BatchColumns
forall x. Rep BatchColumns x -> BatchColumns
forall x. BatchColumns -> Rep BatchColumns x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. BatchColumns -> Rep BatchColumns x
from :: forall x. BatchColumns -> Rep BatchColumns x
$cto :: forall x. Rep BatchColumns x -> BatchColumns
to :: forall x. Rep BatchColumns x -> BatchColumns
Generic)
deriving (Semigroup BatchColumns
BatchColumns
Semigroup BatchColumns =>
BatchColumns
-> (BatchColumns -> BatchColumns -> BatchColumns)
-> ([BatchColumns] -> BatchColumns)
-> Monoid BatchColumns
[BatchColumns] -> BatchColumns
BatchColumns -> BatchColumns -> BatchColumns
forall a.
Semigroup a =>
a -> (a -> a -> a) -> ([a] -> a) -> Monoid a
$cmempty :: BatchColumns
mempty :: BatchColumns
$cmappend :: BatchColumns -> BatchColumns -> BatchColumns
mappend :: BatchColumns -> BatchColumns -> BatchColumns
$cmconcat :: [BatchColumns] -> BatchColumns
mconcat :: [BatchColumns] -> BatchColumns
Monoid, NonEmpty BatchColumns -> BatchColumns
BatchColumns -> BatchColumns -> BatchColumns
(BatchColumns -> BatchColumns -> BatchColumns)
-> (NonEmpty BatchColumns -> BatchColumns)
-> (forall b. Integral b => b -> BatchColumns -> BatchColumns)
-> Semigroup BatchColumns
forall b. Integral b => b -> BatchColumns -> BatchColumns
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
$c<> :: BatchColumns -> BatchColumns -> BatchColumns
<> :: BatchColumns -> BatchColumns -> BatchColumns
$csconcat :: NonEmpty BatchColumns -> BatchColumns
sconcat :: NonEmpty BatchColumns -> BatchColumns
$cstimes :: forall b. Integral b => b -> BatchColumns -> BatchColumns
stimes :: forall b. Integral b => b -> BatchColumns -> BatchColumns
Semigroup) via Generically BatchColumns
emptyColumns :: BatchColumns
emptyColumns :: BatchColumns
emptyColumns = BatchColumns
forall a. Monoid a => a
mempty
claimNextVisibleJobs
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Int
-> NominalDiffTime
-> m [JobRead payload]
claimNextVisibleJobs :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> NominalDiffTime -> m [JobRead payload]
claimNextVisibleJobs Text
schemaName Text
tableName Int
maxJobs NominalDiffTime
timeout = m [JobRead payload] -> m [JobRead payload]
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m [JobRead payload] -> m [JobRead payload])
-> m [JobRead payload] -> m [JobRead payload]
forall a b. (a -> b) -> a -> b
$ do
let claimSql :: Text
claimSql = Text -> Text -> Int -> Int -> Text
Tmpl.claimJobsSQL Text
schemaName Text
tableName Int
maxJobs (NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
timeout)
rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
claimSql [] (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
mapM decodePayload rawJobs
claimNextVisibleJobsBatched
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Int
-> Int
-> NominalDiffTime
-> m [NonEmpty (JobRead payload)]
claimNextVisibleJobsBatched :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text
-> Text
-> Int
-> Int
-> NominalDiffTime
-> m [NonEmpty (JobRead payload)]
claimNextVisibleJobsBatched Text
schemaName Text
tableName Int
batchSize Int
maxBatches NominalDiffTime
timeout
| Int
batchSize Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 = [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
| Int
maxBatches Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1 = [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
| Bool
otherwise = m [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)]
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)])
-> m [NonEmpty (JobRead payload)] -> m [NonEmpty (JobRead payload)]
forall a b. (a -> b) -> a -> b
$ do
let claimSql :: Text
claimSql = Text -> Text -> Int -> Int -> Int -> Text
Tmpl.claimJobsBatchedSQL Text
schemaName Text
tableName Int
batchSize Int
maxBatches (NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
timeout)
rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
claimSql [] (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
jobs <- mapM decodePayload rawJobs
let sorted = (JobRead payload -> Maybe Text)
-> [JobRead payload] -> [JobRead payload]
forall b a. Ord b => (a -> b) -> [a] -> [a]
sortOn JobRead payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey [JobRead payload]
jobs
groups = (JobRead payload -> JobRead payload -> Bool)
-> [JobRead payload] -> [[JobRead payload]]
forall a. (a -> a -> Bool) -> [a] -> [[a]]
groupBy (\JobRead payload
j1 JobRead payload
j2 -> JobRead payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey JobRead payload
j1 Maybe Text -> Maybe Text -> Bool
forall a. Eq a => a -> a -> Bool
== JobRead payload -> Maybe Text
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey JobRead payload
j2) [JobRead payload]
sorted
pure $ concatMap (chunksOfNE batchSize) $ mapMaybe NE.nonEmpty groups
chunksOfNE :: Int -> NonEmpty a -> [NonEmpty a]
chunksOfNE :: forall a. Int -> NonEmpty a -> [NonEmpty a]
chunksOfNE Int
n (a
x :| [a]
xs) = [a] -> [NonEmpty a]
go (a
x a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
xs)
where
go :: [a] -> [NonEmpty a]
go [] = []
go (a
y : [a]
ys) =
let ([a]
chunk, [a]
rest) = Int -> [a] -> ([a], [a])
forall a. Int -> [a] -> ([a], [a])
splitAt (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [a]
ys
in (a
y a -> [a] -> NonEmpty a
forall a. a -> [a] -> NonEmpty a
:| [a]
chunk) NonEmpty a -> [NonEmpty a] -> [NonEmpty a]
forall a. a -> [a] -> [a]
: [a] -> [NonEmpty a]
go [a]
rest
ackJob
:: forall m payload
. (MonadArbiter m)
=> Text
-> Text
-> JobRead payload
-> m Int64
ackJob :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> JobRead payload -> m Int64
ackJob Text
schemaName Text
tableName JobRead payload
job = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> JobRead payload -> m Int64
forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> JobRead payload -> m Int64
ackJobInner Text
schemaName Text
tableName JobRead payload
job
ackJobInner
:: forall m payload
. (MonadArbiter m)
=> Text -> Text -> JobRead payload -> m Int64
ackJobInner :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> JobRead payload -> m Int64
ackJobInner Text
schemaName Text
tableName JobRead payload
job = do
Maybe Int64 -> (Int64 -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId JobRead payload
job) ((Int64 -> m ()) -> m ()) -> (Int64 -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Int64
pid ->
m [Maybe Text] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Maybe Text] -> m ()) -> m [Maybe Text] -> m ()
forall a b. (a -> b) -> a -> b
$
Text -> Params -> Ap NullCol (Maybe Text) -> m [Maybe Text]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
Text
"SELECT pg_advisory_xact_lock(hashtextextended(?, ?))::text AS result"
[Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText (Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid]
(Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"result" Col Text
CText)
let jid :: Int64
jid = JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job
jatt :: Int32
jatt = JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job
params :: Params
params = [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jid, Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 Int32
jatt, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jid, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jid, Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 Int32
jatt, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jid]
rows <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.smartAckJobSQL Text
schemaName Text
tableName) Params
params RowCodec Int64
int64Codec
case rows of
[Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
[Int64]
_ -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
tryResumeParent :: (MonadArbiter m) => Text -> Text -> Int64 -> m ()
tryResumeParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
tryResumeParent Text
schemaName Text
tableName Int64
pid = do
m [Maybe Text] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Maybe Text] -> m ()) -> m [Maybe Text] -> m ()
forall a b. (a -> b) -> a -> b
$
Text -> Params -> Ap NullCol (Maybe Text) -> m [Maybe Text]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
Text
"SELECT pg_advisory_xact_lock(hashtextextended(?, ?))::text AS result"
[Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText (Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid]
(Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"result" Col Text
CText)
m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.tryWakeAncestorSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid]
ackJobsBatch
:: forall m payload
. (MonadArbiter m)
=> Text
-> Text
-> [JobRead payload]
-> m Int64
ackJobsBatch :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> [JobRead payload] -> m Int64
ackJobsBatch Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
ackJobsBatch Text
schemaName Text
tableName [JobRead payload]
jobs =
m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ [Int64] -> Int64
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int64] -> Int64) -> m [Int64] -> m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (JobRead payload -> m Int64) -> [JobRead payload] -> m [Int64]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (Text -> Text -> JobRead payload -> m Int64
forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> JobRead payload -> m Int64
ackJobInner Text
schemaName Text
tableName) [JobRead payload]
jobs
ackJobsBulk
:: forall m payload
. (MonadArbiter m)
=> Text
-> Text
-> [JobRead payload]
-> m Int64
ackJobsBulk :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> [JobRead payload] -> m Int64
ackJobsBulk Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
ackJobsBulk Text
schemaName Text
tableName [JobRead payload]
jobs = do
let ids :: [Int64]
ids = (JobRead payload -> Int64) -> [JobRead payload] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey [JobRead payload]
jobs
atts :: [Int32]
atts = (JobRead payload -> Int32) -> [JobRead payload] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts [JobRead payload]
jobs
rows <-
Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.ackJobsBulkSQL Text
schemaName Text
tableName)
[ Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
ids
, Col Int32 -> [Int32] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int32
CInt4 [Int32]
atts
]
RowCodec Int64
countCodec
case rows of
[Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
[Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"ackJobsBulk: unexpected result"
setVisibilityTimeout
:: forall m payload
. (MonadArbiter m)
=> Text
-> Text
-> NominalDiffTime
-> JobRead payload
-> m Int64
setVisibilityTimeout :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> NominalDiffTime -> JobRead payload -> m Int64
setVisibilityTimeout Text
schemaName Text
tableName NominalDiffTime
timeout JobRead payload
job =
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.setVisibilityTimeoutSQL Text
schemaName Text
tableName)
[ Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (NominalDiffTime -> Int64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
timeout)
, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
, Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job)
]
data VisibilityUpdateInfo = VisibilityUpdateInfo
{ VisibilityUpdateInfo -> Int64
vuiJobId :: Int64
, VisibilityUpdateInfo -> Bool
vuiWasUpdated :: Bool
, VisibilityUpdateInfo -> Maybe Int32
vuiCurrentDbAttempts :: Maybe Int32
}
deriving stock (VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
(VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool)
-> (VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool)
-> Eq VisibilityUpdateInfo
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
== :: VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
$c/= :: VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
/= :: VisibilityUpdateInfo -> VisibilityUpdateInfo -> Bool
Eq, (forall x. VisibilityUpdateInfo -> Rep VisibilityUpdateInfo x)
-> (forall x. Rep VisibilityUpdateInfo x -> VisibilityUpdateInfo)
-> Generic VisibilityUpdateInfo
forall x. Rep VisibilityUpdateInfo x -> VisibilityUpdateInfo
forall x. VisibilityUpdateInfo -> Rep VisibilityUpdateInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. VisibilityUpdateInfo -> Rep VisibilityUpdateInfo x
from :: forall x. VisibilityUpdateInfo -> Rep VisibilityUpdateInfo x
$cto :: forall x. Rep VisibilityUpdateInfo x -> VisibilityUpdateInfo
to :: forall x. Rep VisibilityUpdateInfo x -> VisibilityUpdateInfo
Generic, Int -> VisibilityUpdateInfo -> ShowS
[VisibilityUpdateInfo] -> ShowS
VisibilityUpdateInfo -> String
(Int -> VisibilityUpdateInfo -> ShowS)
-> (VisibilityUpdateInfo -> String)
-> ([VisibilityUpdateInfo] -> ShowS)
-> Show VisibilityUpdateInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> VisibilityUpdateInfo -> ShowS
showsPrec :: Int -> VisibilityUpdateInfo -> ShowS
$cshow :: VisibilityUpdateInfo -> String
show :: VisibilityUpdateInfo -> String
$cshowList :: [VisibilityUpdateInfo] -> ShowS
showList :: [VisibilityUpdateInfo] -> ShowS
Show)
setVisibilityTimeoutBatch
:: forall m payload
. (MonadArbiter m)
=> Text
-> Text
-> NominalDiffTime
-> [JobRead payload]
-> m [VisibilityUpdateInfo]
setVisibilityTimeoutBatch :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text
-> Text
-> NominalDiffTime
-> [JobRead payload]
-> m [VisibilityUpdateInfo]
setVisibilityTimeoutBatch Text
_ Text
_ NominalDiffTime
_ [] = [VisibilityUpdateInfo] -> m [VisibilityUpdateInfo]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
setVisibilityTimeoutBatch Text
schemaName Text
tableName NominalDiffTime
timeout [JobRead payload]
jobs = do
let valuesPlaceholder :: Text
valuesPlaceholder = Text -> [Text] -> Text
T.intercalate Text
"," ([Text] -> Text) -> [Text] -> Text
forall a b. (a -> b) -> a -> b
$ Int -> Text -> [Text]
forall a. Int -> a -> [a]
replicate ([JobRead payload] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [JobRead payload]
jobs) Text
"(?,?)"
jobParams :: Params
jobParams = (JobRead payload -> Params) -> [JobRead payload] -> Params
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\JobRead payload
job -> [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job), Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job)]) [JobRead payload]
jobs
params :: Params
params = Params
jobParams Params -> Params -> Params
forall a. Semigroup a => a -> a -> a
<> [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (NominalDiffTime -> Int64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
timeout)]
Text
-> Params
-> RowCodec VisibilityUpdateInfo
-> m [VisibilityUpdateInfo]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text -> Text
Tmpl.setVisibilityTimeoutBatchSQL Text
schemaName Text
tableName Text
valuesPlaceholder)
Params
params
RowCodec VisibilityUpdateInfo
visibilityUpdateCodec
updateJobForRetry
:: forall m payload
. (MonadArbiter m)
=> Text
-> Text
-> NominalDiffTime
-> Text
-> JobRead payload
-> m Int64
updateJobForRetry :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text
-> Text -> NominalDiffTime -> Text -> JobRead payload -> m Int64
updateJobForRetry Text
schemaName Text
tableName NominalDiffTime
backoff Text
errorMsg JobRead payload
job =
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.updateJobForRetrySQL Text
schemaName Text
tableName)
[ Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (NominalDiffTime -> Int64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
backoff)
, Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
errorMsg
, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
, Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job)
]
moveToDLQ
:: forall m payload
. (MonadArbiter m)
=> Text
-> Text
-> Text
-> JobRead payload
-> m Int64
moveToDLQ :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> Text -> JobRead payload -> m Int64
moveToDLQ Text
schemaName Text
tableName Text
errorMsg JobRead payload
job = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobRead payload
job) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Text -> Text -> Int64 -> m ()
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
snapshotDescendantRollups Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> Text -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Text -> m Int64
cascadeChildrenToDLQ Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job) Text
"Parent moved to DLQ"
countRows <-
Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.moveToDLQSQL Text
schemaName Text
tableName)
[ Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
, Col Int32 -> Int32 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int32
CInt4 (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts JobRead payload
job)
, Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
errorMsg
]
RowCodec Int64
countCodec
let rows = case [Int64]
countRows of
[Int64
n] -> Int64
n
[Int64]
_ -> Int64
0
when (rows > 0) $
for_ (parentId job) $ \Int64
pid ->
Text -> Text -> Int64 -> m ()
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
tryResumeParent Text
schemaName Text
tableName Int64
pid
pure rows
cascadeChildrenToDLQ
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> Text
-> m Int64
cascadeChildrenToDLQ :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Text -> m Int64
cascadeChildrenToDLQ Text
schemaName Text
tableName Int64
parentJobId Text
errorMsg = do
countRows <-
Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.cascadeChildrenToDLQSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId, Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
errorMsg]
RowCodec Int64
countCodec
case countRows of
[Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
[Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"cascadeChildrenToDLQ: unexpected result"
snapshotDescendantRollups
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m ()
snapshotDescendantRollups :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
snapshotDescendantRollups Text
schemaName Text
tableName Int64
parentJobId = do
rollupIds <-
Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.descendantRollupIdsSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
RowCodec Int64
int64Codec
for_ rollupIds $ \Int64
rid -> do
(results, errors, snap, _) <- Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
readChildResultsRaw Text
schemaName Text
tableName Int64
rid
let merged = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
mergeRawChildResults Map Int64 Value
results Map Int64 Text
errors Maybe Value
snap
when (not $ Map.null merged) $
void $
persistParentState schemaName tableName rid (toJSON merged)
moveToDLQBatch
:: forall m payload
. (MonadArbiter m)
=> Text
-> Text
-> [(JobRead payload, Text)]
-> m Int64
moveToDLQBatch :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> [(JobRead payload, Text)] -> m Int64
moveToDLQBatch Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
moveToDLQBatch Text
schemaName Text
tableName [(JobRead payload, Text)]
jobsWithErrors = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
[(JobRead payload, Text)]
-> ((JobRead payload, Text) -> m ()) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [(JobRead payload, Text)]
jobsWithErrors (((JobRead payload, Text) -> m ()) -> m ())
-> ((JobRead payload, Text) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(JobRead payload
job, Text
_) ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobRead payload
job) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Text -> Text -> Int64 -> m ()
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
snapshotDescendantRollups Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> Text -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Text -> m Int64
cascadeChildrenToDLQ Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job) Text
"Parent moved to DLQ"
let ids :: [Int64]
ids = ((JobRead payload, Text) -> Int64)
-> [(JobRead payload, Text)] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey (JobRead payload -> Int64)
-> ((JobRead payload, Text) -> JobRead payload)
-> (JobRead payload, Text)
-> Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (JobRead payload, Text) -> JobRead payload
forall a b. (a, b) -> a
fst) [(JobRead payload, Text)]
jobsWithErrors
atts :: [Int32]
atts = ((JobRead payload, Text) -> Int32)
-> [(JobRead payload, Text)] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts (JobRead payload -> Int32)
-> ((JobRead payload, Text) -> JobRead payload)
-> (JobRead payload, Text)
-> Int32
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (JobRead payload, Text) -> JobRead payload
forall a b. (a, b) -> a
fst) [(JobRead payload, Text)]
jobsWithErrors
msgs :: [Text]
msgs = ((JobRead payload, Text) -> Text)
-> [(JobRead payload, Text)] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map (JobRead payload, Text) -> Text
forall a b. (a, b) -> b
snd [(JobRead payload, Text)]
jobsWithErrors
countRows <-
Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.moveToDLQBatchSQL Text
schemaName Text
tableName)
[ Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
ids
, Col Int32 -> [Int32] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int32
CInt4 [Int32]
atts
, Col Text -> [Text] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Text
CText [Text]
msgs
]
RowCodec Int64
countCodec
let rows = case [Int64]
countRows of
[Int64
n] -> Int64
n
[Int64]
_ -> Int64
0
when (rows > 0) $ do
let parentIds = Set Int64 -> [Int64]
forall a. Set a -> [a]
Set.toAscList (Set Int64 -> [Int64])
-> ([Int64] -> Set Int64) -> [Int64] -> [Int64]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int64] -> Set Int64
forall a. Ord a => [a] -> Set a
Set.fromList ([Int64] -> [Int64]) -> [Int64] -> [Int64]
forall a b. (a -> b) -> a -> b
$ ((JobRead payload, Text) -> Maybe Int64)
-> [(JobRead payload, Text)] -> [Int64]
forall a b. (a -> Maybe b) -> [a] -> [b]
mapMaybe (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId (JobRead payload -> Maybe Int64)
-> ((JobRead payload, Text) -> JobRead payload)
-> (JobRead payload, Text)
-> Maybe Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (JobRead payload, Text) -> JobRead payload
forall a b. (a, b) -> a
fst) [(JobRead payload, Text)]
jobsWithErrors
for_ parentIds $ tryResumeParent schemaName tableName
pure rows
retryFromDLQ
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Int64
-> m (Maybe (JobRead payload))
retryFromDLQ :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
retryFromDLQ Text
schemaName Text
tableName Int64
dlqId = m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload))
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload)))
-> m (Maybe (JobRead payload)) -> m (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ do
rawJobs <-
Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.retryFromDLQSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
dlqId]
(Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
case rawJobs of
[] -> Maybe (JobRead payload) -> m (Maybe (JobRead payload))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JobRead payload)
forall a. Maybe a
Nothing
(Job Value Int64 Text UTCTime
raw : [Job Value Int64 Text UTCTime]
_) -> JobRead payload -> Maybe (JobRead payload)
forall a. a -> Maybe a
Just (JobRead payload -> Maybe (JobRead payload))
-> m (JobRead payload) -> m (Maybe (JobRead payload))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Job Value Int64 Text UTCTime -> m (JobRead payload)
forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
raw
dlqJobExists
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Bool
dlqJobExists :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Bool
dlqJobExists Text
schemaName Text
tableName Int64
dlqId = do
rows <-
Text -> Params -> RowCodec Bool -> m [Bool]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.dlqJobExistsSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
dlqId]
RowCodec Bool
boolCodec
case rows of
[Bool
b] -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
[Bool]
_ -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
listJobsFiltered
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> [Tmpl.JobFilter]
-> Int
-> Int
-> m [JobRead payload]
listJobsFiltered :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName [JobFilter]
filters Int
limit Int
offset = do
let (Text
whereClause, Params
filterParams) = [JobFilter] -> (Text, Params)
buildWhereClause [JobFilter]
filters
sql :: Text
sql = Text -> Text -> Text -> Text
Tmpl.listJobsFilteredSQL Text
schemaName Text
tableName Text
whereClause
params :: Params
params = Params
filterParams Params -> Params -> Params
forall a. Semigroup a => a -> a -> a
<> [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
limit), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
offset)]
rawJobs <- Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
params (Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
mapM decodePayload rawJobs
countJobsFiltered
:: (MonadArbiter m)
=> Text
-> Text
-> [Tmpl.JobFilter]
-> m Int64
countJobsFiltered :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName [JobFilter]
filters = do
let (Text
whereClause, Params
filterParams) = [JobFilter] -> (Text, Params)
buildWhereClause [JobFilter]
filters
sql :: Text
sql = Text -> Text -> Text -> Text
Tmpl.countJobsFilteredSQL Text
schemaName Text
tableName Text
whereClause
rows <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
filterParams RowCodec Int64
countCodec
case rows of
[Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
[Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"countJobsFiltered: unexpected result"
listDLQFiltered
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> [Tmpl.JobFilter]
-> Int
-> Int
-> m [DLQ.DLQJob payload]
listDLQFiltered :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
listDLQFiltered Text
schemaName Text
tableName [JobFilter]
filters Int
limit Int
offset = do
let (Text
whereClause, Params
filterParams) = [JobFilter] -> (Text, Params)
buildWhereClause [JobFilter]
filters
sql :: Text
sql = Text -> Text -> Text -> Text
Tmpl.listDLQFilteredSQL Text
schemaName Text
tableName Text
whereClause
params :: Params
params = Params
filterParams Params -> Params -> Params
forall a. Semigroup a => a -> a -> a
<> [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
limit), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 (Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
offset)]
rawRows <- Text
-> Params
-> RowCodec (Int64, UTCTime, Job Value Int64 Text UTCTime)
-> m [(Int64, UTCTime, Job Value Int64 Text UTCTime)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
params (Text -> RowCodec (Int64, UTCTime, Job Value Int64 Text UTCTime)
dlqRowCodec Text
tableName)
mapM decodeDLQRow rawRows
countDLQFiltered
:: (MonadArbiter m)
=> Text
-> Text
-> [Tmpl.JobFilter]
-> m Int64
countDLQFiltered :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countDLQFiltered Text
schemaName Text
tableName [JobFilter]
filters = do
let (Text
whereClause, Params
filterParams) = [JobFilter] -> (Text, Params)
buildWhereClause [JobFilter]
filters
sql :: Text
sql = Text -> Text -> Text -> Text
Tmpl.countDLQFilteredSQL Text
schemaName Text
tableName Text
whereClause
rows <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
sql Params
filterParams RowCodec Int64
countCodec
case rows of
[Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
[Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"countDLQFiltered: unexpected result"
decodeDLQRow
:: (JobPayload payload, MonadArbiter m)
=> (Int64, UTCTime, Job Value Int64 Text UTCTime)
-> m (DLQ.DLQJob payload)
decodeDLQRow :: forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
(Int64, UTCTime, Job Value Int64 Text UTCTime)
-> m (DLQJob payload)
decodeDLQRow (Int64
dlqId, UTCTime
dlqFailedAt, Job Value Int64 Text UTCTime
rawJob) = do
jobSnapshot <- Job Value Int64 Text UTCTime -> m (JobRead payload)
forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
rawJob
pure $
DLQ.DLQJob
{ DLQ.dlqPrimaryKey = dlqId
, DLQ.failedAt = dlqFailedAt
, DLQ.jobSnapshot = jobSnapshot
}
listDLQJobs
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Int
-> Int
-> m [DLQ.DLQJob payload]
listDLQJobs :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> Int -> m [DLQJob payload]
listDLQJobs Text
schemaName Text
tableName = Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
listDLQFiltered Text
schemaName Text
tableName []
listDLQJobsByParent
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Int64
-> Int
-> Int
-> m [DLQ.DLQJob payload]
listDLQJobsByParent :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> Int -> Int -> m [DLQJob payload]
listDLQJobsByParent Text
schemaName Text
tableName Int64
parentJobId =
Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
listDLQFiltered Text
schemaName Text
tableName [Int64 -> JobFilter
Tmpl.FilterParentId Int64
parentJobId]
countDLQJobsByParent
:: (MonadArbiter m)
=> Text -> Text -> Int64 -> m Int64
countDLQJobsByParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
countDLQJobsByParent Text
schemaName Text
tableName Int64
parentJobId =
Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countDLQFiltered Text
schemaName Text
tableName [Int64 -> JobFilter
Tmpl.FilterParentId Int64
parentJobId]
deleteDLQJob
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Int64
deleteDLQJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
deleteDLQJob Text
schemaName Text
tableName Int64
dlqId = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
rows <-
Text -> Params -> Ap NullCol (Maybe Int64) -> m [Maybe Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.deleteDLQJobSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
dlqId]
Ap NullCol (Maybe Int64)
nullableInt64Codec
case rows of
[] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
(Just Int64
pid : [Maybe Int64]
_) -> do
Text -> Text -> Int64 -> m ()
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m ()
tryResumeParent Text
schemaName Text
tableName Int64
pid
Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
1
[Maybe Int64]
_ -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
1
deleteDLQJobsBatch
:: (MonadArbiter m)
=> Text
-> Text
-> [Int64]
-> m Int64
deleteDLQJobsBatch :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m Int64
deleteDLQJobsBatch Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
deleteDLQJobsBatch Text
schemaName Text
tableName [Int64]
dlqIds = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
rows <-
Text -> Params -> Ap NullCol (Maybe Int64) -> m [Maybe Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.deleteDLQJobsBatchSQL Text
schemaName Text
tableName)
[Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
dlqIds]
Ap NullCol (Maybe Int64)
nullableInt64Codec
let parentIds = Set Int64 -> [Int64]
forall a. Set a -> [a]
Set.toAscList (Set Int64 -> [Int64])
-> ([Int64] -> Set Int64) -> [Int64] -> [Int64]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Int64] -> Set Int64
forall a. Ord a => [a] -> Set a
Set.fromList ([Int64] -> [Int64]) -> [Int64] -> [Int64]
forall a b. (a -> b) -> a -> b
$ [Maybe Int64] -> [Int64]
forall a. [Maybe a] -> [a]
catMaybes [Maybe Int64]
rows
for_ parentIds $ tryResumeParent schemaName tableName
pure $ fromIntegral (length rows)
listJobs
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Int
-> Int
-> m [JobRead payload]
listJobs :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> Int -> m [JobRead payload]
listJobs Text
schemaName Text
tableName = Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName []
getJobById
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Int64
-> m (Maybe (JobRead payload))
getJobById :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
getJobById Text
schemaName Text
tableName Int64
jobId = do
rawJobs <-
Text
-> Params
-> RowCodec (Job Value Int64 Text UTCTime)
-> m [Job Value Int64 Text UTCTime]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.getJobByIdSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
(Text -> RowCodec (Job Value Int64 Text UTCTime)
jobRowCodec Text
tableName)
case rawJobs of
[] -> Maybe (JobRead payload) -> m (Maybe (JobRead payload))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (JobRead payload)
forall a. Maybe a
Nothing
(Job Value Int64 Text UTCTime
raw : [Job Value Int64 Text UTCTime]
_) -> JobRead payload -> Maybe (JobRead payload)
forall a. a -> Maybe a
Just (JobRead payload -> Maybe (JobRead payload))
-> m (JobRead payload) -> m (Maybe (JobRead payload))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Job Value Int64 Text UTCTime -> m (JobRead payload)
forall payload (m :: * -> *).
(JobPayload payload, MonadArbiter m) =>
Job Value Int64 Text UTCTime -> m (JobRead payload)
decodePayload Job Value Int64 Text UTCTime
raw
getJobsByGroup
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Text
-> Int
-> Int
-> m [JobRead payload]
getJobsByGroup :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Text -> Int -> Int -> m [JobRead payload]
getJobsByGroup Text
schemaName Text
tableName Text
gk =
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName [Text -> JobFilter
Tmpl.FilterGroupKey Text
gk]
getInFlightJobs
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Int
-> Int
-> m [JobRead payload]
getInFlightJobs :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> Int -> m [JobRead payload]
getInFlightJobs Text
schemaName Text
tableName =
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName [JobFilter
Tmpl.FilterInFlight]
cancelJob
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Int64
cancelJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJob Text
schemaName Text
tableName Int64
jobId = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobInner Text
schemaName Text
tableName Int64
jobId
cancelJobInner
:: (MonadArbiter m)
=> Text -> Text -> Int64 -> m Int64
cancelJobInner :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobInner Text
schemaName Text
tableName Int64
jobId = do
parentRows <-
Text -> Params -> Ap NullCol (Maybe Int64) -> m [Maybe Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.getParentIdSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
Ap NullCol (Maybe Int64)
nullableInt64Codec
let mParentId = case [Maybe Int64]
parentRows of
[Just Int64
pid] -> Int64 -> Maybe Int64
forall a. a -> Maybe a
Just Int64
pid
[Maybe Int64]
_ -> Maybe Int64
forall a. Maybe a
Nothing
for_ mParentId $ \Int64
pid ->
m [Maybe Text] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Maybe Text] -> m ()) -> m [Maybe Text] -> m ()
forall a b. (a -> b) -> a -> b
$
Text -> Params -> Ap NullCol (Maybe Text) -> m [Maybe Text]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
Text
"SELECT pg_advisory_xact_lock(hashtextextended(?, ?))::text AS result"
[Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText (Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"." Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tableName), Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
pid]
(Text -> Col Text -> Ap NullCol (Maybe Text)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"result" Col Text
CText)
rows <-
executeQuery
(Tmpl.cancelJobSQL schemaName tableName)
[pval CInt8 jobId, pval CInt8 jobId]
int64Codec
case rows of
[Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
[Int64]
_ -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
cancelJobsBatch
:: (MonadArbiter m)
=> Text
-> Text
-> [Int64]
-> m Int64
cancelJobsBatch :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m Int64
cancelJobsBatch Text
_ Text
_ [] = Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
0
cancelJobsBatch Text
schemaName Text
tableName [Int64]
jobIds =
m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ [Int64] -> Int64
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int64] -> Int64) -> m [Int64] -> m Int64
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Int64 -> m Int64) -> [Int64] -> m [Int64]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (Text -> Text -> Int64 -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobInner Text
schemaName Text
tableName) [Int64]
jobIds
promoteJob
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Int64
promoteJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
promoteJob Text
schemaName Text
tableName Int64
jobId =
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.promoteJobSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
data QueueStats = QueueStats
{ QueueStats -> Int64
totalJobs :: Int64
, QueueStats -> Int64
visibleJobs :: Int64
, QueueStats -> Int64
invisibleJobs :: Int64
, QueueStats -> Maybe Double
oldestJobAgeSeconds :: Maybe Double
}
deriving stock (QueueStats -> QueueStats -> Bool
(QueueStats -> QueueStats -> Bool)
-> (QueueStats -> QueueStats -> Bool) -> Eq QueueStats
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: QueueStats -> QueueStats -> Bool
== :: QueueStats -> QueueStats -> Bool
$c/= :: QueueStats -> QueueStats -> Bool
/= :: QueueStats -> QueueStats -> Bool
Eq, (forall x. QueueStats -> Rep QueueStats x)
-> (forall x. Rep QueueStats x -> QueueStats) -> Generic QueueStats
forall x. Rep QueueStats x -> QueueStats
forall x. QueueStats -> Rep QueueStats x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. QueueStats -> Rep QueueStats x
from :: forall x. QueueStats -> Rep QueueStats x
$cto :: forall x. Rep QueueStats x -> QueueStats
to :: forall x. Rep QueueStats x -> QueueStats
Generic, Int -> QueueStats -> ShowS
[QueueStats] -> ShowS
QueueStats -> String
(Int -> QueueStats -> ShowS)
-> (QueueStats -> String)
-> ([QueueStats] -> ShowS)
-> Show QueueStats
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> QueueStats -> ShowS
showsPrec :: Int -> QueueStats -> ShowS
$cshow :: QueueStats -> String
show :: QueueStats -> String
$cshowList :: [QueueStats] -> ShowS
showList :: [QueueStats] -> ShowS
Show)
deriving anyclass (Maybe QueueStats
Value -> Parser [QueueStats]
Value -> Parser QueueStats
(Value -> Parser QueueStats)
-> (Value -> Parser [QueueStats])
-> Maybe QueueStats
-> FromJSON QueueStats
forall a.
(Value -> Parser a)
-> (Value -> Parser [a]) -> Maybe a -> FromJSON a
$cparseJSON :: Value -> Parser QueueStats
parseJSON :: Value -> Parser QueueStats
$cparseJSONList :: Value -> Parser [QueueStats]
parseJSONList :: Value -> Parser [QueueStats]
$comittedField :: Maybe QueueStats
omittedField :: Maybe QueueStats
FromJSON, [QueueStats] -> Encoding
[QueueStats] -> Value
QueueStats -> Bool
QueueStats -> Encoding
QueueStats -> Value
(QueueStats -> Value)
-> (QueueStats -> Encoding)
-> ([QueueStats] -> Value)
-> ([QueueStats] -> Encoding)
-> (QueueStats -> Bool)
-> ToJSON QueueStats
forall a.
(a -> Value)
-> (a -> Encoding)
-> ([a] -> Value)
-> ([a] -> Encoding)
-> (a -> Bool)
-> ToJSON a
$ctoJSON :: QueueStats -> Value
toJSON :: QueueStats -> Value
$ctoEncoding :: QueueStats -> Encoding
toEncoding :: QueueStats -> Encoding
$ctoJSONList :: [QueueStats] -> Value
toJSONList :: [QueueStats] -> Value
$ctoEncodingList :: [QueueStats] -> Encoding
toEncodingList :: [QueueStats] -> Encoding
$comitField :: QueueStats -> Bool
omitField :: QueueStats -> Bool
ToJSON)
getQueueStats
:: (MonadArbiter m)
=> Text
-> Text
-> m QueueStats
getQueueStats :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> m QueueStats
getQueueStats Text
schemaName Text
tableName = do
rows <-
Text
-> Params
-> RowCodec (Int64, Int64, Maybe Double)
-> m [(Int64, Int64, Maybe Double)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.getQueueStatsSQL Text
schemaName Text
tableName)
[]
RowCodec (Int64, Int64, Maybe Double)
statsRowCodec
case rows of
[] -> QueueStats -> m QueueStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (QueueStats -> m QueueStats) -> QueueStats -> m QueueStats
forall a b. (a -> b) -> a -> b
$ Int64 -> Int64 -> Int64 -> Maybe Double -> QueueStats
QueueStats Int64
0 Int64
0 Int64
0 Maybe Double
forall a. Maybe a
Nothing
((Int64
total, Int64
visible, Maybe Double
age) : [(Int64, Int64, Maybe Double)]
_) -> QueueStats -> m QueueStats
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (QueueStats -> m QueueStats) -> QueueStats -> m QueueStats
forall a b. (a -> b) -> a -> b
$ Int64 -> Int64 -> Int64 -> Maybe Double -> QueueStats
QueueStats Int64
total Int64
visible (Int64
total Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
visible) Maybe Double
age
countJobs
:: (MonadArbiter m)
=> Text -> Text -> m Int64
countJobs :: forall (m :: * -> *). MonadArbiter m => Text -> Text -> m Int64
countJobs Text
schemaName Text
tableName = Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName []
countJobsByGroup
:: (MonadArbiter m)
=> Text -> Text -> Text -> m Int64
countJobsByGroup :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Text -> m Int64
countJobsByGroup Text
schemaName Text
tableName Text
gk =
Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName [Text -> JobFilter
Tmpl.FilterGroupKey Text
gk]
countInFlightJobs
:: (MonadArbiter m)
=> Text -> Text -> m Int64
countInFlightJobs :: forall (m :: * -> *). MonadArbiter m => Text -> Text -> m Int64
countInFlightJobs Text
schemaName Text
tableName =
Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName [JobFilter
Tmpl.FilterInFlight]
countDLQJobs
:: (MonadArbiter m)
=> Text -> Text -> m Int64
countDLQJobs :: forall (m :: * -> *). MonadArbiter m => Text -> Text -> m Int64
countDLQJobs Text
schemaName Text
tableName = Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countDLQFiltered Text
schemaName Text
tableName []
getJobsByParent
:: forall m payload
. (JobPayload payload, MonadArbiter m)
=> Text
-> Text
-> Int64
-> Int
-> Int
-> m [JobRead payload]
getJobsByParent :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> Int -> Int -> m [JobRead payload]
getJobsByParent Text
schemaName Text
tableName Int64
pid =
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
listJobsFiltered Text
schemaName Text
tableName [Int64 -> JobFilter
Tmpl.FilterParentId Int64
pid]
countJobsByParent
:: (MonadArbiter m)
=> Text -> Text -> Int64 -> m Int64
countJobsByParent :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
countJobsByParent Text
schemaName Text
tableName Int64
pid =
Text -> Text -> [JobFilter] -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [JobFilter] -> m Int64
countJobsFiltered Text
schemaName Text
tableName [Int64 -> JobFilter
Tmpl.FilterParentId Int64
pid]
countChildrenBatch
:: (MonadArbiter m)
=> Text -> Text -> [Int64] -> m (Map.Map Int64 (Int64, Int64))
countChildrenBatch :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m (Map Int64 (Int64, Int64))
countChildrenBatch Text
_ Text
_ [] = Map Int64 (Int64, Int64) -> m (Map Int64 (Int64, Int64))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Map Int64 (Int64, Int64)
forall k a. Map k a
Map.empty
countChildrenBatch Text
schemaName Text
tableName [Int64]
ids = do
rows <-
Text
-> Params
-> RowCodec (Int64, (Int64, Int64))
-> m [(Int64, (Int64, Int64))]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.countChildrenBatchSQL Text
schemaName Text
tableName)
[Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
ids]
RowCodec (Int64, (Int64, Int64))
parentCountCodec
pure $ Map.fromList rows
countDLQChildrenBatch
:: (MonadArbiter m)
=> Text -> Text -> [Int64] -> m (Map.Map Int64 Int64)
countDLQChildrenBatch :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m (Map Int64 Int64)
countDLQChildrenBatch Text
_ Text
_ [] = Map Int64 Int64 -> m (Map Int64 Int64)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Map Int64 Int64
forall k a. Map k a
Map.empty
countDLQChildrenBatch Text
schemaName Text
tableName [Int64]
ids = do
rows <-
Text -> Params -> RowCodec (Int64, Int64) -> m [(Int64, Int64)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.countDLQChildrenBatchSQL Text
schemaName Text
tableName)
[Col Int64 -> [Int64] -> SomeParam
forall a. Col a -> [a] -> SomeParam
parr Col Int64
CInt8 [Int64]
ids]
RowCodec (Int64, Int64)
dlqParentCountCodec
pure $ Map.fromList rows
pauseChildren
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Int64
pauseChildren :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
pauseChildren Text
schemaName Text
tableName Int64
parentJobId =
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.pauseChildrenSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
resumeChildren
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Int64
resumeChildren :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
resumeChildren Text
schemaName Text
tableName Int64
parentJobId =
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.resumeChildrenSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
cancelJobCascade
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Int64
cancelJobCascade :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobCascade Text
schemaName Text
tableName Int64
jobId = m Int64 -> m Int64
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m Int64 -> m Int64) -> m Int64 -> m Int64
forall a b. (a -> b) -> a -> b
$ do
parentRows <-
Text -> Params -> Ap NullCol (Maybe Int64) -> m [Maybe Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.getParentIdSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
Ap NullCol (Maybe Int64)
nullableInt64Codec
let rootParentId = case [Maybe Int64]
parentRows of
[Just Int64
pid] -> Int64 -> Maybe Int64
forall a. a -> Maybe a
Just Int64
pid
[Maybe Int64]
_ -> Maybe Int64
forall a. Maybe a
Nothing
countRows <-
executeQuery
(Tmpl.cancelJobCascadeSQL schemaName tableName)
[pval CInt8 jobId]
countCodec
let deleted = case [Int64]
countRows of
[Int64
n] -> Int64
n
[Int64]
_ -> Int64
0
when (deleted > 0) $
for_ rootParentId $
tryResumeParent schemaName tableName
pure deleted
cancelJobTree
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Int64
cancelJobTree :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
cancelJobTree Text
schemaName Text
tableName Int64
jobId = do
countRows <-
Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.cancelJobTreeSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
RowCodec Int64
countCodec
case countRows of
[Int64
n] -> Int64 -> m Int64
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int64
n
[Int64]
_ -> Text -> m Int64
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing Text
"cancelJobTree: unexpected result"
suspendJob
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Int64
suspendJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
suspendJob Text
schemaName Text
tableName Int64
jobId =
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.suspendJobSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
resumeJob
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m Int64
resumeJob :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
resumeJob Text
schemaName Text
tableName Int64
jobId =
Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement
(Text -> Text -> Text
Tmpl.resumeJobSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
refreshGroups
:: (MonadArbiter m)
=> Text
-> Text
-> Int
-> m ()
refreshGroups :: forall (m :: * -> *). MonadArbiter m => Text -> Text -> Int -> m ()
refreshGroups Text
schemaName Text
tableName Int
intervalSecs = do
seqResult <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.checkReaperSeqSQL Text
schemaName Text
tableName) [] RowCodec Int64
int64Codec
case seqResult of
[Int64
lastEpoch] -> do
nowResult <- Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery Text
"SELECT extract(epoch FROM now())::bigint AS result" [] RowCodec Int64
int64Codec
case nowResult of
[Int64
nowEpoch]
| Int64
nowEpoch Int64 -> Int64 -> Int64
forall a. Num a => a -> a -> a
- Int64
lastEpoch Int64 -> Int64 -> Bool
forall a. Ord a => a -> a -> Bool
< Int -> Int64
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
intervalSecs -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[Int64]
_ -> m ()
doRefresh
[Int64]
_ -> m ()
doRefresh
where
doRefresh :: m ()
doRefresh = m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
let tbl :: Text
tbl = Text -> Text -> Text
jobQueueTable Text
schemaName Text
tableName
acquired <-
Text -> Params -> RowCodec Bool -> m [Bool]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
Text
"SELECT pg_try_advisory_xact_lock(hashtextextended(?, ?)) AS result"
[Col Text -> Text -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Text
CText Text
tbl, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
2]
RowCodec Bool
boolCodec
case acquired of
[Bool
True] -> do
m [Int64] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Int64] -> m ()) -> m [Int64] -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.lockGroupsSQL Text
schemaName Text
tableName) [] RowCodec Int64
int64Codec
m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement (Text -> Text -> Text
Tmpl.refreshGroupsSQL Text
schemaName Text
tableName) []
m [Int64] -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m [Int64] -> m ()) -> m [Int64] -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Params -> RowCodec Int64 -> m [Int64]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery (Text -> Text -> Text
Tmpl.updateReaperSeqSQL Text
schemaName Text
tableName) [] RowCodec Int64
int64Codec
[Bool]
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
readChildResultsRaw
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m (Map.Map Int64 Value, Map.Map Int64 Text, Maybe Value, Map.Map Int64 Text)
readChildResultsRaw :: forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
readChildResultsRaw Text
schemaName Text
tableName Int64
parentJobId = do
rows <-
Text
-> Params
-> RowCodec
(Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
-> m [(Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.readChildResultsSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId, Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
parentJobId]
RowCodec (Text, Maybe Int64, Maybe Value, Maybe Text, Maybe Int64)
childResultsRowCodec
foldM parseRow (Map.empty, Map.empty, Nothing, Map.empty) rows
where
parseRow :: (Map k a, Map k a, Maybe a, Map k a)
-> (a, Maybe k, Maybe a, Maybe a, Maybe k)
-> f (Map k a, Map k a, Maybe a, Map k a)
parseRow (!Map k a
results, !Map k a
errors, !Maybe a
snap, !Map k a
dlqFailures) (a, Maybe k, Maybe a, Maybe a, Maybe k)
row = case (a, Maybe k, Maybe a, Maybe a, Maybe k)
row of
(a
"r", Just k
cid, Just a
val, Maybe a
_, Maybe k
_) ->
(Map k a, Map k a, Maybe a, Map k a)
-> f (Map k a, Map k a, Maybe a, Map k a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
cid a
val Map k a
results, Map k a
errors, Maybe a
snap, Map k a
dlqFailures)
(a
"e", Just k
jid, Maybe a
_, Just a
err, Just k
dlqPk) ->
(Map k a, Map k a, Maybe a, Map k a)
-> f (Map k a, Map k a, Maybe a, Map k a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map k a
results, k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
jid a
err Map k a
errors, Maybe a
snap, k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
dlqPk a
err Map k a
dlqFailures)
(a
"e", Just k
jid, Maybe a
_, Maybe a
Nothing, Just k
dlqPk) ->
(Map k a, Map k a, Maybe a, Map k a)
-> f (Map k a, Map k a, Maybe a, Map k a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map k a
results, k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
jid a
"" Map k a
errors, Maybe a
snap, k -> a -> Map k a -> Map k a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert k
dlqPk a
"" Map k a
dlqFailures)
(a
"s", Maybe k
_, Just a
val, Maybe a
_, Maybe k
_) ->
(Map k a, Map k a, Maybe a, Map k a)
-> f (Map k a, Map k a, Maybe a, Map k a)
forall a. a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map k a
results, Map k a
errors, a -> Maybe a
forall a. a -> Maybe a
Just a
val, Map k a
dlqFailures)
(a, Maybe k, Maybe a, Maybe a, Maybe k)
_ -> Text -> f (Map k a, Map k a, Maybe a, Map k a)
forall (m :: * -> *) a. MonadIO m => Text -> m a
throwParsing (Text -> f (Map k a, Map k a, Maybe a, Map k a))
-> Text -> f (Map k a, Map k a, Maybe a, Map k a)
forall a b. (a -> b) -> a -> b
$ Text
"readChildResultsRaw: unexpected row: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack ((a, Maybe k, Maybe a, Maybe a, Maybe k) -> String
forall a. Show a => a -> String
show (a, Maybe k, Maybe a, Maybe a, Maybe k)
row)
getParentStateSnapshot
:: (MonadArbiter m)
=> Text
-> Text
-> Int64
-> m (Maybe Value)
getParentStateSnapshot :: forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m (Maybe Value)
getParentStateSnapshot Text
schemaName Text
tableName Int64
jobId = do
rows <-
Text -> Params -> Ap NullCol (Maybe Value) -> m [Maybe Value]
forall a. Text -> Params -> RowCodec a -> m [a]
forall (m :: * -> *) a.
MonadArbiter m =>
Text -> Params -> RowCodec a -> m [a]
executeQuery
(Text -> Text -> Text
Tmpl.getParentStateSnapshotSQL Text
schemaName Text
tableName)
[Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
jobId]
(Text -> Col Value -> Ap NullCol (Maybe Value)
forall a. Text -> Col a -> RowCodec (Maybe a)
ncol Text
"parent_state" Col Value
CJsonb)
case rows of
[Maybe Value
val] -> Maybe Value -> m (Maybe Value)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Value
val
[Maybe Value]
_ -> Maybe Value -> m (Maybe Value)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Value
forall a. Maybe a
Nothing
mergeRawChildResults
:: Map.Map Int64 Value
-> Map.Map Int64 Text
-> Maybe Value
-> Map.Map Int64 (Either Text Value)
mergeRawChildResults :: Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot =
(Text -> Either Text Value)
-> Map Int64 Text -> Map Int64 (Either Text Value)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map Text -> Either Text Value
forall a b. a -> Either a b
Left Map Int64 Text
failures
Map Int64 (Either Text Value)
-> Map Int64 (Either Text Value) -> Map Int64 (Either Text Value)
forall k a. Ord k => Map k a -> Map k a -> Map k a
`Map.union` (Value -> Either Text Value)
-> Map Int64 Value -> Map Int64 (Either Text Value)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map Value -> Either Text Value
forall a b. b -> Either a b
Right Map Int64 Value
results
Map Int64 (Either Text Value)
-> Map Int64 (Either Text Value) -> Map Int64 (Either Text Value)
forall k a. Ord k => Map k a -> Map k a -> Map k a
`Map.union` Map Int64 (Either Text Value)
base
where
base :: Map Int64 (Either Text Value)
base = case Maybe Value
mSnapshot of
Just Value
val | Success Map Int64 (Either Text Value)
m <- Value -> Result (Map Int64 (Either Text Value))
forall a. FromJSON a => Value -> Result a
fromJSON Value
val -> Map Int64 (Either Text Value)
m
Maybe Value
_ -> Map Int64 (Either Text Value)
forall k a. Map k a
Map.empty