{-# LANGUAGE OverloadedStrings #-}

-- | Cron scheduler for the Arbiter worker pool.
--
-- When 'cronJobs' is non-empty in 'WorkerConfig', 'runWorkerPool' spawns a
-- scheduler thread that inserts jobs on 5-field cron expressions.
-- Dedup keys prevent duplicate insertion across multiple worker instances.
--
-- Expressions default to UTC. Pass an IANA tz name (e.g. @\"America\/New_York\"@)
-- to 'cronJobInTimezone' to evaluate in local time instead.
--
-- The scheduler consults the @cron_schedules@ table each tick for runtime
-- overrides (expression, overlap, timezone, enabled).
module Arbiter.Worker.Cron
  ( -- * Types
    CronJob (..)
  , OverlapPolicy (..)
  , BackfillPolicy (..)
  , TickKind (..)

    -- * Smart Constructors
  , cronJob
  , cronJobInTimezone

    -- * Helpers
  , overlapPolicyToText
  , overlapPolicyFromText
  , resolveTZ
  , matchesInTimezone
  , formatMinuteInTimezone

    -- * Internal
  , runCronScheduler
  , initCronSchedules
  , processCronCatchUp
  , enumerateCatchUpTicks
  , truncateToMinute
  , formatMinute
  , makeDedupKey
  , computeDelayMicros
  , enumMinutes
  ) where

import Arbiter.Core.CronSchedule qualified as CS
import Arbiter.Core.HighLevel (QueueOperation)
import Arbiter.Core.HighLevel qualified as HL
import Arbiter.Core.Job.Types (DedupKey (IgnoreDuplicate), JobWrite, dedupKey)
import Arbiter.Core.MonadArbiter (MonadArbiter, withDbTransaction)
import Arbiter.Core.Operations qualified as Ops
import Control.Concurrent.STM (retry)
import Control.Monad (forM_, unless, void, when)
import Control.Monad.IO.Class (MonadIO)
import Data.List (unfoldr)
import Data.Maybe (fromMaybe, isNothing)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.Encoding (encodeUtf8)
import Data.Time
  ( NominalDiffTime
  , UTCTime (..)
  , addUTCTime
  , defaultTimeLocale
  , diffUTCTime
  , formatTime
  , getCurrentTime
  , localTimeToUTC
  , secondsToDiffTime
  , utc
  )
import Data.Time.Zones (TZ, utcToLocalTimeTZ)
import Data.Time.Zones.All (fromTZName, tzByLabel)
import GHC.Generics (Generic)
import System.Cron (CronSchedule, parseCronSchedule, scheduleMatches)
import UnliftIO (MonadUnliftIO, TVar, atomically, liftIO, readTVar, readTVarIO, registerDelay, tryAny)

import Arbiter.Worker.Logger (LogConfig, LogLevel (..))
import Arbiter.Worker.Logger.Internal (tryLog)
import Arbiter.Worker.WorkerState (WorkerState (..))

-- | How overlapping cron ticks are deduplicated.
data OverlapPolicy
  = -- | At most one pending or running job per schedule.
    SkipOverlap
  | -- | One job per tick. Concurrent execution of prior ticks is allowed.
    AllowOverlap
  deriving stock (OverlapPolicy -> OverlapPolicy -> Bool
(OverlapPolicy -> OverlapPolicy -> Bool)
-> (OverlapPolicy -> OverlapPolicy -> Bool) -> Eq OverlapPolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: OverlapPolicy -> OverlapPolicy -> Bool
== :: OverlapPolicy -> OverlapPolicy -> Bool
$c/= :: OverlapPolicy -> OverlapPolicy -> Bool
/= :: OverlapPolicy -> OverlapPolicy -> Bool
Eq, (forall x. OverlapPolicy -> Rep OverlapPolicy x)
-> (forall x. Rep OverlapPolicy x -> OverlapPolicy)
-> Generic OverlapPolicy
forall x. Rep OverlapPolicy x -> OverlapPolicy
forall x. OverlapPolicy -> Rep OverlapPolicy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. OverlapPolicy -> Rep OverlapPolicy x
from :: forall x. OverlapPolicy -> Rep OverlapPolicy x
$cto :: forall x. Rep OverlapPolicy x -> OverlapPolicy
to :: forall x. Rep OverlapPolicy x -> OverlapPolicy
Generic, Int -> OverlapPolicy -> ShowS
[OverlapPolicy] -> ShowS
OverlapPolicy -> String
(Int -> OverlapPolicy -> ShowS)
-> (OverlapPolicy -> String)
-> ([OverlapPolicy] -> ShowS)
-> Show OverlapPolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> OverlapPolicy -> ShowS
showsPrec :: Int -> OverlapPolicy -> ShowS
$cshow :: OverlapPolicy -> String
show :: OverlapPolicy -> String
$cshowList :: [OverlapPolicy] -> ShowS
showList :: [OverlapPolicy] -> ShowS
Show)

-- | Bound on replay of missed ticks. Applies at startup and mid-flight.
data BackfillPolicy
  = -- | Drop missed minutes silently. Default.
    NoBackfill
  | -- | Replay missed minutes up to the given duration.
    Backfill NominalDiffTime
  deriving stock (BackfillPolicy -> BackfillPolicy -> Bool
(BackfillPolicy -> BackfillPolicy -> Bool)
-> (BackfillPolicy -> BackfillPolicy -> Bool) -> Eq BackfillPolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BackfillPolicy -> BackfillPolicy -> Bool
== :: BackfillPolicy -> BackfillPolicy -> Bool
$c/= :: BackfillPolicy -> BackfillPolicy -> Bool
/= :: BackfillPolicy -> BackfillPolicy -> Bool
Eq, (forall x. BackfillPolicy -> Rep BackfillPolicy x)
-> (forall x. Rep BackfillPolicy x -> BackfillPolicy)
-> Generic BackfillPolicy
forall x. Rep BackfillPolicy x -> BackfillPolicy
forall x. BackfillPolicy -> Rep BackfillPolicy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. BackfillPolicy -> Rep BackfillPolicy x
from :: forall x. BackfillPolicy -> Rep BackfillPolicy x
$cto :: forall x. Rep BackfillPolicy x -> BackfillPolicy
to :: forall x. Rep BackfillPolicy x -> BackfillPolicy
Generic, Int -> BackfillPolicy -> ShowS
[BackfillPolicy] -> ShowS
BackfillPolicy -> String
(Int -> BackfillPolicy -> ShowS)
-> (BackfillPolicy -> String)
-> ([BackfillPolicy] -> ShowS)
-> Show BackfillPolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BackfillPolicy -> ShowS
showsPrec :: Int -> BackfillPolicy -> ShowS
$cshow :: BackfillPolicy -> String
show :: BackfillPolicy -> String
$cshowList :: [BackfillPolicy] -> ShowS
showList :: [BackfillPolicy] -> ShowS
Show)

-- | Whether a tick is for the current minute or a replay of a past minute.
data TickKind = Live | Replay
  deriving stock (TickKind -> TickKind -> Bool
(TickKind -> TickKind -> Bool)
-> (TickKind -> TickKind -> Bool) -> Eq TickKind
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TickKind -> TickKind -> Bool
== :: TickKind -> TickKind -> Bool
$c/= :: TickKind -> TickKind -> Bool
/= :: TickKind -> TickKind -> Bool
Eq, (forall x. TickKind -> Rep TickKind x)
-> (forall x. Rep TickKind x -> TickKind) -> Generic TickKind
forall x. Rep TickKind x -> TickKind
forall x. TickKind -> Rep TickKind x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. TickKind -> Rep TickKind x
from :: forall x. TickKind -> Rep TickKind x
$cto :: forall x. Rep TickKind x -> TickKind
to :: forall x. Rep TickKind x -> TickKind
Generic, Int -> TickKind -> ShowS
[TickKind] -> ShowS
TickKind -> String
(Int -> TickKind -> ShowS)
-> (TickKind -> String) -> ([TickKind] -> ShowS) -> Show TickKind
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TickKind -> ShowS
showsPrec :: Int -> TickKind -> ShowS
$cshow :: TickKind -> String
show :: TickKind -> String
$cshowList :: [TickKind] -> ShowS
showList :: [TickKind] -> ShowS
Show)

-- | Convert an 'OverlapPolicy' to its text representation.
overlapPolicyToText :: OverlapPolicy -> Text
overlapPolicyToText :: OverlapPolicy -> Text
overlapPolicyToText OverlapPolicy
SkipOverlap = Text
"SkipOverlap"
overlapPolicyToText OverlapPolicy
AllowOverlap = Text
"AllowOverlap"

-- | Parse an 'OverlapPolicy' from text.
overlapPolicyFromText :: Text -> Maybe OverlapPolicy
overlapPolicyFromText :: Text -> Maybe OverlapPolicy
overlapPolicyFromText Text
"SkipOverlap" = OverlapPolicy -> Maybe OverlapPolicy
forall a. a -> Maybe a
Just OverlapPolicy
SkipOverlap
overlapPolicyFromText Text
"AllowOverlap" = OverlapPolicy -> Maybe OverlapPolicy
forall a. a -> Maybe a
Just OverlapPolicy
AllowOverlap
overlapPolicyFromText Text
_ = Maybe OverlapPolicy
forall a. Maybe a
Nothing

-- | A cron schedule definition.
--
-- Use 'cronJob' to construct (eagerly parses the cron expression). For a
-- non-UTC timezone, use 'cronJobInTimezone' (also validates the tz name).
data CronJob payload = CronJob
  { forall payload. CronJob payload -> Text
name :: Text
  -- ^ Human-readable name for logging and dedup keys
  , forall payload. CronJob payload -> Text
cronExpression :: Text
  -- ^ Original cron expression text (for DB storage)
  , forall payload. CronJob payload -> OverlapPolicy
overlap :: OverlapPolicy
  -- ^ How to handle overlapping ticks
  , forall payload. CronJob payload -> BackfillPolicy
backfill :: BackfillPolicy
  -- ^ How to replay missed ticks, at startup and mid-flight. Default: 'NoBackfill'.
  , forall payload. CronJob payload -> Maybe Text
timezone :: Maybe Text
  -- ^ IANA tz name (e.g. @\"America\/New_York\"@). 'Nothing' means UTC.
  -- The @override_timezone@ DB column wins if set.
  , forall payload.
CronJob payload -> TickKind -> UTCTime -> JobWrite payload
builder :: TickKind -> UTCTime -> JobWrite payload
  -- ^ Build a job for the given tick time. 'Replay' is passed for any tick
  -- whose minute is not the current scheduler minute (startup or mid-flight
  -- catch-up). 'Live' is passed for the current minute boundary.
  }
  deriving stock ((forall x. CronJob payload -> Rep (CronJob payload) x)
-> (forall x. Rep (CronJob payload) x -> CronJob payload)
-> Generic (CronJob payload)
forall x. Rep (CronJob payload) x -> CronJob payload
forall x. CronJob payload -> Rep (CronJob payload) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall payload x. Rep (CronJob payload) x -> CronJob payload
forall payload x. CronJob payload -> Rep (CronJob payload) x
$cfrom :: forall payload x. CronJob payload -> Rep (CronJob payload) x
from :: forall x. CronJob payload -> Rep (CronJob payload) x
$cto :: forall payload x. Rep (CronJob payload) x -> CronJob payload
to :: forall x. Rep (CronJob payload) x -> CronJob payload
Generic)

-- | Smart constructor for 'CronJob'. Parses the cron expression eagerly.
--
-- Returns @Left@ with an error message if the cron expression is invalid.
--
-- @
-- cronJob "nightly-report" "0 3 * * *" SkipOverlap
--   (\\_kind _tick -> defaultJob (GenerateReport "nightly"))
-- @
--
-- The builder receives a 'TickKind' ('Live' for the current minute, 'Replay'
-- for any catch-up tick) and the tick time.
--
-- To enable backfill, set it via record update:
--
-- @
-- let Right cj = cronJob "nightly-report" "0 3 * * *" AllowOverlap
--       (\\kind tick -> (defaultJob (GenerateReport tick))
--          { priority = case kind of Replay -> 10; Live -> 0 })
-- in cj { backfill = Backfill 86400 }  -- replay up to 24 hours
-- @
--
-- Note: cron expressions are evaluated in __UTC__. @\"0 3 * * *\"@ fires at
-- 03:00 UTC regardless of the server's local timezone.
cronJob
  :: Text
  -- ^ Schedule name (used in dedup keys and logging)
  -> Text
  -- ^ Cron expression (5-field: minute hour day-of-month month day-of-week)
  -> OverlapPolicy
  -> (TickKind -> UTCTime -> JobWrite payload)
  -- ^ Job builder. Receives the tick kind and the tick time.
  -> Either String (CronJob payload)
cronJob :: forall payload.
Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJob Text
cronName Text
expr OverlapPolicy
ov TickKind -> UTCTime -> JobWrite payload
mk =
  case Text -> Either String CronSchedule
parseCronSchedule Text
expr of
    Left String
err -> String -> Either String (CronJob payload)
forall a b. a -> Either a b
Left String
err
    Right CronSchedule
_ ->
      CronJob payload -> Either String (CronJob payload)
forall a b. b -> Either a b
Right
        CronJob
          { name :: Text
name = Text
cronName
          , cronExpression :: Text
cronExpression = Text
expr
          , overlap :: OverlapPolicy
overlap = OverlapPolicy
ov
          , backfill :: BackfillPolicy
backfill = BackfillPolicy
NoBackfill
          , timezone :: Maybe Text
timezone = Maybe Text
forall a. Maybe a
Nothing
          , builder :: TickKind -> UTCTime -> JobWrite payload
builder = TickKind -> UTCTime -> JobWrite payload
mk
          }

-- | Like 'cronJob' but evaluated in a specific timezone. The tz name is
-- validated eagerly against the bundled @tzdata@ database.
cronJobInTimezone
  :: Text
  -- ^ Schedule name
  -> Text
  -- ^ IANA tz name (e.g. @\"America\/New_York\"@)
  -> Text
  -- ^ Cron expression (5-field)
  -> OverlapPolicy
  -> (TickKind -> UTCTime -> JobWrite payload)
  -> Either String (CronJob payload)
cronJobInTimezone :: forall payload.
Text
-> Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJobInTimezone Text
cronName Text
tzName Text
expr OverlapPolicy
ov TickKind -> UTCTime -> JobWrite payload
mk =
  case Text -> Maybe TZ
resolveTZ Text
tzName of
    Maybe TZ
Nothing -> String -> Either String (CronJob payload)
forall a b. a -> Either a b
Left (String -> Either String (CronJob payload))
-> String -> Either String (CronJob payload)
forall a b. (a -> b) -> a -> b
$ String
"Unknown timezone: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack Text
tzName
    Just TZ
_ -> (CronJob payload -> CronJob payload)
-> Either String (CronJob payload)
-> Either String (CronJob payload)
forall a b. (a -> b) -> Either String a -> Either String b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\CronJob payload
cj -> CronJob payload
cj {timezone = Just tzName}) (Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
forall payload.
Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJob Text
cronName Text
expr OverlapPolicy
ov TickKind -> UTCTime -> JobWrite payload
mk)

-- | Look up an IANA tz name in the bundled @tzdata@ database.
resolveTZ :: Text -> Maybe TZ
resolveTZ :: Text -> Maybe TZ
resolveTZ Text
name = (TZLabel -> TZ) -> Maybe TZLabel -> Maybe TZ
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap TZLabel -> TZ
tzByLabel (ByteString -> Maybe TZLabel
fromTZName (Text -> ByteString
encodeUtf8 Text
name))

-- | Match a cron schedule against a UTC tick, evaluated in @tz@.
-- 'Nothing' means UTC. An unknown tz name returns 'False'.
matchesInTimezone :: Maybe Text -> CronSchedule -> UTCTime -> Bool
matchesInTimezone :: Maybe Text -> CronSchedule -> UTCTime -> Bool
matchesInTimezone Maybe Text
Nothing CronSchedule
sched UTCTime
t = CronSchedule -> UTCTime -> Bool
scheduleMatches CronSchedule
sched UTCTime
t
matchesInTimezone (Just Text
tzName) CronSchedule
sched UTCTime
t =
  case Text -> Maybe TZ
resolveTZ Text
tzName of
    Maybe TZ
Nothing -> Bool
False
    Just TZ
tz ->
      let local :: LocalTime
local = TZ -> UTCTime -> LocalTime
utcToLocalTimeTZ TZ
tz UTCTime
t
          asUtc :: UTCTime
asUtc = TimeZone -> LocalTime -> UTCTime
localTimeToUTC TimeZone
utc LocalTime
local
       in CronSchedule -> UTCTime -> Bool
scheduleMatches CronSchedule
sched UTCTime
asUtc

-- | Format a UTC tick as @YYYY-MM-DDTHH:MM@ in the given timezone.
-- DST fall-back maps two UTC instants to the same local minute, so dedup
-- keys built from this collapse to a single fire.
formatMinuteInTimezone :: Maybe Text -> UTCTime -> Text
formatMinuteInTimezone :: Maybe Text -> UTCTime -> Text
formatMinuteInTimezone Maybe Text
Nothing UTCTime
t = UTCTime -> Text
formatMinute UTCTime
t
formatMinuteInTimezone (Just Text
tzName) UTCTime
t =
  case Text -> Maybe TZ
resolveTZ Text
tzName of
    Maybe TZ
Nothing -> UTCTime -> Text
formatMinute UTCTime
t
    Just TZ
tz ->
      let local :: LocalTime
local = TZ -> UTCTime -> LocalTime
utcToLocalTimeTZ TZ
tz UTCTime
t
       in String -> Text
T.pack (TimeLocale -> String -> LocalTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y-%m-%dT%H:%M" LocalTime
local)

-- | Upsert default expression and overlap for each 'CronJob' into the
-- @cron_schedules@ table. Preserves any user overrides and enabled state.
initCronSchedules
  :: (MonadArbiter m)
  => Text -> [CronJob payload] -> LogConfig -> m ()
initCronSchedules :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> [CronJob payload] -> LogConfig -> m ()
initCronSchedules Text
schemaName [CronJob payload]
jobs LogConfig
logCfg = do
  [CronJob payload] -> (CronJob payload -> m Int64) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [CronJob payload]
jobs ((CronJob payload -> m Int64) -> m ())
-> (CronJob payload -> m Int64) -> m ()
forall a b. (a -> b) -> a -> b
$ \CronJob payload
cj ->
    Text -> Text -> Text -> Text -> Maybe Text -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Text -> Text -> Maybe Text -> m Int64
Ops.upsertCronDefault
      Text
schemaName
      (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj)
      (CronJob payload -> Text
forall payload. CronJob payload -> Text
cronExpression CronJob payload
cj)
      (OverlapPolicy -> Text
overlapPolicyToText (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj))
      (CronJob payload -> Maybe Text
forall payload. CronJob payload -> Maybe Text
timezone CronJob payload
cj)
  LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Info (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
    Text
"Cron schedules initialized: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show ([CronJob payload] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [CronJob payload]
jobs)) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" schedule(s) upserted"

-- | Scheduler entry point. Exits cleanly when the worker state becomes
-- 'ShuttingDown' so graceful shutdown stops creating new jobs.
runCronScheduler
  :: (MonadUnliftIO m, QueueOperation m registry payload)
  => TVar WorkerState
  -> LogConfig
  -> Text
  -> [CronJob payload]
  -> m ()
runCronScheduler :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
TVar WorkerState -> LogConfig -> Text -> [CronJob payload] -> m ()
runCronScheduler TVar WorkerState
stateVar LogConfig
logCfg Text
schemaName [CronJob payload]
jobs = do
  Text -> [CronJob payload] -> LogConfig -> m ()
forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> [CronJob payload] -> LogConfig -> m ()
initCronSchedules Text
schemaName [CronJob payload]
jobs LogConfig
logCfg
  startupNow <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
  shuttingDown <- isShuttingDown stateVar
  unless shuttingDown $
    processCronCatchUp logCfg schemaName jobs startupNow
  logCron logCfg Info $ "Cron scheduler started with " <> T.pack (show (length jobs)) <> " schedule(s)"
  loop
  where
    loop :: m ()
loop = do
      stop <- TVar WorkerState -> m Bool
forall (m :: * -> *). MonadIO m => TVar WorkerState -> m Bool
waitUntilNextMinuteOrShutdown TVar WorkerState
stateVar
      unless stop $ do
        now <- liftIO getCurrentTime
        processCronCatchUp logCfg schemaName jobs now
        loop

-- | Scheduler catch-up step. Each cron is processed in its own transaction.
-- Backfill schedules hold a per-(schema, name) advisory lock so concurrent
-- pools can't interleave gate updates across a backfill window.
processCronCatchUp
  :: (MonadUnliftIO m, QueueOperation m registry payload)
  => LogConfig
  -> Text
  -> [CronJob payload]
  -> UTCTime
  -- ^ Current wall-clock time
  -> m ()
processCronCatchUp :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
LogConfig -> Text -> [CronJob payload] -> UTCTime -> m ()
processCronCatchUp LogConfig
logCfg Text
schemaName [CronJob payload]
jobs UTCTime
now = do
  let currentTick :: UTCTime
currentTick = UTCTime -> UTCTime
truncateToMinute UTCTime
now
  [CronJob payload] -> (CronJob payload -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [CronJob payload]
jobs (UTCTime -> CronJob payload -> m ()
processOneCron UTCTime
currentTick)
  where
    processOneCron :: UTCTime -> CronJob payload -> m ()
processOneCron UTCTime
currentTick CronJob payload
cj = do
      outcome <- m TickOutcome -> m (Either SomeException TickOutcome)
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m TickOutcome -> m (Either SomeException TickOutcome))
-> (m TickOutcome -> m TickOutcome)
-> m TickOutcome
-> m (Either SomeException TickOutcome)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m TickOutcome -> m TickOutcome
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m TickOutcome -> m (Either SomeException TickOutcome))
-> m TickOutcome -> m (Either SomeException TickOutcome)
forall a b. (a -> b) -> a -> b
$ do
        haveLeader <- case CronJob payload -> BackfillPolicy
forall payload. CronJob payload -> BackfillPolicy
backfill CronJob payload
cj of
          BackfillPolicy
NoBackfill -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
          Backfill NominalDiffTime
_ -> Text -> Text -> m Bool
forall (m :: * -> *). MonadArbiter m => Text -> Text -> m Bool
Ops.tryAcquireCronLeader Text
schemaName (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj)
        if not haveLeader
          then pure NotLeader
          else do
            mRow <- Ops.getCronScheduleByName schemaName (name cj)
            processOne mRow currentTick cj
            void $ Ops.touchCronChecked schemaName currentTick [name cj]
            pure Ran
      case outcome of
        Left SomeException
e ->
          LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' tick aborted: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
        Right TickOutcome
NotLeader ->
          LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Debug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' skipped, another pool holds the lock"
        Right TickOutcome
Ran -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    processOne :: Maybe CronScheduleRow -> UTCTime -> CronJob payload -> m ()
processOne Maybe CronScheduleRow
mRow UTCTime
currentTick CronJob payload
cj = case CronJob payload -> Maybe CronScheduleRow -> Resolved
forall payload.
CronJob payload -> Maybe CronScheduleRow -> Resolved
resolveAndParse CronJob payload
cj Maybe CronScheduleRow
mRow of
      Resolved
Disabled -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      ParseError Text
expr String
err ->
        LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
          Text
"Cron schedule '"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' has invalid effective expression '"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
expr
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"': "
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
err
      InvalidTimezone Text
tzName ->
        LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
          Text
"Cron schedule '"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' has unknown timezone '"
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tzName
            Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"'"
      Effective OverlapPolicy
effectiveOv CronSchedule
sched Maybe Text
effectiveTz -> do
        let ticksInWindow :: [UTCTime]
ticksInWindow = BackfillPolicy -> Maybe UTCTime -> UTCTime -> [UTCTime]
enumerateCatchUpTicks (CronJob payload -> BackfillPolicy
forall payload. CronJob payload -> BackfillPolicy
backfill CronJob payload
cj) (Maybe CronScheduleRow
mRow Maybe CronScheduleRow
-> (CronScheduleRow -> Maybe UTCTime) -> Maybe UTCTime
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= CronScheduleRow -> Maybe UTCTime
CS.lastCheckedAt) UTCTime
currentTick
            ticksToFire :: [UTCTime]
ticksToFire = CronSchedule
-> Maybe Text -> OverlapPolicy -> [UTCTime] -> [UTCTime]
pickTicksToFire CronSchedule
sched Maybe Text
effectiveTz OverlapPolicy
effectiveOv [UTCTime]
ticksInWindow
            replayCount :: Int
replayCount = [UTCTime] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((UTCTime -> Bool) -> [UTCTime] -> [UTCTime]
forall a. (a -> Bool) -> [a] -> [a]
filter (UTCTime -> UTCTime -> Bool
forall a. Eq a => a -> a -> Bool
/= UTCTime
currentTick) [UTCTime]
ticksToFire)
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
replayCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
          LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Info (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
            Text
"Replaying " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
replayCount) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" missed tick(s) for '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"'"
        [UTCTime] -> (UTCTime -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [UTCTime]
ticksToFire ((UTCTime -> m ()) -> m ()) -> (UTCTime -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \UTCTime
t ->
          m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ LogConfig
-> Text
-> CronJob payload
-> OverlapPolicy
-> Maybe Text
-> TickKind
-> UTCTime
-> m Bool
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
LogConfig
-> Text
-> CronJob payload
-> OverlapPolicy
-> Maybe Text
-> TickKind
-> UTCTime
-> m Bool
tryInsertCronJob LogConfig
logCfg Text
schemaName CronJob payload
cj OverlapPolicy
effectiveOv Maybe Text
effectiveTz (UTCTime -> UTCTime -> TickKind
tickKindFor UTCTime
currentTick UTCTime
t) UTCTime
t

data TickOutcome = NotLeader | Ran

-- | 'Live' for @currentTick@, 'Replay' otherwise.
tickKindFor :: UTCTime -> UTCTime -> TickKind
tickKindFor :: UTCTime -> UTCTime -> TickKind
tickKindFor UTCTime
currentTick UTCTime
t = if UTCTime
t UTCTime -> UTCTime -> Bool
forall a. Eq a => a -> a -> Bool
== UTCTime
currentTick then TickKind
Live else TickKind
Replay

-- | 'SkipOverlap' keeps the oldest match so a catch-up handler starts from
-- the earliest unprocessed checkpoint. 'AllowOverlap' keeps all.
-- Match evaluation uses the supplied timezone ('Nothing' = UTC).
pickTicksToFire :: CronSchedule -> Maybe Text -> OverlapPolicy -> [UTCTime] -> [UTCTime]
pickTicksToFire :: CronSchedule
-> Maybe Text -> OverlapPolicy -> [UTCTime] -> [UTCTime]
pickTicksToFire CronSchedule
sched Maybe Text
tz OverlapPolicy
ov [UTCTime]
ticks =
  let matching :: [UTCTime]
matching = (UTCTime -> Bool) -> [UTCTime] -> [UTCTime]
forall a. (a -> Bool) -> [a] -> [a]
filter (Maybe Text -> CronSchedule -> UTCTime -> Bool
matchesInTimezone Maybe Text
tz CronSchedule
sched) [UTCTime]
ticks
   in case OverlapPolicy
ov of
        OverlapPolicy
SkipOverlap -> Int -> [UTCTime] -> [UTCTime]
forall a. Int -> [a] -> [a]
take Int
1 [UTCTime]
matching
        OverlapPolicy
AllowOverlap -> [UTCTime]
matching

-- | Minutes to evaluate for a 'processCronCatchUp' call. Returns @[]@ when
-- the watermark is already at or past @currentTick@ (preventing re-fires
-- after job processing).
enumerateCatchUpTicks :: BackfillPolicy -> Maybe UTCTime -> UTCTime -> [UTCTime]
enumerateCatchUpTicks :: BackfillPolicy -> Maybe UTCTime -> UTCTime -> [UTCTime]
enumerateCatchUpTicks BackfillPolicy
_ (Just UTCTime
lastChecked) UTCTime
currentTick
  | UTCTime -> UTCTime
truncateToMinute UTCTime
lastChecked UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
>= UTCTime
currentTick = []
enumerateCatchUpTicks BackfillPolicy
NoBackfill Maybe UTCTime
_ UTCTime
currentTick = [UTCTime
currentTick]
enumerateCatchUpTicks BackfillPolicy
_ Maybe UTCTime
Nothing UTCTime
currentTick = [UTCTime
currentTick]
enumerateCatchUpTicks (Backfill NominalDiffTime
window) (Just UTCTime
lastChecked) UTCTime
currentTick =
  let truncatedLast :: UTCTime
truncatedLast = UTCTime -> UTCTime
truncateToMinute UTCTime
lastChecked
      -- Truncate so a non-minute-multiple window doesn't stride past currentTick.
      windowFloor :: UTCTime
windowFloor = UTCTime -> UTCTime
truncateToMinute (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a
negate NominalDiffTime
window) UTCTime
currentTick)
      startMinute :: UTCTime
startMinute = UTCTime -> UTCTime -> UTCTime
forall a. Ord a => a -> a -> a
max (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
truncatedLast) UTCTime
windowFloor
   in UTCTime -> UTCTime -> [UTCTime]
enumMinutes UTCTime
startMinute UTCTime
currentTick

-- | Result of resolving a schedule's effective config (code defaults +
-- runtime DB overrides).
data Resolved
  = Disabled
  | ParseError Text String
  | InvalidTimezone Text
  | Effective OverlapPolicy CronSchedule (Maybe Text)

resolveAndParse :: CronJob payload -> Maybe CS.CronScheduleRow -> Resolved
resolveAndParse :: forall payload.
CronJob payload -> Maybe CronScheduleRow -> Resolved
resolveAndParse CronJob payload
cj Maybe CronScheduleRow
mRow =
  let (Text
expr, OverlapPolicy
ov, Maybe Text
tz, Bool
isEnabled) = case Maybe CronScheduleRow
mRow of
        Maybe CronScheduleRow
Nothing -> (CronJob payload -> Text
forall payload. CronJob payload -> Text
cronExpression CronJob payload
cj, CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj, CronJob payload -> Maybe Text
forall payload. CronJob payload -> Maybe Text
timezone CronJob payload
cj, Bool
True)
        Just row :: CronScheduleRow
row@CS.CronScheduleRow {enabled :: CronScheduleRow -> Bool
CS.enabled = Bool
rowEnabled} ->
          ( CronScheduleRow -> Text
CS.effectiveExpression CronScheduleRow
row
          , OverlapPolicy -> Maybe OverlapPolicy -> OverlapPolicy
forall a. a -> Maybe a -> a
fromMaybe (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj) (Text -> Maybe OverlapPolicy
overlapPolicyFromText (CronScheduleRow -> Text
CS.effectiveOverlap CronScheduleRow
row))
          , CronScheduleRow -> Maybe Text
CS.effectiveTimezone CronScheduleRow
row
          , Bool
rowEnabled
          )
   in if Bool
isEnabled
        then case Text -> Either String CronSchedule
parseCronSchedule Text
expr of
          Left String
err -> Text -> String -> Resolved
ParseError Text
expr String
err
          Right CronSchedule
sched -> case Maybe Text
tz of
            Just Text
tzName | Maybe TZ -> Bool
forall a. Maybe a -> Bool
isNothing (Text -> Maybe TZ
resolveTZ Text
tzName) -> Text -> Resolved
InvalidTimezone Text
tzName
            Maybe Text
_ -> OverlapPolicy -> CronSchedule -> Maybe Text -> Resolved
Effective OverlapPolicy
ov CronSchedule
sched Maybe Text
tz
        else Resolved
Disabled

-- | Attempt to insert a single cron-tick job. Returns 'False' on any failure
-- (logged), 'True' on successful insert or a dedup-blocked no-op.
--
-- The insert and the per-tick @last_checked_at@ advance are atomic. If
-- either fails the other rolls back, so a successful fire is always paired
-- with a watermark advance to that tick.
tryInsertCronJob
  :: (MonadUnliftIO m, QueueOperation m registry payload)
  => LogConfig -> Text -> CronJob payload -> OverlapPolicy -> Maybe Text -> TickKind -> UTCTime -> m Bool
tryInsertCronJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
LogConfig
-> Text
-> CronJob payload
-> OverlapPolicy
-> Maybe Text
-> TickKind
-> UTCTime
-> m Bool
tryInsertCronJob LogConfig
logCfg Text
schemaName CronJob payload
cj OverlapPolicy
effectiveOv Maybe Text
effectiveTz TickKind
kind UTCTime
tick = do
  result <- m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (m () -> m ()) -> m () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m () -> m (Either SomeException ()))
