arbiter-worker-0.1.0.0: Worker framework for arbiter
Safe HaskellNone
LanguageGHC2024

Arbiter.Worker

Description

Entry point for running a worker pool that fetches and executes jobs.

Synopsis

Running Workers

runWorkerPool :: forall m (registry :: JobPayloadRegistry) payload result. (JobResult result, MonadMask m, MonadUnliftIO m, QueueOperation m registry payload) => WorkerConfig m payload result -> m () Source #

Starts a worker pool with a dispatcher and N worker threads.

Multi-Queue Workers

data NamedWorkerPool (m :: Type -> Type) Source #

A worker pool paired with its queue name (derived from the registry).

allWorkers =
  [ namedWorkerPool emailConfig      -- "email_jobs"
  , namedWorkerPool imageConfig      -- "image_jobs"
  ]

main = runWorkerPools (Proxy @MyRegistry) allWorkers (\_ -> pure ())

Constructors

(JobResult result, QueueOperation m registry payload) => NamedWorkerPool 

Fields

namedWorkerPool :: forall (m :: Type -> Type) (registry :: JobPayloadRegistry) payload result. (JobResult result, QueueOperation m registry payload) => WorkerConfig m payload result -> NamedWorkerPool m Source #

Create a named worker pool, deriving the name from the type-level registry.

runWorkerPools :: forall m (registry :: JobPayloadRegistry). (MonadMask m, MonadUnliftIO m, RegistryTables registry) => Proxy registry -> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m () Source #

Run worker pools with shared shutdown state. Filters to queues listed in ARBITER_ENABLED_QUEUES (all if unset). The setup action receives the shared TVar for installing signal handlers.

runSelectedWorkerPools :: (MonadMask m, MonadUnliftIO m) => TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m () Source #

Run only the worker pools whose names appear in the enabled list.

getEnabledQueues Source #

Arguments

:: forall (registry :: JobPayloadRegistry). RegistryTables registry 
=> String

Environment variable name

-> Proxy registry

Registry proxy

-> IO [Text] 

Get enabled queues from an environment variable.

If the environment variable is set and non-empty, parses it as a comma-separated list of queue names. Each name is validated against the registry - invalid names cause an error. If not set or empty, returns all queue names from the registry.

Example:

-- With ENABLED_QUEUES="email_jobs,notifications"
queues <- getEnabledQueues ENABLED_QUEUES (Proxy @MyRegistry)
-- Returns: ["email_jobs", "notifications"]

-- With ENABLED_QUEUES unset or empty
queues <- getEnabledQueues ENABLED_QUEUES (Proxy @MyRegistry)
-- Returns: all queues from registry

-- With ENABLED_QUEUES="email_jobs,invalid_queue"
-- Throws error: "Unknown queue names: invalid_queue"

Job Result

class JobResult a where Source #

Handler result types. () is fire-and-forget; any (ToJSON a, FromJSON a) is stored in the results table when the job has a parent and decoded when read by a rollup finalizer.

Methods

encodeJobResult :: a -> Maybe Value Source #

decodeJobResult :: Value -> Either Text a Source #

Instances

Instances details
JobResult () Source # 
Instance details

Defined in Arbiter.Worker

Methods

encodeJobResult :: () -> Maybe Value Source #

decodeJobResult :: Value -> Either Text () Source #

(FromJSON a, ToJSON a) => JobResult a Source # 
Instance details

Defined in Arbiter.Worker

Methods

encodeJobResult :: a -> Maybe Value Source #

decodeJobResult :: Value -> Either Text a Source #

Re-exports

Cron

data CronJob payload Source #

A cron schedule definition.

Use cronJob to construct (eagerly parses the cron expression). For a non-UTC timezone, use cronJobInTimezone (also validates the tz name).

Constructors

CronJob 

Fields

Instances

Instances details
Generic (CronJob payload) Source # 
Instance details

Defined in Arbiter.Worker.Cron

Associated Types

type Rep (CronJob payload) 
Instance details

Defined in Arbiter.Worker.Cron

Methods

from :: CronJob payload -> Rep (CronJob payload) x #

to :: Rep (CronJob payload) x -> CronJob payload #

type Rep (CronJob payload) Source # 
Instance details

Defined in Arbiter.Worker.Cron

data OverlapPolicy Source #

How overlapping cron ticks are deduplicated.

Constructors

SkipOverlap

At most one pending or running job per schedule.

AllowOverlap

One job per tick. Concurrent execution of prior ticks is allowed.

Instances

Instances details
Generic OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Associated Types

type Rep OverlapPolicy 
Instance details

Defined in Arbiter.Worker.Cron

type Rep OverlapPolicy = D1 ('MetaData "OverlapPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "SkipOverlap" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "AllowOverlap" 'PrefixI 'False) (U1 :: Type -> Type))
Show OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Eq OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep OverlapPolicy = D1 ('MetaData "OverlapPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "SkipOverlap" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "AllowOverlap" 'PrefixI 'False) (U1 :: Type -> Type))

data BackfillPolicy Source #

Bound on replay of missed ticks. Applies at startup and mid-flight.

Constructors

NoBackfill

Drop missed minutes silently. Default.

Backfill NominalDiffTime

Replay missed minutes up to the given duration.

Instances

Instances details
Generic BackfillPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Associated Types

type Rep BackfillPolicy 
Instance details

Defined in Arbiter.Worker.Cron

type Rep BackfillPolicy = D1 ('MetaData "BackfillPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "NoBackfill" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Backfill" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 NominalDiffTime)))
Show BackfillPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Eq BackfillPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep BackfillPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep BackfillPolicy = D1 ('MetaData "BackfillPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "NoBackfill" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Backfill" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 NominalDiffTime)))

cronJob Source #

Arguments

:: Text

Schedule name (used in dedup keys and logging)

-> Text

Cron expression (5-field: minute hour day-of-month month day-of-week)

-> OverlapPolicy 
-> (TickKind -> UTCTime -> JobWrite payload)

Job builder. Receives the tick kind and the tick time.

-> Either String (CronJob payload) 

Smart constructor for CronJob. Parses the cron expression eagerly.

Returns Left with an error message if the cron expression is invalid.

cronJob "nightly-report" "0 3 * * *" SkipOverlap
  (\_kind _tick -> defaultJob (GenerateReport "nightly"))

The builder receives a TickKind (Live for the current minute, Replay for any catch-up tick) and the tick time.

To enable backfill, set it via record update:

let Right cj = cronJob "nightly-report" "0 3 * * *" AllowOverlap
      (\kind tick -> (defaultJob (GenerateReport tick))
         { priority = case kind of Replay -> 10; Live -> 0 })
in cj { backfill = Backfill 86400 }  -- replay up to 24 hours

Note: cron expressions are evaluated in UTC. "0 3 * * *" fires at 03:00 UTC regardless of the server's local timezone.

initCronSchedules :: MonadArbiter m => Text -> [CronJob payload] -> LogConfig -> m () Source #

Upsert default expression and overlap for each CronJob into the cron_schedules table. Preserves any user overrides and enabled state.

overlapPolicyToText :: OverlapPolicy -> Text Source #

Convert an OverlapPolicy to its text representation.