{-# LANGUAGE OverloadedStrings #-}

-- | Cron scheduler for the Arbiter worker pool.
--
-- When 'cronJobs' is non-empty in 'WorkerConfig', 'runWorkerPool' spawns an
-- additional thread that inserts jobs based on 5-field cron expressions.
-- Uses 'IgnoreDuplicate' dedup keys for multi-instance idempotency.
--
-- __All cron expressions are evaluated in UTC.__ There is no local-timezone
-- support — @\"0 3 * * *\"@ means 03:00 UTC, not 03:00 in the server's
-- local time. Account for your timezone offset when writing expressions.
--
-- When a connection pool and schema are provided, the scheduler consults the
-- @cron_schedules@ table for runtime overrides (expression, overlap, enabled).
module Arbiter.Worker.Cron
  ( -- * Types
    CronJob (..)
  , OverlapPolicy (..)

    -- * Smart Constructor
  , cronJob

    -- * Helpers
  , overlapPolicyToText
  , overlapPolicyFromText

    -- * DB Init
  , initCronSchedules

    -- * Internal
  , runCronScheduler
  , processCronTick
  , truncateToMinute
  , formatMinute
  , makeDedupKey
  , computeDelayMicros
  ) where

import Arbiter.Core.CronSchedule qualified as CS
import Arbiter.Core.HighLevel (QueueOperation)
import Arbiter.Core.HighLevel qualified as HL
import Arbiter.Core.Job.Types (DedupKey (IgnoreDuplicate), JobWrite, dedupKey)
import Control.Monad (forM_, forever, when)
import Control.Monad.IO.Class (MonadIO)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time
  ( UTCTime (..)
  , defaultTimeLocale
  , diffUTCTime
  , formatTime
  , getCurrentTime
  , secondsToDiffTime
  )
import Database.PostgreSQL.Simple (Connection)
import GHC.Generics (Generic)
import System.Cron (CronSchedule, parseCronSchedule, scheduleMatches)
import UnliftIO (MonadUnliftIO, liftIO, tryAny)
import UnliftIO.Concurrent (threadDelay)

import Arbiter.Worker.Logger (LogConfig, LogLevel (..))
import Arbiter.Worker.Logger.Internal (logMessage)

-- | Determines how overlapping cron ticks are deduplicated.
data OverlapPolicy
  = -- | At most one pending or running job per schedule name.
    SkipOverlap
  | -- | One job per tick. Allows concurrent execution of prior ticks.
    AllowOverlap
  deriving stock (OverlapPolicy -> OverlapPolicy -> Bool
(OverlapPolicy -> OverlapPolicy -> Bool)
-> (OverlapPolicy -> OverlapPolicy -> Bool) -> Eq OverlapPolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: OverlapPolicy -> OverlapPolicy -> Bool
== :: OverlapPolicy -> OverlapPolicy -> Bool
$c/= :: OverlapPolicy -> OverlapPolicy -> Bool
/= :: OverlapPolicy -> OverlapPolicy -> Bool
Eq, (forall x. OverlapPolicy -> Rep OverlapPolicy x)
-> (forall x. Rep OverlapPolicy x -> OverlapPolicy)
-> Generic OverlapPolicy
forall x. Rep OverlapPolicy x -> OverlapPolicy
forall x. OverlapPolicy -> Rep OverlapPolicy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. OverlapPolicy -> Rep OverlapPolicy x
from :: forall x. OverlapPolicy -> Rep OverlapPolicy x
$cto :: forall x. Rep OverlapPolicy x -> OverlapPolicy
to :: forall x. Rep OverlapPolicy x -> OverlapPolicy
Generic, Int -> OverlapPolicy -> ShowS
[OverlapPolicy] -> ShowS
OverlapPolicy -> String
(Int -> OverlapPolicy -> ShowS)
-> (OverlapPolicy -> String)
-> ([OverlapPolicy] -> ShowS)
-> Show OverlapPolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> OverlapPolicy -> ShowS
showsPrec :: Int -> OverlapPolicy -> ShowS
$cshow :: OverlapPolicy -> String
show :: OverlapPolicy -> String
$cshowList :: [OverlapPolicy] -> ShowS
showList :: [OverlapPolicy] -> ShowS
Show)

-- | Convert an 'OverlapPolicy' to its text representation.
overlapPolicyToText :: OverlapPolicy -> Text
overlapPolicyToText :: OverlapPolicy -> Text
overlapPolicyToText OverlapPolicy
SkipOverlap = Text
"SkipOverlap"
overlapPolicyToText OverlapPolicy
AllowOverlap = Text
"AllowOverlap"

-- | Parse an 'OverlapPolicy' from text.
overlapPolicyFromText :: Text -> Maybe OverlapPolicy
overlapPolicyFromText :: Text -> Maybe OverlapPolicy
overlapPolicyFromText Text
"SkipOverlap" = OverlapPolicy -> Maybe OverlapPolicy
forall a. a -> Maybe a
Just OverlapPolicy
SkipOverlap
overlapPolicyFromText Text
"AllowOverlap" = OverlapPolicy -> Maybe OverlapPolicy
forall a. a -> Maybe a
Just OverlapPolicy
AllowOverlap
overlapPolicyFromText Text
_ = Maybe OverlapPolicy
forall a. Maybe a
Nothing

-- | A cron schedule definition.
--
-- Use 'cronJob' to construct — it parses the cron expression eagerly,
-- so invalid expressions are caught at construction time.
data CronJob payload = CronJob
  { forall payload. CronJob payload -> Text
name :: Text
  -- ^ Human-readable name for logging and dedup keys
  , forall payload. CronJob payload -> Text
cronExpression :: Text
  -- ^ Original cron expression text (for DB storage)
  , forall payload. CronJob payload -> CronSchedule
schedule :: CronSchedule
  -- ^ Parsed cron schedule (internal)
  , forall payload. CronJob payload -> OverlapPolicy
overlap :: OverlapPolicy
  -- ^ How to handle overlapping ticks
  , forall payload. CronJob payload -> UTCTime -> JobWrite payload
builder :: UTCTime -> JobWrite payload
  -- ^ Build a job for the given tick time (truncated to minute)
  }
  deriving stock ((forall x. CronJob payload -> Rep (CronJob payload) x)
-> (forall x. Rep (CronJob payload) x -> CronJob payload)
-> Generic (CronJob payload)
forall x. Rep (CronJob payload) x -> CronJob payload
forall x. CronJob payload -> Rep (CronJob payload) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall payload x. Rep (CronJob payload) x -> CronJob payload
forall payload x. CronJob payload -> Rep (CronJob payload) x
$cfrom :: forall payload x. CronJob payload -> Rep (CronJob payload) x
from :: forall x. CronJob payload -> Rep (CronJob payload) x
$cto :: forall payload x. Rep (CronJob payload) x -> CronJob payload
to :: forall x. Rep (CronJob payload) x -> CronJob payload
Generic)

-- | Smart constructor for 'CronJob'. Parses the cron expression eagerly.
--
-- Returns @Left@ with an error message if the cron expression is invalid.
--
-- Note: cron expressions are evaluated in __UTC__. @\"0 3 * * *\"@ fires at
-- 03:00 UTC regardless of the server's local timezone.
--
-- Example:
--
-- @
-- cronJob "nightly-report" "0 3 * * *" SkipOverlap
--   (\\_ -> defaultJob (GenerateReport "nightly"))
-- @
cronJob
  :: Text
  -- ^ Schedule name (used in dedup keys and logging)
  -> Text
  -- ^ Cron expression (5-field: minute hour day-of-month month day-of-week)
  -> OverlapPolicy
  -> (UTCTime -> JobWrite payload)
  -> Either String (CronJob payload)
cronJob :: forall payload.
Text
-> Text
-> OverlapPolicy
-> (UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJob Text
name Text
expr OverlapPolicy
overlap UTCTime -> JobWrite payload
mk =
  case Text -> Either String CronSchedule
parseCronSchedule Text
expr of
    Left String
err -> String -> Either String (CronJob payload)
forall a b. a -> Either a b
Left String
err
    Right CronSchedule
schedule ->
      CronJob payload -> Either String (CronJob payload)
forall a b. b -> Either a b
Right
        CronJob
          { name :: Text
name = Text
name
          , cronExpression :: Text
cronExpression = Text
expr
          , schedule :: CronSchedule
schedule = CronSchedule
schedule
          , overlap :: OverlapPolicy
overlap = OverlapPolicy
overlap
          , builder :: UTCTime -> JobWrite payload
builder = UTCTime -> JobWrite payload
mk
          }

-- | Initialize the @cron_schedules@ table and upsert defaults for all cron jobs.
--
-- Called once at scheduler startup. Upserts default_expression and
-- default_overlap for each 'CronJob', preserving any user overrides and
-- enabled state.
initCronSchedules :: Connection -> Text -> [CronJob payload] -> LogConfig -> IO ()
initCronSchedules :: forall payload.
Connection -> Text -> [CronJob payload] -> LogConfig -> IO ()
initCronSchedules Connection
conn Text
schemaName [CronJob payload]
jobs LogConfig
logCfg = do
  [CronJob payload] -> (CronJob payload -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [CronJob payload]
jobs ((CronJob payload -> IO ()) -> IO ())
-> (CronJob payload -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \CronJob payload
cj ->
    Connection -> Text -> Text -> Text -> Text -> IO ()
CS.upsertCronScheduleDefault
      Connection
conn
      Text
schemaName
      (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj)
      (CronJob payload -> Text
forall payload. CronJob payload -> Text
cronExpression CronJob payload
cj)
      (OverlapPolicy -> Text
overlapPolicyToText (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj))
  LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Info (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron schedules initialized: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show ([CronJob payload] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [CronJob payload]
jobs)) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" schedule(s) upserted"

-- | Run the cron scheduler loop. Called by 'runWorkerPool' when 'cronJobs'
-- is non-empty.
--
-- On each minute boundary, checks all schedules and inserts matching jobs
-- with appropriate dedup keys. Exceptions per-schedule are caught and logged;
-- the loop continues.
--
-- The provided 'Connection' is used to consult the @cron_schedules@ table
-- for effective expression, overlap, and enabled state on each tick.
runCronScheduler
  :: (MonadUnliftIO m, QueueOperation m registry payload)
  => Connection
  -> LogConfig
  -> Text
  -> [CronJob payload]
  -> m ()
runCronScheduler :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
Connection -> LogConfig -> Text -> [CronJob payload] -> m ()
runCronScheduler Connection
conn LogConfig
logCfg Text
schemaName [CronJob payload]
jobs = do
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> Text -> [CronJob payload] -> LogConfig -> IO ()
forall payload.
Connection -> Text -> [CronJob payload] -> LogConfig -> IO ()
initCronSchedules Connection
conn Text
schemaName [CronJob payload]
jobs LogConfig
logCfg
  IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Info (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron scheduler started with " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show ([CronJob payload] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [CronJob payload]
jobs)) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" schedule(s)"
  m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    m ()
forall (m :: * -> *). MonadIO m => m ()
waitUntilNextMinute
    now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
    processCronTick conn logCfg schemaName jobs (truncateToMinute now)

-- | Process a single cron tick at the given time.
--
-- For each 'CronJob', consults the DB for effective expression, overlap, and enabled.
-- If the schedule is disabled, it is skipped. If the effective expression fails to parse,
-- it is logged and skipped.
processCronTick
  :: (MonadUnliftIO m, QueueOperation m registry payload)
  => Connection
  -> LogConfig
  -> Text
  -> [CronJob payload]
  -> UTCTime
  -> m ()
processCronTick :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
Connection
-> LogConfig -> Text -> [CronJob payload] -> UTCTime -> m ()
processCronTick Connection
conn LogConfig
logCfg Text
schemaName [CronJob payload]
jobs UTCTime
tick = do
  -- Batch-fetch all schedule rows once; fall back to code defaults on DB error
  rowMap <- do
    result <- m [CronScheduleRow] -> m (Either SomeException [CronScheduleRow])
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m [CronScheduleRow] -> m (Either SomeException [CronScheduleRow]))
-> (IO [CronScheduleRow] -> m [CronScheduleRow])
-> IO [CronScheduleRow]
-> m (Either SomeException [CronScheduleRow])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO [CronScheduleRow] -> m [CronScheduleRow]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [CronScheduleRow]
 -> m (Either SomeException [CronScheduleRow]))
-> IO [CronScheduleRow]
-> m (Either SomeException [CronScheduleRow])
forall a b. (a -> b) -> a -> b
$ Connection -> Text -> IO [CronScheduleRow]
CS.listCronSchedules Connection
conn Text
schemaName
    case result of
      Right [CronScheduleRow]
rows -> [(Text, CronScheduleRow)] -> m [(Text, CronScheduleRow)]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [(CronScheduleRow -> Text
CS.name CronScheduleRow
r, CronScheduleRow
r) | CronScheduleRow
r <- [CronScheduleRow]
rows]
      Left SomeException
e -> do
        IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
          LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
            Text
"Failed to fetch cron schedules from DB, using code defaults: "
              Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
        [(Text, CronScheduleRow)] -> m [(Text, CronScheduleRow)]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []

  forM_ jobs $ \CronJob payload
cj -> do
    let mRow :: Maybe CronScheduleRow
mRow = Text -> [(Text, CronScheduleRow)] -> Maybe CronScheduleRow
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) [(Text, CronScheduleRow)]
rowMap

    let (Text
effectiveExpr, OverlapPolicy
effectiveOv, Bool
isEnabled) = case Maybe CronScheduleRow
mRow of
          Maybe CronScheduleRow