-> m () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ do
    -- Gate first: another pool may have already fired this minute.
    fired <- Text -> Text -> UTCTime -> m Bool
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> UTCTime -> m Bool
Ops.tryFireCronGate Text
schemaName (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) UTCTime
tick
    when fired $ do
      let key = Text -> OverlapPolicy -> Maybe Text -> UTCTime -> Text
makeDedupKeyFromParts (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) OverlapPolicy
effectiveOv Maybe Text
effectiveTz UTCTime
tick
          jobWrite = (CronJob payload -> TickKind -> UTCTime -> Job payload () () ()
forall payload.
CronJob payload -> TickKind -> UTCTime -> JobWrite payload
builder CronJob payload
cj TickKind
kind UTCTime
tick) {dedupKey = Just (IgnoreDuplicate key)}
      void $ HL.insertJob jobWrite
    void $ Ops.touchCronChecked schemaName tick [name cj]
  case result of
    Left SomeException
e -> do
      LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron schedule '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' failed to insert: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
      Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    Right () -> do
      LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Debug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron schedule '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' processed at " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UTCTime -> Text
formatMinute UTCTime
tick
      Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True

-- | Log a cron message, swallowing logger failures.
logCron :: (MonadIO m) => LogConfig -> LogLevel -> Text -> m ()
logCron :: forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
level Text
msg = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> IO ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog LogConfig
logCfg LogLevel
level Text
msg

