arbiter-worker-0.1.0.0: Worker framework for arbiter
Safe HaskellNone
LanguageGHC2024

Arbiter.Worker.Cron

Description

Cron scheduler for the Arbiter worker pool.

When cronJobs is non-empty in WorkerConfig, runWorkerPool spawns a scheduler thread that inserts jobs on 5-field cron expressions. Dedup keys prevent duplicate insertion across multiple worker instances.

Expressions default to UTC. Pass an IANA tz name (e.g. "America/New_York") to cronJobInTimezone to evaluate in local time instead.

The scheduler consults the cron_schedules table each tick for runtime overrides (expression, overlap, timezone, enabled).

Synopsis

Types

data CronJob payload Source #

A cron schedule definition.

Use cronJob to construct (eagerly parses the cron expression). For a non-UTC timezone, use cronJobInTimezone (also validates the tz name).

Constructors

CronJob 

Fields

Instances

Instances details
Generic (CronJob payload) Source # 
Instance details

Defined in Arbiter.Worker.Cron

Associated Types

type Rep (CronJob payload) 
Instance details

Defined in Arbiter.Worker.Cron

Methods

from :: CronJob payload -> Rep (CronJob payload) x #

to :: Rep (CronJob payload) x -> CronJob payload #

type Rep (CronJob payload) Source # 
Instance details

Defined in Arbiter.Worker.Cron

data OverlapPolicy Source #

How overlapping cron ticks are deduplicated.

Constructors

SkipOverlap

At most one pending or running job per schedule.

AllowOverlap

One job per tick. Concurrent execution of prior ticks is allowed.

Instances

Instances details
Generic OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Associated Types

type Rep OverlapPolicy 
Instance details

Defined in Arbiter.Worker.Cron

