| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Worker.Config
Contents
Description
Configuration types for the arbiter worker pool.
Synopsis
- data WorkerConfig (m :: Type -> Type) payload result = WorkerConfig {
- connStr :: ByteString
- workerCount :: Int
- handlerMode :: HandlerMode m payload result
- pollInterval :: NominalDiffTime
- visibilityTimeout :: NominalDiffTime
- heartbeatInterval :: NominalDiffTime
- maxAttempts :: Int32
- backoffStrategy :: BackoffStrategy
- jitter :: Jitter
- useWorkerTransaction :: Bool
- observabilityHooks :: ObservabilityHooks m payload
- workerStateVar :: TVar WorkerState
- livenessConfig :: Maybe LivenessConfig
- gracefulShutdownTimeout :: Maybe NominalDiffTime
- logConfig :: LogConfig
- cronJobs :: [CronJob payload]
- groupReaperInterval :: NominalDiffTime
- defaultWorkerConfig :: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) => ByteString -> Int -> JobHandler n payload result -> m (WorkerConfig n payload result)
- defaultBatchedWorkerConfig :: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) => ByteString -> Int -> Int -> BatchedJobHandler n payload result -> m (WorkerConfig n payload result)
- defaultRollupWorkerConfig :: forall (n :: Type -> Type) m result payload. (MonadArbiter n, MonadIO m, Monoid result) => ByteString -> Int -> (result -> Map Int64 Text -> JobHandler n payload result) -> m (WorkerConfig n payload result)
- singleJobMode :: forall (m :: Type -> Type) payload result. JobHandler m payload result -> HandlerMode m payload result
- mergedRollupHandler :: forall result (m :: Type -> Type) payload. Monoid result => (result -> Map Int64 Text -> JobHandler m payload result) -> HandlerMode m payload result
- mergeChildResults :: Monoid a => Map Int64 (Either Text a) -> a
- data HandlerMode (m :: Type -> Type) payload result
- = SingleJobMode (Map Int64 (Either Text result) -> Map Int64 Text -> JobHandler m payload result)
- | BatchedJobsMode Int (BatchedJobHandler m payload result)
- data LivenessConfig = LivenessConfig {
- livenessPath :: FilePath
- livenessSignal :: MVar ()
- livenessInterval :: Int
- data WorkerState
- pauseWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m ()
- resumeWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m ()
- shutdownWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m ()
- getWorkerState :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m WorkerState
Worker Configuration
data WorkerConfig (m :: Type -> Type) payload result Source #
Configuration for a worker pool.
Constructors
| WorkerConfig | |
Fields
| |
Arguments
| :: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) | |
| => ByteString | Connection string |
| -> Int | Worker count |
| -> JobHandler n payload result | |
| -> m (WorkerConfig n payload result) |
Create a WorkerConfig with default settings.
defaultBatchedWorkerConfig Source #
Arguments
| :: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) | |
| => ByteString | Connection string |
| -> Int | Worker count |
| -> Int | Batch size (max jobs per group to claim together) |
| -> BatchedJobHandler n payload result | |
| -> m (WorkerConfig n payload result) |
Create a WorkerConfig for batched job processing.
Like defaultWorkerConfig but for handlers that process multiple jobs at once.
defaultRollupWorkerConfig Source #
Arguments
| :: forall (n :: Type -> Type) m result payload. (MonadArbiter n, MonadIO m, Monoid result) | |
| => ByteString | Connection string |
| -> Int | Worker count |
| -> (result -> Map Int64 Text -> JobHandler n payload result) | |
| -> m (WorkerConfig n payload result) |
Create a WorkerConfig for rollup finalizers. See mergedRollupHandler.
singleJobMode :: forall (m :: Type -> Type) payload result. JobHandler m payload result -> HandlerMode m payload result Source #
Handler that ignores child results. Use for regular jobs and leaf children.
mergedRollupHandler :: forall result (m :: Type -> Type) payload. Monoid result => (result -> Map Int64 Text -> JobHandler m payload result) -> HandlerMode m payload result Source #
Handler for rollup finalizers. Child results are merged via Monoid.
The handler receives two arguments before the job:
- The
mappend-fold of all child results. Children whose results couldn't be decoded into the expected type contributemempty(the raw JSON is still in the results table). UseSingleJobModedirectly to inspect per-child decode failures via theLeftentries. - A map of DLQ failures (
dlqPrimaryKey -> errorMessage) for children that failed and were moved to the DLQ.
For non-rollup jobs both arguments are empty.
data HandlerMode (m :: Type -> Type) payload result Source #
Handler type and claiming strategy.
Single vs Batched Mode
- Single: Processes one job at a time. Failures are independent. Handler receives
JobRead payload. - Batched: Claims and processes multiple jobs from the same group together.
All-or-nothing: if any job in batch fails, entire batch rolls back (automatic mode)
or is retried together. Uses minimum maxAttempts across batch.
Handler receives
NonEmpty (JobRead payload).
Constructors
| SingleJobMode (Map Int64 (Either Text result) -> Map Int64 Text -> JobHandler m payload result) | Claim 1 job per group. The handler receives:
|
| BatchedJobsMode Int (BatchedJobHandler m payload result) | Batched mode: claim up to N jobs per group, handler receives batch. Rollup interaction: Batched mode has no rollup awareness - child results
are not passed to the handler. Use |
data LivenessConfig Source #
File-based liveness probe configuration.
The probe file is touched when the dispatcher claims jobs or the heartbeat extends visibility, proving the worker is actively functioning. A timeout fallback ensures the probe still updates during idle periods (no jobs to process). If the file goes stale, the worker should be restarted.
Constructors
| LivenessConfig | |
Fields
| |
Worker State
data WorkerState Source #
State for worker pool coordination.
Controls whether workers claim new jobs:
Running: Normal operation, claim jobs continuouslyPaused: Stop claiming new jobs, finish in-flight jobs, wait for resumeShuttingDown: Stop claiming new jobs, finish in-flight jobs, then exit
Constructors
| Running | Normal operation |
| Paused | Paused (stop claiming, finish in-flight, wait for resume) |
| ShuttingDown | Graceful shutdown in progress (drain and exit) |
Instances
| Show WorkerState Source # | |
Defined in Arbiter.Worker.WorkerState Methods showsPrec :: Int -> WorkerState -> ShowS # show :: WorkerState -> String # showList :: [WorkerState] -> ShowS # | |
| Eq WorkerState Source # | |
Defined in Arbiter.Worker.WorkerState | |
pauseWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m () Source #
Pause the worker pool
Stops claiming new jobs. In-flight jobs will complete normally. Workers will wait in paused state until resumed or shut down.
resumeWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m () Source #
Resume the worker pool from paused state
Workers will start claiming new jobs again.
shutdownWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m () Source #
Initiate graceful shutdown of the worker pool
Stops claiming new jobs. In-flight jobs will complete, then the pool exits.
getWorkerState :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m WorkerState Source #
Get the current worker state