-- | Dedup key for a cron job, from its code-defined overlap and timezone.
makeDedupKey :: CronJob payload -> UTCTime -> Text
makeDedupKey :: forall payload. CronJob payload -> UTCTime -> Text
makeDedupKey CronJob payload
cj UTCTime
tick = Text -> OverlapPolicy -> Maybe Text -> UTCTime -> Text
makeDedupKeyFromParts (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj) (CronJob payload -> Maybe Text
forall payload. CronJob payload -> Maybe Text
timezone CronJob payload
cj) UTCTime
tick

-- | For 'AllowOverlap', the key includes the tick formatted in the schedule's
-- timezone, so DST fall-back fires once instead of twice.
makeDedupKeyFromParts :: Text -> OverlapPolicy -> Maybe Text -> UTCTime -> Text
makeDedupKeyFromParts :: Text -> OverlapPolicy -> Maybe Text -> UTCTime -> Text
makeDedupKeyFromParts Text
jobName OverlapPolicy
ov Maybe Text
tz UTCTime
tick = case OverlapPolicy
ov of
  OverlapPolicy
SkipOverlap -> Text
"arbiter_cron:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
jobName
  OverlapPolicy
AllowOverlap -> Text
"arbiter_cron:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
jobName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Maybe Text -> UTCTime -> Text
formatMinuteInTimezone Maybe Text
tz UTCTime
tick

