arbiter-worker-0.1.0.0: Worker framework for arbiter
Safe HaskellNone
LanguageGHC2024

Arbiter.Worker.Config

Description

Configuration types for the arbiter worker pool.

Synopsis

Worker Configuration

data WorkerConfig (m :: Type -> Type) payload result Source #

Configuration for a worker pool.

Time units: All time-related fields use NominalDiffTime (seconds). Sub-second precision is supported (e.g., 0.1 for 100ms).

Constructors

WorkerConfig 

Fields

  • connStr :: ByteString

    PostgreSQL connection string (used for LISTEN/NOTIFY).

  • workerCount :: Int

    Number of concurrent worker threads.

  • handlerMode :: HandlerMode m payload result

    Job handler and claiming strategy (single or batched). Default: SingleJobMode.

  • pollInterval :: NominalDiffTime

    Polling interval in seconds (fallback when no NOTIFY received). Also serves as the liveness heartbeat — the dispatcher signals the liveness probe after each poll cycle. Default: 5 seconds.

  • visibilityTimeout :: NominalDiffTime

    Duration in seconds a claimed job stays invisible to other workers. Must be greater than heartbeatInterval. Sub-second values are rounded up to the nearest second. Default: 60.

  • heartbeatInterval :: Int

    Interval in seconds for extending visibility timeout during processing. Must be less than visibilityTimeout to prevent job reclaim. Default: 30.

  • maxAttempts :: Int32

    Max retries before moving to DLQ (used when job's maxAttempts is Nothing). Default: 10.

  • backoffStrategy :: BackoffStrategy

    Retry backoff strategy. Default: exponential with base 2, max 1048576 seconds.

  • jitter :: Jitter

    Jitter strategy for retry delays. Default: EqualJitter.

  • useWorkerTransaction :: Bool

    Transaction Mode:

    • True (Automatic): Handler runs in a transaction with automatic acking and rollback. Heartbeat prevents job reclaim during processing. Cannot control commit timing. If the ack fails then all database work is rolled back — this means BatchedJobsMode can have a large blast radius because the batch is acked atomically. Default: True.
    • False (Manual): Full transaction control. Handler must explicitly ack jobs. Child results are not automatically stored in the results table — the handler must call insertResult to make results visible to the rollup finalizer.
  • transactionTimeout :: Maybe NominalDiffTime

    Transaction statement timeout in seconds. Only applies when useWorkerTransaction is True. Default: Nothing (no timeout).

  • observabilityHooks :: ObservabilityHooks m payload

    Callbacks for metrics or tracing. Default: no-op hooks.

  • workerStateVar :: TVar WorkerState

    Worker state (Running, Paused, ShuttingDown). Managed internally.

  • livenessConfig :: Maybe LivenessConfig

    File-based liveness probe configuration. Default: writes to /tmp/arbiter-worker-<uuid> every 60 seconds.

  • gracefulShutdownTimeout :: Maybe NominalDiffTime

    Maximum time in seconds to wait for in-flight jobs during graceful shutdown. If Nothing, waits indefinitely. If Just n, force-exits after n seconds. Default: Just 30.

  • logConfig :: LogConfig

    Logging configuration. Arbiter outputs structured JSON logs with job context automatically included. Use this to control log level, destination, and inject additional context (e.g., trace IDs). Default: Info level to stdout.

  • claimThrottle :: Maybe (IO (Int, NominalDiffTime))

    Optional claim rate limiter. The IO action returns (maxClaims, window): at most maxClaims jobs will be claimed per window duration. The action is called each claim cycle, so limits can be adjusted dynamically. Default: Nothing (no throttling).

  • cronJobs :: [CronJob payload]

    Cron schedules. When non-empty, the worker pool spawns a scheduler thread that opens a dedicated connection from connStr and inserts jobs based on cron expressions. The cron_schedules table is consulted for runtime overrides (expression, overlap, enabled). Default: [].

  • groupReaperInterval :: NominalDiffTime

    How often to recompute the groups table (correct drift in job_count, min_priority, min_id, in_flight_until). Default: 300 (5 minutes).

defaultWorkerConfig Source #

Arguments

:: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) 
=> ByteString

Connection string

-> Int

Worker count

-> JobHandler n payload result 
-> m (WorkerConfig n payload result) 

Creates a WorkerConfig with default settings.

defaultBatchedWorkerConfig Source #

Arguments

:: forall (n :: Type -> Type) m payload result. (MonadArbiter n, MonadIO m) 
=> ByteString

Connection string

-> Int

Worker count

-> Int

Batch size (max jobs per group to claim together)

-> BatchedJobHandler n payload result 
-> m (WorkerConfig n payload result) 

