| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Worker
Description
Entry point for running a worker pool that fetches and executes jobs.
Synopsis
- runWorkerPool :: forall m (registry :: JobPayloadRegistry) payload result. (JobResult result, MonadMask m, MonadUnliftIO m, QueueOperation m registry payload) => WorkerConfig m payload result -> m ()
- data NamedWorkerPool (m :: Type -> Type) = (JobResult result, QueueOperation m registry payload) => NamedWorkerPool {
- workerPoolName :: Text
- workerPoolConfig :: WorkerConfig m payload result
- namedWorkerPool :: forall (m :: Type -> Type) (registry :: JobPayloadRegistry) payload result. (JobResult result, QueueOperation m registry payload) => WorkerConfig m payload result -> NamedWorkerPool m
- runWorkerPools :: forall m (registry :: JobPayloadRegistry). (MonadMask m, MonadUnliftIO m, RegistryTables registry) => Proxy registry -> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m ()
- runSelectedWorkerPools :: (MonadMask m, MonadUnliftIO m) => TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m ()
- getEnabledQueues :: forall (registry :: JobPayloadRegistry). RegistryTables registry => String -> Proxy registry -> IO [Text]
- class JobResult a where
- encodeJobResult :: a -> Maybe Value
- decodeJobResult :: Value -> Either Text a
- module Arbiter.Worker.Config
- module Arbiter.Worker.BackoffStrategy
- module Arbiter.Worker.Logger
- module Arbiter.Worker.WorkerState
- data CronJob payload = CronJob {
- name :: Text
- cronExpression :: Text
- schedule :: CronSchedule
- overlap :: OverlapPolicy
- builder :: UTCTime -> JobWrite payload
- data OverlapPolicy
- cronJob :: Text -> Text -> OverlapPolicy -> (UTCTime -> JobWrite payload) -> Either String (CronJob payload)
- initCronSchedules :: Connection -> Text -> [CronJob payload] -> LogConfig -> IO ()
- overlapPolicyToText :: OverlapPolicy -> Text
- overlapPolicyFromText :: Text -> Maybe OverlapPolicy
Running Workers
runWorkerPool :: forall m (registry :: JobPayloadRegistry) payload result. (JobResult result, MonadMask m, MonadUnliftIO m, QueueOperation m registry payload) => WorkerConfig m payload result -> m () Source #
Starts a worker pool with a dispatcher and N worker threads.
Multi-Queue Workers
data NamedWorkerPool (m :: Type -> Type) Source #
A worker pool bundled with its queue name from the type-level registry.
The queue name is derived from the registry at compile time, ensuring it stays in sync with the type-level definition.
Use namedWorkerPool to construct these, then runSelectedWorkerPools
to run only the ones matching a runtime configuration.
Example:
allWorkers :: [NamedWorkerPool (SimpleDb MyRegistry IO)] allWorkers = [ namedWorkerPool emailConfig -- name derived from registry: "email_jobs" , namedWorkerPool imageConfig -- name derived from registry: "image_jobs" , namedWorkerPool notifConfig -- name derived from registry: "notifications" ] main = runWorkerPools (Proxy @MyRegistry) allWorkers (\_ -> pure ())
Constructors
| (JobResult result, QueueOperation m registry payload) => NamedWorkerPool | |
Fields
| |
namedWorkerPool :: forall (m :: Type -> Type) (registry :: JobPayloadRegistry) payload result. (JobResult result, QueueOperation m registry payload) => WorkerConfig m payload result -> NamedWorkerPool m Source #
Create a named worker pool, deriving the name from the type-level registry.
runWorkerPools :: forall m (registry :: JobPayloadRegistry). (MonadMask m, MonadUnliftIO m, RegistryTables registry) => Proxy registry -> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m () Source #
Run worker pools with shared state for coordinated shutdown.
Creates shared state, passes it to setup action (for signal handlers), then runs pools. Reads enabled queues from ARBITER_ENABLED_QUEUES.
runSelectedWorkerPools :: (MonadMask m, MonadUnliftIO m) => TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m () Source #
Run only the worker pools whose names appear in the enabled list.
Arguments
| :: forall (registry :: JobPayloadRegistry). RegistryTables registry | |
| => String | Environment variable name |
| -> Proxy registry | Registry proxy |
| -> IO [Text] |
Get enabled queues from an environment variable.
If the environment variable is set and non-empty, parses it as a comma-separated list of queue names. Each name is validated against the registry - invalid names cause an error. If not set or empty, returns all queue names from the registry.
Example:
-- With ENABLED_QUEUES="email_jobs,notifications" queues <- getEnabledQueues ENABLED_QUEUES (Proxy @MyRegistry) -- Returns: ["email_jobs", "notifications"] -- With ENABLED_QUEUES unset or empty queues <- getEnabledQueues ENABLED_QUEUES (Proxy @MyRegistry) -- Returns: all queues from registry -- With ENABLED_QUEUES="email_jobs,invalid_queue" -- Throws error: "Unknown queue names: invalid_queue"
Job Result
class JobResult a where Source #
Handler result types. () is fire-and-forget; any (ToJSON a, FromJSON a)
is stored in the results table when the job has a parent and decoded when
read by a rollup finalizer.
Methods
encodeJobResult :: a -> Maybe Value Source #
decodeJobResult :: Value -> Either Text a Source #
Instances
| JobResult () Source # | |
Defined in Arbiter.Worker Methods encodeJobResult :: () -> Maybe Value Source # decodeJobResult :: Value -> Either Text () Source # | |
| (FromJSON a, ToJSON a) => JobResult a Source # | |
Defined in Arbiter.Worker Methods encodeJobResult :: a -> Maybe Value Source # decodeJobResult :: Value -> Either Text a Source # | |
Re-exports
module Arbiter.Worker.Config
module Arbiter.Worker.Logger
module Arbiter.Worker.WorkerState
Cron
A cron schedule definition.
Use cronJob to construct — it parses the cron expression eagerly,
so invalid expressions are caught at construction time.
Constructors
| CronJob | |
Fields
| |
Instances
data OverlapPolicy Source #
Determines how overlapping cron ticks are deduplicated.
Constructors
| SkipOverlap | At most one pending or running job per schedule name. |
| AllowOverlap | One job per tick. Allows concurrent execution of prior ticks. |
Instances
| Generic OverlapPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Associated Types
| |||||
| Show OverlapPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods showsPrec :: Int -> OverlapPolicy -> ShowS # show :: OverlapPolicy -> String # showList :: [OverlapPolicy] -> ShowS # | |||||
| Eq OverlapPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods (==) :: OverlapPolicy -> OverlapPolicy -> Bool # (/=) :: OverlapPolicy -> OverlapPolicy -> Bool # | |||||
| type Rep OverlapPolicy Source # | |||||
Arguments
| :: Text | Schedule name (used in dedup keys and logging) |
| -> Text | Cron expression (5-field: minute hour day-of-month month day-of-week) |
| -> OverlapPolicy | |
| -> (UTCTime -> JobWrite payload) | |
| -> Either String (CronJob payload) |
Smart constructor for CronJob. Parses the cron expression eagerly.
Returns Left with an error message if the cron expression is invalid.
Note: cron expressions are evaluated in UTC. "0 3 * * *" fires at
03:00 UTC regardless of the server's local timezone.
Example:
cronJob "nightly-report" "0 3 * * *" SkipOverlap (\_ -> defaultJob (GenerateReport "nightly"))
initCronSchedules :: Connection -> Text -> [CronJob payload] -> LogConfig -> IO () Source #
Initialize the cron_schedules table and upsert defaults for all cron jobs.
Called once at scheduler startup. Upserts default_expression and
default_overlap for each CronJob, preserving any user overrides and
enabled state.
overlapPolicyToText :: OverlapPolicy -> Text Source #
Convert an OverlapPolicy to its text representation.
overlapPolicyFromText :: Text -> Maybe OverlapPolicy Source #
Parse an OverlapPolicy from text.