Nothing ->
            -- No DB row yet (shouldn't happen after init, but be safe)
            (CronJob payload -> Text
forall payload. CronJob payload -> Text
cronExpression CronJob payload
cj, CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj, Bool
True)
          Just row :: CronScheduleRow
row@CS.CronScheduleRow {enabled :: CronScheduleRow -> Bool
CS.enabled = Bool
rowEnabled} ->
            let expr :: Text
expr = CronScheduleRow -> Text
CS.effectiveExpression CronScheduleRow
row
                ovText :: Text
ovText = CronScheduleRow -> Text
CS.effectiveOverlap CronScheduleRow
row
                ov :: OverlapPolicy
ov = case Text -> Maybe OverlapPolicy
overlapPolicyFromText Text
ovText of
                  Just OverlapPolicy
p -> OverlapPolicy
p
                  Maybe OverlapPolicy
Nothing -> CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj -- fallback to code default
             in (Text
expr, OverlapPolicy
ov, Bool
rowEnabled)

    Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isEnabled (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      -- Parse the effective expression
      case Text -> Either String CronSchedule
parseCronSchedule Text
effectiveExpr of
        Left String
err ->
          IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
            LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
              Text
"Cron schedule '"
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' has invalid effective expression '"
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
effectiveExpr
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"': "
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
err
        Right CronSchedule
effectiveSched ->
          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (CronSchedule -> UTCTime -> Bool
scheduleMatches CronSchedule
effectiveSched UTCTime
tick) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            let key :: Text
key = Text -> OverlapPolicy -> UTCTime -> Text
makeDedupKeyFromParts (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) OverlapPolicy
effectiveOv UTCTime
tick
                jobWrite :: Job payload () () ()
jobWrite = (CronJob payload -> UTCTime -> Job payload () () ()
forall payload. CronJob payload -> UTCTime -> JobWrite payload
builder CronJob payload
cj UTCTime
tick) {dedupKey = Just (IgnoreDuplicate key)}
            result <- m (Maybe (JobRead payload))
-> m (Either SomeException (Maybe (JobRead payload)))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m (Maybe (JobRead payload))
 -> m (Either SomeException (Maybe (JobRead payload))))
-> m (Maybe (JobRead payload))
-> m (Either SomeException (Maybe (JobRead payload)))
forall a b. (a -> b) -> a -> b
$ Job payload () () () -> m (Maybe (JobRead payload))
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
JobWrite payload -> m (Maybe (JobRead payload))
HL.insertJob Job payload () () ()
jobWrite
            liftIO $ case result of
              Left SomeException
e ->
                LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
                  Text
"Cron schedule '"
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' failed to insert: "
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
              Right Maybe (JobRead payload)
Nothing ->
                LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Debug (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
                  Text
"Cron schedule '"
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' skipped (dedup key exists): "
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
key
              Right (Just JobRead payload
_) -> do
                LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Info (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
                  Text
"Cron schedule '"
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' fired at "
                    Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UTCTime -> Text
formatMinute UTCTime
tick
                -- Update last_fired_at
                fireResult <- IO () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (IO () -> IO (Either SomeException ()))
-> (IO () -> IO ()) -> IO () -> IO (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ Connection -> Text -> Text -> IO ()
CS.touchCronScheduleLastFired Connection
conn Text
schemaName (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj)
                case fireResult of
                  Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                  Left SomeException
e ->
                    IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
                      LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
                        Text
"Cron schedule '"
                          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
                          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' failed to update last_fired_at: "
                          Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)

  -- Mark all schedules as checked
  result <- tryAny . liftIO $ CS.touchCronSchedulesChecked conn schemaName (map name jobs)
  case result of
    Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Left SomeException
e ->
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
        LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
          Text
"Failed to update last_checked_at: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)

-- | Compute the dedup key for a cron job at the given tick time.
--
-- Uses the code-defined overlap policy. If the schedule has a DB override,
-- use 'makeDedupKeyFromParts' with the effective policy instead.
makeDedupKey :: CronJob payload -> UTCTime -> Text
makeDedupKey :: forall payload. CronJob payload -> UTCTime -> Text
makeDedupKey CronJob payload
cj UTCTime
tick = Text -> OverlapPolicy -> UTCTime -> Text
makeDedupKeyFromParts (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj) UTCTime
tick

-- | Internal helper: compute dedup key from components.
makeDedupKeyFromParts :: Text -> OverlapPolicy -> UTCTime -> Text
makeDedupKeyFromParts :: Text -> OverlapPolicy -> UTCTime -> Text
makeDedupKeyFromParts Text
jobName OverlapPolicy
ov UTCTime
tick = case OverlapPolicy
ov of
  OverlapPolicy
SkipOverlap -> Text
"arbiter_cron:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
jobName
  OverlapPolicy
AllowOverlap -> Text
"arbiter_cron:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
jobName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UTCTime -> Text
formatMinute UTCTime
tick

-- | Compute the delay in microseconds until the next minute boundary,
-- clamped to @[0, 120_000_000]@.
computeDelayMicros :: UTCTime -> Int
computeDelayMicros :: UTCTime -> Int
computeDelayMicros UTCTime
now =
  let nextMinute :: UTCTime
nextMinute = UTCTime -> UTCTime
truncateToMinute UTCTime
now {utctDayTime = utctDayTime now + 60}
      delaySeconds :: NominalDiffTime
delaySeconds = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
nextMinute UTCTime
now
      rawMicros :: Int
rawMicros = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
delaySeconds NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1_000_000) :: Int
   in Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
120_000_000 Int
rawMicros)

-- | Sleep until the next minute boundary (:00 seconds).
waitUntilNextMinute :: (MonadIO m) => m ()
waitUntilNextMinute :: forall (m :: * -> *). MonadIO m => m ()
waitUntilNextMinute = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  now <- IO UTCTime
getCurrentTime
  threadDelay (computeDelayMicros now)

-- | Truncate a 'UTCTime' to the current minute (zero out seconds).
truncateToMinute :: UTCTime -> UTCTime
truncateToMinute :: UTCTime -> UTCTime
truncateToMinute UTCTime
t =
  let secs :: DiffTime
secs = UTCTime -> DiffTime
utctDayTime UTCTime
t
      truncated :: DiffTime
truncated = Integer -> DiffTime
secondsToDiffTime (DiffTime -> Integer
forall b. Integral b => DiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor DiffTime
secs Integer -> Integer -> Integer
forall a. Integral a => a -> a -> a
`div` Integer
60 Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
* Integer
60)
   in UTCTime
t {utctDayTime = truncated}

-- | Format a 'UTCTime' as @YYYY-MM-DDTHH:MM@ for dedup key buckets.
formatMinute :: UTCTime -> Text
formatMinute :: UTCTime -> Text
formatMinute = String -> Text
T.pack (String -> Text) -> (UTCTime -> String) -> UTCTime -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y-%m-%dT%H:%M"