{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE UndecidableInstances #-}

-- | Entry point for running a worker pool that fetches and executes jobs.
module Arbiter.Worker
  ( -- * Running Workers
    runWorkerPool

    -- * Multi-Queue Workers
  , NamedWorkerPool (..)
  , namedWorkerPool
  , runWorkerPools
  , runSelectedWorkerPools
  , getEnabledQueues

    -- * Job Result
  , JobResult (..)

    -- * Re-exports
  , module Arbiter.Worker.Config
  , module Arbiter.Worker.BackoffStrategy
  , module Arbiter.Worker.Logger
  , module Arbiter.Worker.WorkerState

    -- * Cron
  , CronJob (..)
  , OverlapPolicy (..)
  , BackfillPolicy (..)
  , cronJob
  , initCronSchedules
  , overlapPolicyToText
  , overlapPolicyFromText
  ) where

import Arbiter.Core.Exceptions
  ( BranchCancelException (..)
  , JobException (..)
  , JobNotFoundException (..)
  , JobPermanentException (..)
  , JobRetryableException (..)
  , JobStolenException
  , ParsingException (..)
  , TreeCancelException (..)
  , throwJobNotFound
  )
import Arbiter.Core.HasArbiterSchema (HasArbiterSchema (..))
import Arbiter.Core.HighLevel (JobOperation, QueueOperation)
import Arbiter.Core.HighLevel qualified as Arb
import Arbiter.Core.Job.Types qualified as Job
import Arbiter.Core.MonadArbiter (MonadArbiter (..))
import Arbiter.Core.Operations qualified as Ops
import Arbiter.Core.QueueRegistry (RegistryTables (..), TableForPayload)
import Control.Exception (SomeException, fromException)
import Control.Monad (forever, replicateM, unless, void, when)
import Control.Monad.Catch (MonadMask)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Cont (ContT (..), evalContT)
import Data.Aeson (FromJSON, ToJSON, Value, toJSON)
import Data.Aeson qualified as Aeson
import Data.Foldable (toList)
import Data.Int (Int32, Int64)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (NominalDiffTime, UTCTime, getCurrentTime)
import Data.Traversable (for)
import GHC.TypeLits (symbolVal)
import System.Directory (removeFile)
import System.Environment (lookupEnv)
import UnliftIO
  ( MonadUnliftIO
  , atomically
  , checkSTM
  , finally
  , isEmptyTBQueue
  , lengthTBQueue
  , mask
  , modifyTVar'
  , newTBQueueIO
  , newTVarIO
  , readTBQueue
  , readTVar
  , throwIO
  , tryAny
  , waitAnyCatch
  , withAsync
  , writeTVar
  )
import UnliftIO.Async (race)
import UnliftIO.Async qualified as Async
import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Concurrent qualified as Conc
import UnliftIO.MVar qualified as MVar
import UnliftIO.STM (TBQueue, TVar)

import Arbiter.Worker.BackoffStrategy
import Arbiter.Worker.Config
import Arbiter.Worker.Cron
  ( BackfillPolicy (..)
  , CronJob (..)
  , OverlapPolicy (..)
  , cronJob
  , initCronSchedules
  , overlapPolicyFromText
  , overlapPolicyToText
  , runCronScheduler
  )
import Arbiter.Worker.Dispatcher
import Arbiter.Worker.Heartbeat (withJobsHeartbeat)
import Arbiter.Worker.Logger
import Arbiter.Worker.Logger.Internal (runHook, tryLog, withJobContext)
import Arbiter.Worker.Retry (retryOnException)
import Arbiter.Worker.WorkerState

-- ---------------------------------------------------------------------------
-- Job Result
-- ---------------------------------------------------------------------------

-- | Handler result types. @()@ is fire-and-forget; any @(ToJSON a, FromJSON a)@
-- is stored in the results table when the job has a parent and decoded when
-- read by a rollup finalizer.
class JobResult a where
  encodeJobResult :: a -> Maybe Value
  decodeJobResult :: Value -> Either Text a

instance JobResult () where
  encodeJobResult :: () -> Maybe Value
encodeJobResult ()
_ = Maybe Value
forall a. Maybe a
Nothing
  decodeJobResult :: Value -> Either Text ()
decodeJobResult Value
_ = () -> Either Text ()
forall a b. b -> Either a b
Right ()

instance {-# OVERLAPPABLE #-} (FromJSON a, ToJSON a) => JobResult a where
  encodeJobResult :: a -> Maybe Value
encodeJobResult = Value -> Maybe Value
forall a. a -> Maybe a
Just (Value -> Maybe Value) -> (a -> Value) -> a -> Maybe Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Value
forall a. ToJSON a => a -> Value
toJSON
  decodeJobResult :: Value -> Either Text a
decodeJobResult Value
v = case Value -> Result a
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON Value
v of
    Aeson.Success a
a -> a -> Either Text a
forall a b. b -> Either a b
Right a
a
    Aeson.Error String
err -> Text -> Either Text a
forall a b. a -> Either a b
Left (String -> Text
T.pack String
err)

-- ---------------------------------------------------------------------------
-- Multi-Queue Workers
-- ---------------------------------------------------------------------------

-- | A worker pool paired with its queue name (derived from the registry).
--
-- @
-- allWorkers =
--   [ namedWorkerPool emailConfig      -- "email_jobs"
--   , namedWorkerPool imageConfig      -- "image_jobs"
--   ]
--
-- main = runWorkerPools (Proxy \@MyRegistry) allWorkers (\\_ -> pure ())
-- @
data NamedWorkerPool m
  = forall registry payload result.
  (JobResult result, QueueOperation m registry payload) =>
  NamedWorkerPool
  { forall (m :: * -> *). NamedWorkerPool m -> Text
workerPoolName :: Text
  -- ^ Queue name from the type-level registry
  , ()
workerPoolConfig :: WorkerConfig m payload result
  -- ^ The worker configuration
  }

-- | Create a named worker pool, deriving the name from the type-level registry.
namedWorkerPool
  :: forall m registry payload result
   . (JobResult result, QueueOperation m registry payload)
  => WorkerConfig m payload result
  -> NamedWorkerPool m
namedWorkerPool :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobResult result, QueueOperation m registry payload) =>
WorkerConfig m payload result -> NamedWorkerPool m
namedWorkerPool WorkerConfig m payload result
cfg =
  NamedWorkerPool
    { workerPoolName :: Text
workerPoolName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
    , workerPoolConfig :: WorkerConfig m payload result
workerPoolConfig = WorkerConfig m payload result
cfg
    }

-- | Run worker pools with shared shutdown state. Filters to queues listed
-- in @ARBITER_ENABLED_QUEUES@ (all if unset). The setup action receives the
-- shared 'TVar' for installing signal handlers.
runWorkerPools
  :: forall m registry
   . (MonadMask m, MonadUnliftIO m, RegistryTables registry)
  => Proxy registry
  -> [NamedWorkerPool m]
  -> (TVar WorkerState -> IO ())
  -> m ()
runWorkerPools :: forall (m :: * -> *) (registry :: JobPayloadRegistry).
(MonadMask m, MonadUnliftIO m, RegistryTables registry) =>
Proxy registry
-> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m ()
runWorkerPools Proxy registry
registry [NamedWorkerPool m]
pools TVar WorkerState -> IO ()
setup = do
  sharedState <- IO (TVar WorkerState) -> m (TVar WorkerState)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TVar WorkerState)