Creates a WorkerConfig for batched job processing.

Like defaultWorkerConfig but for handlers that process multiple jobs at once.

defaultRollupWorkerConfig Source #

Arguments

:: forall (n :: Type -> Type) m result payload. (MonadArbiter n, MonadIO m, Monoid result) 
=> ByteString

Connection string

-> Int

Worker count

-> (result -> Map Int64 Text -> JobHandler n payload result) 
-> m (WorkerConfig n payload result) 

Creates a WorkerConfig for rollup-aware job processing.

The handler receives:

  1. The monoidal merge of all successful child results (DLQ'd children and decode failures contribute mempty).
  2. A Map dlqPrimaryKey errorMessage of failed children, actionable with deleteDLQJob.

Use SingleJobMode directly for per-child result visibility.

singleJobMode :: forall (m :: Type -> Type) payload result. JobHandler m payload result -> HandlerMode m payload result Source #

Wrap a simple handler that ignores child results.

Use this when your handler doesn't need rollup support:

singleJobMode (\conn job -> pure ())

mergedRollupHandler :: forall result (m :: Type -> Type) payload. Monoid result => (result -> Map Int64 Text -> JobHandler m payload result) -> HandlerMode m payload result Source #

Wrap a rollup handler that receives merged child results and DLQ failures.

Successful child results are combined via their Monoid instance. Failed children (DLQ'd or with decode failures) contribute mempty to the merge. The second argument is a map from DLQ primary key to error message, actionable with deleteDLQJob. For regular/child jobs the handler receives (mempty, empty).

mergeChildResults :: Monoid a => Map Int64 (Either Text a) -> a Source #

Merge child results from a rollup finalizer.

Folds all successful child results via their Monoid instance. Left entries (DLQ failures and decode failures) map to mempty. Empty map for non-rollup jobs.

data HandlerMode (m :: Type -> Type) payload result Source #

Handler mode determines both the handler type and claiming strategy.

Single vs Batched Mode

  • Single: Processes one job at a time. Failures are independent. Handler receives JobRead payload.
  • Batched: Claims and processes multiple jobs from the same group together. All-or-nothing: if any job in batch fails, entire batch rolls back (automatic mode) or is retried together. Uses minimum maxAttempts across batch. Handler receives NonEmpty (JobRead payload).

Constructors

SingleJobMode (Map Int64 (Either Text result) -> Map Int64 Text -> JobHandler m payload result)

Single job mode: claim 1 job per group, handler processes one at a time.

The handler receives:

  1. Map childJobId (Either errorMsg result) — child results for rollup finalizers, empty for regular/child jobs. Left entries represent DLQ'd children or result decode failures.
  2. Map dlqPrimaryKey errorMsg — DLQ'd child failures, keyed by DLQ primary key (actionable with deleteDLQJob). Empty for non-rollup jobs.
BatchedJobsMode Int (BatchedJobHandler m payload result)

Batched mode: claim up to N jobs per group, handler receives batch.

Rollup interaction: Batched mode has no rollup awareness — child results are not passed to the handler. Use SingleJobMode for rollup finalizers.

data LivenessConfig Source #

File-based liveness probe configuration.

The probe file is touched when the dispatcher claims jobs or the heartbeat extends visibility, proving the worker is actively functioning. A timeout fallback ensures the probe still updates during idle periods (no jobs to process). If the file goes stale, the worker should be restarted.

Constructors

LivenessConfig 

Fields

Worker State

data WorkerState Source #

State for worker pool coordination.

Controls whether workers claim new jobs:

  • Running: Normal operation, claim jobs continuously
  • Paused: Stop claiming new jobs, finish in-flight jobs, wait for resume
  • ShuttingDown: Stop claiming new jobs, finish in-flight jobs, then exit

Constructors

Running

Normal operation

Paused

Paused (stop claiming, finish in-flight, wait for resume)

ShuttingDown

Graceful shutdown in progress (drain and exit)

Instances

Instances details
Show WorkerState Source # 
Instance details

Defined in Arbiter.Worker.WorkerState

Eq WorkerState Source # 
Instance details

Defined in Arbiter.Worker.WorkerState

pauseWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m () Source #

Pause the worker pool

Stops claiming new jobs. In-flight jobs will complete normally. Workers will wait in paused state until resumed or shut down.

resumeWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m () Source #

Resume the worker pool from paused state

Workers will start claiming new jobs again.

shutdownWorker :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m () Source #

Initiate graceful shutdown of the worker pool

Stops claiming new jobs. In-flight jobs will complete, then the pool exits.

getWorkerState :: forall m (n :: Type -> Type) payload result. MonadIO m => WorkerConfig n payload result -> m WorkerState Source #

Get the current worker state