arbiter-worker-0.1.0.0: Worker framework for arbiter
Safe HaskellNone
LanguageGHC2024

Arbiter.Worker

Description

Entry point for running a worker pool that fetches and executes jobs.

Synopsis

Running Workers

runWorkerPool :: forall m (registry :: JobPayloadRegistry) payload result. (JobResult result, MonadMask m, MonadUnliftIO m, QueueOperation m registry payload) => WorkerConfig m payload result -> m () Source #

Starts a worker pool with a dispatcher and N worker threads.

Multi-Queue Workers

data NamedWorkerPool (m :: Type -> Type) Source #

A worker pool bundled with its queue name from the type-level registry.

The queue name is derived from the registry at compile time, ensuring it stays in sync with the type-level definition.

Use namedWorkerPool to construct these, then runSelectedWorkerPools to run only the ones matching a runtime configuration.

Example:

allWorkers :: [NamedWorkerPool (SimpleDb MyRegistry IO)]
allWorkers =
  [ namedWorkerPool emailConfig      -- name derived from registry: "email_jobs"
  , namedWorkerPool imageConfig      -- name derived from registry: "image_jobs"
  , namedWorkerPool notifConfig      -- name derived from registry: "notifications"
  ]

main = runWorkerPools (Proxy @MyRegistry) allWorkers (\_ -> pure ())

Constructors

(JobResult result, QueueOperation m registry payload) => NamedWorkerPool 

Fields

namedWorkerPool :: forall (m :: Type -> Type) (registry :: JobPayloadRegistry) payload result. (JobResult result, QueueOperation m registry payload) => WorkerConfig m payload result -> NamedWorkerPool m Source #

Create a named worker pool, deriving the name from the type-level registry.

runWorkerPools :: forall m (registry :: JobPayloadRegistry). (MonadMask m, MonadUnliftIO m, RegistryTables registry) => Proxy registry -> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m () Source #

Run worker pools with shared state for coordinated shutdown.

Creates shared state, passes it to setup action (for signal handlers), then runs pools. Reads enabled queues from ARBITER_ENABLED_QUEUES.

runSelectedWorkerPools :: (MonadMask m, MonadUnliftIO m) => TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m () Source #

Run only the worker pools whose names appear in the enabled list.

getEnabledQueues Source #

Arguments

:: forall (registry :: JobPayloadRegistry). RegistryTables registry 
=> String

Environment variable name

-> Proxy registry

Registry proxy

-> IO [Text] 

Get enabled queues from an environment variable.

If the environment variable is set and non-empty, parses it as a comma-separated list of queue names. Each name is validated against the registry - invalid names cause an error. If not set or empty, returns all queue names from the registry.

Example:

-- With ENABLED_QUEUES="email_jobs,notifications"
queues <- getEnabledQueues ENABLED_QUEUES (Proxy @MyRegistry)
-- Returns: ["email_jobs", "notifications"]

-- With ENABLED_QUEUES unset or empty
queues <- getEnabledQueues ENABLED_QUEUES (Proxy @MyRegistry)
-- Returns: all queues from registry

-- With ENABLED_QUEUES="email_jobs,invalid_queue"
-- Throws error: "Unknown queue names: invalid_queue"

Job Result

class JobResult a where Source #

Handler result types. () is fire-and-forget; any (ToJSON a, FromJSON a) is stored in the results table when the job has a parent and decoded when read by a rollup finalizer.

Methods

encodeJobResult :: a -> Maybe Value Source #

decodeJobResult :: Value -> Either Text a Source #

Instances

Instances details
JobResult () Source # 
Instance details

Defined in Arbiter.Worker

Methods

encodeJobResult :: () -> Maybe Value Source #

decodeJobResult :: Value -> Either Text () Source #

(FromJSON a, ToJSON a) => JobResult a Source # 
Instance details

Defined in Arbiter.Worker

Methods

encodeJobResult :: a -> Maybe Value Source #

decodeJobResult :: Value -> Either Text a Source #

Re-exports

Cron

data CronJob payload Source #

A cron schedule definition.

Use cronJob to construct — it parses the cron expression eagerly, so invalid expressions are caught at construction time.

Constructors

CronJob 

Fields

Instances

Instances details
Generic (CronJob payload) Source # 
Instance details

Defined in Arbiter.Worker.Cron

Associated Types

type Rep (CronJob payload) 
Instance details

Defined in Arbiter.Worker.Cron

type Rep (CronJob payload) = D1 ('MetaData "CronJob" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "CronJob" 'PrefixI 'True) ((S1 ('MetaSel ('Just "name") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text) :*: S1 ('MetaSel ('Just "cronExpression") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :*: (S1 ('MetaSel ('Just "schedule") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 CronSchedule) :*: (S1 ('MetaSel ('Just "overlap") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 OverlapPolicy) :*: S1 ('MetaSel ('Just "builder") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (UTCTime -> JobWrite payload))))))

Methods

from :: CronJob payload -> Rep (CronJob payload) x #

to :: Rep (CronJob payload) x -> CronJob payload #

type Rep (CronJob payload) Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep (CronJob payload) = D1 ('MetaData "CronJob" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "CronJob" 'PrefixI 'True) ((S1 ('MetaSel ('Just "name") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text) :*: S1 ('MetaSel ('Just "cronExpression") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :*: (S1 ('MetaSel ('Just "schedule") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 CronSchedule) :*: (S1 ('MetaSel ('Just "overlap") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 OverlapPolicy) :*: S1 ('MetaSel ('Just "builder") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (UTCTime -> JobWrite payload))))))

data OverlapPolicy Source #

Determines how overlapping cron ticks are deduplicated.

Constructors

SkipOverlap

At most one pending or running job per schedule name.

AllowOverlap

One job per tick. Allows concurrent execution of prior ticks.

Instances

Instances details
Generic OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Associated Types

type Rep OverlapPolicy 
Instance details

Defined in Arbiter.Worker.Cron

type Rep OverlapPolicy = D1 ('MetaData "OverlapPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "SkipOverlap" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "AllowOverlap" 'PrefixI 'False) (U1 :: Type -> Type))
Show OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Eq OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep OverlapPolicy = D1 ('MetaData "OverlapPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "SkipOverlap" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "AllowOverlap" 'PrefixI 'False) (U1 :: Type -> Type))

cronJob Source #

Arguments

:: Text

Schedule name (used in dedup keys and logging)

-> Text

Cron expression (5-field: minute hour day-of-month month day-of-week)

-> OverlapPolicy 
-> (UTCTime -> JobWrite payload) 
-> Either String (CronJob payload) 

Smart constructor for CronJob. Parses the cron expression eagerly.

Returns Left with an error message if the cron expression is invalid.

Note: cron expressions are evaluated in UTC. "0 3 * * *" fires at 03:00 UTC regardless of the server's local timezone.

Example:

cronJob "nightly-report" "0 3 * * *" SkipOverlap
  (\_ -> defaultJob (GenerateReport "nightly"))

initCronSchedules :: Connection -> Text -> [CronJob payload] -> LogConfig -> IO () Source #

Initialize the cron_schedules table and upsert defaults for all cron jobs.

Called once at scheduler startup. Upserts default_expression and default_overlap for each CronJob, preserving any user overrides and enabled state.

overlapPolicyToText :: OverlapPolicy -> Text Source #

Convert an OverlapPolicy to its text representation.