-- | Compute the delay in microseconds until the next minute boundary,
-- clamped to @[0, 120_000_000]@.
computeDelayMicros :: UTCTime -> Int
computeDelayMicros :: UTCTime -> Int
computeDelayMicros UTCTime
now =
  let nextMinute :: UTCTime
nextMinute = UTCTime -> UTCTime
truncateToMinute (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
now)
      delaySeconds :: NominalDiffTime
delaySeconds = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
nextMinute UTCTime
now
      rawMicros :: Int
rawMicros = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
delaySeconds NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1_000_000) :: Int
   in Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
120_000_000 Int
rawMicros)

-- | Wait for either the next minute boundary or a shutdown signal. Returns
-- 'True' if the shutdown signal arrived first.
waitUntilNextMinuteOrShutdown :: (MonadIO m) => TVar WorkerState -> m Bool
waitUntilNextMinuteOrShutdown :: forall (m :: * -> *). MonadIO m => TVar WorkerState -> m Bool
waitUntilNextMinuteOrShutdown TVar WorkerState
stateVar = IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
  now <- IO UTCTime
getCurrentTime
  timerVar <- registerDelay (computeDelayMicros now)
  atomically $ do
    st <- readTVar stateVar
    case st of
      WorkerState
ShuttingDown -> Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
      WorkerState
