{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE UndecidableInstances #-}
module Arbiter.Worker
(
runWorkerPool
, NamedWorkerPool (..)
, namedWorkerPool
, runWorkerPools
, runSelectedWorkerPools
, getEnabledQueues
, JobResult (..)
, module Arbiter.Worker.Config
, module Arbiter.Worker.BackoffStrategy
, module Arbiter.Worker.Logger
, module Arbiter.Worker.WorkerState
, CronJob (..)
, OverlapPolicy (..)
, BackfillPolicy (..)
, cronJob
, initCronSchedules
, overlapPolicyToText
, overlapPolicyFromText
) where
import Arbiter.Core.Exceptions
( BranchCancelException (..)
, JobException (..)
, JobNotFoundException (..)
, JobPermanentException (..)
, JobRetryableException (..)
, JobStolenException
, ParsingException (..)
, TreeCancelException (..)
, throwJobNotFound
)
import Arbiter.Core.HasArbiterSchema (HasArbiterSchema (..))
import Arbiter.Core.HighLevel (JobOperation, QueueOperation)
import Arbiter.Core.HighLevel qualified as Arb
import Arbiter.Core.Job.Types qualified as Job
import Arbiter.Core.MonadArbiter (MonadArbiter (..))
import Arbiter.Core.Operations qualified as Ops
import Arbiter.Core.QueueRegistry (RegistryTables (..), TableForPayload)
import Control.Exception (SomeException, fromException)
import Control.Monad (forever, replicateM, unless, void, when)
import Control.Monad.Catch (MonadMask)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Cont (ContT (..), evalContT)
import Data.Aeson (FromJSON, ToJSON, Value, toJSON)
import Data.Aeson qualified as Aeson
import Data.Foldable (toList)
import Data.Int (Int32, Int64)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (NominalDiffTime, UTCTime, getCurrentTime)
import Data.Traversable (for)
import GHC.TypeLits (symbolVal)
import System.Directory (removeFile)
import System.Environment (lookupEnv)
import UnliftIO
( MonadUnliftIO
, atomically
, checkSTM
, finally
, isEmptyTBQueue
, lengthTBQueue
, mask
, modifyTVar'
, newTBQueueIO
, newTVarIO
, readTBQueue
, readTVar
, throwIO
, tryAny
, waitAnyCatch
, withAsync
, writeTVar
)
import UnliftIO.Async (race)
import UnliftIO.Async qualified as Async
import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Concurrent qualified as Conc
import UnliftIO.MVar qualified as MVar
import UnliftIO.STM (TBQueue, TVar)
import Arbiter.Worker.BackoffStrategy
import Arbiter.Worker.Config
import Arbiter.Worker.Cron
( BackfillPolicy (..)
, CronJob (..)
, OverlapPolicy (..)
, cronJob
, initCronSchedules
, overlapPolicyFromText
, overlapPolicyToText
, runCronScheduler
)
import Arbiter.Worker.Dispatcher
import Arbiter.Worker.Heartbeat (withJobsHeartbeat)
import Arbiter.Worker.Logger
import Arbiter.Worker.Logger.Internal (runHook, tryLog, withJobContext)
import Arbiter.Worker.Retry (retryOnException)
import Arbiter.Worker.WorkerState
class JobResult a where
encodeJobResult :: a -> Maybe Value
decodeJobResult :: Value -> Either Text a
instance JobResult () where
encodeJobResult :: () -> Maybe Value
encodeJobResult ()
_ = Maybe Value
forall a. Maybe a
Nothing
decodeJobResult :: Value -> Either Text ()
decodeJobResult Value
_ = () -> Either Text ()
forall a b. b -> Either a b
Right ()
instance {-# OVERLAPPABLE #-} (FromJSON a, ToJSON a) => JobResult a where
encodeJobResult :: a -> Maybe Value
encodeJobResult = Value -> Maybe Value
forall a. a -> Maybe a
Just (Value -> Maybe Value) -> (a -> Value) -> a -> Maybe Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Value
forall a. ToJSON a => a -> Value
toJSON
decodeJobResult :: Value -> Either Text a
decodeJobResult Value
v = case Value -> Result a
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON Value
v of
Aeson.Success a
a -> a -> Either Text a
forall a b. b -> Either a b
Right a
a
Aeson.Error String
err -> Text -> Either Text a
forall a b. a -> Either a b
Left (String -> Text
T.pack String
err)
data NamedWorkerPool m
= forall registry payload result.
(JobResult result, QueueOperation m registry payload) =>
NamedWorkerPool
{ forall (m :: * -> *). NamedWorkerPool m -> Text
workerPoolName :: Text
, ()
workerPoolConfig :: WorkerConfig m payload result
}
namedWorkerPool
:: forall m registry payload result
. (JobResult result, QueueOperation m registry payload)
=> WorkerConfig m payload result
-> NamedWorkerPool m
namedWorkerPool :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobResult result, QueueOperation m registry payload) =>
WorkerConfig m payload result -> NamedWorkerPool m
namedWorkerPool WorkerConfig m payload result
cfg =
NamedWorkerPool
{ workerPoolName :: Text
workerPoolName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
, workerPoolConfig :: WorkerConfig m payload result
workerPoolConfig = WorkerConfig m payload result
cfg
}
runWorkerPools
:: forall m registry
. (MonadMask m, MonadUnliftIO m, RegistryTables registry)
=> Proxy registry
-> [NamedWorkerPool m]
-> (TVar WorkerState -> IO ())
-> m ()
runWorkerPools :: forall (m :: * -> *) (registry :: JobPayloadRegistry).
(MonadMask m, MonadUnliftIO m, RegistryTables registry) =>
Proxy registry
-> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m ()
runWorkerPools Proxy registry
registry [NamedWorkerPool m]
pools TVar WorkerState -> IO ()
setup = do
sharedState <- IO (TVar WorkerState) -> m (TVar WorkerState)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TVar WorkerState)
newWorkerState
liftIO $ setup sharedState
enabled <- liftIO $ getEnabledQueues "ARBITER_ENABLED_QUEUES" registry
runSelectedWorkerPools sharedState enabled pools
runSelectedWorkerPools
:: (MonadMask m, MonadUnliftIO m)
=> TVar WorkerState
-> [Text]
-> [NamedWorkerPool m]
-> m ()
runSelectedWorkerPools :: forall (m :: * -> *).
(MonadMask m, MonadUnliftIO m) =>
TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m ()
runSelectedWorkerPools TVar WorkerState
sharedState [Text]
enabled [NamedWorkerPool m]
pools =
case (NamedWorkerPool m -> Bool)
-> [NamedWorkerPool m] -> [NamedWorkerPool m]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(NamedWorkerPool Text
name WorkerConfig m payload result
_) -> Text
name Text -> [Text] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Text]
enabled) [NamedWorkerPool m]
pools of
[] -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[NamedWorkerPool m]
selected -> ContT () m () -> m ()
forall (m :: * -> *) r. Monad m => ContT r m r -> m r
evalContT (ContT () m () -> m ()) -> ContT () m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
asyncs <- [NamedWorkerPool m]
-> (NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for [NamedWorkerPool m]
selected ((NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()])
-> (NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()]
forall a b. (a -> b) -> a -> b
$ \(NamedWorkerPool Text
name WorkerConfig m payload result
cfg) ->
let cfg' :: WorkerConfig m payload result
cfg' =
WorkerConfig m payload result
cfg
{ workerStateVar = sharedState
, logConfig = withPoolContext name (logConfig cfg)
}
in ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Async () -> m ()) -> m ()) -> ContT () m (Async ()))
-> ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall a b. (a -> b) -> a -> b
$ \Async () -> m ()
k -> m () -> (Async () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
Async.withAsync (WorkerConfig m payload result -> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobResult result, MonadMask m, MonadUnliftIO m,
QueueOperation m registry payload) =>
WorkerConfig m payload result -> m ()
runWorkerPool WorkerConfig m payload result
cfg') Async () -> m ()
k
lift $ mapM_ Async.waitCatch asyncs
withPoolContext :: Text -> LogConfig -> LogConfig
withPoolContext :: Text -> LogConfig -> LogConfig
withPoolContext Text
poolName LogConfig
lc =
LogConfig
lc {additionalContext = (("pool" .= poolName) :) <$> additionalContext lc}
getEnabledQueues
:: (RegistryTables registry)
=> String
-> Proxy registry
-> IO [Text]
getEnabledQueues :: forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
String -> Proxy registry -> IO [Text]
getEnabledQueues String
envVar Proxy registry
registry = do
let allQueues :: [Text]
allQueues = Proxy registry -> [Text]
forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> [Text]
registryTableNames Proxy registry
registry
mVal <- String -> IO (Maybe String)
lookupEnv String
envVar
case mVal of
Maybe String
Nothing -> [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
allQueues
Just String
val -> do
let tval :: Text
tval = String -> Text
T.pack String
val
if Text -> Bool
T.null (Text -> Text
T.strip Text
tval)
then [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
allQueues
else do
let requested :: [Text]
requested = (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
T.strip ([Text] -> [Text]) -> [Text] -> [Text]
forall a b. (a -> b) -> a -> b
$ HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"," Text
tval
invalid :: [Text]
invalid = (Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> [Text] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` [Text]
allQueues) [Text]
requested
if [Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
invalid
then [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
requested
else IOError -> IO [Text]
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> IO [Text])
-> (String -> IOError) -> String -> IO [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> IOError
userError (String -> IO [Text]) -> String -> IO [Text]
forall a b. (a -> b) -> a -> b
$ String
"Unknown queue names: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack (Text -> [Text] -> Text
T.intercalate Text
", " [Text]
invalid)
runWorkerPool
:: forall m registry payload result
. ( JobResult result
, MonadMask m
, MonadUnliftIO m
, QueueOperation m registry payload
)
=> WorkerConfig m payload result
-> m ()
runWorkerPool :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobResult result, MonadMask m, MonadUnliftIO m,
QueueOperation m registry payload) =>
WorkerConfig m payload result -> m ()
runWorkerPool WorkerConfig m payload result
config = do
let workerCap :: Int
workerCap = WorkerConfig m payload result -> Int
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int
workerCount WorkerConfig m payload result
config
mLiveness :: Maybe LivenessConfig
mLiveness = WorkerConfig m payload result -> Maybe LivenessConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Maybe LivenessConfig
livenessConfig WorkerConfig m payload result
config
mLivenessSignal :: Maybe (MVar ())
mLivenessSignal = (LivenessConfig -> MVar ())
-> Maybe LivenessConfig -> Maybe (MVar ())
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap LivenessConfig -> MVar ()
livenessSignal Maybe LivenessConfig
mLiveness
workQueue <- IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload))))
-> IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload)))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (NonEmpty (JobRead payload)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
workerCap)
busyWorkerCount <- liftIO $ newTVarIO 0
workerFinishedVar <- liftIO $ newTVarIO False
evalContT $ do
liveness <-
case mLiveness of
Just LivenessConfig
lc ->
(Async () -> [Async ()])
-> ContT () m (Async ()) -> ContT () m [Async ()]
forall a b. (a -> b) -> ContT () m a -> ContT () m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Async () -> [Async ()]
forall a. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ContT () m (Async ()) -> ContT () m [Async ()])
-> (m () -> ContT () m (Async ())) -> m () -> ContT () m [Async ()]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Async () -> m ()) -> m ()) -> ContT () m (Async ()))
-> (m () -> (Async () -> m ()) -> m ())
-> m ()
-> ContT () m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> (Async () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (m () -> ContT () m [Async ()]) -> m () -> ContT () m [Async ()]
forall a b. (a -> b) -> a -> b
$
LogConfig -> String -> MVar () -> Int -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> String -> MVar () -> Int -> m ()
refreshLiveness (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) (LivenessConfig -> String
livenessPath LivenessConfig
lc) (LivenessConfig -> MVar ()
livenessSignal LivenessConfig
lc) (LivenessConfig -> Int
livenessInterval LivenessConfig
lc)
m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m (Either SomeException ()))
-> IO () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removeFile (LivenessConfig -> String
livenessPath LivenessConfig
lc))
Maybe LivenessConfig
Nothing -> [Async ()] -> ContT () m [Async ()]
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
dispatcher <-
ContT . withAsync $
runDispatcher config workerCap workQueue busyWorkerCount mLivenessSignal workerFinishedVar
workers <-
replicateM workerCap . ContT . withAsync $
retryOnException (workerStateVar config) (logConfig config) "Worker" $
workerLoop config workQueue busyWorkerCount workerFinishedVar
cron <-
case cronJobs config of
[] -> [Async ()] -> ContT () m [Async ()]
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
[CronJob payload]
jobs -> do
sch <- m Text -> ContT () m Text
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
pure
<$> ( ContT . withAsync $
retryOnException (workerStateVar config) (logConfig config) "Cron scheduler" $
runCronScheduler (workerStateVar config) (logConfig config) sch jobs
)
reaper <-
ContT . withAsync $
retryOnException (workerStateVar config) (logConfig config) "Group reaper" $
groupReaperLoop @m @registry @payload (groupReaperInterval config)
(_, res) <- waitAnyCatch (dispatcher : reaper : cron ++ liveness ++ workers)
case res of
Left SomeException
e ->
m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ()) -> m () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Thread pool exception: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
Right ()
_ ->
() -> ContT () m ()
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
shutdownWorker config
lift $ tryLog (logConfig config) Info "Starting graceful shutdown. Draining in-flight jobs..."
let waitForDrain = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (STM () -> IO ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
qEmpty <- TBQueue (NonEmpty (JobRead payload)) -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue
checkSTM qEmpty
busy <- readTVar busyWorkerCount
checkSTM (busy == 0)
drainResult <- case gracefulShutdownTimeout config of
Maybe NominalDiffTime
Nothing -> do
let drainLoop :: ContT () m ()
drainLoop = do
drainOrTick <- m (Either () ()) -> ContT () m (Either () ())
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either () ()) -> ContT () m (Either () ()))
-> m (Either () ()) -> ContT () m (Either () ())
forall a b. (a -> b) -> a -> b
$ m () -> m () -> m (Either () ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
10_000_000) m ()
waitForDrain
case drainOrTick of
Right () -> () -> ContT () m ()
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left () -> do
(busy, qLen) <-
IO (Int, Int) -> ContT () m (Int, Int)
forall a. IO a -> ContT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Int, Int) -> ContT () m (Int, Int))
-> (STM (Int, Int) -> IO (Int, Int))
-> STM (Int, Int)
-> ContT () m (Int, Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Int, Int) -> IO (Int, Int)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Int, Int) -> ContT () m (Int, Int))
-> STM (Int, Int) -> ContT () m (Int, Int)
forall a b. (a -> b) -> a -> b
$
(,)
(Int -> Int -> (Int, Int)) -> STM Int -> STM (Int -> (Int, Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
busyWorkerCount
STM (Int -> (Int, Int)) -> STM Int -> STM (Int, Int)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Int) -> STM Natural -> STM Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue (NonEmpty (JobRead payload)) -> STM Natural
forall a. TBQueue a -> STM Natural
lengthTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue)
lift $
tryLog (logConfig config) Info $
"Graceful shutdown: waiting for "
<> T.pack (show (busy :: Int))
<> " busy worker(s), "
<> T.pack (show (qLen :: Int))
<> " job(s) in queue..."
drainLoop
ContT () m ()
drainLoop
Either () () -> ContT () m (Either () ())
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () ()
forall a b. b -> Either a b
Right ())
Just NominalDiffTime
timeoutSecs -> do
let timeoutMicros :: Int
timeoutMicros = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
timeoutSecs NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1_000_000)
m (Either () ()) -> ContT () m (Either () ())
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either () ()) -> ContT () m (Either () ()))
-> m (Either () ()) -> ContT () m (Either () ())
forall a b. (a -> b) -> a -> b
$ m () -> m () -> m (Either () ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
timeoutMicros) m ()
waitForDrain
case drainResult of
Right () ->
m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ()) -> m () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Info Text
"All workers are now idle. Graceful shutdown complete."
Left () ->
m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ()) -> m () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Warning Text
"Graceful shutdown timed out. Some jobs may still be in-flight."
refreshLiveness :: (MonadUnliftIO m) => LogConfig -> FilePath -> MVar.MVar () -> Int -> m ()
refreshLiveness :: forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> String -> MVar () -> Int -> m ()
refreshLiveness LogConfig
logCfg String
healthcheckPath MVar ()
livenessMVar Int
n = do
MVar () -> m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
MVar.takeMVar MVar ()
livenessMVar
m ()
writeProbe
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
Async.concurrently_ (MVar () -> m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
MVar.takeMVar MVar ()
livenessMVar) (Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
Conc.threadDelay (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_000_000)
m ()
writeProbe
where
writeProbe :: m ()
writeProbe = do
result <- m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> m () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
writeFile String
healthcheckPath String
""
case result of
Left SomeException
e ->
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Liveness probe write failed: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
workerLoop
:: forall m registry payload result
. ( JobOperation m registry payload
, JobResult result
, MonadMask m
, MonadUnliftIO m
)
=> WorkerConfig m payload result
-> TBQueue (NonEmpty (Job.JobRead payload))
-> TVar Int
-> TVar Bool
-> m ()
workerLoop :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobOperation m registry payload, JobResult result, MonadMask m,
MonadUnliftIO m) =>
WorkerConfig m payload result
-> TBQueue (NonEmpty (JobRead payload))
-> TVar Int
-> TVar Bool
-> m ()
workerLoop WorkerConfig m payload result
config TBQueue (NonEmpty (JobRead payload))
workQueue TVar Int
busyCount TVar Bool
workerFinishedVar = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
unmask -> do
jobBatch <- STM (NonEmpty (JobRead payload)) -> m (NonEmpty (JobRead payload))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (NonEmpty (JobRead payload))
-> m (NonEmpty (JobRead payload)))
-> STM (NonEmpty (JobRead payload))
-> m (NonEmpty (JobRead payload))
forall a b. (a -> b) -> a -> b
$ do
batch <- TBQueue (NonEmpty (JobRead payload))
-> STM (NonEmpty (JobRead payload))
forall a. TBQueue a -> STM a
readTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue
modifyTVar' busyCount (+ 1)
pure batch
flip
finally
( atomically $ do
modifyTVar' busyCount (subtract 1)
writeTVar workerFinishedVar True
)
$ withJobContext jobBatch
$ do
currentTime <- liftIO getCurrentTime
mapM_
(\JobRead payload
job -> LogConfig -> Text -> m () -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> Text -> m () -> m ()
runHook (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
"onJobClaimed" (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
Job.onJobClaimed (WorkerConfig m payload result -> ObservabilityHooks m payload
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ObservabilityHooks m payload
observabilityHooks WorkerConfig m payload result
config) JobRead payload
job UTCTime
currentTime)
jobBatch
result <- tryAny $ unmask $ processJobsWithRetry config jobBatch
case result of
Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left SomeException
e -> do
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Worker exception: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
2_000_000
readChildResults
:: (JobResult a, MonadArbiter m)
=> Text
-> Job.JobRead payload
-> m (Map.Map Int64 (Either Text a), Map.Map Int64 T.Text)
readChildResults :: forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName JobRead payload
job = do
(results, failures, mSnapshot, dlqFailures) <-
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
Ops.readChildResultsRaw Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job)
let raw = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
merged = (Either Text Value -> Either Text a)
-> Map Int64 (Either Text Value) -> Map Int64 (Either Text a)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (Either Text Value -> (Value -> Either Text a) -> Either Text a
forall a b. Either Text a -> (a -> Either Text b) -> Either Text b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Value -> Either Text a
forall a. JobResult a => Value -> Either Text a
decodeJobResult) Map Int64 (Either Text Value)
raw
pure (merged, dlqFailures)
processJobsWithRetry
:: forall m registry payload result
. ( JobOperation m registry payload
, JobResult result
, MonadUnliftIO m
)
=> WorkerConfig m payload result
-> NonEmpty (Job.JobRead payload)
-> m ()
processJobsWithRetry :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobOperation m registry payload, JobResult result,
MonadUnliftIO m) =>
WorkerConfig m payload result -> NonEmpty (JobRead payload) -> m ()
processJobsWithRetry WorkerConfig m payload result
config NonEmpty (JobRead payload)
jobs = do
let hooks :: ObservabilityHooks m payload
hooks = WorkerConfig m payload result -> ObservabilityHooks m payload
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ObservabilityHooks m payload
observabilityHooks WorkerConfig m payload result
config
maxAtts :: Int32
maxAtts = [Int32] -> Int32
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (JobRead payload -> Int32) -> [JobRead payload] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map (\JobRead payload
job -> Int32 -> Maybe Int32 -> Int32
forall a. a -> Maybe a -> a
fromMaybe (WorkerConfig m payload result -> Int32
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int32
maxAttempts WorkerConfig m payload result
config) (JobRead payload -> Maybe Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
Job.maxAttempts JobRead payload
job)) (NonEmpty (JobRead payload) -> [JobRead payload]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (JobRead payload)
jobs)
startTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
schemaName <- Arb.getSchema
result <-
tryAny
$ withJobsHeartbeat
hooks
(heartbeatInterval config)
(visibilityTimeout config)
startTime
jobs
(logConfig config)
(fmap livenessSignal (livenessConfig config))
$ do
if useWorkerTransaction config
then withDbTransaction $ do
handlerResult <- runHandler config schemaName jobs
case handlerMode config of
SingleJobMode Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result
_ -> do
let (JobRead payload
job :| [JobRead payload]
_) = NonEmpty (JobRead payload)
jobs
case (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
Job.parentId JobRead payload
job, result -> Maybe Value
forall a. JobResult a => a -> Maybe Value
encodeJobResult result
handlerResult) of
(Just Int64
pid, Just Value
val) ->
m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> Int64 -> Value -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Int64 -> Value -> m Int64
Ops.insertResult Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) Int64
pid (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job) Value
val
(Maybe Int64, Maybe Value)
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
rowsAffected <- JobRead payload -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
JobRead payload -> m Int64
Arb.ackJob JobRead payload
job
when (rowsAffected == 0) $
throwJobNotFound $
"Job "
<> T.pack (show (Job.primaryKey job))
<> " was reclaimed during processing - rolling back handler transaction"
BatchedJobsMode Int
_ BatchedJobHandler m payload result
_ -> do
rowsAffected <- [JobRead payload] -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[JobRead payload] -> m Int64
Arb.ackJobsBulk (NonEmpty (JobRead payload) -> [JobRead payload]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (JobRead payload)
jobs)
when (rowsAffected /= fromIntegral (length jobs)) $
throwJobNotFound $
"Expected to ack "
<> T.pack (show (length jobs))
<> " jobs but only "
<> T.pack (show rowsAffected)
<> " were deleted - rolling back handler transaction"
else do
void $ runHandler config schemaName jobs
endTime <- liftIO getCurrentTime
case result of
Right () ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (WorkerConfig m payload result -> Bool
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Bool
useWorkerTransaction WorkerConfig m payload result
config) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
(JobRead payload -> m ()) -> NonEmpty (JobRead payload) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\JobRead payload
job -> LogConfig -> Text -> m () -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> Text -> m () -> m ()
runHook (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
"onJobSuccess" (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObservabilityHooks m payload
-> JobPayload payload =>
JobRead payload -> UTCTime -> UTCTime -> m ()
forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
JobRead payload -> UTCTime -> UTCTime -> m ()
Job.onJobSuccess ObservabilityHooks m payload
hooks JobRead payload
job UTCTime
startTime UTCTime
endTime) NonEmpty (JobRead payload)
jobs
Left SomeException
e
| SomeException -> Bool
isJobGoneException SomeException
e ->
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Info (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
Text
"Job(s) no longer available, skipping retry: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
| Bool
otherwise ->
m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
(JobRead payload -> m ()) -> NonEmpty (JobRead payload) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobOperation m registry payload, MonadUnliftIO m) =>
WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
handleJobFailure WorkerConfig m payload result
config ObservabilityHooks m payload
hooks SomeException
e Int32
maxAtts UTCTime
startTime UTCTime
endTime) NonEmpty (JobRead payload)
jobs
runHandler
:: forall m registry payload result
. ( JobOperation m registry payload
, JobResult result
)
=> WorkerConfig m payload result
-> Text
-> NonEmpty (Job.JobRead payload)
-> m result
runHandler :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobOperation m registry payload, JobResult result) =>
WorkerConfig m payload result
-> Text -> NonEmpty (JobRead payload) -> m result
runHandler WorkerConfig m payload result
config Text
schemaName NonEmpty (JobRead payload)
jobs = case WorkerConfig m payload result -> HandlerMode m payload result
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> HandlerMode m payload result
handlerMode WorkerConfig m payload result
config of
SingleJobMode Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result
handler -> do
let (JobRead payload
job :| [JobRead payload]
_) = NonEmpty (JobRead payload)
jobs
(childResults, dlqFailures) <-
if JobRead payload -> Bool
forall p k q t. Job p k q t -> Bool
Job.isRollup JobRead payload
job
then Text
-> JobRead payload
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName JobRead payload
job
else (Map Int64 (Either Text result), Map Int64 Text)
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map Int64 (Either Text result)
forall k a. Map k a
Map.empty, Map Int64 Text
forall k a. Map k a
Map.empty)
runHandlerWithConnection (handler childResults dlqFailures) job
BatchedJobsMode Int
_ BatchedJobHandler m payload result
handler ->
BatchedJobHandler m payload result
-> NonEmpty (JobRead payload) -> m result
forall jobs result. Handler m jobs result -> jobs -> m result
forall (m :: * -> *) jobs result.
MonadArbiter m =>
Handler m jobs result -> jobs -> m result
runHandlerWithConnection BatchedJobHandler m payload result
handler NonEmpty (JobRead payload)
jobs
isJobGoneException :: SomeException -> Bool
isJobGoneException :: SomeException -> Bool
isJobGoneException SomeException
e =
case (SomeException -> Maybe JobNotFoundException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe JobNotFoundException, SomeException -> Maybe JobStolenException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe JobStolenException) of
(Just JobNotFoundException
_, Maybe JobStolenException
_) -> Bool
True
(Maybe JobNotFoundException
_, Just JobStolenException
_) -> Bool
True
(Maybe JobNotFoundException, Maybe JobStolenException)
_ -> Bool
False
data FailureKind = RetryFailure | PermanentFailure | TreeCancelFailure | BranchCancelFailure
deriving stock (FailureKind -> FailureKind -> Bool
(FailureKind -> FailureKind -> Bool)
-> (FailureKind -> FailureKind -> Bool) -> Eq FailureKind
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FailureKind -> FailureKind -> Bool
== :: FailureKind -> FailureKind -> Bool
$c/= :: FailureKind -> FailureKind -> Bool
/= :: FailureKind -> FailureKind -> Bool
Eq)
classifyException :: SomeException -> (T.Text, FailureKind)
classifyException :: SomeException -> (Text, FailureKind)
classifyException SomeException
e
| Just (Retryable (JobRetryableException Text
msg)) <- SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
RetryFailure)
| Just (Permanent (JobPermanentException Text
msg)) <- SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
PermanentFailure)
| Just (TreeCancel (TreeCancelException Text
msg)) <- SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
TreeCancelFailure)
| Just (BranchCancel (BranchCancelException Text
msg)) <- SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
BranchCancelFailure)
| Just (ParsingException Text
msg) <- SomeException -> Maybe ParsingException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
PermanentFailure)
| Bool
otherwise = (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e, FailureKind
RetryFailure)
handleJobFailure
:: forall m registry payload result
. ( JobOperation m registry payload
, MonadUnliftIO m
)
=> WorkerConfig m payload result
-> Job.ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> Job.JobRead payload
-> m ()
handleJobFailure :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobOperation m registry payload, MonadUnliftIO m) =>
WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
handleJobFailure WorkerConfig m payload result
config ObservabilityHooks m payload
hooks SomeException
e Int32
maxAtts UTCTime
startTime UTCTime
endTime JobRead payload
job = do
let (Text
errorMsg, FailureKind
failureKind) = SomeException -> (Text, FailureKind)
classifyException SomeException
e
cfg :: LogConfig
cfg = WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
case failureKind of
FailureKind
TreeCancelFailure -> do
deleted <- Text -> Text -> Int64 -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.cancelJobTree Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job)
when (deleted > 0) $
runHook cfg "onJobFailure" $
Job.onJobFailure hooks job errorMsg startTime endTime
FailureKind
BranchCancelFailure -> do
let target :: Int64
target = Int64 -> Maybe Int64 -> Int64
forall a. a -> Maybe a -> a
fromMaybe (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job) (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
Job.parentId JobRead payload
job)
deleted <- Text -> Text -> Int64 -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.cancelJobCascade Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) Int64
target
when (deleted > 0) $
runHook cfg "onJobFailure" $
Job.onJobFailure hooks job errorMsg startTime endTime
FailureKind
_
| FailureKind
failureKind FailureKind -> FailureKind -> Bool
forall a. Eq a => a -> a -> Bool
== FailureKind
PermanentFailure Bool -> Bool -> Bool
|| JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
Job.attempts JobRead payload
job Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int32
maxAtts -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall p k q t. Job p k q t -> Bool
Job.isRollup JobRead payload
job) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
(results, failures, mSnapshot, _dlqFailures) <-
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
Ops.readChildResultsRaw Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job)
let merged = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
unless (Map.null merged) $
void $
Ops.persistParentState schemaName (Job.queueName job) (Job.primaryKey job) (toJSON merged)
rowsAffected <- Text -> JobRead payload -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
Text -> JobRead payload -> m Int64
Arb.moveToDLQ Text
errorMsg JobRead payload
job
if rowsAffected == 0
then
tryLog cfg Warning $
"Job " <> T.pack (show (Job.primaryKey job)) <> " not available for moving to DLQ"
else do
runHook cfg "onJobFailure" $ Job.onJobFailure hooks job errorMsg startTime endTime
runHook cfg "onJobFailedAndMovedToDLQ" $ Job.onJobFailedAndMovedToDLQ hooks errorMsg job
| Bool
otherwise -> do
let baseDelay :: NominalDiffTime
baseDelay = BackoffStrategy -> Int32 -> NominalDiffTime
calculateBackoff (WorkerConfig m payload result -> BackoffStrategy
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> BackoffStrategy
backoffStrategy WorkerConfig m payload result
config) (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
Job.attempts JobRead payload
job)
backoffSecs <- IO NominalDiffTime -> m NominalDiffTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO NominalDiffTime -> m NominalDiffTime)
-> IO NominalDiffTime -> m NominalDiffTime
forall a b. (a -> b) -> a -> b
$ Jitter -> NominalDiffTime -> IO NominalDiffTime
applyJitter (WorkerConfig m payload result -> Jitter
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Jitter
jitter WorkerConfig m payload result
config) NominalDiffTime
baseDelay
rowsAffected <- Arb.updateJobForRetry backoffSecs errorMsg job
if rowsAffected == 0
then
tryLog cfg Warning $
"Job " <> T.pack (show (Job.primaryKey job)) <> " not available for retry"
else do
runHook cfg "onJobFailure" $ Job.onJobFailure hooks job errorMsg startTime endTime
runHook cfg "onJobRetry" $ Job.onJobRetry hooks job backoffSecs
groupReaperLoop
:: forall m registry payload
. ( MonadUnliftIO m
, QueueOperation m registry payload
)
=> NominalDiffTime
-> m ()
groupReaperLoop :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
NominalDiffTime -> m ()
groupReaperLoop NominalDiffTime
interval = do
let intervalSecs :: Int
intervalSecs = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
interval
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> m ()
Arb.refreshGroups @m @registry @payload Int
intervalSecs
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int
intervalSecs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_000_000)