{-# LANGUAGE OverloadedStrings #-}
module Arbiter.Worker.Config
(
WorkerConfig (..)
, defaultWorkerConfig
, defaultBatchedWorkerConfig
, defaultRollupWorkerConfig
, singleJobMode
, mergedRollupHandler
, mergeChildResults
, HandlerMode (..)
, LivenessConfig (..)
, WorkerState (..)
, pauseWorker
, resumeWorker
, shutdownWorker
, getWorkerState
) where
import Arbiter.Core.Job.Types (ObservabilityHooks, defaultObservabilityHooks)
import Arbiter.Core.MonadArbiter (BatchedJobHandler, JobHandler, MonadArbiter)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.ByteString (ByteString)
import Data.Foldable (fold)
import Data.Int (Int32, Int64)
import Data.Map.Strict (Map)
import Data.Text (Text)
import Data.Time (NominalDiffTime)
import Data.UUID (toString)
import Data.UUID.V4 qualified as UUID
import System.Directory (getTemporaryDirectory)
import UnliftIO.MVar (MVar, newEmptyMVar)
import UnliftIO.STM (TVar, newTVarIO)
import UnliftIO.STM qualified as STM
import Arbiter.Worker.BackoffStrategy (BackoffStrategy, Jitter (..), exponentialBackoff)
import Arbiter.Worker.Cron (CronJob)
import Arbiter.Worker.Logger (LogConfig (..), defaultLogConfig)
import Arbiter.Worker.WorkerState (WorkerState (..))
data HandlerMode m payload result
=
SingleJobMode (Map Int64 (Either Text result) -> Map Int64 Text -> JobHandler m payload result)
|
BatchedJobsMode Int (BatchedJobHandler m payload result)
data LivenessConfig = LivenessConfig
{ LivenessConfig -> FilePath
livenessPath :: FilePath
, LivenessConfig -> MVar ()
livenessSignal :: MVar ()
, LivenessConfig -> Int
livenessInterval :: Int
}
data WorkerConfig m payload result = WorkerConfig
{ forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ByteString
connStr :: ByteString
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int
workerCount :: Int
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> HandlerMode m payload result
handlerMode :: HandlerMode m payload result
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> NominalDiffTime
pollInterval :: NominalDiffTime
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> NominalDiffTime
visibilityTimeout :: NominalDiffTime
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int
heartbeatInterval :: Int
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int32
maxAttempts :: Int32
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> BackoffStrategy
backoffStrategy :: BackoffStrategy
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Jitter
jitter :: Jitter
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Bool
useWorkerTransaction :: Bool
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Maybe NominalDiffTime
transactionTimeout :: Maybe NominalDiffTime
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ObservabilityHooks m payload
observabilityHooks :: ObservabilityHooks m payload
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> TVar WorkerState
workerStateVar :: TVar WorkerState
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Maybe LivenessConfig
livenessConfig :: Maybe LivenessConfig
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Maybe NominalDiffTime
gracefulShutdownTimeout :: Maybe NominalDiffTime
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig :: LogConfig
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Maybe (IO (Int, NominalDiffTime))
claimThrottle :: Maybe (IO (Int, NominalDiffTime))
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> [CronJob payload]
cronJobs :: [CronJob payload]
, forall (m :: * -> *) payload result.
WorkerConfig m payload result -> NominalDiffTime
groupReaperInterval :: NominalDiffTime
}
defaultWorkerConfig
:: (MonadArbiter n, MonadIO m)
=> ByteString
-> Int
-> JobHandler n payload result
-> m (WorkerConfig n payload result)
defaultWorkerConfig :: forall (n :: * -> *) (m :: * -> *) payload result.
(MonadArbiter n, MonadIO m) =>
ByteString
-> Int
-> JobHandler n payload result
-> m (WorkerConfig n payload result)
defaultWorkerConfig ByteString
connStrVal Int
workerCnt JobHandler n payload result
handler =
ByteString
-> Int
-> HandlerMode n payload result
-> m (WorkerConfig n payload result)
forall (n :: * -> *) (m :: * -> *) payload result.
(Applicative n, MonadIO m) =>
ByteString
-> Int
-> HandlerMode n payload result
-> m (WorkerConfig n payload result)
mkDefaultConfig ByteString
connStrVal Int
workerCnt (JobHandler n payload result -> HandlerMode n payload result
forall (m :: * -> *) payload result.
JobHandler m payload result -> HandlerMode m payload result
singleJobMode JobHandler n payload result
handler)
defaultBatchedWorkerConfig
:: (MonadArbiter n, MonadIO m)
=> ByteString
-> Int
-> Int
-> BatchedJobHandler n payload result
-> m (WorkerConfig n payload result)
defaultBatchedWorkerConfig :: forall (n :: * -> *) (m :: * -> *) payload result.
(MonadArbiter n, MonadIO m) =>
ByteString
-> Int
-> Int
-> BatchedJobHandler n payload result
-> m (WorkerConfig n payload result)
defaultBatchedWorkerConfig ByteString
connStrVal Int
workerCnt Int
batchSize BatchedJobHandler n payload result
handler =
ByteString
-> Int
-> HandlerMode n payload result
-> m (WorkerConfig n payload result)
forall (n :: * -> *) (m :: * -> *) payload result.
(Applicative n, MonadIO m) =>
ByteString
-> Int
-> HandlerMode n payload result
-> m (WorkerConfig n payload result)
mkDefaultConfig ByteString
connStrVal Int
workerCnt (Int
-> BatchedJobHandler n payload result
-> HandlerMode n payload result
forall (m :: * -> *) payload result.
Int
-> BatchedJobHandler m payload result
-> HandlerMode m payload result
BatchedJobsMode Int
batchSize BatchedJobHandler n payload result
handler)
defaultRollupWorkerConfig
:: (MonadArbiter n, MonadIO m, Monoid result)
=> ByteString
-> Int
-> (result -> Map Int64 Text -> JobHandler n payload result)
-> m (WorkerConfig n payload result)
defaultRollupWorkerConfig :: forall (n :: * -> *) (m :: * -> *) result payload.
(MonadArbiter n, MonadIO m, Monoid result) =>
ByteString
-> Int
-> (result -> Map Int64 Text -> JobHandler n payload result)
-> m (WorkerConfig n payload result)
defaultRollupWorkerConfig ByteString
connStrVal Int
workerCnt result -> Map Int64 Text -> JobHandler n payload result
handler =
ByteString
-> Int
-> HandlerMode n payload result
-> m (WorkerConfig n payload result)
forall (n :: * -> *) (m :: * -> *) payload result.
(Applicative n, MonadIO m) =>
ByteString
-> Int
-> HandlerMode n payload result
-> m (WorkerConfig n payload result)
mkDefaultConfig ByteString
connStrVal Int
workerCnt ((result -> Map Int64 Text -> JobHandler n payload result)
-> HandlerMode n payload result
forall result (m :: * -> *) payload.
Monoid result =>
(result -> Map Int64 Text -> JobHandler m payload result)
-> HandlerMode m payload result
mergedRollupHandler result -> Map Int64 Text -> JobHandler n payload result
handler)
singleJobMode :: JobHandler m payload result -> HandlerMode m payload result
singleJobMode :: forall (m :: * -> *) payload result.
JobHandler m payload result -> HandlerMode m payload result
singleJobMode JobHandler m payload result
handler = (Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result)
-> HandlerMode m payload result
forall (m :: * -> *) payload result.
(Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result)
-> HandlerMode m payload result
SingleJobMode (\Map Int64 (Either Text result)
_ Map Int64 Text
_ -> JobHandler m payload result
handler)
mergedRollupHandler
:: (Monoid result) => (result -> Map Int64 Text -> JobHandler m payload result) -> HandlerMode m payload result
mergedRollupHandler :: forall result (m :: * -> *) payload.
Monoid result =>
(result -> Map Int64 Text -> JobHandler m payload result)
-> HandlerMode m payload result
mergedRollupHandler result -> Map Int64 Text -> JobHandler m payload result
handler = (Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result)
-> HandlerMode m payload result
forall (m :: * -> *) payload result.
(Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result)
-> HandlerMode m payload result
SingleJobMode ((Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result)
-> HandlerMode m payload result)
-> (Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result)
-> HandlerMode m payload result
forall a b. (a -> b) -> a -> b
$ \Map Int64 (Either Text result)
results Map Int64 Text
dlqFailures -> result -> Map Int64 Text -> JobHandler m payload result
handler (Map Int64 (Either Text result) -> result
forall a. Monoid a => Map Int64 (Either Text a) -> a
mergeChildResults Map Int64 (Either Text result)
results) Map Int64 Text
dlqFailures
mergeChildResults :: (Monoid a) => Map Int64 (Either Text a) -> a
mergeChildResults :: forall a. Monoid a => Map Int64 (Either Text a) -> a
mergeChildResults = (Either Text a -> a) -> Map Int64 (Either Text a) -> a
forall m a. Monoid m => (a -> m) -> Map Int64 a -> m
forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap Either Text a -> a
forall m. Monoid m => Either Text m -> m
forall (t :: * -> *) m. (Foldable t, Monoid m) => t m -> m
fold
mkDefaultConfig
:: (Applicative n, MonadIO m)
=> ByteString
-> Int
-> HandlerMode n payload result
-> m (WorkerConfig n payload result)
mkDefaultConfig :: forall (n :: * -> *) (m :: * -> *) payload result.
(Applicative n, MonadIO m) =>
ByteString
-> Int
-> HandlerMode n payload result
-> m (WorkerConfig n payload result)
mkDefaultConfig ByteString
connStrVal Int
workerCnt HandlerMode n payload result
mode = do
livenessMVar <- m (MVar ())
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
shutdownTVar <- newTVarIO Running
uuid <- liftIO UUID.nextRandom
tmpDir <- liftIO getTemporaryDirectory
let livenessFile = FilePath
tmpDir FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> FilePath
"/arbiter-worker-" FilePath -> FilePath -> FilePath
forall a. Semigroup a => a -> a -> a
<> UUID -> FilePath
toString UUID
uuid
pure
WorkerConfig
{ connStr = connStrVal
, workerCount = workerCnt
, handlerMode = mode
, pollInterval = 5
, visibilityTimeout = 60
, heartbeatInterval = 30
, maxAttempts = 10
, backoffStrategy = exponentialBackoff 2.0 1_048_576
, jitter = EqualJitter
, useWorkerTransaction = True
, transactionTimeout = Nothing
, observabilityHooks = defaultObservabilityHooks
, workerStateVar = shutdownTVar
, livenessConfig = Just (LivenessConfig livenessFile livenessMVar 60)
, gracefulShutdownTimeout = Just 30
, logConfig = defaultLogConfig
, claimThrottle = Nothing
, cronJobs = []
, groupReaperInterval = 300
}
pauseWorker :: (MonadIO m) => WorkerConfig n payload result -> m ()
pauseWorker :: forall (m :: * -> *) (n :: * -> *) payload result.
MonadIO m =>
WorkerConfig n payload result -> m ()
pauseWorker WorkerConfig n payload result
config = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (STM () -> IO ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
STM.atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
st <- TVar WorkerState -> STM WorkerState
forall a. TVar a -> STM a
STM.readTVar (WorkerConfig n payload result -> TVar WorkerState
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> TVar WorkerState
workerStateVar WorkerConfig n payload result
config)
case st of
WorkerState
ShuttingDown -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
WorkerState
_ -> TVar WorkerState -> WorkerState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (WorkerConfig n payload result -> TVar WorkerState
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> TVar WorkerState
workerStateVar WorkerConfig n payload result
config) WorkerState
Paused
resumeWorker :: (MonadIO m) => WorkerConfig n payload result -> m ()
resumeWorker :: forall (m :: * -> *) (n :: * -> *) payload result.
MonadIO m =>
WorkerConfig n payload result -> m ()
resumeWorker WorkerConfig n payload result
config = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (STM () -> IO ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
STM.atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
st <- TVar WorkerState -> STM WorkerState
forall a. TVar a -> STM a
STM.readTVar (WorkerConfig n payload result -> TVar WorkerState
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> TVar WorkerState
workerStateVar WorkerConfig n payload result
config)
case st of
WorkerState
ShuttingDown -> () -> STM ()
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
WorkerState
_ -> TVar WorkerState -> WorkerState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (WorkerConfig n payload result -> TVar WorkerState
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> TVar WorkerState
workerStateVar WorkerConfig n payload result
config) WorkerState
Running
shutdownWorker :: (MonadIO m) => WorkerConfig n payload result -> m ()
shutdownWorker :: forall (m :: * -> *) (n :: * -> *) payload result.
MonadIO m =>
WorkerConfig n payload result -> m ()
shutdownWorker WorkerConfig n payload result
config = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (STM () -> IO ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
STM.atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar WorkerState -> WorkerState -> STM ()
forall a. TVar a -> a -> STM ()
STM.writeTVar (WorkerConfig n payload result -> TVar WorkerState
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> TVar WorkerState
workerStateVar WorkerConfig n payload result
config) WorkerState
ShuttingDown
getWorkerState :: (MonadIO m) => WorkerConfig n payload result -> m WorkerState
getWorkerState :: forall (m :: * -> *) (n :: * -> *) payload result.
MonadIO m =>
WorkerConfig n payload result -> m WorkerState
getWorkerState WorkerConfig n payload result
config = IO WorkerState -> m WorkerState
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerState -> m WorkerState)
-> (STM WorkerState -> IO WorkerState)
-> STM WorkerState
-> m WorkerState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM WorkerState -> IO WorkerState
forall (m :: * -> *) a. MonadIO m => STM a -> m a
STM.atomically (STM WorkerState -> m WorkerState)
-> STM WorkerState -> m WorkerState
forall a b. (a -> b) -> a -> b
$ TVar WorkerState -> STM WorkerState
forall a. TVar a -> STM a
STM.readTVar (WorkerConfig n payload result -> TVar WorkerState
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> TVar WorkerState
workerStateVar WorkerConfig n payload result
config)