{-# LANGUAGE OverloadedStrings #-}
module Arbiter.Worker.Cron
(
CronJob (..)
, OverlapPolicy (..)
, BackfillPolicy (..)
, TickKind (..)
, cronJob
, cronJobInTimezone
, overlapPolicyToText
, overlapPolicyFromText
, resolveTZ
, matchesInTimezone
, formatMinuteInTimezone
, runCronScheduler
, initCronSchedules
, processCronCatchUp
, enumerateCatchUpTicks
, truncateToMinute
, formatMinute
, makeDedupKey
, computeDelayMicros
, enumMinutes
) where
import Arbiter.Core.CronSchedule qualified as CS
import Arbiter.Core.HighLevel (QueueOperation)
import Arbiter.Core.HighLevel qualified as HL
import Arbiter.Core.Job.Types (DedupKey (IgnoreDuplicate), JobWrite, dedupKey)
import Arbiter.Core.MonadArbiter (MonadArbiter, withDbTransaction)
import Arbiter.Core.Operations qualified as Ops
import Control.Concurrent.STM (retry)
import Control.Monad (forM_, unless, void, when)
import Control.Monad.IO.Class (MonadIO)
import Data.List (unfoldr)
import Data.Maybe (fromMaybe, isNothing)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.Encoding (encodeUtf8)
import Data.Time
( NominalDiffTime
, UTCTime (..)
, addUTCTime
, defaultTimeLocale
, diffUTCTime
, formatTime
, getCurrentTime
, localTimeToUTC
, secondsToDiffTime
, utc
)
import Data.Time.Zones (TZ, utcToLocalTimeTZ)
import Data.Time.Zones.All (fromTZName, tzByLabel)
import GHC.Generics (Generic)
import System.Cron (CronSchedule, parseCronSchedule, scheduleMatches)
import UnliftIO (MonadUnliftIO, TVar, atomically, liftIO, readTVar, readTVarIO, registerDelay, tryAny)
import Arbiter.Worker.Logger (LogConfig, LogLevel (..))
import Arbiter.Worker.Logger.Internal (tryLog)
import Arbiter.Worker.WorkerState (WorkerState (..))
data OverlapPolicy
=
SkipOverlap
|
AllowOverlap
deriving stock (OverlapPolicy -> OverlapPolicy -> Bool
(OverlapPolicy -> OverlapPolicy -> Bool)
-> (OverlapPolicy -> OverlapPolicy -> Bool) -> Eq OverlapPolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: OverlapPolicy -> OverlapPolicy -> Bool
== :: OverlapPolicy -> OverlapPolicy -> Bool
$c/= :: OverlapPolicy -> OverlapPolicy -> Bool
/= :: OverlapPolicy -> OverlapPolicy -> Bool
Eq, (forall x. OverlapPolicy -> Rep OverlapPolicy x)
-> (forall x. Rep OverlapPolicy x -> OverlapPolicy)
-> Generic OverlapPolicy
forall x. Rep OverlapPolicy x -> OverlapPolicy
forall x. OverlapPolicy -> Rep OverlapPolicy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. OverlapPolicy -> Rep OverlapPolicy x
from :: forall x. OverlapPolicy -> Rep OverlapPolicy x
$cto :: forall x. Rep OverlapPolicy x -> OverlapPolicy
to :: forall x. Rep OverlapPolicy x -> OverlapPolicy
Generic, Int -> OverlapPolicy -> ShowS
[OverlapPolicy] -> ShowS
OverlapPolicy -> String
(Int -> OverlapPolicy -> ShowS)
-> (OverlapPolicy -> String)
-> ([OverlapPolicy] -> ShowS)
-> Show OverlapPolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> OverlapPolicy -> ShowS
showsPrec :: Int -> OverlapPolicy -> ShowS
$cshow :: OverlapPolicy -> String
show :: OverlapPolicy -> String
$cshowList :: [OverlapPolicy] -> ShowS
showList :: [OverlapPolicy] -> ShowS
Show)
data BackfillPolicy
=
NoBackfill
|
Backfill NominalDiffTime
deriving stock (BackfillPolicy -> BackfillPolicy -> Bool
(BackfillPolicy -> BackfillPolicy -> Bool)
-> (BackfillPolicy -> BackfillPolicy -> Bool) -> Eq BackfillPolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: BackfillPolicy -> BackfillPolicy -> Bool
== :: BackfillPolicy -> BackfillPolicy -> Bool
$c/= :: BackfillPolicy -> BackfillPolicy -> Bool
/= :: BackfillPolicy -> BackfillPolicy -> Bool
Eq, (forall x. BackfillPolicy -> Rep BackfillPolicy x)
-> (forall x. Rep BackfillPolicy x -> BackfillPolicy)
-> Generic BackfillPolicy
forall x. Rep BackfillPolicy x -> BackfillPolicy
forall x. BackfillPolicy -> Rep BackfillPolicy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. BackfillPolicy -> Rep BackfillPolicy x
from :: forall x. BackfillPolicy -> Rep BackfillPolicy x
$cto :: forall x. Rep BackfillPolicy x -> BackfillPolicy
to :: forall x. Rep BackfillPolicy x -> BackfillPolicy
Generic, Int -> BackfillPolicy -> ShowS
[BackfillPolicy] -> ShowS
BackfillPolicy -> String
(Int -> BackfillPolicy -> ShowS)
-> (BackfillPolicy -> String)
-> ([BackfillPolicy] -> ShowS)
-> Show BackfillPolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> BackfillPolicy -> ShowS
showsPrec :: Int -> BackfillPolicy -> ShowS
$cshow :: BackfillPolicy -> String
show :: BackfillPolicy -> String
$cshowList :: [BackfillPolicy] -> ShowS
showList :: [BackfillPolicy] -> ShowS
Show)
data TickKind = Live | Replay
deriving stock (TickKind -> TickKind -> Bool
(TickKind -> TickKind -> Bool)
-> (TickKind -> TickKind -> Bool) -> Eq TickKind
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TickKind -> TickKind -> Bool
== :: TickKind -> TickKind -> Bool
$c/= :: TickKind -> TickKind -> Bool
/= :: TickKind -> TickKind -> Bool
Eq, (forall x. TickKind -> Rep TickKind x)
-> (forall x. Rep TickKind x -> TickKind) -> Generic TickKind
forall x. Rep TickKind x -> TickKind
forall x. TickKind -> Rep TickKind x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. TickKind -> Rep TickKind x
from :: forall x. TickKind -> Rep TickKind x
$cto :: forall x. Rep TickKind x -> TickKind
to :: forall x. Rep TickKind x -> TickKind
Generic, Int -> TickKind -> ShowS
[TickKind] -> ShowS
TickKind -> String
(Int -> TickKind -> ShowS)
-> (TickKind -> String) -> ([TickKind] -> ShowS) -> Show TickKind
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TickKind -> ShowS
showsPrec :: Int -> TickKind -> ShowS
$cshow :: TickKind -> String
show :: TickKind -> String
$cshowList :: [TickKind] -> ShowS
showList :: [TickKind] -> ShowS
Show)
overlapPolicyToText :: OverlapPolicy -> Text
overlapPolicyToText :: OverlapPolicy -> Text
overlapPolicyToText OverlapPolicy
SkipOverlap = Text
"SkipOverlap"
overlapPolicyToText OverlapPolicy
AllowOverlap = Text
"AllowOverlap"
overlapPolicyFromText :: Text -> Maybe OverlapPolicy
overlapPolicyFromText :: Text -> Maybe OverlapPolicy
overlapPolicyFromText Text
"SkipOverlap" = OverlapPolicy -> Maybe OverlapPolicy
forall a. a -> Maybe a
Just OverlapPolicy
SkipOverlap
overlapPolicyFromText Text
"AllowOverlap" = OverlapPolicy -> Maybe OverlapPolicy
forall a. a -> Maybe a
Just OverlapPolicy
AllowOverlap
overlapPolicyFromText Text
_ = Maybe OverlapPolicy
forall a. Maybe a
Nothing
data CronJob payload = CronJob
{ forall payload. CronJob payload -> Text
name :: Text
, forall payload. CronJob payload -> Text
cronExpression :: Text
, forall payload. CronJob payload -> OverlapPolicy
overlap :: OverlapPolicy
, forall payload. CronJob payload -> BackfillPolicy
backfill :: BackfillPolicy
, forall payload. CronJob payload -> Maybe Text
timezone :: Maybe Text
, forall payload.
CronJob payload -> TickKind -> UTCTime -> JobWrite payload
builder :: TickKind -> UTCTime -> JobWrite payload
}
deriving stock ((forall x. CronJob payload -> Rep (CronJob payload) x)
-> (forall x. Rep (CronJob payload) x -> CronJob payload)
-> Generic (CronJob payload)
forall x. Rep (CronJob payload) x -> CronJob payload
forall x. CronJob payload -> Rep (CronJob payload) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall payload x. Rep (CronJob payload) x -> CronJob payload
forall payload x. CronJob payload -> Rep (CronJob payload) x
$cfrom :: forall payload x. CronJob payload -> Rep (CronJob payload) x
from :: forall x. CronJob payload -> Rep (CronJob payload) x
$cto :: forall payload x. Rep (CronJob payload) x -> CronJob payload
to :: forall x. Rep (CronJob payload) x -> CronJob payload
Generic)
cronJob
:: Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJob :: forall payload.
Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJob Text
cronName Text
expr OverlapPolicy
ov TickKind -> UTCTime -> JobWrite payload
mk =
case Text -> Either String CronSchedule
parseCronSchedule Text
expr of
Left String
err -> String -> Either String (CronJob payload)
forall a b. a -> Either a b
Left String
err
Right CronSchedule
_ ->
CronJob payload -> Either String (CronJob payload)
forall a b. b -> Either a b
Right
CronJob
{ name :: Text
name = Text
cronName
, cronExpression :: Text
cronExpression = Text
expr
, overlap :: OverlapPolicy
overlap = OverlapPolicy
ov
, backfill :: BackfillPolicy
backfill = BackfillPolicy
NoBackfill
, timezone :: Maybe Text
timezone = Maybe Text
forall a. Maybe a
Nothing
, builder :: TickKind -> UTCTime -> JobWrite payload
builder = TickKind -> UTCTime -> JobWrite payload
mk
}
cronJobInTimezone
:: Text
-> Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJobInTimezone :: forall payload.
Text
-> Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJobInTimezone Text
cronName Text
tzName Text
expr OverlapPolicy
ov TickKind -> UTCTime -> JobWrite payload
mk =
case Text -> Maybe TZ
resolveTZ Text
tzName of
Maybe TZ
Nothing -> String -> Either String (CronJob payload)
forall a b. a -> Either a b
Left (String -> Either String (CronJob payload))
-> String -> Either String (CronJob payload)
forall a b. (a -> b) -> a -> b
$ String
"Unknown timezone: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack Text
tzName
Just TZ
_ -> (CronJob payload -> CronJob payload)
-> Either String (CronJob payload)
-> Either String (CronJob payload)
forall a b. (a -> b) -> Either String a -> Either String b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\CronJob payload
cj -> CronJob payload
cj {timezone = Just tzName}) (Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
forall payload.
Text
-> Text
-> OverlapPolicy
-> (TickKind -> UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJob Text
cronName Text
expr OverlapPolicy
ov TickKind -> UTCTime -> JobWrite payload
mk)
resolveTZ :: Text -> Maybe TZ
resolveTZ :: Text -> Maybe TZ
resolveTZ Text
name = (TZLabel -> TZ) -> Maybe TZLabel -> Maybe TZ
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap TZLabel -> TZ
tzByLabel (ByteString -> Maybe TZLabel
fromTZName (Text -> ByteString
encodeUtf8 Text
name))
matchesInTimezone :: Maybe Text -> CronSchedule -> UTCTime -> Bool
matchesInTimezone :: Maybe Text -> CronSchedule -> UTCTime -> Bool
matchesInTimezone Maybe Text
Nothing CronSchedule
sched UTCTime
t = CronSchedule -> UTCTime -> Bool
scheduleMatches CronSchedule
sched UTCTime
t
matchesInTimezone (Just Text
tzName) CronSchedule
sched UTCTime
t =
case Text -> Maybe TZ
resolveTZ Text
tzName of
Maybe TZ
Nothing -> Bool
False
Just TZ
tz ->
let local :: LocalTime
local = TZ -> UTCTime -> LocalTime
utcToLocalTimeTZ TZ
tz UTCTime
t
asUtc :: UTCTime
asUtc = TimeZone -> LocalTime -> UTCTime
localTimeToUTC TimeZone
utc LocalTime
local
in CronSchedule -> UTCTime -> Bool
scheduleMatches CronSchedule
sched UTCTime
asUtc
formatMinuteInTimezone :: Maybe Text -> UTCTime -> Text
formatMinuteInTimezone :: Maybe Text -> UTCTime -> Text
formatMinuteInTimezone Maybe Text
Nothing UTCTime
t = UTCTime -> Text
formatMinute UTCTime
t
formatMinuteInTimezone (Just Text
tzName) UTCTime
t =
case Text -> Maybe TZ
resolveTZ Text
tzName of
Maybe TZ
Nothing -> UTCTime -> Text
formatMinute UTCTime
t
Just TZ
tz ->
let local :: LocalTime
local = TZ -> UTCTime -> LocalTime
utcToLocalTimeTZ TZ
tz UTCTime
t
in String -> Text
T.pack (TimeLocale -> String -> LocalTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y-%m-%dT%H:%M" LocalTime
local)
initCronSchedules
:: (MonadArbiter m)
=> Text -> [CronJob payload] -> LogConfig -> m ()
initCronSchedules :: forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> [CronJob payload] -> LogConfig -> m ()
initCronSchedules Text
schemaName [CronJob payload]
jobs LogConfig
logCfg = do
[CronJob payload] -> (CronJob payload -> m Int64) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [CronJob payload]
jobs ((CronJob payload -> m Int64) -> m ())
-> (CronJob payload -> m Int64) -> m ()
forall a b. (a -> b) -> a -> b
$ \CronJob payload
cj ->
Text -> Text -> Text -> Text -> Maybe Text -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Text -> Text -> Maybe Text -> m Int64
Ops.upsertCronDefault
Text
schemaName
(CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj)
(CronJob payload -> Text
forall payload. CronJob payload -> Text
cronExpression CronJob payload
cj)
(OverlapPolicy -> Text
overlapPolicyToText (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj))
(CronJob payload -> Maybe Text
forall payload. CronJob payload -> Maybe Text
timezone CronJob payload
cj)
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Info (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
Text
"Cron schedules initialized: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show ([CronJob payload] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [CronJob payload]
jobs)) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" schedule(s) upserted"
runCronScheduler
:: (MonadUnliftIO m, QueueOperation m registry payload)
=> TVar WorkerState
-> LogConfig
-> Text
-> [CronJob payload]
-> m ()
runCronScheduler :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
TVar WorkerState -> LogConfig -> Text -> [CronJob payload] -> m ()
runCronScheduler TVar WorkerState
stateVar LogConfig
logCfg Text
schemaName [CronJob payload]
jobs = do
Text -> [CronJob payload] -> LogConfig -> m ()
forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> [CronJob payload] -> LogConfig -> m ()
initCronSchedules Text
schemaName [CronJob payload]
jobs LogConfig
logCfg
startupNow <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
shuttingDown <- isShuttingDown stateVar
unless shuttingDown $
processCronCatchUp logCfg schemaName jobs startupNow
logCron logCfg Info $ "Cron scheduler started with " <> T.pack (show (length jobs)) <> " schedule(s)"
loop
where
loop :: m ()
loop = do
stop <- TVar WorkerState -> m Bool
forall (m :: * -> *). MonadIO m => TVar WorkerState -> m Bool
waitUntilNextMinuteOrShutdown TVar WorkerState
stateVar
unless stop $ do
now <- liftIO getCurrentTime
processCronCatchUp logCfg schemaName jobs now
loop
processCronCatchUp
:: (MonadUnliftIO m, QueueOperation m registry payload)
=> LogConfig
-> Text
-> [CronJob payload]
-> UTCTime
-> m ()
processCronCatchUp :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
LogConfig -> Text -> [CronJob payload] -> UTCTime -> m ()
processCronCatchUp LogConfig
logCfg Text
schemaName [CronJob payload]
jobs UTCTime
now = do
let currentTick :: UTCTime
currentTick = UTCTime -> UTCTime
truncateToMinute UTCTime
now
[CronJob payload] -> (CronJob payload -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [CronJob payload]
jobs (UTCTime -> CronJob payload -> m ()
processOneCron UTCTime
currentTick)
where
processOneCron :: UTCTime -> CronJob payload -> m ()
processOneCron UTCTime
currentTick CronJob payload
cj = do
outcome <- m TickOutcome -> m (Either SomeException TickOutcome)
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m TickOutcome -> m (Either SomeException TickOutcome))
-> (m TickOutcome -> m TickOutcome)
-> m TickOutcome
-> m (Either SomeException TickOutcome)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m TickOutcome -> m TickOutcome
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m TickOutcome -> m (Either SomeException TickOutcome))
-> m TickOutcome -> m (Either SomeException TickOutcome)
forall a b. (a -> b) -> a -> b
$ do
haveLeader <- case CronJob payload -> BackfillPolicy
forall payload. CronJob payload -> BackfillPolicy
backfill CronJob payload
cj of
BackfillPolicy
NoBackfill -> Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
Backfill NominalDiffTime
_ -> Text -> Text -> m Bool
forall (m :: * -> *). MonadArbiter m => Text -> Text -> m Bool
Ops.tryAcquireCronLeader Text
schemaName (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj)
if not haveLeader
then pure NotLeader
else do
mRow <- Ops.getCronScheduleByName schemaName (name cj)
processOne mRow currentTick cj
void $ Ops.touchCronChecked schemaName currentTick [name cj]
pure Ran
case outcome of
Left SomeException
e ->
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' tick aborted: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
Right TickOutcome
NotLeader ->
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Debug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' skipped, another pool holds the lock"
Right TickOutcome
Ran -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
processOne :: Maybe CronScheduleRow -> UTCTime -> CronJob payload -> m ()
processOne Maybe CronScheduleRow
mRow UTCTime
currentTick CronJob payload
cj = case CronJob payload -> Maybe CronScheduleRow -> Resolved
forall payload.
CronJob payload -> Maybe CronScheduleRow -> Resolved
resolveAndParse CronJob payload
cj Maybe CronScheduleRow
mRow of
Resolved
Disabled -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
ParseError Text
expr String
err ->
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
Text
"Cron schedule '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' has invalid effective expression '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
expr
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"': "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
err
InvalidTimezone Text
tzName ->
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
Text
"Cron schedule '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' has unknown timezone '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
tzName
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"'"
Effective OverlapPolicy
effectiveOv CronSchedule
sched Maybe Text
effectiveTz -> do
let ticksInWindow :: [UTCTime]
ticksInWindow = BackfillPolicy -> Maybe UTCTime -> UTCTime -> [UTCTime]
enumerateCatchUpTicks (CronJob payload -> BackfillPolicy
forall payload. CronJob payload -> BackfillPolicy
backfill CronJob payload
cj) (Maybe CronScheduleRow
mRow Maybe CronScheduleRow
-> (CronScheduleRow -> Maybe UTCTime) -> Maybe UTCTime
forall a b. Maybe a -> (a -> Maybe b) -> Maybe b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= CronScheduleRow -> Maybe UTCTime
CS.lastCheckedAt) UTCTime
currentTick
ticksToFire :: [UTCTime]
ticksToFire = CronSchedule
-> Maybe Text -> OverlapPolicy -> [UTCTime] -> [UTCTime]
pickTicksToFire CronSchedule
sched Maybe Text
effectiveTz OverlapPolicy
effectiveOv [UTCTime]
ticksInWindow
replayCount :: Int
replayCount = [UTCTime] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ((UTCTime -> Bool) -> [UTCTime] -> [UTCTime]
forall a. (a -> Bool) -> [a] -> [a]
filter (UTCTime -> UTCTime -> Bool
forall a. Eq a => a -> a -> Bool
/= UTCTime
currentTick) [UTCTime]
ticksToFire)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
replayCount Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Info (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
Text
"Replaying " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show Int
replayCount) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" missed tick(s) for '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"'"
[UTCTime] -> (UTCTime -> m ()) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [UTCTime]
ticksToFire ((UTCTime -> m ()) -> m ()) -> (UTCTime -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \UTCTime
t ->
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ LogConfig
-> Text
-> CronJob payload
-> OverlapPolicy
-> Maybe Text
-> TickKind
-> UTCTime
-> m Bool
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
LogConfig
-> Text
-> CronJob payload
-> OverlapPolicy
-> Maybe Text
-> TickKind
-> UTCTime
-> m Bool
tryInsertCronJob LogConfig
logCfg Text
schemaName CronJob payload
cj OverlapPolicy
effectiveOv Maybe Text
effectiveTz (UTCTime -> UTCTime -> TickKind
tickKindFor UTCTime
currentTick UTCTime
t) UTCTime
t
data TickOutcome = NotLeader | Ran
tickKindFor :: UTCTime -> UTCTime -> TickKind
tickKindFor :: UTCTime -> UTCTime -> TickKind
tickKindFor UTCTime
currentTick UTCTime
t = if UTCTime
t UTCTime -> UTCTime -> Bool
forall a. Eq a => a -> a -> Bool
== UTCTime
currentTick then TickKind
Live else TickKind
Replay
pickTicksToFire :: CronSchedule -> Maybe Text -> OverlapPolicy -> [UTCTime] -> [UTCTime]
pickTicksToFire :: CronSchedule
-> Maybe Text -> OverlapPolicy -> [UTCTime] -> [UTCTime]
pickTicksToFire CronSchedule
sched Maybe Text
tz OverlapPolicy
ov [UTCTime]
ticks =
let matching :: [UTCTime]
matching = (UTCTime -> Bool) -> [UTCTime] -> [UTCTime]
forall a. (a -> Bool) -> [a] -> [a]
filter (Maybe Text -> CronSchedule -> UTCTime -> Bool
matchesInTimezone Maybe Text
tz CronSchedule
sched) [UTCTime]
ticks
in case OverlapPolicy
ov of
OverlapPolicy
SkipOverlap -> Int -> [UTCTime] -> [UTCTime]
forall a. Int -> [a] -> [a]
take Int
1 [UTCTime]
matching
OverlapPolicy
AllowOverlap -> [UTCTime]
matching
enumerateCatchUpTicks :: BackfillPolicy -> Maybe UTCTime -> UTCTime -> [UTCTime]
enumerateCatchUpTicks :: BackfillPolicy -> Maybe UTCTime -> UTCTime -> [UTCTime]
enumerateCatchUpTicks BackfillPolicy
_ (Just UTCTime
lastChecked) UTCTime
currentTick
| UTCTime -> UTCTime
truncateToMinute UTCTime
lastChecked UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
>= UTCTime
currentTick = []
enumerateCatchUpTicks BackfillPolicy
NoBackfill Maybe UTCTime
_ UTCTime
currentTick = [UTCTime
currentTick]
enumerateCatchUpTicks BackfillPolicy
_ Maybe UTCTime
Nothing UTCTime
currentTick = [UTCTime
currentTick]
enumerateCatchUpTicks (Backfill NominalDiffTime
window) (Just UTCTime
lastChecked) UTCTime
currentTick =
let truncatedLast :: UTCTime
truncatedLast = UTCTime -> UTCTime
truncateToMinute UTCTime
lastChecked
windowFloor :: UTCTime
windowFloor = UTCTime -> UTCTime
truncateToMinute (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime (NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a
negate NominalDiffTime
window) UTCTime
currentTick)
startMinute :: UTCTime
startMinute = UTCTime -> UTCTime -> UTCTime
forall a. Ord a => a -> a -> a
max (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
truncatedLast) UTCTime
windowFloor
in UTCTime -> UTCTime -> [UTCTime]
enumMinutes UTCTime
startMinute UTCTime
currentTick
data Resolved
= Disabled
| ParseError Text String
| InvalidTimezone Text
| Effective OverlapPolicy CronSchedule (Maybe Text)
resolveAndParse :: CronJob payload -> Maybe CS.CronScheduleRow -> Resolved
resolveAndParse :: forall payload.
CronJob payload -> Maybe CronScheduleRow -> Resolved
resolveAndParse CronJob payload
cj Maybe CronScheduleRow
mRow =
let (Text
expr, OverlapPolicy
ov, Maybe Text
tz, Bool
isEnabled) = case Maybe CronScheduleRow
mRow of
Maybe CronScheduleRow
Nothing -> (CronJob payload -> Text
forall payload. CronJob payload -> Text
cronExpression CronJob payload
cj, CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj, CronJob payload -> Maybe Text
forall payload. CronJob payload -> Maybe Text
timezone CronJob payload
cj, Bool
True)
Just row :: CronScheduleRow
row@CS.CronScheduleRow {enabled :: CronScheduleRow -> Bool
CS.enabled = Bool
rowEnabled} ->
( CronScheduleRow -> Text
CS.effectiveExpression CronScheduleRow
row
, OverlapPolicy -> Maybe OverlapPolicy -> OverlapPolicy
forall a. a -> Maybe a -> a
fromMaybe (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj) (Text -> Maybe OverlapPolicy
overlapPolicyFromText (CronScheduleRow -> Text
CS.effectiveOverlap CronScheduleRow
row))
, CronScheduleRow -> Maybe Text
CS.effectiveTimezone CronScheduleRow
row
, Bool
rowEnabled
)
in if Bool
isEnabled
then case Text -> Either String CronSchedule
parseCronSchedule Text
expr of
Left String
err -> Text -> String -> Resolved
ParseError Text
expr String
err
Right CronSchedule
sched -> case Maybe Text
tz of
Just Text
tzName | Maybe TZ -> Bool
forall a. Maybe a -> Bool
isNothing (Text -> Maybe TZ
resolveTZ Text
tzName) -> Text -> Resolved
InvalidTimezone Text
tzName
Maybe Text
_ -> OverlapPolicy -> CronSchedule -> Maybe Text -> Resolved
Effective OverlapPolicy
ov CronSchedule
sched Maybe Text
tz
else Resolved
Disabled
tryInsertCronJob
:: (MonadUnliftIO m, QueueOperation m registry payload)
=> LogConfig -> Text -> CronJob payload -> OverlapPolicy -> Maybe Text -> TickKind -> UTCTime -> m Bool
tryInsertCronJob :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
LogConfig
-> Text
-> CronJob payload
-> OverlapPolicy
-> Maybe Text
-> TickKind
-> UTCTime
-> m Bool
tryInsertCronJob LogConfig
logCfg Text
schemaName CronJob payload
cj OverlapPolicy
effectiveOv Maybe Text
effectiveTz TickKind
kind UTCTime
tick = do
result <- m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (m () -> m ()) -> m () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m () -> m (Either SomeException ()))
-> m () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ do
fired <- Text -> Text -> UTCTime -> m Bool
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> UTCTime -> m Bool
Ops.tryFireCronGate Text
schemaName (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) UTCTime
tick
when fired $ do
let key = Text -> OverlapPolicy -> Maybe Text -> UTCTime -> Text
makeDedupKeyFromParts (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) OverlapPolicy
effectiveOv Maybe Text
effectiveTz UTCTime
tick
jobWrite = (CronJob payload -> TickKind -> UTCTime -> Job payload () () ()
forall payload.
CronJob payload -> TickKind -> UTCTime -> JobWrite payload
builder CronJob payload
cj TickKind
kind UTCTime
tick) {dedupKey = Just (IgnoreDuplicate key)}
void $ HL.insertJob jobWrite
void $ Ops.touchCronChecked schemaName tick [name cj]
case result of
Left SomeException
e -> do
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron schedule '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' failed to insert: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
Right () -> do
LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
Debug (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron schedule '" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' processed at " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UTCTime -> Text
formatMinute UTCTime
tick
Bool -> m Bool
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
logCron :: (MonadIO m) => LogConfig -> LogLevel -> Text -> m ()
logCron :: forall (m :: * -> *).
MonadIO m =>
LogConfig -> LogLevel -> Text -> m ()
logCron LogConfig
logCfg LogLevel
level Text
msg = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> IO ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog LogConfig
logCfg LogLevel
level Text
msg
makeDedupKey :: CronJob payload -> UTCTime -> Text
makeDedupKey :: forall payload. CronJob payload -> UTCTime -> Text
makeDedupKey CronJob payload
cj UTCTime
tick = Text -> OverlapPolicy -> Maybe Text -> UTCTime -> Text
makeDedupKeyFromParts (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj) (CronJob payload -> Maybe Text
forall payload. CronJob payload -> Maybe Text
timezone CronJob payload
cj) UTCTime
tick
makeDedupKeyFromParts :: Text -> OverlapPolicy -> Maybe Text -> UTCTime -> Text
makeDedupKeyFromParts :: Text -> OverlapPolicy -> Maybe Text -> UTCTime -> Text
makeDedupKeyFromParts Text
jobName OverlapPolicy
ov Maybe Text
tz UTCTime
tick = case OverlapPolicy
ov of
OverlapPolicy
SkipOverlap -> Text
"arbiter_cron:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
jobName
OverlapPolicy
AllowOverlap -> Text
"arbiter_cron:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
jobName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Maybe Text -> UTCTime -> Text
formatMinuteInTimezone Maybe Text
tz UTCTime
tick
computeDelayMicros :: UTCTime -> Int
computeDelayMicros :: UTCTime -> Int
computeDelayMicros UTCTime
now =
let nextMinute :: UTCTime
nextMinute = UTCTime -> UTCTime
truncateToMinute (NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
now)
delaySeconds :: NominalDiffTime
delaySeconds = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
nextMinute UTCTime
now
rawMicros :: Int
rawMicros = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
delaySeconds NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1_000_000) :: Int
in Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
120_000_000 Int
rawMicros)
waitUntilNextMinuteOrShutdown :: (MonadIO m) => TVar WorkerState -> m Bool
waitUntilNextMinuteOrShutdown :: forall (m :: * -> *). MonadIO m => TVar WorkerState -> m Bool
waitUntilNextMinuteOrShutdown TVar WorkerState
stateVar = IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
now <- IO UTCTime
getCurrentTime
timerVar <- registerDelay (computeDelayMicros now)
atomically $ do
st <- readTVar stateVar
case st of
WorkerState
ShuttingDown -> Bool -> STM Bool
forall a. a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True
WorkerState
_ -> do
timedOut <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar TVar Bool
timerVar
if timedOut then pure False else retry
isShuttingDown :: (MonadIO m) => TVar WorkerState -> m Bool
isShuttingDown :: forall (m :: * -> *). MonadIO m => TVar WorkerState -> m Bool
isShuttingDown TVar WorkerState
stateVar = do
st <- TVar WorkerState -> m WorkerState
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO TVar WorkerState
stateVar
pure $ case st of
WorkerState
ShuttingDown -> Bool
True
WorkerState
_ -> Bool
False
truncateToMinute :: UTCTime -> UTCTime
truncateToMinute :: UTCTime -> UTCTime
truncateToMinute UTCTime
t =
let secs :: DiffTime
secs = UTCTime -> DiffTime
utctDayTime UTCTime
t
truncated :: DiffTime
truncated = Integer -> DiffTime
secondsToDiffTime (DiffTime -> Integer
forall b. Integral b => DiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor DiffTime
secs Integer -> Integer -> Integer
forall a. Integral a => a -> a -> a
`div` Integer
60 Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
* Integer
60)
in UTCTime
t {utctDayTime = truncated}
formatMinute :: UTCTime -> Text
formatMinute :: UTCTime -> Text
formatMinute = String -> Text
T.pack (String -> Text) -> (UTCTime -> String) -> UTCTime -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y-%m-%dT%H:%M"
enumMinutes :: UTCTime -> UTCTime -> [UTCTime]
enumMinutes :: UTCTime -> UTCTime -> [UTCTime]
enumMinutes UTCTime
start UTCTime
end =
(UTCTime -> Maybe (UTCTime, UTCTime)) -> UTCTime -> [UTCTime]
forall b a. (b -> Maybe (a, b)) -> b -> [a]
unfoldr
(\UTCTime
t -> if UTCTime
t UTCTime -> UTCTime -> Bool
forall a. Ord a => a -> a -> Bool
> UTCTime
end then Maybe (UTCTime, UTCTime)
forall a. Maybe a
Nothing else (UTCTime, UTCTime) -> Maybe (UTCTime, UTCTime)
forall a. a -> Maybe a
Just (UTCTime
t, NominalDiffTime -> UTCTime -> UTCTime
addUTCTime NominalDiffTime
60 UTCTime
t))
UTCTime
start