| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Worker.Config
Contents
Description
Configuration types for the arbiter worker pool.
Synopsis
- data WorkerConfig (m :: Type -> Type) payload result = WorkerConfig {
- connStr :: ByteString
- workerCount :: Int
- handlerMode :: HandlerMode m payload result
- pollInterval :: NominalDiffTime
- visibilityTimeout :: NominalDiffTime
- heartbeatInterval :: Int
- maxAttempts :: Int32
- backoffStrategy :: BackoffStrategy
- jitter :: Jitter
- useWorkerTransaction :: Bool
- transactionTimeout :: Maybe NominalDiffTime
- observabilityHooks :: ObservabilityHooks m payload
- workerStateVar :: TVar WorkerState
- livenessConfig :: Maybe LivenessConfig
- gracefulShutdownTimeout :: Maybe NominalDiffTime
- logConfig :: LogConfig
- claimThrottle :: Maybe (IO (Int, NominalDiffTime))
- cronJobs :: [CronJob payload]
- groupReaperInterval :: NominalDiffTime
- defaultWorkerConfig :: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) => ByteString -> Int -> JobHandler n payload result -> m (WorkerConfig n payload result)
- defaultBatchedWorkerConfig :: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) => ByteString -> Int -> Int -> BatchedJobHandler n payload result -> m (WorkerConfig n payload result)
- defaultRollupWorkerConfig :: forall (n :: Type -> Type) m result payload. (MonadArbiter n, MonadIO m, Monoid result) => ByteString -> Int -> (result -> Map Int64 Text -> JobHandler n payload result) -> m (WorkerConfig n payload result)
- singleJobMode :: forall (m :: Type -> Type) payload result. JobHandler m payload result -> HandlerMode m payload result
- mergedRollupHandler :: forall result (m :: Type -> Type) payload. Monoid result => (result -> Map Int64 Text -> JobHandler m payload result) -> HandlerMode m payload result
- mergeChildResults :: Monoid a => Map Int64 (Either Text a) -> a
- data HandlerMode (m :: Type -> Type) payload result
- = SingleJobMode (Map Int64 (Either Text result) -> Map Int64 Text -> JobHandler m payload result)
- | BatchedJobsMode Int (BatchedJobHandler m payload result)
- data LivenessConfig = LivenessConfig {
- livenessPath :: FilePath
- livenessSignal :: MVar ()
- livenessInterval :: Int
- data WorkerState
- pauseWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m ()
- resumeWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m ()
- shutdownWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m ()
- getWorkerState :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m WorkerState
Worker Configuration
data WorkerConfig (m :: Type -> Type) payload result Source #
Configuration for a worker pool.
Time units: All time-related fields use NominalDiffTime (seconds).
Sub-second precision is supported (e.g., 0.1 for 100ms).
Constructors
| WorkerConfig | |
Fields
| |
Arguments
| :: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) | |
| => ByteString | Connection string |
| -> Int | Worker count |
| -> JobHandler n payload result | |
| -> m (WorkerConfig n payload result) |
Creates a WorkerConfig with default settings.
defaultBatchedWorkerConfig Source #
Arguments
| :: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) | |
| => ByteString | Connection string |
| -> Int | Worker count |
| -> Int | Batch size (max jobs per group to claim together) |
| -> BatchedJobHandler n payload result | |
| -> m (WorkerConfig n payload result) |
Creates a WorkerConfig for batched job processing.
Like defaultWorkerConfig but for handlers that process multiple jobs at once.
defaultRollupWorkerConfig Source #
Arguments
| :: forall (n :: Type -> Type) m result payload. (MonadArbiter n, MonadIO m, Monoid result) | |
| => ByteString | Connection string |
| -> Int | Worker count |
| -> (result -> Map Int64 Text -> JobHandler n payload result) | |
| -> m (WorkerConfig n payload result) |
Creates a WorkerConfig for rollup-aware job processing.
The handler receives:
- The monoidal merge of all successful child results (DLQ'd children
and decode failures contribute
mempty). - A
of failed children, actionable withMapdlqPrimaryKey errorMessagedeleteDLQJob.
Use SingleJobMode directly for per-child result visibility.
singleJobMode :: forall (m :: Type -> Type) payload result. JobHandler m payload result -> HandlerMode m payload result Source #
Wrap a simple handler that ignores child results.
Use this when your handler doesn't need rollup support:
singleJobMode (\conn job -> pure ())
mergedRollupHandler :: forall result (m :: Type -> Type) payload. Monoid result => (result -> Map Int64 Text -> JobHandler m payload result) -> HandlerMode m payload result Source #
Wrap a rollup handler that receives merged child results and DLQ failures.
Successful child results are combined via their Monoid instance.
Failed children (DLQ'd or with decode failures) contribute mempty to the merge.
The second argument is a map from DLQ primary key to error message,
actionable with deleteDLQJob.
For regular/child jobs the handler receives (.mempty, empty)
data HandlerMode (m :: Type -> Type) payload result Source #
Handler mode determines both the handler type and claiming strategy.
Single vs Batched Mode
- Single: Processes one job at a time. Failures are independent. Handler receives
JobRead payload. - Batched: Claims and processes multiple jobs from the same group together.
All-or-nothing: if any job in batch fails, entire batch rolls back (automatic mode)
or is retried together. Uses minimum maxAttempts across batch.
Handler receives
NonEmpty (JobRead payload).
Constructors
| SingleJobMode (Map Int64 (Either Text result) -> Map Int64 Text -> JobHandler m payload result) | Single job mode: claim 1 job per group, handler processes one at a time. The handler receives:
|
| BatchedJobsMode Int (BatchedJobHandler m payload result) | Batched mode: claim up to N jobs per group, handler receives batch. Rollup interaction: Batched mode has no rollup awareness — child results
are not passed to the handler. Use |
data LivenessConfig Source #
File-based liveness probe configuration.
The probe file is touched when the dispatcher claims jobs or the heartbeat extends visibility, proving the worker is actively functioning. A timeout fallback ensures the probe still updates during idle periods (no jobs to process). If the file goes stale, the worker should be restarted.
Constructors
| LivenessConfig | |
Fields
| |
Worker State
data WorkerState Source #
State for worker pool coordination.
Controls whether workers claim new jobs:
Running: Normal operation, claim jobs continuouslyPaused: Stop claiming new jobs, finish in-flight jobs, wait for resumeShuttingDown: Stop claiming new jobs, finish in-flight jobs, then exit
Constructors
| Running | Normal operation |
| Paused | Paused (stop claiming, finish in-flight, wait for resume) |
| ShuttingDown | Graceful shutdown in progress (drain and exit) |
Instances
| Show WorkerState Source # | |
Defined in Arbiter.Worker.WorkerState Methods showsPrec :: Int -> WorkerState -> ShowS # show :: WorkerState -> String # showList :: [WorkerState] -> ShowS # | |
| Eq WorkerState Source # | |
Defined in Arbiter.Worker.WorkerState | |
pauseWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m () Source #
Pause the worker pool
Stops claiming new jobs. In-flight jobs will complete normally. Workers will wait in paused state until resumed or shut down.
resumeWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m () Source #
Resume the worker pool from paused state
Workers will start claiming new jobs again.
shutdownWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m () Source #
Initiate graceful shutdown of the worker pool
Stops claiming new jobs. In-flight jobs will complete, then the pool exits.
getWorkerState :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m WorkerState Source #
Get the current worker state