| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Worker
Description
Entry point for running a worker pool that fetches and executes jobs.
Synopsis
- runWorkerPool :: forall m (registry :: JobPayloadRegistry) payload result. (JobResult result, MonadMask m, MonadUnliftIO m, QueueOperation m registry payload) => WorkerConfig m payload result -> m ()
- data NamedWorkerPool (m :: Type -> Type) = (JobResult result, QueueOperation m registry payload) => NamedWorkerPool {
- workerPoolName :: Text
- workerPoolConfig :: WorkerConfig m payload result
- namedWorkerPool :: forall (m :: Type -> Type) (registry :: JobPayloadRegistry) payload result. (JobResult result, QueueOperation m registry payload) => WorkerConfig m payload result -> NamedWorkerPool m
- runWorkerPools :: forall m (registry :: JobPayloadRegistry). (MonadMask m, MonadUnliftIO m, RegistryTables registry) => Proxy registry -> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m ()
- runSelectedWorkerPools :: (MonadMask m, MonadUnliftIO m) => TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m ()
- getEnabledQueues :: forall (registry :: JobPayloadRegistry). RegistryTables registry => String -> Proxy registry -> IO [Text]
- class JobResult a where
- encodeJobResult :: a -> Maybe Value
- decodeJobResult :: Value -> Either Text a
- module Arbiter.Worker.Config
- module Arbiter.Worker.BackoffStrategy
- module Arbiter.Worker.Logger
- module Arbiter.Worker.WorkerState
- data CronJob payload = CronJob {
- name :: Text
- cronExpression :: Text
- overlap :: OverlapPolicy
- backfill :: BackfillPolicy
- timezone :: Maybe Text
- builder :: TickKind -> UTCTime -> JobWrite payload
- data OverlapPolicy
- data BackfillPolicy
- cronJob :: Text -> Text -> OverlapPolicy -> (TickKind -> UTCTime -> JobWrite payload) -> Either String (CronJob payload)
- initCronSchedules :: MonadArbiter m => Text -> [CronJob payload] -> LogConfig -> m ()
- overlapPolicyToText :: OverlapPolicy -> Text
- overlapPolicyFromText :: Text -> Maybe OverlapPolicy
Running Workers
runWorkerPool :: forall m (registry :: JobPayloadRegistry) payload result. (JobResult result, MonadMask m, MonadUnliftIO m, QueueOperation m registry payload) => WorkerConfig m payload result -> m () Source #
Starts a worker pool with a dispatcher and N worker threads.
Multi-Queue Workers
data NamedWorkerPool (m :: Type -> Type) Source #
A worker pool paired with its queue name (derived from the registry).
allWorkers = [ namedWorkerPool emailConfig -- "email_jobs" , namedWorkerPool imageConfig -- "image_jobs" ] main = runWorkerPools (Proxy @MyRegistry) allWorkers (\_ -> pure ())
Constructors
| (JobResult result, QueueOperation m registry payload) => NamedWorkerPool | |
Fields
| |
namedWorkerPool :: forall (m :: Type -> Type) (registry :: JobPayloadRegistry) payload result. (JobResult result, QueueOperation m registry payload) => WorkerConfig m payload result -> NamedWorkerPool m Source #
Create a named worker pool, deriving the name from the type-level registry.
runWorkerPools :: forall m (registry :: JobPayloadRegistry). (MonadMask m, MonadUnliftIO m, RegistryTables registry) => Proxy registry -> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m () Source #
Run worker pools with shared shutdown state. Filters to queues listed
in ARBITER_ENABLED_QUEUES (all if unset). The setup action receives the
shared TVar for installing signal handlers.
runSelectedWorkerPools :: (MonadMask m, MonadUnliftIO m) => TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m () Source #
Run only the worker pools whose names appear in the enabled list.
Arguments
| :: forall (registry :: JobPayloadRegistry). RegistryTables registry | |
| => String | Environment variable name |
| -> Proxy registry | Registry proxy |
| -> IO [Text] |
Get enabled queues from an environment variable.
If the environment variable is set and non-empty, parses it as a comma-separated list of queue names. Each name is validated against the registry - invalid names cause an error. If not set or empty, returns all queue names from the registry.
Example:
-- With ENABLED_QUEUES="email_jobs,notifications" queues <- getEnabledQueues ENABLED_QUEUES (Proxy @MyRegistry) -- Returns: ["email_jobs", "notifications"] -- With ENABLED_QUEUES unset or empty queues <- getEnabledQueues ENABLED_QUEUES (Proxy @MyRegistry) -- Returns: all queues from registry -- With ENABLED_QUEUES="email_jobs,invalid_queue" -- Throws error: "Unknown queue names: invalid_queue"
Job Result
class JobResult a where Source #
Handler result types. () is fire-and-forget; any (ToJSON a, FromJSON a)
is stored in the results table when the job has a parent and decoded when
read by a rollup finalizer.
Methods
encodeJobResult :: a -> Maybe Value Source #
decodeJobResult :: Value -> Either Text a Source #
Instances
| JobResult () Source # | |
Defined in Arbiter.Worker Methods encodeJobResult :: () -> Maybe Value Source # decodeJobResult :: Value -> Either Text () Source # | |
| (FromJSON a, ToJSON a) => JobResult a Source # | |
Defined in Arbiter.Worker Methods encodeJobResult :: a -> Maybe Value Source # decodeJobResult :: Value -> Either Text a Source # | |
Re-exports
module Arbiter.Worker.Config
module Arbiter.Worker.Logger
module Arbiter.Worker.WorkerState
Cron
A cron schedule definition.
Use cronJob to construct (eagerly parses the cron expression). For a
non-UTC timezone, use cronJobInTimezone (also validates the tz name).
Constructors
| CronJob | |
Fields
| |
Instances
data OverlapPolicy Source #
How overlapping cron ticks are deduplicated.
Constructors
| SkipOverlap | At most one pending or running job per schedule. |
| AllowOverlap | One job per tick. Concurrent execution of prior ticks is allowed. |
Instances
| Generic OverlapPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Associated Types
| |||||
| Show OverlapPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods showsPrec :: Int -> OverlapPolicy -> ShowS # show :: OverlapPolicy -> String # showList :: [OverlapPolicy] -> ShowS # | |||||
| Eq OverlapPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods (==) :: OverlapPolicy -> OverlapPolicy -> Bool # (/=) :: OverlapPolicy -> OverlapPolicy -> Bool # | |||||
| type Rep OverlapPolicy Source # | |||||
data BackfillPolicy Source #
Bound on replay of missed ticks. Applies at startup and mid-flight.
Constructors
| NoBackfill | Drop missed minutes silently. Default. |
| Backfill NominalDiffTime | Replay missed minutes up to the given duration. |
Instances
| Generic BackfillPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Associated Types
Methods from :: BackfillPolicy -> Rep BackfillPolicy x # to :: Rep BackfillPolicy x -> BackfillPolicy # | |||||
| Show BackfillPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods showsPrec :: Int -> BackfillPolicy -> ShowS # show :: BackfillPolicy -> String # showList :: [BackfillPolicy] -> ShowS # | |||||
| Eq BackfillPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods (==) :: BackfillPolicy -> BackfillPolicy -> Bool # (/=) :: BackfillPolicy -> BackfillPolicy -> Bool # | |||||
| type Rep BackfillPolicy Source # | |||||
Defined in Arbiter.Worker.Cron type Rep BackfillPolicy = D1 ('MetaData "BackfillPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "NoBackfill" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Backfill" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 NominalDiffTime))) | |||||
Arguments
| :: Text | Schedule name (used in dedup keys and logging) |
| -> Text | Cron expression (5-field: minute hour day-of-month month day-of-week) |
| -> OverlapPolicy | |
| -> (TickKind -> UTCTime -> JobWrite payload) | Job builder. Receives the tick kind and the tick time. |
| -> Either String (CronJob payload) |
Smart constructor for CronJob. Parses the cron expression eagerly.
Returns Left with an error message if the cron expression is invalid.
cronJob "nightly-report" "0 3 * * *" SkipOverlap (\_kind _tick -> defaultJob (GenerateReport "nightly"))
The builder receives a TickKind (Live for the current minute, Replay
for any catch-up tick) and the tick time.
To enable backfill, set it via record update:
let Right cj = cronJob "nightly-report" "0 3 * * *" AllowOverlap
(\kind tick -> (defaultJob (GenerateReport tick))
{ priority = case kind of Replay -> 10; Live -> 0 })
in cj { backfill = Backfill 86400 } -- replay up to 24 hours
Note: cron expressions are evaluated in UTC. "0 3 * * *" fires at
03:00 UTC regardless of the server's local timezone.
initCronSchedules :: MonadArbiter m => Text -> [CronJob payload] -> LogConfig -> m () Source #
Upsert default expression and overlap for each CronJob into the
cron_schedules table. Preserves any user overrides and enabled state.
overlapPolicyToText :: OverlapPolicy -> Text Source #
Convert an OverlapPolicy to its text representation.
overlapPolicyFromText :: Text -> Maybe OverlapPolicy Source #
Parse an OverlapPolicy from text.