{-# LANGUAGE OverloadedStrings #-}
module Arbiter.Worker.Cron
(
CronJob (..)
, OverlapPolicy (..)
, cronJob
, overlapPolicyToText
, overlapPolicyFromText
, initCronSchedules
, runCronScheduler
, processCronTick
, truncateToMinute
, formatMinute
, makeDedupKey
, computeDelayMicros
) where
import Arbiter.Core.CronSchedule qualified as CS
import Arbiter.Core.HighLevel (QueueOperation)
import Arbiter.Core.HighLevel qualified as HL
import Arbiter.Core.Job.Types (DedupKey (IgnoreDuplicate), JobWrite, dedupKey)
import Control.Monad (forM_, forever, when)
import Control.Monad.IO.Class (MonadIO)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time
( UTCTime (..)
, defaultTimeLocale
, diffUTCTime
, formatTime
, getCurrentTime
, secondsToDiffTime
)
import Database.PostgreSQL.Simple (Connection)
import GHC.Generics (Generic)
import System.Cron (CronSchedule, parseCronSchedule, scheduleMatches)
import UnliftIO (MonadUnliftIO, liftIO, tryAny)
import UnliftIO.Concurrent (threadDelay)
import Arbiter.Worker.Logger (LogConfig, LogLevel (..))
import Arbiter.Worker.Logger.Internal (logMessage)
data OverlapPolicy
=
SkipOverlap
|
AllowOverlap
deriving stock (OverlapPolicy -> OverlapPolicy -> Bool
(OverlapPolicy -> OverlapPolicy -> Bool)
-> (OverlapPolicy -> OverlapPolicy -> Bool) -> Eq OverlapPolicy
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: OverlapPolicy -> OverlapPolicy -> Bool
== :: OverlapPolicy -> OverlapPolicy -> Bool
$c/= :: OverlapPolicy -> OverlapPolicy -> Bool
/= :: OverlapPolicy -> OverlapPolicy -> Bool
Eq, (forall x. OverlapPolicy -> Rep OverlapPolicy x)
-> (forall x. Rep OverlapPolicy x -> OverlapPolicy)
-> Generic OverlapPolicy
forall x. Rep OverlapPolicy x -> OverlapPolicy
forall x. OverlapPolicy -> Rep OverlapPolicy x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. OverlapPolicy -> Rep OverlapPolicy x
from :: forall x. OverlapPolicy -> Rep OverlapPolicy x
$cto :: forall x. Rep OverlapPolicy x -> OverlapPolicy
to :: forall x. Rep OverlapPolicy x -> OverlapPolicy
Generic, Int -> OverlapPolicy -> ShowS
[OverlapPolicy] -> ShowS
OverlapPolicy -> String
(Int -> OverlapPolicy -> ShowS)
-> (OverlapPolicy -> String)
-> ([OverlapPolicy] -> ShowS)
-> Show OverlapPolicy
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> OverlapPolicy -> ShowS
showsPrec :: Int -> OverlapPolicy -> ShowS
$cshow :: OverlapPolicy -> String
show :: OverlapPolicy -> String
$cshowList :: [OverlapPolicy] -> ShowS
showList :: [OverlapPolicy] -> ShowS
Show)
overlapPolicyToText :: OverlapPolicy -> Text
overlapPolicyToText :: OverlapPolicy -> Text
overlapPolicyToText OverlapPolicy
SkipOverlap = Text
"SkipOverlap"
overlapPolicyToText OverlapPolicy
AllowOverlap = Text
"AllowOverlap"
overlapPolicyFromText :: Text -> Maybe OverlapPolicy
overlapPolicyFromText :: Text -> Maybe OverlapPolicy
overlapPolicyFromText Text
"SkipOverlap" = OverlapPolicy -> Maybe OverlapPolicy
forall a. a -> Maybe a
Just OverlapPolicy
SkipOverlap
overlapPolicyFromText Text
"AllowOverlap" = OverlapPolicy -> Maybe OverlapPolicy
forall a. a -> Maybe a
Just OverlapPolicy
AllowOverlap
overlapPolicyFromText Text
_ = Maybe OverlapPolicy
forall a. Maybe a
Nothing
data CronJob payload = CronJob
{ forall payload. CronJob payload -> Text
name :: Text
, forall payload. CronJob payload -> Text
cronExpression :: Text
, forall payload. CronJob payload -> CronSchedule
schedule :: CronSchedule
, forall payload. CronJob payload -> OverlapPolicy
overlap :: OverlapPolicy
, forall payload. CronJob payload -> UTCTime -> JobWrite payload
builder :: UTCTime -> JobWrite payload
}
deriving stock ((forall x. CronJob payload -> Rep (CronJob payload) x)
-> (forall x. Rep (CronJob payload) x -> CronJob payload)
-> Generic (CronJob payload)
forall x. Rep (CronJob payload) x -> CronJob payload
forall x. CronJob payload -> Rep (CronJob payload) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall payload x. Rep (CronJob payload) x -> CronJob payload
forall payload x. CronJob payload -> Rep (CronJob payload) x
$cfrom :: forall payload x. CronJob payload -> Rep (CronJob payload) x
from :: forall x. CronJob payload -> Rep (CronJob payload) x
$cto :: forall payload x. Rep (CronJob payload) x -> CronJob payload
to :: forall x. Rep (CronJob payload) x -> CronJob payload
Generic)
cronJob
:: Text
-> Text
-> OverlapPolicy
-> (UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJob :: forall payload.
Text
-> Text
-> OverlapPolicy
-> (UTCTime -> JobWrite payload)
-> Either String (CronJob payload)
cronJob Text
name Text
expr OverlapPolicy
overlap UTCTime -> JobWrite payload
mk =
case Text -> Either String CronSchedule
parseCronSchedule Text
expr of
Left String
err -> String -> Either String (CronJob payload)
forall a b. a -> Either a b
Left String
err
Right CronSchedule
schedule ->
CronJob payload -> Either String (CronJob payload)
forall a b. b -> Either a b
Right
CronJob
{ name :: Text
name = Text
name
, cronExpression :: Text
cronExpression = Text
expr
, schedule :: CronSchedule
schedule = CronSchedule
schedule
, overlap :: OverlapPolicy
overlap = OverlapPolicy
overlap
, builder :: UTCTime -> JobWrite payload
builder = UTCTime -> JobWrite payload
mk
}
initCronSchedules :: Connection -> Text -> [CronJob payload] -> LogConfig -> IO ()
initCronSchedules :: forall payload.
Connection -> Text -> [CronJob payload] -> LogConfig -> IO ()
initCronSchedules Connection
conn Text
schemaName [CronJob payload]
jobs LogConfig
logCfg = do
[CronJob payload] -> (CronJob payload -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [CronJob payload]
jobs ((CronJob payload -> IO ()) -> IO ())
-> (CronJob payload -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \CronJob payload
cj ->
Connection -> Text -> Text -> Text -> Text -> IO ()
CS.upsertCronScheduleDefault
Connection
conn
Text
schemaName
(CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj)
(CronJob payload -> Text
forall payload. CronJob payload -> Text
cronExpression CronJob payload
cj)
(OverlapPolicy -> Text
overlapPolicyToText (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj))
LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Info (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron schedules initialized: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show ([CronJob payload] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [CronJob payload]
jobs)) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" schedule(s) upserted"
runCronScheduler
:: (MonadUnliftIO m, QueueOperation m registry payload)
=> Connection
-> LogConfig
-> Text
-> [CronJob payload]
-> m ()
runCronScheduler :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
Connection -> LogConfig -> Text -> [CronJob payload] -> m ()
runCronScheduler Connection
conn LogConfig
logCfg Text
schemaName [CronJob payload]
jobs = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Connection -> Text -> [CronJob payload] -> LogConfig -> IO ()
forall payload.
Connection -> Text -> [CronJob payload] -> LogConfig -> IO ()
initCronSchedules Connection
conn Text
schemaName [CronJob payload]
jobs LogConfig
logCfg
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Info (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ Text
"Cron scheduler started with " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (Int -> String
forall a. Show a => a -> String
show ([CronJob payload] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [CronJob payload]
jobs)) Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" schedule(s)"
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
m ()
forall (m :: * -> *). MonadIO m => m ()
waitUntilNextMinute
now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
processCronTick conn logCfg schemaName jobs (truncateToMinute now)
processCronTick
:: (MonadUnliftIO m, QueueOperation m registry payload)
=> Connection
-> LogConfig
-> Text
-> [CronJob payload]
-> UTCTime
-> m ()
processCronTick :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
Connection
-> LogConfig -> Text -> [CronJob payload] -> UTCTime -> m ()
processCronTick Connection
conn LogConfig
logCfg Text
schemaName [CronJob payload]
jobs UTCTime
tick = do
rowMap <- do
result <- m [CronScheduleRow] -> m (Either SomeException [CronScheduleRow])
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m [CronScheduleRow] -> m (Either SomeException [CronScheduleRow]))
-> (IO [CronScheduleRow] -> m [CronScheduleRow])
-> IO [CronScheduleRow]
-> m (Either SomeException [CronScheduleRow])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO [CronScheduleRow] -> m [CronScheduleRow]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [CronScheduleRow]
-> m (Either SomeException [CronScheduleRow]))
-> IO [CronScheduleRow]
-> m (Either SomeException [CronScheduleRow])
forall a b. (a -> b) -> a -> b
$ Connection -> Text -> IO [CronScheduleRow]
CS.listCronSchedules Connection
conn Text
schemaName
case result of
Right [CronScheduleRow]
rows -> [(Text, CronScheduleRow)] -> m [(Text, CronScheduleRow)]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [(CronScheduleRow -> Text
CS.name CronScheduleRow
r, CronScheduleRow
r) | CronScheduleRow
r <- [CronScheduleRow]
rows]
Left SomeException
e -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Failed to fetch cron schedules from DB, using code defaults: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
[(Text, CronScheduleRow)] -> m [(Text, CronScheduleRow)]
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
forM_ jobs $ \CronJob payload
cj -> do
let mRow :: Maybe CronScheduleRow
mRow = Text -> [(Text, CronScheduleRow)] -> Maybe CronScheduleRow
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) [(Text, CronScheduleRow)]
rowMap
let (Text
effectiveExpr, OverlapPolicy
effectiveOv, Bool
isEnabled) = case Maybe CronScheduleRow
mRow of
Maybe CronScheduleRow
Nothing ->
(CronJob payload -> Text
forall payload. CronJob payload -> Text
cronExpression CronJob payload
cj, CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj, Bool
True)
Just row :: CronScheduleRow
row@CS.CronScheduleRow {enabled :: CronScheduleRow -> Bool
CS.enabled = Bool
rowEnabled} ->
let expr :: Text
expr = CronScheduleRow -> Text
CS.effectiveExpression CronScheduleRow
row
ovText :: Text
ovText = CronScheduleRow -> Text
CS.effectiveOverlap CronScheduleRow
row
ov :: OverlapPolicy
ov = case Text -> Maybe OverlapPolicy
overlapPolicyFromText Text
ovText of
Just OverlapPolicy
p -> OverlapPolicy
p
Maybe OverlapPolicy
Nothing -> CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj
in (Text
expr, OverlapPolicy
ov, Bool
rowEnabled)
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
isEnabled (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
case Text -> Either String CronSchedule
parseCronSchedule Text
effectiveExpr of
Left String
err ->
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Cron schedule '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' has invalid effective expression '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
effectiveExpr
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"': "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack String
err
Right CronSchedule
effectiveSched ->
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (CronSchedule -> UTCTime -> Bool
scheduleMatches CronSchedule
effectiveSched UTCTime
tick) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
let key :: Text
key = Text -> OverlapPolicy -> UTCTime -> Text
makeDedupKeyFromParts (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) OverlapPolicy
effectiveOv UTCTime
tick
jobWrite :: Job payload () () ()
jobWrite = (CronJob payload -> UTCTime -> Job payload () () ()
forall payload. CronJob payload -> UTCTime -> JobWrite payload
builder CronJob payload
cj UTCTime
tick) {dedupKey = Just (IgnoreDuplicate key)}
result <- m (Maybe (JobRead payload))
-> m (Either SomeException (Maybe (JobRead payload)))
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m (Maybe (JobRead payload))
-> m (Either SomeException (Maybe (JobRead payload))))
-> m (Maybe (JobRead payload))
-> m (Either SomeException (Maybe (JobRead payload)))
forall a b. (a -> b) -> a -> b
$ Job payload () () () -> m (Maybe (JobRead payload))
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
JobWrite payload -> m (Maybe (JobRead payload))
HL.insertJob Job payload () () ()
jobWrite
liftIO $ case result of
Left SomeException
e ->
LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Cron schedule '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' failed to insert: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
Right Maybe (JobRead payload)
Nothing ->
LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Debug (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Cron schedule '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' skipped (dedup key exists): "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
key
Right (Just JobRead payload
_) -> do
LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Info (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Cron schedule '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' fired at "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UTCTime -> Text
formatMinute UTCTime
tick
fireResult <- IO () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (IO () -> IO (Either SomeException ()))
-> (IO () -> IO ()) -> IO () -> IO (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ Connection -> Text -> Text -> IO ()
CS.touchCronScheduleLastFired Connection
conn Text
schemaName (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj)
case fireResult of
Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left SomeException
e ->
IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Cron schedule '"
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
"' failed to update last_fired_at: "
Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
result <- tryAny . liftIO $ CS.touchCronSchedulesChecked conn schemaName (map name jobs)
case result of
Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left SomeException
e ->
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
Text
"Failed to update last_checked_at: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
makeDedupKey :: CronJob payload -> UTCTime -> Text
makeDedupKey :: forall payload. CronJob payload -> UTCTime -> Text
makeDedupKey CronJob payload
cj UTCTime
tick = Text -> OverlapPolicy -> UTCTime -> Text
makeDedupKeyFromParts (CronJob payload -> Text
forall payload. CronJob payload -> Text
name CronJob payload
cj) (CronJob payload -> OverlapPolicy
forall payload. CronJob payload -> OverlapPolicy
overlap CronJob payload
cj) UTCTime
tick
makeDedupKeyFromParts :: Text -> OverlapPolicy -> UTCTime -> Text
makeDedupKeyFromParts :: Text -> OverlapPolicy -> UTCTime -> Text
makeDedupKeyFromParts Text
jobName OverlapPolicy
ov UTCTime
tick = case OverlapPolicy
ov of
OverlapPolicy
SkipOverlap -> Text
"arbiter_cron:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
jobName
OverlapPolicy
AllowOverlap -> Text
"arbiter_cron:" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
jobName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
":" Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> UTCTime -> Text
formatMinute UTCTime
tick
computeDelayMicros :: UTCTime -> Int
computeDelayMicros :: UTCTime -> Int
computeDelayMicros UTCTime
now =
let nextMinute :: UTCTime
nextMinute = UTCTime -> UTCTime
truncateToMinute UTCTime
now {utctDayTime = utctDayTime now + 60}
delaySeconds :: NominalDiffTime
delaySeconds = UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
nextMinute UTCTime
now
rawMicros :: Int
rawMicros = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
delaySeconds NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1_000_000) :: Int
in Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
120_000_000 Int
rawMicros)
waitUntilNextMinute :: (MonadIO m) => m ()
waitUntilNextMinute :: forall (m :: * -> *). MonadIO m => m ()
waitUntilNextMinute = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
now <- IO UTCTime
getCurrentTime
threadDelay (computeDelayMicros now)
truncateToMinute :: UTCTime -> UTCTime
truncateToMinute :: UTCTime -> UTCTime
truncateToMinute UTCTime
t =
let secs :: DiffTime
secs = UTCTime -> DiffTime
utctDayTime UTCTime
t
truncated :: DiffTime
truncated = Integer -> DiffTime
secondsToDiffTime (DiffTime -> Integer
forall b. Integral b => DiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
floor DiffTime
secs Integer -> Integer -> Integer
forall a. Integral a => a -> a -> a
`div` Integer
60 Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
* Integer
60)
in UTCTime
t {utctDayTime = truncated}
formatMinute :: UTCTime -> Text
formatMinute :: UTCTime -> Text
formatMinute = String -> Text
T.pack (String -> Text) -> (UTCTime -> String) -> UTCTime -> Text
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y-%m-%dT%H:%M"