{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE UndecidableInstances #-}
module Arbiter.Worker
(
runWorkerPool
, NamedWorkerPool (..)
, namedWorkerPool
, runWorkerPools
, runSelectedWorkerPools
, getEnabledQueues
, JobResult (..)
, module Arbiter.Worker.Config
, module Arbiter.Worker.BackoffStrategy
, module Arbiter.Worker.Logger
, module Arbiter.Worker.WorkerState
, CronJob (..)
, OverlapPolicy (..)
, cronJob
, initCronSchedules
, overlapPolicyToText
, overlapPolicyFromText
) where
import Arbiter.Core.Codec (Col (..), pval)
import Arbiter.Core.Exceptions
( BranchCancelException (..)
, JobException (..)
, JobPermanentException (..)
, JobRetryableException (..)
, ParsingException (..)
, TreeCancelException (..)
, throwJobNotFound
)
import Arbiter.Core.HasArbiterSchema (HasArbiterSchema (..))
import Arbiter.Core.HighLevel (JobOperation, QueueOperation)
import Arbiter.Core.HighLevel qualified as Arb
import Arbiter.Core.Job.Types qualified as Job
import Arbiter.Core.MonadArbiter (MonadArbiter (..))
import Arbiter.Core.Operations qualified as Ops
import Arbiter.Core.QueueRegistry (RegistryTables (..), TableForPayload)
import Control.Exception (SomeException, fromException)
import Control.Monad (forever, replicateM, void, when)
import Control.Monad.Catch (MonadMask)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Cont (ContT (..), evalContT)
import Data.Aeson (FromJSON, ToJSON, Value, toJSON)
import Data.Aeson qualified as Aeson
import Data.Foldable (toList)
import Data.Int (Int32, Int64)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (NominalDiffTime, UTCTime, getCurrentTime)
import Data.Traversable (for)
import Database.PostgreSQL.Simple (close, connectPostgreSQL)
import GHC.TypeLits (symbolVal)
import System.Directory (removeFile)
import System.Environment (lookupEnv)
import UnliftIO
( MonadUnliftIO
, atomically
, bracket
, checkSTM
, finally
, isAsyncException
, isEmptyTBQueue
, lengthTBQueue
, mask
, modifyTVar'
, newTBQueueIO
, newTVarIO
, readTBQueue
, readTVar
, throwIO
, tryAny
, trySyncOrAsync
, waitAnyCatch
, withAsync
, writeTVar
)
import UnliftIO.Async (race)
import UnliftIO.Async qualified as Async
import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Concurrent qualified as Conc
import UnliftIO.MVar qualified as MVar
import UnliftIO.STM (TBQueue, TVar)
import Arbiter.Worker.BackoffStrategy
import Arbiter.Worker.Config
import Arbiter.Worker.Cron
( CronJob (..)
, OverlapPolicy (..)
, cronJob
, initCronSchedules
, overlapPolicyFromText
, overlapPolicyToText
, runCronScheduler
)
import Arbiter.Worker.Dispatcher
import Arbiter.Worker.Heartbeat (withJobsHeartbeat)
import Arbiter.Worker.Logger
import Arbiter.Worker.Logger.Internal (logMessage, runHook, withJobContext)
import Arbiter.Worker.Retry (retryOnException)
import Arbiter.Worker.WorkerState
class JobResult a where
encodeJobResult :: a -> Maybe Value
decodeJobResult :: Value -> Either Text a
instance JobResult () where
encodeJobResult :: () -> Maybe Value
encodeJobResult ()
_ = Maybe Value
forall a. Maybe a
Nothing
decodeJobResult :: Value -> Either Text ()
decodeJobResult Value
_ = () -> Either Text ()
forall a b. b -> Either a b
Right ()
instance {-# OVERLAPPABLE #-} (FromJSON a, ToJSON a) => JobResult a where
encodeJobResult :: a -> Maybe Value
encodeJobResult = Value -> Maybe Value
forall a. a -> Maybe a
Just (Value -> Maybe Value) -> (a -> Value) -> a -> Maybe Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Value
forall a. ToJSON a => a -> Value
toJSON
decodeJobResult :: Value -> Either Text a
decodeJobResult Value
v = case Value -> Result a
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON Value
v of
Aeson.Success a
a -> a -> Either Text a
forall a b. b -> Either a b
Right a
a
Aeson.Error String
err -> Text -> Either Text a
forall a b. a -> Either a b
Left (String -> Text
T.pack String
err)
data NamedWorkerPool m
= forall registry payload result.
(JobResult result, QueueOperation m registry payload) =>
NamedWorkerPool
{ forall (m :: * -> *). NamedWorkerPool m -> Text
workerPoolName :: Text
, ()
workerPoolConfig :: WorkerConfig m payload result
}
namedWorkerPool
:: forall m registry payload result
. (JobResult result, QueueOperation m registry payload)
=> WorkerConfig m payload result
-> NamedWorkerPool m
namedWorkerPool :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobResult result, QueueOperation m registry payload) =>
WorkerConfig m payload result -> NamedWorkerPool m
namedWorkerPool WorkerConfig m payload result
cfg =
NamedWorkerPool
{ workerPoolName :: Text
workerPoolName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
, workerPoolConfig :: WorkerConfig m payload result
workerPoolConfig = WorkerConfig m payload result
cfg
}
runWorkerPools
:: forall m registry
. (MonadMask m, MonadUnliftIO m, RegistryTables registry)
=> Proxy registry
-> [NamedWorkerPool m]
-> (TVar WorkerState -> IO ())
-> m ()
runWorkerPools :: forall (m :: * -> *) (registry :: JobPayloadRegistry).
(MonadMask m, MonadUnliftIO m, RegistryTables registry) =>
Proxy registry
-> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m ()
runWorkerPools Proxy registry
registry [NamedWorkerPool m]
pools TVar WorkerState -> IO ()
setup = do
sharedState <- IO (TVar WorkerState) -> m (TVar WorkerState)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TVar WorkerState)
newWorkerState
liftIO $ setup sharedState
enabled <- liftIO $ getEnabledQueues "ARBITER_ENABLED_QUEUES" registry
runSelectedWorkerPools sharedState enabled pools
runSelectedWorkerPools
:: (MonadMask m, MonadUnliftIO m)
=> TVar WorkerState
-> [Text]
-> [NamedWorkerPool m]
-> m ()
runSelectedWorkerPools :: forall (m :: * -> *).
(MonadMask m, MonadUnliftIO m) =>
TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m ()
runSelectedWorkerPools TVar WorkerState
sharedState [Text]
enabled [NamedWorkerPool m]
pools =
case (NamedWorkerPool m -> Bool)
-> [NamedWorkerPool m] -> [NamedWorkerPool m]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(NamedWorkerPool Text
name WorkerConfig m payload result
_) -> Text
name Text -> [Text] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Text]
enabled) [NamedWorkerPool m]
pools of
[] -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
[NamedWorkerPool m]
selected -> ContT () m () -> m ()
forall (m :: * -> *) r. Monad m => ContT r m r -> m r
evalContT (ContT () m () -> m ()) -> ContT () m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
asyncs <- [NamedWorkerPool m]
-> (NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for [NamedWorkerPool m]
selected ((NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()])
-> (NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()]
forall a b. (a -> b) -> a -> b
$ \(NamedWorkerPool Text
_ WorkerConfig m payload result
cfg) ->
let cfg' :: WorkerConfig m payload result
cfg' = WorkerConfig m payload result
cfg {workerStateVar = sharedState}
in ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Async () -> m ()) -> m ()) -> ContT () m (Async ()))
-> ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall a b. (a -> b) -> a -> b
$ \Async () -> m ()
k -> m () -> (Async () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
Async.withAsync (WorkerConfig m payload result -> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobResult result, MonadMask m, MonadUnliftIO m,
QueueOperation m registry payload) =>
WorkerConfig m payload result -> m ()
runWorkerPool WorkerConfig m payload result
cfg') Async () -> m ()
k
lift $ mapM_ Async.waitCatch asyncs
getEnabledQueues
:: (RegistryTables registry)
=> String
-> Proxy registry
-> IO [Text]
getEnabledQueues :: forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
String -> Proxy registry -> IO [Text]
getEnabledQueues String
envVar Proxy registry
registry = do
let allQueues :: [Text]
allQueues = Proxy registry -> [Text]
forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> [Text]
registryTableNames Proxy registry
registry
mVal <- String -> IO (Maybe String)
lookupEnv String
envVar
case mVal of
Maybe String
Nothing -> [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
allQueues
Just String
val
| Text -> Bool
T.null (Text -> Text
T.strip (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
val) -> [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
allQueues
| Bool
otherwise -> do
let requested :: [Text]
requested = (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
T.strip ([Text] -> [Text]) -> [Text] -> [Text]
forall a b. (a -> b) -> a -> b
$ HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"," (Text -> [Text]) -> Text -> [Text]
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
val
invalid :: [Text]
invalid = (Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> [Text] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` [Text]
allQueues) [Text]
requested
if [Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
invalid
then [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
requested
else IOError -> IO [Text]
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> IO [Text])
-> (String -> IOError) -> String -> IO [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> IOError
userError (String -> IO [Text]) -> String -> IO [Text]
forall a b. (a -> b) -> a -> b
$ String
"Unknown queue names: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack (Text -> [Text] -> Text
T.intercalate Text
", " [Text]
invalid)
runWorkerPool
:: forall m registry payload result
. ( JobResult result
, MonadMask m
, MonadUnliftIO m
, QueueOperation m registry payload
)
=> WorkerConfig m payload result
-> m ()
runWorkerPool :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobResult result, MonadMask m, MonadUnliftIO m,
QueueOperation m registry payload) =>
WorkerConfig m payload result -> m ()
runWorkerPool WorkerConfig m payload result
config = do
let workerCap :: Int
workerCap = WorkerConfig m payload result -> Int
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int
workerCount WorkerConfig m payload result
config
mLiveness :: Maybe LivenessConfig
mLiveness = WorkerConfig m payload result -> Maybe LivenessConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Maybe LivenessConfig
livenessConfig WorkerConfig m payload result
config
mLivenessSignal :: Maybe (MVar ())
mLivenessSignal = (LivenessConfig -> MVar ())
-> Maybe LivenessConfig -> Maybe (MVar ())
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap LivenessConfig -> MVar ()
livenessSignal Maybe LivenessConfig
mLiveness
workQueue <- IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload))))
-> IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload)))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (NonEmpty (JobRead payload)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
workerCap)
busyWorkerCount <- liftIO $ newTVarIO 0
workerFinishedVar <- liftIO $ newTVarIO False
evalContT $ do
liveness <-
case mLiveness of
Just LivenessConfig
lc ->
(Async () -> [Async ()])
-> ContT () m (Async ()) -> ContT () m [Async ()]
forall a b. (a -> b) -> ContT () m a -> ContT () m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Async () -> [Async ()]
forall a. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ContT () m (Async ()) -> ContT () m [Async ()])
-> (m () -> ContT () m (Async ())) -> m () -> ContT () m [Async ()]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Async () -> m ()) -> m ()) -> ContT () m (Async ()))
-> (m () -> (Async () -> m ()) -> m ())
-> m ()
-> ContT () m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> (Async () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (m () -> ContT () m [Async ()]) -> m () -> ContT () m [Async ()]
forall a b. (a -> b) -> a -> b
$
LogConfig -> String -> MVar () -> Int -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> String -> MVar () -> Int -> m ()
refreshLiveness (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) (LivenessConfig -> String
livenessPath LivenessConfig
lc) (LivenessConfig -> MVar ()
livenessSignal LivenessConfig
lc) (LivenessConfig -> Int
livenessInterval LivenessConfig
lc)
m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m (Either SomeException ()))
-> IO () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removeFile (LivenessConfig -> String
livenessPath LivenessConfig
lc))
Maybe LivenessConfig
Nothing -> [Async ()] -> ContT () m [Async ()]
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
dispatcher <-
ContT . withAsync $
runDispatcher config workerCap workQueue busyWorkerCount mLivenessSignal workerFinishedVar
workers <-
replicateM workerCap . ContT . withAsync $
workerLoop config workQueue busyWorkerCount workerFinishedVar
cron <-
case cronJobs config of
[] -> [Async ()] -> ContT () m [Async ()]
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
[CronJob payload]
jobs -> do
sch <- m Text -> ContT () m Text
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
pure
<$> ( ContT . withAsync $
retryOnException (workerStateVar config) (logConfig config) "Cron scheduler" $
bracket
(liftIO $ connectPostgreSQL (connStr config))
(liftIO . close)
(\Connection
conn -> Connection -> LogConfig -> Text -> [CronJob payload] -> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
Connection -> LogConfig -> Text -> [CronJob payload] -> m ()
runCronScheduler Connection
conn (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
sch [CronJob payload]
jobs)
)
reaper <-
pure
<$> ( ContT . withAsync $
retryOnException (workerStateVar config) (logConfig config) "Group reaper" $
groupReaperLoop config (groupReaperInterval config)
)
(_, res) <- waitAnyCatch (dispatcher : cron ++ reaper ++ liveness ++ workers)
case res of
Left SomeException
e ->
m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ())
-> (IO () -> m ()) -> IO () -> ContT () m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ContT () m ()) -> IO () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> IO ()
logMessage (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String
"Thread pool exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
e
Right ()
_ ->
() -> ContT () m ()
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
shutdownWorker config
lift . void . tryAny . liftIO $
logMessage (logConfig config) Info (T.pack "Starting graceful shutdown. Draining in-flight jobs...")
let waitForDrain = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (STM () -> IO ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
qEmpty <- TBQueue (NonEmpty (JobRead payload)) -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue
checkSTM qEmpty
busy <- readTVar busyWorkerCount
checkSTM (busy == 0)
drainResult <- case gracefulShutdownTimeout config of
Maybe NominalDiffTime
Nothing -> do
let drainLoop :: ContT () m ()
drainLoop = do
drainOrTick <- m (Either () ()) -> ContT () m (Either () ())
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either () ()) -> ContT () m (Either () ()))
-> m (Either () ()) -> ContT () m (Either () ())
forall a b. (a -> b) -> a -> b
$ m () -> m () -> m (Either () ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
10_000_000) m ()
waitForDrain
case drainOrTick of
Right () -> () -> ContT () m ()
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left () -> do
(busy, qLen) <-
IO (Int, Int) -> ContT () m (Int, Int)
forall a. IO a -> ContT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Int, Int) -> ContT () m (Int, Int))
-> (STM (Int, Int) -> IO (Int, Int))
-> STM (Int, Int)
-> ContT () m (Int, Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Int, Int) -> IO (Int, Int)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Int, Int) -> ContT () m (Int, Int))
-> STM (Int, Int) -> ContT () m (Int, Int)
forall a b. (a -> b) -> a -> b
$
(,)
(Int -> Int -> (Int, Int)) -> STM Int -> STM (Int -> (Int, Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
busyWorkerCount
STM (Int -> (Int, Int)) -> STM Int -> STM (Int, Int)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Int) -> STM Natural -> STM Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue (NonEmpty (JobRead payload)) -> STM Natural
forall a. TBQueue a -> STM Natural
lengthTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue)
lift . void . tryAny . liftIO $
logMessage (logConfig config) Info $
T.pack $
"Graceful shutdown: waiting for "
<> show (busy :: Int)
<> " busy worker(s), "
<> show (qLen :: Int)
<> " job(s) in queue..."
drainLoop
ContT () m ()
drainLoop
Either () () -> ContT () m (Either () ())
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () ()
forall a b. b -> Either a b
Right ())
Just NominalDiffTime
timeoutSecs -> do
let timeoutMicros :: Int
timeoutMicros = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
timeoutSecs NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1_000_000)
m (Either () ()) -> ContT () m (Either () ())
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either () ()) -> ContT () m (Either () ()))
-> m (Either () ()) -> ContT () m (Either () ())
forall a b. (a -> b) -> a -> b
$ m () -> m () -> m (Either () ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
timeoutMicros) m ()
waitForDrain
case drainResult of
Right () ->
m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ())
-> (IO () -> m ()) -> IO () -> ContT () m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ContT () m ()) -> IO () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$
LogConfig -> LogLevel -> Text -> IO ()
logMessage (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Info (String -> Text
T.pack String
"All workers are now idle. Graceful shutdown complete.")
Left () ->
m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ())
-> (IO () -> m ()) -> IO () -> ContT () m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ContT () m ()) -> IO () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$
LogConfig -> LogLevel -> Text -> IO ()
logMessage (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Warning (String -> Text
T.pack String
"Graceful shutdown timed out. Some jobs may still be in-flight.")
refreshLiveness :: (MonadUnliftIO m) => LogConfig -> FilePath -> MVar.MVar () -> Int -> m ()
refreshLiveness :: forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> String -> MVar () -> Int -> m ()
refreshLiveness LogConfig
logCfg String
healthcheckPath MVar ()
livenessMVar Int
n = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
result <- m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> m () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
writeFile String
healthcheckPath String
""
case result of
Left SomeException
e ->
m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Liveness probe write failed: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Async.concurrently_ (MVar.takeMVar livenessMVar) (Conc.threadDelay $ n * 1_000_000)
workerLoop
:: forall m registry payload result
. ( JobOperation m registry payload
, JobResult result
, MonadMask m
, MonadUnliftIO m
)
=> WorkerConfig m payload result
-> TBQueue (NonEmpty (Job.JobRead payload))
-> TVar Int
-> TVar Bool
-> m ()
workerLoop :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobOperation m registry payload, JobResult result, MonadMask m,
MonadUnliftIO m) =>
WorkerConfig m payload result
-> TBQueue (NonEmpty (JobRead payload))
-> TVar Int
-> TVar Bool
-> m ()
workerLoop WorkerConfig m payload result
config TBQueue (NonEmpty (JobRead payload))
workQueue TVar Int
busyCount TVar Bool
workerFinishedVar = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
unmask -> do
jobBatch <- STM (NonEmpty (JobRead payload)) -> m (NonEmpty (JobRead payload))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (NonEmpty (JobRead payload))
-> m (NonEmpty (JobRead payload)))
-> STM (NonEmpty (JobRead payload))
-> m (NonEmpty (JobRead payload))
forall a b. (a -> b) -> a -> b
$ do
batch <- TBQueue (NonEmpty (JobRead payload))
-> STM (NonEmpty (JobRead payload))
forall a. TBQueue a -> STM a
readTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue
modifyTVar' busyCount (+ 1)
pure batch
flip
finally
( atomically $ do
modifyTVar' busyCount (subtract 1)
writeTVar workerFinishedVar True
)
$ withJobContext jobBatch
$ do
currentTime <- liftIO getCurrentTime
mapM_
(\JobRead payload
job -> LogConfig -> Text -> m () -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> Text -> m () -> m ()
runHook (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
"onJobClaimed" (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
Job.onJobClaimed (WorkerConfig m payload result -> ObservabilityHooks m payload
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ObservabilityHooks m payload
observabilityHooks WorkerConfig m payload result
config) JobRead payload
job UTCTime
currentTime)
jobBatch
result <- trySyncOrAsync $ unmask $ processJobsWithRetry config jobBatch
liftIO $ handleWorkerException (logConfig config) result
handleWorkerException :: LogConfig -> Either SomeException () -> IO ()
handleWorkerException :: LogConfig -> Either SomeException () -> IO ()
handleWorkerException LogConfig
cfg Either SomeException ()
result =
case Either SomeException ()
result of
Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left (SomeException
e :: SomeException) -> do
let msg :: Text
msg = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String
"Worker exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
e
IO (Either SomeException ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either SomeException ()) -> IO ())
-> (IO () -> IO (Either SomeException ())) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
cfg LogLevel
Error Text
msg
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SomeException -> Bool
forall e. Exception e => e -> Bool
isAsyncException SomeException
e) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
2_000_000
readChildResults
:: (JobResult a, MonadArbiter m)
=> Text
-> Job.JobRead payload
-> m (Map.Map Int64 (Either Text a), Map.Map Int64 T.Text)
readChildResults :: forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName JobRead payload
job = do
(results, failures, mSnapshot, dlqFailures) <-
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
Ops.readChildResultsRaw Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job)
let raw = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
merged = (Either Text Value -> Either Text a)
-> Map Int64 (Either Text Value) -> Map Int64 (Either Text a)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (Either Text Value -> (Value -> Either Text a) -> Either Text a
forall a b. Either Text a -> (a -> Either Text b) -> Either Text b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Value -> Either Text a
forall a. JobResult a => Value -> Either Text a
decodeJobResult) Map Int64 (Either Text Value)
raw
pure (merged, dlqFailures)
processJobsWithRetry
:: forall m registry payload result
. ( JobOperation m registry payload
, JobResult result
, MonadUnliftIO m
)
=> WorkerConfig m payload result
-> NonEmpty (Job.JobRead payload)
-> m ()
processJobsWithRetry :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobOperation m registry payload, JobResult result,
MonadUnliftIO m) =>
WorkerConfig m payload result -> NonEmpty (JobRead payload) -> m ()
processJobsWithRetry WorkerConfig m payload result
config NonEmpty (JobRead payload)
jobs = do
let hooks :: ObservabilityHooks m payload
hooks = WorkerConfig m payload result -> ObservabilityHooks m payload
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ObservabilityHooks m payload
observabilityHooks WorkerConfig m payload result
config
maxAtts :: Int32
maxAtts = [Int32] -> Int32
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (JobRead payload -> Int32) -> [JobRead payload] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map (\JobRead payload
job -> Int32 -> Maybe Int32 -> Int32
forall a. a -> Maybe a -> a
fromMaybe (WorkerConfig m payload result -> Int32
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int32
maxAttempts WorkerConfig m payload result
config) (JobRead payload -> Maybe Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
Job.maxAttempts JobRead payload
job)) (NonEmpty (JobRead payload) -> [JobRead payload]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (JobRead payload)
jobs)
startTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
result <-
tryAny
$ withJobsHeartbeat
hooks
(heartbeatInterval config)
(visibilityTimeout config)
startTime
jobs
(logConfig config)
(fmap livenessSignal (livenessConfig config))
$ if useWorkerTransaction config
then withDbTransaction $ do
schemaName <- Arb.getSchema
case transactionTimeout config of
Just NominalDiffTime
timeout -> do
let timeoutMs :: Int64
timeoutMs = NominalDiffTime -> Int64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
timeout NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1000) :: Int64
m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement Text
"SET LOCAL statement_timeout = ?" [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
timeoutMs]
Maybe NominalDiffTime
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
case handlerMode config of
SingleJobMode Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result
handler -> do
let (JobRead payload
job :| [JobRead payload]
_) = NonEmpty (JobRead payload)
jobs
(childResults, dlqFailures) <-
if JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
Job.isRollup JobRead payload
job
then Text
-> JobRead payload
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName JobRead payload
job
else (Map Int64 (Either Text result), Map Int64 Text)
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map Int64 (Either Text result)
forall k a. Map k a
Map.empty, Map Int64 Text
forall k a. Map k a
Map.empty)
handlerResult <- (runHandlerWithConnection (handler childResults dlqFailures) job :: m result)
case (Job.parentId job, encodeJobResult handlerResult) of
(Just Int64
pid, Just Value
val) ->
m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> Int64 -> Value -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Int64 -> Value -> m Int64
Ops.insertResult Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) Int64
pid (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job) Value
val
(Maybe Int64, Maybe Value)
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
rowsAffected <- Arb.ackJob job
when (rowsAffected == 0) $
throwJobNotFound $
"Job "
<> T.pack (show (Job.primaryKey job))
<> " was reclaimed during processing - rolling back handler transaction"
BatchedJobsMode Int
_ BatchedJobHandler m payload result
handler -> do
m result -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (BatchedJobHandler m payload result
-> NonEmpty (JobRead payload) -> m result
forall jobs result. Handler m jobs result -> jobs -> m result
forall (m :: * -> *) jobs result.
MonadArbiter m =>
Handler m jobs result -> jobs -> m result
runHandlerWithConnection BatchedJobHandler m payload result
handler NonEmpty (JobRead payload)
jobs :: m result)
rowsAffected <- [JobRead payload] -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[JobRead payload] -> m Int64
Arb.ackJobsBulk (NonEmpty (JobRead payload) -> [JobRead payload]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (JobRead payload)
jobs)
when (rowsAffected /= fromIntegral (length jobs)) $
throwJobNotFound $
"Expected to ack "
<> T.pack (show (length jobs))
<> " jobs but only "
<> T.pack (show rowsAffected)
<> " were deleted - rolling back handler transaction"
else do
schemaName' <- Arb.getSchema
case handlerMode config of
SingleJobMode Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result
handler -> do
let (JobRead payload
job :| [JobRead payload]
_) = NonEmpty (JobRead payload)
jobs
(childResults, dlqFailures) <-
if JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
Job.isRollup JobRead payload
job
then Text
-> JobRead payload
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName' JobRead payload
job
else (Map Int64 (Either Text result), Map Int64 Text)
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map Int64 (Either Text result)
forall k a. Map k a
Map.empty, Map Int64 Text
forall k a. Map k a
Map.empty)
void (runHandlerWithConnection (handler childResults dlqFailures) job :: m result)
BatchedJobsMode Int
_ BatchedJobHandler m payload result
handler ->
m result -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (BatchedJobHandler m payload result
-> NonEmpty (JobRead payload) -> m result
forall jobs result. Handler m jobs result -> jobs -> m result
forall (m :: * -> *) jobs result.
MonadArbiter m =>
Handler m jobs result -> jobs -> m result
runHandlerWithConnection BatchedJobHandler m payload result
handler NonEmpty (JobRead payload)
jobs :: m result)
endTime <- liftIO getCurrentTime
case result of
Right () ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (WorkerConfig m payload result -> Bool
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Bool
useWorkerTransaction WorkerConfig m payload result
config) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
(JobRead payload -> m ()) -> NonEmpty (JobRead payload) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\JobRead payload
job -> LogConfig -> Text -> m () -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> Text -> m () -> m ()
runHook (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
"onJobSuccess" (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObservabilityHooks m payload
-> JobPayload payload =>
JobRead payload -> UTCTime -> UTCTime -> m ()
forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
JobRead payload -> UTCTime -> UTCTime -> m ()
Job.onJobSuccess ObservabilityHooks m payload
hooks JobRead payload
job UTCTime
startTime UTCTime
endTime) NonEmpty (JobRead payload)
jobs
Left SomeException
e
| SomeException -> Bool
isJobGoneException SomeException
e ->
m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
LogConfig -> LogLevel -> Text -> IO ()
logMessage (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Info (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Job(s) no longer available, skipping retry: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
| Bool
otherwise ->
m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
(JobRead payload -> m ()) -> NonEmpty (JobRead payload) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobOperation m registry payload, MonadUnliftIO m) =>
WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
handleJobFailure WorkerConfig m payload result
config ObservabilityHooks m payload
hooks SomeException
e Int32
maxAtts UTCTime
startTime UTCTime
endTime) NonEmpty (JobRead payload)
jobs
isJobGoneException :: SomeException -> Bool
isJobGoneException :: SomeException -> Bool
isJobGoneException SomeException
e = case SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just (JobNotFound JobNotFoundException
_) -> Bool
True
Just (JobStolen JobStolenException
_) -> Bool
True
Maybe JobException
_ -> Bool
False
data FailureKind = RetryFailure | PermanentFailure | TreeCancelFailure | BranchCancelFailure
deriving stock (FailureKind -> FailureKind -> Bool
(FailureKind -> FailureKind -> Bool)
-> (FailureKind -> FailureKind -> Bool) -> Eq FailureKind
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FailureKind -> FailureKind -> Bool
== :: FailureKind -> FailureKind -> Bool
$c/= :: FailureKind -> FailureKind -> Bool
/= :: FailureKind -> FailureKind -> Bool
Eq)
classifyException :: SomeException -> (T.Text, FailureKind)
classifyException :: SomeException -> (Text, FailureKind)
classifyException SomeException
e = case SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just (Retryable (JobRetryableException Text
msg)) -> (Text
msg, FailureKind
RetryFailure)
Just (Permanent (JobPermanentException Text
msg)) -> (Text
msg, FailureKind
PermanentFailure)
Just (TreeCancel (TreeCancelException Text
msg)) -> (Text
msg, FailureKind
TreeCancelFailure)
Just (BranchCancel (BranchCancelException Text
msg)) -> (Text
msg, FailureKind
BranchCancelFailure)
Just (ParseFailure (ParsingException Text
msg)) -> (Text
msg, FailureKind
PermanentFailure)
Maybe JobException
_ -> (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e, FailureKind
RetryFailure)
handleJobFailure
:: forall m registry payload result
. ( JobOperation m registry payload
, MonadUnliftIO m
)
=> WorkerConfig m payload result
-> Job.ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> Job.JobRead payload
-> m ()
handleJobFailure :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(JobOperation m registry payload, MonadUnliftIO m) =>
WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
handleJobFailure WorkerConfig m payload result
config ObservabilityHooks m payload
hooks SomeException
e Int32
maxAtts UTCTime
startTime UTCTime
endTime JobRead payload
job = do
let (Text
errorMsg, FailureKind
failureKind) = SomeException -> (Text, FailureKind)
classifyException SomeException
e
cfg :: LogConfig
cfg = WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config
case FailureKind
failureKind of
FailureKind
TreeCancelFailure -> do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
deleted <- Ops.cancelJobTree schemaName (Job.queueName job) (Job.primaryKey job)
when (deleted > 0) $
runHook cfg "onJobFailure" $
Job.onJobFailure hooks job errorMsg startTime endTime
FailureKind
BranchCancelFailure -> do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
let target = Int64 -> (Int64 -> Int64) -> Maybe Int64 -> Int64
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job) Int64 -> Int64
forall a. a -> a
id (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
Job.parentId JobRead payload
job)
deleted <- Ops.cancelJobCascade schemaName (Job.queueName job) target
when (deleted > 0) $
runHook cfg "onJobFailure" $
Job.onJobFailure hooks job errorMsg startTime endTime
FailureKind
_
| FailureKind
failureKind FailureKind -> FailureKind -> Bool
forall a. Eq a => a -> a -> Bool
== FailureKind
PermanentFailure Bool -> Bool -> Bool
|| JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
Job.attempts JobRead payload
job Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int32
maxAtts -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
Job.isRollup JobRead payload
job) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
(results, failures, mSnapshot, _dlqFailures) <-
Ops.readChildResultsRaw schemaName (Job.queueName job) (Job.primaryKey job)
let merged = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
when (not (Map.null merged)) $
void $
Ops.persistParentState schemaName (Job.queueName job) (Job.primaryKey job) (toJSON merged)
rowsAffected <- Text -> JobRead payload -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
Text -> JobRead payload -> m Int64
Arb.moveToDLQ Text
errorMsg JobRead payload
job
if rowsAffected == 0
then
void . tryAny . liftIO $
logMessage cfg Warning $
"Job " <> T.pack (show (Job.primaryKey job)) <> " not available for moving to DLQ"
else do
runHook cfg "onJobFailure" $ Job.onJobFailure hooks job errorMsg startTime endTime
runHook cfg "onJobFailedAndMovedToDLQ" $ Job.onJobFailedAndMovedToDLQ hooks errorMsg job
| Bool
otherwise -> do
let baseDelay :: NominalDiffTime
baseDelay = BackoffStrategy -> Int32 -> NominalDiffTime
calculateBackoff (WorkerConfig m payload result -> BackoffStrategy
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> BackoffStrategy
backoffStrategy WorkerConfig m payload result
config) (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
Job.attempts JobRead payload
job)
backoffSecs <- IO NominalDiffTime -> m NominalDiffTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO NominalDiffTime -> m NominalDiffTime)
-> IO NominalDiffTime -> m NominalDiffTime
forall a b. (a -> b) -> a -> b
$ Jitter -> NominalDiffTime -> IO NominalDiffTime
applyJitter (WorkerConfig m payload result -> Jitter
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Jitter
jitter WorkerConfig m payload result
config) NominalDiffTime
baseDelay
rowsAffected <- Arb.updateJobForRetry backoffSecs errorMsg job
if rowsAffected == 0
then
void . tryAny . liftIO $
logMessage cfg Warning $
"Job " <> T.pack (show (Job.primaryKey job)) <> " not available for retry"
else do
runHook cfg "onJobFailure" $ Job.onJobFailure hooks job errorMsg startTime endTime
runHook cfg "onJobRetry" $ Job.onJobRetry hooks job backoffSecs
groupReaperLoop
:: forall m registry payload result
. ( MonadUnliftIO m
, QueueOperation m registry payload
)
=> WorkerConfig m payload result
-> NominalDiffTime
-> m ()
groupReaperLoop :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
result.
(MonadUnliftIO m, QueueOperation m registry payload) =>
WorkerConfig m payload result -> NominalDiffTime -> m ()
groupReaperLoop WorkerConfig m payload result
_config NominalDiffTime
interval = do
let intervalSecs :: Int
intervalSecs = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
interval
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> m ()
Arb.refreshGroups @m @registry @payload Int
intervalSecs
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int
intervalSecs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_000_000)
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> m ()
Arb.refreshGroups @m @registry @payload Int
intervalSecs