| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Worker.Cron
Description
Cron scheduler for the Arbiter worker pool.
When cronJobs is non-empty in WorkerConfig, runWorkerPool spawns a
scheduler thread that inserts jobs on 5-field cron expressions.
Dedup keys prevent duplicate insertion across multiple worker instances.
Expressions default to UTC. Pass an IANA tz name (e.g. "America/New_York")
to cronJobInTimezone to evaluate in local time instead.
The scheduler consults the cron_schedules table each tick for runtime
overrides (expression, overlap, timezone, enabled).
Synopsis
- data CronJob payload = CronJob {
- name :: Text
- cronExpression :: Text
- overlap :: OverlapPolicy
- backfill :: BackfillPolicy
- timezone :: Maybe Text
- builder :: TickKind -> UTCTime -> JobWrite payload
- data OverlapPolicy
- data BackfillPolicy
- data TickKind
- cronJob :: Text -> Text -> OverlapPolicy -> (TickKind -> UTCTime -> JobWrite payload) -> Either String (CronJob payload)
- cronJobInTimezone :: Text -> Text -> Text -> OverlapPolicy -> (TickKind -> UTCTime -> JobWrite payload) -> Either String (CronJob payload)
- overlapPolicyToText :: OverlapPolicy -> Text
- overlapPolicyFromText :: Text -> Maybe OverlapPolicy
- resolveTZ :: Text -> Maybe TZ
- matchesInTimezone :: Maybe Text -> CronSchedule -> UTCTime -> Bool
- formatMinuteInTimezone :: Maybe Text -> UTCTime -> Text
- runCronScheduler :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => TVar WorkerState -> LogConfig -> Text -> [CronJob payload] -> m ()
- initCronSchedules :: MonadArbiter m => Text -> [CronJob payload] -> LogConfig -> m ()
- processCronCatchUp :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => LogConfig -> Text -> [CronJob payload] -> UTCTime -> m ()
- enumerateCatchUpTicks :: BackfillPolicy -> Maybe UTCTime -> UTCTime -> [UTCTime]
- truncateToMinute :: UTCTime -> UTCTime
- formatMinute :: UTCTime -> Text
- makeDedupKey :: CronJob payload -> UTCTime -> Text
- computeDelayMicros :: UTCTime -> Int
- enumMinutes :: UTCTime -> UTCTime -> [UTCTime]
Types
A cron schedule definition.
Use cronJob to construct (eagerly parses the cron expression). For a
non-UTC timezone, use cronJobInTimezone (also validates the tz name).
Constructors
| CronJob | |
Fields
| |
Instances
data OverlapPolicy Source #
How overlapping cron ticks are deduplicated.
Constructors
| SkipOverlap | At most one pending or running job per schedule. |
| AllowOverlap | One job per tick. Concurrent execution of prior ticks is allowed. |
Instances
| Generic OverlapPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Associated Types
| |||||
| Show OverlapPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods showsPrec :: Int -> OverlapPolicy -> ShowS # show :: OverlapPolicy -> String # showList :: [OverlapPolicy] -> ShowS # | |||||
| Eq OverlapPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods (==) :: OverlapPolicy -> OverlapPolicy -> Bool # (/=) :: OverlapPolicy -> OverlapPolicy -> Bool # | |||||
| type Rep OverlapPolicy Source # | |||||
data BackfillPolicy Source #
Bound on replay of missed ticks. Applies at startup and mid-flight.
Constructors
| NoBackfill | Drop missed minutes silently. Default. |
| Backfill NominalDiffTime | Replay missed minutes up to the given duration. |
Instances
| Generic BackfillPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Associated Types
Methods from :: BackfillPolicy -> Rep BackfillPolicy x # to :: Rep BackfillPolicy x -> BackfillPolicy # | |||||
| Show BackfillPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods showsPrec :: Int -> BackfillPolicy -> ShowS # show :: BackfillPolicy -> String # showList :: [BackfillPolicy] -> ShowS # | |||||
| Eq BackfillPolicy Source # | |||||
Defined in Arbiter.Worker.Cron Methods (==) :: BackfillPolicy -> BackfillPolicy -> Bool # (/=) :: BackfillPolicy -> BackfillPolicy -> Bool # | |||||
| type Rep BackfillPolicy Source # | |||||
Defined in Arbiter.Worker.Cron type Rep BackfillPolicy = D1 ('MetaData "BackfillPolicy" "Arbiter.Worker.Cron" "arbiter-worker-0.1.0.0-inplace" 'False) (C1 ('MetaCons "NoBackfill" 'PrefixI 'False) (U1 :: Type -> Type) :+: C1 ('MetaCons "Backfill" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 NominalDiffTime))) | |||||
Whether a tick is for the current minute or a replay of a past minute.
Smart Constructors
Arguments
| :: Text | Schedule name (used in dedup keys and logging) |
| -> Text | Cron expression (5-field: minute hour day-of-month month day-of-week) |
| -> OverlapPolicy | |
| -> (TickKind -> UTCTime -> JobWrite payload) | Job builder. Receives the tick kind and the tick time. |
| -> Either String (CronJob payload) |
Smart constructor for CronJob. Parses the cron expression eagerly.
Returns Left with an error message if the cron expression is invalid.
cronJob "nightly-report" "0 3 * * *" SkipOverlap (\_kind _tick -> defaultJob (GenerateReport "nightly"))
The builder receives a TickKind (Live for the current minute, Replay
for any catch-up tick) and the tick time.
To enable backfill, set it via record update:
let Right cj = cronJob "nightly-report" "0 3 * * *" AllowOverlap
(\kind tick -> (defaultJob (GenerateReport tick))
{ priority = case kind of Replay -> 10; Live -> 0 })
in cj { backfill = Backfill 86400 } -- replay up to 24 hours
Note: cron expressions are evaluated in UTC. "0 3 * * *" fires at
03:00 UTC regardless of the server's local timezone.
Arguments
| :: Text | Schedule name |
| -> Text | IANA tz name (e.g. |
| -> Text | Cron expression (5-field) |
| -> OverlapPolicy | |
| -> (TickKind -> UTCTime -> JobWrite payload) | |
| -> Either String (CronJob payload) |
Like cronJob but evaluated in a specific timezone. The tz name is
validated eagerly against the bundled tzdata database.
Helpers
overlapPolicyToText :: OverlapPolicy -> Text Source #
Convert an OverlapPolicy to its text representation.
overlapPolicyFromText :: Text -> Maybe OverlapPolicy Source #
Parse an OverlapPolicy from text.
formatMinuteInTimezone :: Maybe Text -> UTCTime -> Text Source #
Format a UTC tick as YYYY-MM-DDTHH:MM in the given timezone.
DST fall-back maps two UTC instants to the same local minute, so dedup
keys built from this collapse to a single fire.
Internal
runCronScheduler :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => TVar WorkerState -> LogConfig -> Text -> [CronJob payload] -> m () Source #
Scheduler entry point. Exits cleanly when the worker state becomes
ShuttingDown so graceful shutdown stops creating new jobs.
initCronSchedules :: MonadArbiter m => Text -> [CronJob payload] -> LogConfig -> m () Source #
Upsert default expression and overlap for each CronJob into the
cron_schedules table. Preserves any user overrides and enabled state.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) | |
| => LogConfig | |
| -> Text | |
| -> [CronJob payload] | |
| -> UTCTime | Current wall-clock time |
| -> m () |
Scheduler catch-up step. Each cron is processed in its own transaction. Backfill schedules hold a per-(schema, name) advisory lock so concurrent pools can't interleave gate updates across a backfill window.
enumerateCatchUpTicks :: BackfillPolicy -> Maybe UTCTime -> UTCTime -> [UTCTime] Source #
Minutes to evaluate for a processCronCatchUp call. Returns [] when
the watermark is already at or past currentTick (preventing re-fires
after job processing).
truncateToMinute :: UTCTime -> UTCTime Source #
Truncate a UTCTime to the current minute (zero out seconds).
makeDedupKey :: CronJob payload -> UTCTime -> Text Source #
Dedup key for a cron job, from its code-defined overlap and timezone.
computeDelayMicros :: UTCTime -> Int Source #
Compute the delay in microseconds until the next minute boundary,
clamped to [0, 120_000_000].