type Rep OverlapPolicy = D1 ('MetaData "OverlapPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "SkipOverlap" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "AllowOverlap" 'PrefixI 'False) (U1 :: Type -> Type))
Show OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Eq OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep OverlapPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep OverlapPolicy = D1 ('MetaData "OverlapPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "SkipOverlap" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "AllowOverlap" 'PrefixI 'False) (U1 :: Type -> Type))

data BackfillPolicy Source #

Bound on replay of missed ticks. Applies at startup and mid-flight.

Constructors

NoBackfill

Drop missed minutes silently. Default.

Backfill NominalDiffTime

Replay missed minutes up to the given duration.

Instances

Instances details
Generic BackfillPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Associated Types

type Rep BackfillPolicy 
Instance details

Defined in Arbiter.Worker.Cron

type Rep BackfillPolicy = D1 ('MetaData "BackfillPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "NoBackfill" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Backfill" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 NominalDiffTime)))
Show BackfillPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

Eq BackfillPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep BackfillPolicy Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep BackfillPolicy = D1 ('MetaData "BackfillPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "NoBackfill" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Backfill" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 NominalDiffTime)))

data TickKind Source #

Whether a tick is for the current minute or a replay of a past minute.

Constructors

Live 
Replay 

Instances

Instances details
Generic TickKind Source # 
Instance details

Defined in Arbiter.Worker.Cron

Associated Types

type Rep TickKind 
Instance details

Defined in Arbiter.Worker.Cron

type Rep TickKind = D1 ('MetaData "TickKind" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "Live" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Replay" 'PrefixI 'False) (U1 :: Type -> Type))

Methods

from :: TickKind -> Rep TickKind x #

to :: Rep TickKind x -> TickKind #

Show TickKind Source # 
Instance details

Defined in Arbiter.Worker.Cron

Eq TickKind Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep TickKind Source # 
Instance details

Defined in Arbiter.Worker.Cron

type Rep TickKind = D1 ('MetaData "TickKind" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "Live" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Replay" 'PrefixI 'False) (U1 :: Type -> Type))

Smart Constructors

cronJob Source #

Arguments

:: Text

Schedule name (used in dedup keys and logging)

-> Text

Cron expression (5-field: minute hour day-of-month month day-of-week)

-> OverlapPolicy 
-> (TickKind -> UTCTime -> JobWrite payload)

Job builder. Receives the tick kind and the tick time.

-> Either String (CronJob payload) 

Smart constructor for CronJob. Parses the cron expression eagerly.

Returns Left with an error message if the cron expression is invalid.

cronJob "nightly-report" "0 3 * * *" SkipOverlap
  (\_kind _tick -> defaultJob (GenerateReport "nightly"))

The builder receives a TickKind (Live for the current minute, Replay for any catch-up tick) and the tick time.

To enable backfill, set it via record update:

let Right cj = cronJob "nightly-report" "0 3 * * *" AllowOverlap
      (\kind tick -> (defaultJob (GenerateReport tick))
         { priority = case kind of Replay -> 10; Live -> 0 })
in cj { backfill = Backfill 86400 }  -- replay up to 24 hours

Note: cron expressions are evaluated in UTC. "0 3 * * *" fires at 03:00 UTC regardless of the server's local timezone.

cronJobInTimezone Source #

Arguments

:: Text

Schedule name

-> Text

IANA tz name (e.g. "America/New_York")

-> Text

Cron expression (5-field)

-> OverlapPolicy 
-> (TickKind -> UTCTime -> JobWrite payload) 
-> Either String (CronJob payload) 

Like cronJob but evaluated in a specific timezone. The tz name is validated eagerly against the bundled tzdata database.

Helpers

overlapPolicyToText :: OverlapPolicy -> Text Source #

Convert an OverlapPolicy to its text representation.

resolveTZ :: Text -> Maybe TZ Source #

Look up an IANA tz name in the bundled tzdata database.

matchesInTimezone :: Maybe Text -> CronSchedule -> UTCTime -> Bool Source #

Match a cron schedule against a UTC tick, evaluated in tz. Nothing means UTC. An unknown tz name returns False.

formatMinuteInTimezone :: Maybe Text -> UTCTime -> Text Source #

Format a UTC tick as YYYY-MM-DDTHH:MM in the given timezone. DST fall-back maps two UTC instants to the same local minute, so dedup keys built from this collapse to a single fire.

Internal

runCronScheduler :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => TVar WorkerState -> LogConfig -> Text -> [CronJob payload] -> m () Source #

Scheduler entry point. Exits cleanly when the worker state becomes ShuttingDown so graceful shutdown stops creating new jobs.

initCronSchedules :: MonadArbiter m => Text -> [CronJob payload] -> LogConfig -> m () Source #

Upsert default expression and overlap for each CronJob into the cron_schedules table. Preserves any user overrides and enabled state.

processCronCatchUp Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) 
=> LogConfig 
-> Text 
-> [CronJob payload] 
-> UTCTime

Current wall-clock time

-> m () 

Scheduler catch-up step. Each cron is processed in its own transaction. Backfill schedules hold a per-(schema, name) advisory lock so concurrent pools can't interleave gate updates across a backfill window.

enumerateCatchUpTicks :: BackfillPolicy -> Maybe UTCTime -> UTCTime -> [UTCTime] Source #

Minutes to evaluate for a processCronCatchUp call. Returns [] when the watermark is already at or past currentTick (preventing re-fires after job processing).

truncateToMinute :: UTCTime -> UTCTime Source #

Truncate a UTCTime to the current minute (zero out seconds).

formatMinute :: UTCTime -> Text Source #

Format a UTCTime as YYYY-MM-DDTHH:MM for dedup key buckets.

makeDedupKey :: CronJob payload -> UTCTime -> Text Source #

Dedup key for a cron job, from its code-defined overlap and timezone.

computeDelayMicros :: UTCTime -> Int Source #

Compute the delay in microseconds until the next minute boundary, clamped to [0, 120_000_000].

enumMinutes :: UTCTime -> UTCTime -> [UTCTime] Source #

Enumerate all minute-boundary times from start through end (inclusive). Both start and end should be truncated to minute boundaries.