_ -> do
        timedOut <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
timerVar
        if timedOut then pure False else retry

-- | Snapshot of the current 'WorkerState' for use outside STM.
isShuttingDown :: (MonadIO m) => TVar WorkerState -> m Bool
isShuttingDown :: forall (m :: * -> *). MonadIO m => TVar WorkerState -> m Bool
isShuttingDown TVar WorkerState
stateVar = do
  st <- TVar WorkerState -> m WorkerState
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar WorkerState
stateVar
  pure $ case st of
    WorkerState
ShuttingDown -> Bool
True
    WorkerState
_ -> Bool
False

-- | Truncate a 'UTCTime' to the current minute (zero out seconds).
truncateToMinute :: UTCTime -> UTCTime
truncateToMinute :: UTCTime -> UTCTime
truncateToMinute UTCTime
t =
  let secs :: DiffTime
secs = UTCTime -> DiffTime
utctDayTime UTCTime
t
      truncated :: DiffTime
truncated = Integer -> DiffTime
secondsToDiffTime (DiffTime -> Integer
forall b. Integral b => DiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor DiffTime
secs Integer -> Integer -> Integer
forall a. Integral a => a -> a -> a
`div` Integer
60 Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
* Integer
60)
   in UTCTime
t {utctDayTime = truncated}

-- | Format a 'UTCTime' as @YYYY-MM-DDTHH:MM@ for dedup key buckets.
formatMinute :: UTCTime -> Text
formatMinute :: UTCTime -> Text
formatMinute = String -> Text
T.pack (String -> Text) -> (UTCTime -> String) -> UTCTime -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y-%m-%dT%H:%M"

-- | Enumerate all minute-boundary times from @start@ through @end@ (inclusive).
-- Both @start@ and @end@ should be truncated to minute boundaries.
enumMinutes :: UTCTime -> UTCTime -> [UTCTime]
enumMinutes :: UTCTime -> UTCTime -> [UTCTime]
enumMinutes UTCTime
start UTCTime
end =
  (UTCTime -> Maybe (UTCTime, UTCTime)) -> UTCTime -> [UTCTime]
forall b a. (b -> Maybe (a, b)) -> b -> [a]
unfoldr
    (\UTCTime
t -> if UTCTime
t UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
> UTCTime
end then Maybe (UTCTime, UTCTime)
forall a. Maybe a
Nothing else (UTCTime, UTCTime) -> Maybe (UTCTime, UTCTime)
forall a. a -> Maybe a
Just (UTCTime
t, NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
t))
    UTCTime
start