newWorkerState
  liftIO $ setup sharedState
  enabled <- liftIO $ getEnabledQueues "ARBITER_ENABLED_QUEUES" registry
  runSelectedWorkerPools sharedState enabled pools

-- | Run only the worker pools whose names appear in the enabled list.
runSelectedWorkerPools
  :: (MonadMask m, MonadUnliftIO m)
  => TVar WorkerState
  -> [Text]
  -> [NamedWorkerPool m]
  -> m ()
runSelectedWorkerPools :: forall (m :: * -> *).
(MonadMask m, MonadUnliftIO m) =>
TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m ()
runSelectedWorkerPools TVar WorkerState
sharedState [Text]
enabled [NamedWorkerPool m]
pools =
  case (NamedWorkerPool m -> Bool)
-> [NamedWorkerPool m] -> [NamedWorkerPool m]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(NamedWorkerPool Text
name WorkerConfig m payload result
_) -> Text
name Text -> [Text] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Text]
enabled) [NamedWorkerPool m]
pools of
    [] -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    [NamedWorkerPool m]
selected -> ContT () m () -> m ()
forall (m :: * -> *) r. Monad m => ContT r m r -> m r
evalContT (ContT () m () -> m ()) -> ContT () m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      asyncs <- [NamedWorkerPool m]
-> (NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for [NamedWorkerPool m]
selected ((NamedWorkerPool m -> ContT () m (Async ()))
 -> ContT () m [Async ()])
-> (NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()]
forall a b. (a -> b) -> a -> b
$ \(NamedWorkerPool Text
name WorkerConfig m payload result
cfg) ->
        let cfg' :: WorkerConfig m payload result
cfg' =
              WorkerConfig m payload result
cfg
                { workerStateVar = sharedState
                , logConfig = withPoolContext name (logConfig cfg)
                }
         in ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Async () -> m ()) -> m ()) -> ContT () m (Async ()))
-> ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall a b. (a -> b) -> a -> b
$ \Async () -> m ()
k -> m () -> (Async () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
Async.withAsync (WorkerConfig m payload result -> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobResult result, MonadMask m, MonadUnliftIO m,
 QueueOperation m registry payload) =>
WorkerConfig m payload result -> m ()
runWorkerPool WorkerConfig m payload result
cfg') Async () -> m ()
k
      lift $ mapM_ Async.waitCatch asyncs

-- | Inject the pool name into log context. User-supplied pairs come after
-- so they win on key collision.
withPoolContext :: Text -> LogConfig -> LogConfig
withPoolContext :: Text -> LogConfig -> LogConfig
withPoolContext Text
poolName LogConfig
lc =
  LogConfig
lc {additionalContext = (("pool" .= poolName) :) <$> additionalContext lc}

-- | Get enabled queues from an environment variable.
--
-- If the environment variable is set and non-empty, parses it as a
-- comma-separated list of queue names. Each name is validated against the
-- registry - invalid names cause an error. If not set or empty, returns all
-- queue names from the registry.
--
-- Example:
--
-- @
-- -- With ENABLED_QUEUES="email_jobs,notifications"
-- queues <- getEnabledQueues "ENABLED_QUEUES" (Proxy \@MyRegistry)
-- -- Returns: ["email_jobs", "notifications"]
--
-- -- With ENABLED_QUEUES unset or empty
-- queues <- getEnabledQueues "ENABLED_QUEUES" (Proxy \@MyRegistry)
-- -- Returns: all queues from registry
--
-- -- With ENABLED_QUEUES="email_jobs,invalid_queue"
-- -- Throws error: "Unknown queue names: invalid_queue"
-- @
getEnabledQueues
  :: (RegistryTables registry)
  => String
  -- ^ Environment variable name
  -> Proxy registry
  -- ^ Registry proxy
  -> IO [Text]
getEnabledQueues :: forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
String -> Proxy registry -> IO [Text]
getEnabledQueues String
envVar Proxy registry
registry = do
  let allQueues :: [Text]
allQueues = Proxy registry -> [Text]
forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> [Text]
registryTableNames Proxy registry
registry
  mVal <- String -> IO (Maybe String)
lookupEnv String
envVar
  case mVal of
    Maybe String
Nothing -> [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
allQueues
    Just String
val -> do
      let tval :: Text
tval = String -> Text
T.pack String
val
      if Text -> Bool
T.null (Text -> Text
T.strip Text
tval)
        then [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
allQueues
        else do
          let requested :: [Text]
requested = (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
T.strip ([Text] -> [Text]) -> [Text] -> [Text]
forall a b. (a -> b) -> a -> b
$ HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"," Text
tval
              invalid :: [Text]
invalid = (Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> [Text] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` [Text]
allQueues) [Text]
requested
          if [Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
invalid
            then [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
requested
            else IOError -> IO [Text]
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> IO [Text])
-> (String -> IOError) -> String -> IO [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> IOError
userError (String -> IO [Text]) -> String -> IO [Text]
forall a b. (a -> b) -> a -> b
$ String
"Unknown queue names: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack (Text -> [Text] -> Text
T.intercalate Text
", " [Text]
invalid)

-- ---------------------------------------------------------------------------
-- Worker Pool
-- ---------------------------------------------------------------------------

-- | Starts a worker pool with a dispatcher and N worker threads.
runWorkerPool
  :: forall m registry payload result
   . ( JobResult result
     , MonadMask m
     , MonadUnliftIO m
     , QueueOperation m registry payload
     )
  => WorkerConfig m payload result
  -> m ()
runWorkerPool :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobResult result, MonadMask m, MonadUnliftIO m,
 QueueOperation m registry payload) =>
WorkerConfig m payload result -> m ()
runWorkerPool WorkerConfig m payload result
config = do
  let workerCap :: Int
workerCap = WorkerConfig m payload result -> Int
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int
workerCount WorkerConfig m payload result
config
      mLiveness :: Maybe LivenessConfig
mLiveness = WorkerConfig m payload result -> Maybe LivenessConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Maybe LivenessConfig
livenessConfig WorkerConfig m payload result
config
      mLivenessSignal :: Maybe (MVar ())
mLivenessSignal = (LivenessConfig -> MVar ())
-> Maybe LivenessConfig -> Maybe (MVar ())
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap LivenessConfig -> MVar ()
livenessSignal Maybe LivenessConfig
mLiveness

  -- Create shared state
  workQueue <- IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (NonEmpty (JobRead payload)))
 -> m (TBQueue (NonEmpty (JobRead payload))))
-> IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload)))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (NonEmpty (JobRead payload)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
workerCap)
  busyWorkerCount <- liftIO $ newTVarIO 0
  workerFinishedVar <- liftIO $ newTVarIO False

  evalContT $ do
    -- Spawn liveness probe
    liveness <-
      case mLiveness of
        Just LivenessConfig
lc ->
          (Async () -> [Async ()])
-> ContT () m (Async ()) -> ContT () m [Async ()]
forall a b. (a -> b) -> ContT () m a -> ContT () m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Async () -> [Async ()]
forall a. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ContT () m (Async ()) -> ContT () m [Async ()])
-> (m () -> ContT () m (Async ())) -> m () -> ContT () m [Async ()]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Async () -> m ()) -> m ()) -> ContT () m (Async ()))
-> (m () -> (Async () -> m ()) -> m ())
-> m ()
-> ContT () m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> (Async () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (m () -> ContT () m [Async ()]) -> m () -> ContT () m [Async ()]
forall a b. (a -> b) -> a -> b
$
            LogConfig -> String -> MVar () -> Int -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> String -> MVar () -> Int -> m ()
refreshLiveness (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) (LivenessConfig -> String
livenessPath LivenessConfig
lc) (LivenessConfig -> MVar ()
livenessSignal LivenessConfig
lc) (LivenessConfig -> Int
livenessInterval LivenessConfig
lc)
              m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m (Either SomeException ()))
-> IO () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removeFile (LivenessConfig -> String
livenessPath LivenessConfig
lc))
        Maybe LivenessConfig
Nothing -> [Async ()] -> ContT () m [Async ()]
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []

    -- Spawn dispatcher
    dispatcher <-
      ContT . withAsync $
        runDispatcher config workerCap workQueue busyWorkerCount mLivenessSignal workerFinishedVar

    -- Spawn workers. The worker loop already swallows per-job failures via
    -- 'tryAny', so anything reaching this layer is an exception in the loop's
    -- own scaffolding (queue read, hook, context setup, etc.). Retry the
    -- worker rather than collapsing the whole pool.
    workers <-
      replicateM workerCap . ContT . withAsync $
        retryOnException (workerStateVar config) (logConfig config) "Worker" $
          workerLoop config workQueue busyWorkerCount workerFinishedVar

    -- Spawn cron scheduler (only when cronJobs is non-empty)
    cron <-
      case cronJobs config of
        [] -> [Async ()] -> ContT () m [Async ()]
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
        [CronJob payload]
jobs -> do
          sch <- m Text -> ContT () m Text
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
          pure
            <$> ( ContT . withAsync $
                    retryOnException (workerStateVar config) (logConfig config) "Cron scheduler" $
                      runCronScheduler (workerStateVar config) (logConfig config) sch jobs
                )

    -- Spawn groups table reaper (corrects drift in job_count, min_priority, min_id)
    reaper <-
      ContT . withAsync $
        retryOnException (workerStateVar config) (logConfig config) "Group reaper" $
          groupReaperLoop @m @registry @payload (groupReaperInterval config)

    -- Wait for any thread to exit (normal or exceptional)
    (_, res) <- waitAnyCatch (dispatcher : reaper : cron ++ liveness ++ workers)

    case res of
      Left SomeException
e ->
        -- A thread crashed
        m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ()) -> m () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Thread pool exception: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
      Right ()
_ ->
        -- A thread exited normally
        () -> ContT () m ()
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    -- Set shutdown state if not already set
    shutdownWorker config

    -- Graceful shutdown: drain the queue with optional timeout
    lift $ tryLog (logConfig config) Info "Starting graceful shutdown. Draining in-flight jobs..."

    -- Wait for work queue to be empty, with optional timeout
    let waitForDrain = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (STM () -> IO ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          qEmpty <- TBQueue (NonEmpty (JobRead payload)) -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue
          checkSTM qEmpty
          busy <- readTVar busyWorkerCount
          checkSTM (busy == 0)

    drainResult <- case gracefulShutdownTimeout config of
      Maybe NominalDiffTime
Nothing -> do
        -- No timeout: wait indefinitely, log progress every 10 seconds
        let drainLoop :: ContT () m ()
drainLoop = do
              drainOrTick <- m (Either () ()) -> ContT () m (Either () ())
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either () ()) -> ContT () m (Either () ()))
-> m (Either () ()) -> ContT () m (Either () ())
forall a b. (a -> b) -> a -> b
$ m () -> m () -> m (Either () ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
10_000_000) m ()
waitForDrain
              case drainOrTick of
                Right () -> () -> ContT () m ()
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Left () -> do
                  (busy, qLen) <-
                    IO (Int, Int) -> ContT () m (Int, Int)
forall a. IO a -> ContT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Int, Int) -> ContT () m (Int, Int))
-> (STM (Int, Int) -> IO (Int, Int))
-> STM (Int, Int)
-> ContT () m (Int, Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Int, Int) -> IO (Int, Int)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Int, Int) -> ContT () m (Int, Int))
-> STM (Int, Int) -> ContT () m (Int, Int)
forall a b. (a -> b) -> a -> b
$
                      (,)
                        (Int -> Int -> (Int, Int)) -> STM Int -> STM (Int -> (Int, Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
busyWorkerCount
                        STM (Int -> (Int, Int)) -> STM Int -> STM (Int, Int)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Int) -> STM Natural -> STM Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue (NonEmpty (JobRead payload)) -> STM Natural
forall a. TBQueue a -> STM Natural
lengthTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue)
                  lift $
                    tryLog (logConfig config) Info $
                      "Graceful shutdown: waiting for "
                        <> T.pack (show (busy :: Int))
                        <> " busy worker(s), "
                        <> T.pack (show (qLen :: Int))
                        <> " job(s) in queue..."
                  drainLoop
        ContT () m ()
drainLoop
        Either () () -> ContT () m (Either () ())
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () ()
forall a b. b -> Either a b
Right ())
      Just NominalDiffTime
timeoutSecs -> do
        -- Race between draining and timeout
        let timeoutMicros :: Int
timeoutMicros = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
timeoutSecs NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1_000_000)
        m (Either () ()) -> ContT () m (Either () ())
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either () ()) -> ContT () m (Either () ()))
-> m (Either () ()) -> ContT () m (Either () ())
forall a b. (a -> b) -> a -> b
$ m () -> m () -> m (Either () ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
timeoutMicros) m ()
waitForDrain

    case drainResult of
      Right () ->
        m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ()) -> m () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Info Text
"All workers are now idle. Graceful shutdown complete."
      Left () ->
        m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ()) -> m () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Warning Text
"Graceful shutdown timed out. Some jobs may still be in-flight."

-- | Periodically writes to a file to signal liveness.
--
-- Requires both a signal (from dispatcher or heartbeat) and a minimum delay
-- before writing - proof of work, rate-limited. If neither the dispatcher
-- nor heartbeat signals (e.g., broken DB connection, deadlocked dispatcher),
-- the file goes stale and the process should be restarted.
refreshLiveness :: (MonadUnliftIO m) => LogConfig -> FilePath -> MVar.MVar () -> Int -> m ()
refreshLiveness :: forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> String -> MVar () -> Int -> m ()
refreshLiveness LogConfig
logCfg String
healthcheckPath MVar ()
livenessMVar Int
n = do
  MVar () -> m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
MVar.takeMVar MVar ()
livenessMVar
  m ()
writeProbe
  m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
Async.concurrently_ (MVar () -> m ()
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
MVar.takeMVar MVar ()
livenessMVar) (Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
Conc.threadDelay (Int -> m ()) -> Int -> m ()
forall a b. (a -> b) -> a -> b
$ Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_000_000)
    m ()
writeProbe
  where
    writeProbe :: m ()
writeProbe = do
      result <- m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> m () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
writeFile String
healthcheckPath String
""
      case result of
        Left SomeException
e ->
          LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog LogConfig
logCfg LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Liveness probe write failed: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
        Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

-- | Main loop for a single worker thread.
workerLoop
  :: forall m registry payload result
   . ( JobOperation m registry payload
     , JobResult result
     , MonadMask m
     , MonadUnliftIO m
     )
  => WorkerConfig m payload result
  -> TBQueue (NonEmpty (Job.JobRead payload))
  -> TVar Int
  -- ^ Busy worker count
  -> TVar Bool
  -- ^ Worker finished signal
  -> m ()
workerLoop :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobOperation m registry payload, JobResult result, MonadMask m,
 MonadUnliftIO m) =>
WorkerConfig m payload result
-> TBQueue (NonEmpty (JobRead payload))
-> TVar Int
-> TVar Bool
-> m ()
workerLoop WorkerConfig m payload result
config TBQueue (NonEmpty (JobRead payload))
workQueue TVar Int
busyCount TVar Bool
workerFinishedVar = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
unmask -> do
  -- Read batch from queue (singleton in single mode, multiple in batched mode).
  -- The outer mask covers the window between the atomic claim (which
  -- increments busyCount) and entering the finally block that decrements it.
  jobBatch <- STM (NonEmpty (JobRead payload)) -> m (NonEmpty (JobRead payload))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (NonEmpty (JobRead payload))
 -> m (NonEmpty (JobRead payload)))
-> STM (NonEmpty (JobRead payload))
-> m (NonEmpty (JobRead payload))
forall a b. (a -> b) -> a -> b
$ do
    batch <- TBQueue (NonEmpty (JobRead payload))
-> STM (NonEmpty (JobRead payload))
forall a. TBQueue a -> STM a
readTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue
    modifyTVar' busyCount (+ 1)
    pure batch

  flip
    finally
    ( atomically $ do
        modifyTVar' busyCount (subtract 1)
        writeTVar workerFinishedVar True
    )
    $ withJobContext jobBatch
    $ do
      currentTime <- liftIO getCurrentTime
      mapM_
        (\JobRead payload
job -> LogConfig -> Text -> m () -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> Text -> m () -> m ()
runHook (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
"onJobClaimed" (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
Job.onJobClaimed (WorkerConfig m payload result -> ObservabilityHooks m payload
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ObservabilityHooks m payload
observabilityHooks WorkerConfig m payload result
config) JobRead payload
job UTCTime
currentTime)
        jobBatch
      -- Unmask for actual job processing (allow cancellation during work).
      -- tryAny catches synchronous failures and rethrows async exceptions, so
      -- shutdown still propagates. threadDelay below is interruptible.
      result <- tryAny $ unmask $ processJobsWithRetry config jobBatch
      case result of
        Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
        Left SomeException
e -> do
          LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Error (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$ Text
"Worker exception: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
          Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
2_000_000

-- | Read and decode child results for a rollup finalizer.
-- Decode failures appear as @Left decodeError@ - the child succeeded but
-- its result JSON doesn't match the expected type.
readChildResults
  :: (JobResult a, MonadArbiter m)
  => Text
  -> Job.JobRead payload
  -> m (Map.Map Int64 (Either Text a), Map.Map Int64 T.Text)
readChildResults :: forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName JobRead payload
job = do
  (results, failures, mSnapshot, dlqFailures) <-
    Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
Ops.readChildResultsRaw Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job)
  let raw = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
      merged = (Either Text Value -> Either Text a)
-> Map Int64 (Either Text Value) -> Map Int64 (Either Text a)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (Either Text Value -> (Value -> Either Text a) -> Either Text a
forall a b. Either Text a -> (a -> Either Text b) -> Either Text b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Value -> Either Text a
forall a. JobResult a => Value -> Either Text a
decodeJobResult) Map Int64 (Either Text Value)
raw
  pure (merged, dlqFailures)

processJobsWithRetry
  :: forall m registry payload result
   . ( JobOperation m registry payload
     , JobResult result
     , MonadUnliftIO m
     )
  => WorkerConfig m payload result
  -> NonEmpty (Job.JobRead payload)
  -> m ()
processJobsWithRetry :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobOperation m registry payload, JobResult result,
 MonadUnliftIO m) =>
WorkerConfig m payload result -> NonEmpty (JobRead payload) -> m ()
processJobsWithRetry WorkerConfig m payload result
config NonEmpty (JobRead payload)
jobs = do
  let hooks :: ObservabilityHooks m payload
hooks = WorkerConfig m payload result -> ObservabilityHooks m payload
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ObservabilityHooks m payload
observabilityHooks WorkerConfig m payload result
config
      -- Use minimum maxAttempts across all jobs in the batch
      maxAtts :: Int32
maxAtts = [Int32] -> Int32
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (JobRead payload -> Int32) -> [JobRead payload] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map (\JobRead payload
job -> Int32 -> Maybe Int32 -> Int32
forall a. a -> Maybe a -> a
fromMaybe (WorkerConfig m payload result -> Int32
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int32
maxAttempts WorkerConfig m payload result
config) (JobRead payload -> Maybe Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
Job.maxAttempts JobRead payload
job)) (NonEmpty (JobRead payload) -> [JobRead payload]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (JobRead payload)
jobs)
  startTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
  schemaName <- Arb.getSchema
  result <-
    tryAny
      $ withJobsHeartbeat
        hooks
        (heartbeatInterval config)
        (visibilityTimeout config)
        startTime
        jobs
        (logConfig config)
        (fmap livenessSignal (livenessConfig config))
      $ do
        if useWorkerTransaction config
          then withDbTransaction $ do
            handlerResult <- runHandler config schemaName jobs
            -- Store result for parent rollup if applicable
            case handlerMode config of
              SingleJobMode Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result
_ -> do
                let (JobRead payload
job :| [JobRead payload]
_) = NonEmpty (JobRead payload)
jobs
                case (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
Job.parentId JobRead payload
job, result -> Maybe Value
forall a. JobResult a => a -> Maybe Value
encodeJobResult result
handlerResult) of
                  (Just Int64
pid, Just Value
val) ->
                    m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> Int64 -> Value -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Int64 -> Value -> m Int64
Ops.insertResult Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) Int64
pid (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job) Value
val
                  (Maybe Int64, Maybe Value)
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                rowsAffected <- JobRead payload -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
JobRead payload -> m Int64
Arb.ackJob JobRead payload
job
                when (rowsAffected == 0) $
                  throwJobNotFound $
                    "Job "
                      <> T.pack (show (Job.primaryKey job))
                      <> " was reclaimed during processing - rolling back handler transaction"
              BatchedJobsMode Int
_ BatchedJobHandler m payload result
_ -> do
                rowsAffected <- [JobRead payload] -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[JobRead payload] -> m Int64
Arb.ackJobsBulk (NonEmpty (JobRead payload) -> [JobRead payload]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (JobRead payload)
jobs)
                when (rowsAffected /= fromIntegral (length jobs)) $
                  throwJobNotFound $
                    "Expected to ack "
                      <> T.pack (show (length jobs))
                      <> " jobs but only "
                      <> T.pack (show rowsAffected)
                      <> " were deleted - rolling back handler transaction"
          else do
            -- Manual mode: no transaction, no automatic acking
            void $ runHandler config schemaName jobs
  endTime <- liftIO getCurrentTime
  case result of
    Right () ->
      -- Success: only call onJobSuccess in automatic transaction mode
      -- In manual mode, users are responsible for acking and success observability
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (WorkerConfig m payload result -> Bool
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Bool
useWorkerTransaction WorkerConfig m payload result
config) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
        (JobRead payload -> m ()) -> NonEmpty (JobRead payload) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\JobRead payload
job -> LogConfig -> Text -> m () -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> Text -> m () -> m ()
runHook (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
"onJobSuccess" (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> UTCTime -> UTCTime -> m ()
forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> UTCTime -> UTCTime -> m ()
Job.onJobSuccess ObservabilityHooks m payload
hooks JobRead payload
job UTCTime
startTime UTCTime
endTime) NonEmpty (JobRead payload)
jobs
    Left SomeException
e
      | SomeException -> Bool
isJobGoneException SomeException
e ->
          LogConfig -> LogLevel -> Text -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> LogLevel -> Text -> m ()
tryLog (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Info (Text -> m ()) -> Text -> m ()
forall a b. (a -> b) -> a -> b
$
            Text
"Job(s) no longer available, skipping retry: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
      | Bool
otherwise ->
          -- Update all jobs for retry or move to DLQ in a separate transaction
          m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            (JobRead payload -> m ()) -> NonEmpty (JobRead payload) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobOperation m registry payload, MonadUnliftIO m) =>
WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
handleJobFailure WorkerConfig m payload result
config ObservabilityHooks m payload
hooks SomeException
e Int32
maxAtts UTCTime
startTime UTCTime
endTime) NonEmpty (JobRead payload)
jobs

-- | Run the user's handler, reading child results for rollup jobs.
runHandler
  :: forall m registry payload result
   . ( JobOperation m registry payload
     , JobResult result
     )
  => WorkerConfig m payload result
  -> Text
  -> NonEmpty (Job.JobRead payload)
  -> m result
runHandler :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobOperation m registry payload, JobResult result) =>
WorkerConfig m payload result
-> Text -> NonEmpty (JobRead payload) -> m result
runHandler WorkerConfig m payload result
config Text
schemaName NonEmpty (JobRead payload)
jobs = case WorkerConfig m payload result -> HandlerMode m payload result
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> HandlerMode m payload result
handlerMode WorkerConfig m payload result
config of
  SingleJobMode Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result
handler -> do
    let (JobRead payload
job :| [JobRead payload]
_) = NonEmpty (JobRead payload)
jobs
    (childResults, dlqFailures) <-
      if JobRead payload -> Bool
forall p k q t. Job p k q t -> Bool
Job.isRollup JobRead payload
job
        then Text
-> JobRead payload
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName JobRead payload
job
        else (Map Int64 (Either Text result), Map Int64 Text)
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map Int64 (Either Text result)
forall k a. Map k a
Map.empty, Map Int64 Text
forall k a. Map k a
Map.empty)
    runHandlerWithConnection (handler childResults dlqFailures) job
  BatchedJobsMode Int
_ BatchedJobHandler m payload result
handler ->
    BatchedJobHandler m payload result
-> NonEmpty (JobRead payload) -> m result
forall jobs result. Handler m jobs result -> jobs -> m result
forall (m :: * -> *) jobs result.
MonadArbiter m =>
Handler m jobs result -> jobs -> m result
runHandlerWithConnection BatchedJobHandler m payload result
handler NonEmpty (JobRead payload)
jobs

-- | Check if an exception indicates the job is gone (stolen or not found).
-- These jobs should not be retried or moved to DLQ.
isJobGoneException :: SomeException -> Bool
isJobGoneException :: SomeException -> Bool
isJobGoneException SomeException
e =
  case (SomeException -> Maybe JobNotFoundException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe JobNotFoundException, SomeException -> Maybe JobStolenException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e :: Maybe JobStolenException) of
    (Just JobNotFoundException
_, Maybe JobStolenException
_) -> Bool
True
    (Maybe JobNotFoundException
_, Just JobStolenException
_) -> Bool
True
    (Maybe JobNotFoundException, Maybe JobStolenException)
_ -> Bool
False

-- | Classify a handler exception into an error message and failure disposition.
--
-- Note: 'JobNotFoundException' and 'JobStolenException' are intercepted by
-- 'isJobGoneException' before reaching 'handleJobFailure', so they never
-- arrive here.
data FailureKind = RetryFailure | PermanentFailure | TreeCancelFailure | BranchCancelFailure
  deriving stock (FailureKind -> FailureKind -> Bool
(FailureKind -> FailureKind -> Bool)
-> (FailureKind -> FailureKind -> Bool) -> Eq FailureKind
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FailureKind -> FailureKind -> Bool
== :: FailureKind -> FailureKind -> Bool
$c/= :: FailureKind -> FailureKind -> Bool
/= :: FailureKind -> FailureKind -> Bool
Eq)

classifyException :: SomeException -> (T.Text, FailureKind)
classifyException :: SomeException -> (Text, FailureKind)
classifyException SomeException
e
  | Just (Retryable (JobRetryableException Text
msg)) <- SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
RetryFailure)
  | Just (Permanent (JobPermanentException Text
msg)) <- SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
PermanentFailure)
  | Just (TreeCancel (TreeCancelException Text
msg)) <- SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
TreeCancelFailure)
  | Just (BranchCancel (BranchCancelException Text
msg)) <- SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
BranchCancelFailure)
  | Just (ParsingException Text
msg) <- SomeException -> Maybe ParsingException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e = (Text
msg, FailureKind
PermanentFailure)
  | Bool
otherwise = (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e, FailureKind
RetryFailure) -- Unknown exception, treat as retryable

-- | Handle failure for a single job (retry or move to DLQ).
handleJobFailure
  :: forall m registry payload result
   . ( JobOperation m registry payload
     , MonadUnliftIO m
     )
  => WorkerConfig m payload result
  -> Job.ObservabilityHooks m payload
  -> SomeException
  -> Int32
  -> UTCTime
  -> UTCTime
  -> Job.JobRead payload
  -> m ()
handleJobFailure :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobOperation m registry payload, MonadUnliftIO m) =>
WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
handleJobFailure WorkerConfig m payload result
config ObservabilityHooks m payload
hooks SomeException
e Int32
maxAtts UTCTime
startTime UTCTime
endTime JobRead payload
job = do
  let (Text
errorMsg, FailureKind
failureKind) = SomeException -> (Text, FailureKind)
classifyException SomeException
e
      cfg :: LogConfig
cfg = WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config
  schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
  case failureKind of
    FailureKind
TreeCancelFailure -> do
      -- TreeCancel: delete the entire tree from root down (including this job)
      deleted <- Text -> Text -> Int64 -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.cancelJobTree Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job)
      when (deleted > 0) $
        runHook cfg "onJobFailure" $
          Job.onJobFailure hooks job errorMsg startTime endTime
    FailureKind
BranchCancelFailure -> do
      -- BranchCancel: cascade-delete the parent + all siblings (including this job).
      -- If no parent, just delete this job.
      let target :: Int64
target = Int64 -> Maybe Int64 -> Int64
forall a. a -> Maybe a -> a
fromMaybe (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job) (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
Job.parentId JobRead payload
job)
      deleted <- Text -> Text -> Int64 -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.cancelJobCascade Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) Int64
target
      when (deleted > 0) $
        runHook cfg "onJobFailure" $
          Job.onJobFailure hooks job errorMsg startTime endTime
    FailureKind
_
      | FailureKind
failureKind FailureKind -> FailureKind -> Bool
forall a. Eq a => a -> a -> Bool
== FailureKind
PermanentFailure Bool -> Bool -> Bool
|| JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
Job.attempts JobRead payload
job Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int32
maxAtts -> do
          -- Snapshot into parent_state before DLQ move (survives CASCADE delete).
          -- Merges old snapshot so repeated DLQ round-trips don't lose data.
          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall p k q t. Job p k q t -> Bool
Job.isRollup JobRead payload
job) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            (results, failures, mSnapshot, _dlqFailures) <-
              Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
Ops.readChildResultsRaw Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job)
            let merged = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
            unless (Map.null merged) $
              void $
                Ops.persistParentState schemaName (Job.queueName job) (Job.primaryKey job) (toJSON merged)
          -- Permanent failure or max attempts reached - move to DLQ
          rowsAffected <- Text -> JobRead payload -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
Text -> JobRead payload -> m Int64
Arb.moveToDLQ Text
errorMsg JobRead payload
job
          if rowsAffected == 0
            then
              tryLog cfg Warning $
                "Job " <> T.pack (show (Job.primaryKey job)) <> " not available for moving to DLQ"
            else do
              -- Successfully moved to DLQ
              runHook cfg "onJobFailure" $ Job.onJobFailure hooks job errorMsg startTime endTime
              runHook cfg "onJobFailedAndMovedToDLQ" $ Job.onJobFailedAndMovedToDLQ hooks errorMsg job
      | Bool
otherwise -> do
          -- Retry with configured backoff strategy and jitter
          let baseDelay :: NominalDiffTime
baseDelay = BackoffStrategy -> Int32 -> NominalDiffTime
calculateBackoff (WorkerConfig m payload result -> BackoffStrategy
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> BackoffStrategy
backoffStrategy WorkerConfig m payload result
config) (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
Job.attempts JobRead payload
job)
          backoffSecs <- IO NominalDiffTime -> m NominalDiffTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO NominalDiffTime -> m NominalDiffTime)
-> IO NominalDiffTime -> m NominalDiffTime
forall a b. (a -> b) -> a -> b
$ Jitter -> NominalDiffTime -> IO NominalDiffTime
applyJitter (WorkerConfig m payload result -> Jitter
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Jitter
jitter WorkerConfig m payload result
config) NominalDiffTime
baseDelay
          rowsAffected <- Arb.updateJobForRetry backoffSecs errorMsg job
          if rowsAffected == 0
            then
              tryLog cfg Warning $
                "Job " <> T.pack (show (Job.primaryKey job)) <> " not available for retry"
            else do
              -- Successfully updated for retry
              runHook cfg "onJobFailure" $ Job.onJobFailure hooks job errorMsg startTime endTime
              runHook cfg "onJobRetry" $ Job.onJobRetry hooks job backoffSecs

groupReaperLoop
  :: forall m registry payload
   . ( MonadUnliftIO m
     , QueueOperation m registry payload
     )
  => NominalDiffTime
  -> m ()
groupReaperLoop :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
NominalDiffTime -> m ()
groupReaperLoop NominalDiffTime
interval = do
  let intervalSecs :: Int
intervalSecs = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
interval
  m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> m ()
Arb.refreshGroups @m @registry @payload Int
intervalSecs
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int
intervalSecs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_000_000)