{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE UndecidableInstances #-}

-- | Entry point for running a worker pool that fetches and executes jobs.
module Arbiter.Worker
  ( -- * Running Workers
    runWorkerPool

    -- * Multi-Queue Workers
  , NamedWorkerPool (..)
  , namedWorkerPool
  , runWorkerPools
  , runSelectedWorkerPools
  , getEnabledQueues

    -- * Job Result
  , JobResult (..)

    -- * Re-exports
  , module Arbiter.Worker.Config
  , module Arbiter.Worker.BackoffStrategy
  , module Arbiter.Worker.Logger
  , module Arbiter.Worker.WorkerState

    -- * Cron
  , CronJob (..)
  , OverlapPolicy (..)
  , cronJob
  , initCronSchedules
  , overlapPolicyToText
  , overlapPolicyFromText
  ) where

import Arbiter.Core.Codec (Col (..), pval)
import Arbiter.Core.Exceptions
  ( BranchCancelException (..)
  , JobException (..)
  , JobPermanentException (..)
  , JobRetryableException (..)
  , ParsingException (..)
  , TreeCancelException (..)
  , throwJobNotFound
  )
import Arbiter.Core.HasArbiterSchema (HasArbiterSchema (..))
import Arbiter.Core.HighLevel (JobOperation, QueueOperation)
import Arbiter.Core.HighLevel qualified as Arb
import Arbiter.Core.Job.Types qualified as Job
import Arbiter.Core.MonadArbiter (MonadArbiter (..))
import Arbiter.Core.Operations qualified as Ops
import Arbiter.Core.QueueRegistry (RegistryTables (..), TableForPayload)
import Control.Exception (SomeException, fromException)
import Control.Monad (forever, replicateM, void, when)
import Control.Monad.Catch (MonadMask)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Cont (ContT (..), evalContT)
import Data.Aeson (FromJSON, ToJSON, Value, toJSON)
import Data.Aeson qualified as Aeson
import Data.Foldable (toList)
import Data.Int (Int32, Int64)
import Data.List.NonEmpty (NonEmpty (..))
import Data.Map.Strict qualified as Map
import Data.Maybe (fromMaybe)
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (NominalDiffTime, UTCTime, getCurrentTime)
import Data.Traversable (for)
import Database.PostgreSQL.Simple (close, connectPostgreSQL)
import GHC.TypeLits (symbolVal)
import System.Directory (removeFile)
import System.Environment (lookupEnv)
import UnliftIO
  ( MonadUnliftIO
  , atomically
  , bracket
  , checkSTM
  , finally
  , isAsyncException
  , isEmptyTBQueue
  , lengthTBQueue
  , mask
  , modifyTVar'
  , newTBQueueIO
  , newTVarIO
  , readTBQueue
  , readTVar
  , throwIO
  , tryAny
  , trySyncOrAsync
  , waitAnyCatch
  , withAsync
  , writeTVar
  )
import UnliftIO.Async (race)
import UnliftIO.Async qualified as Async
import UnliftIO.Concurrent (threadDelay)
import UnliftIO.Concurrent qualified as Conc
import UnliftIO.MVar qualified as MVar
import UnliftIO.STM (TBQueue, TVar)

import Arbiter.Worker.BackoffStrategy
import Arbiter.Worker.Config
import Arbiter.Worker.Cron
  ( CronJob (..)
  , OverlapPolicy (..)
  , cronJob
  , initCronSchedules
  , overlapPolicyFromText
  , overlapPolicyToText
  , runCronScheduler
  )
import Arbiter.Worker.Dispatcher
import Arbiter.Worker.Heartbeat (withJobsHeartbeat)
import Arbiter.Worker.Logger
import Arbiter.Worker.Logger.Internal (logMessage, runHook, withJobContext)
import Arbiter.Worker.Retry (retryOnException)
import Arbiter.Worker.WorkerState

-- ---------------------------------------------------------------------------
-- Job Result
-- ---------------------------------------------------------------------------

-- | Handler result types. @()@ is fire-and-forget; any @(ToJSON a, FromJSON a)@
-- is stored in the results table when the job has a parent and decoded when
-- read by a rollup finalizer.
class JobResult a where
  encodeJobResult :: a -> Maybe Value
  decodeJobResult :: Value -> Either Text a

instance JobResult () where
  encodeJobResult :: () -> Maybe Value
encodeJobResult ()
_ = Maybe Value
forall a. Maybe a
Nothing
  decodeJobResult :: Value -> Either Text ()
decodeJobResult Value
_ = () -> Either Text ()
forall a b. b -> Either a b
Right ()

instance {-# OVERLAPPABLE #-} (FromJSON a, ToJSON a) => JobResult a where
  encodeJobResult :: a -> Maybe Value
encodeJobResult = Value -> Maybe Value
forall a. a -> Maybe a
Just (Value -> Maybe Value) -> (a -> Value) -> a -> Maybe Value
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Value
forall a. ToJSON a => a -> Value
toJSON
  decodeJobResult :: Value -> Either Text a
decodeJobResult Value
v = case Value -> Result a
forall a. FromJSON a => Value -> Result a
Aeson.fromJSON Value
v of
    Aeson.Success a
a -> a -> Either Text a
forall a b. b -> Either a b
Right a
a
    Aeson.Error String
err -> Text -> Either Text a
forall a b. a -> Either a b
Left (String -> Text
T.pack String
err)

-- ---------------------------------------------------------------------------
-- Multi-Queue Workers
-- ---------------------------------------------------------------------------

-- | A worker pool bundled with its queue name from the type-level registry.
--
-- The queue name is derived from the registry at compile time, ensuring it
-- stays in sync with the type-level definition.
--
-- Use 'namedWorkerPool' to construct these, then 'runSelectedWorkerPools'
-- to run only the ones matching a runtime configuration.
--
-- Example:
--
-- @
-- allWorkers :: [NamedWorkerPool (SimpleDb MyRegistry IO)]
-- allWorkers =
--   [ namedWorkerPool emailConfig      -- name derived from registry: "email_jobs"
--   , namedWorkerPool imageConfig      -- name derived from registry: "image_jobs"
--   , namedWorkerPool notifConfig      -- name derived from registry: "notifications"
--   ]
--
-- main = runWorkerPools (Proxy \@MyRegistry) allWorkers (\\_ -> pure ())
-- @
data NamedWorkerPool m
  = forall registry payload result.
  (JobResult result, QueueOperation m registry payload) =>
  NamedWorkerPool
  { forall (m :: * -> *). NamedWorkerPool m -> Text
workerPoolName :: Text
  -- ^ Queue name from the type-level registry
  , ()
workerPoolConfig :: WorkerConfig m payload result
  -- ^ The worker configuration
  }

-- | Create a named worker pool, deriving the name from the type-level registry.
namedWorkerPool
  :: forall m registry payload result
   . (JobResult result, QueueOperation m registry payload)
  => WorkerConfig m payload result
  -> NamedWorkerPool m
namedWorkerPool :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobResult result, QueueOperation m registry payload) =>
WorkerConfig m payload result -> NamedWorkerPool m
namedWorkerPool WorkerConfig m payload result
cfg =
  NamedWorkerPool
    { workerPoolName :: Text
workerPoolName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy (TableForPayload payload registry) -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @(TableForPayload payload registry))
    , workerPoolConfig :: WorkerConfig m payload result
workerPoolConfig = WorkerConfig m payload result
cfg
    }

-- | Run worker pools with shared state for coordinated shutdown.
--
-- Creates shared state, passes it to setup action (for signal handlers),
-- then runs pools. Reads enabled queues from ARBITER_ENABLED_QUEUES.
runWorkerPools
  :: forall m registry
   . (MonadMask m, MonadUnliftIO m, RegistryTables registry)
  => Proxy registry
  -> [NamedWorkerPool m]
  -> (TVar WorkerState -> IO ())
  -> m ()
runWorkerPools :: forall (m :: * -> *) (registry :: JobPayloadRegistry).
(MonadMask m, MonadUnliftIO m, RegistryTables registry) =>
Proxy registry
-> [NamedWorkerPool m] -> (TVar WorkerState -> IO ()) -> m ()
runWorkerPools Proxy registry
registry [NamedWorkerPool m]
pools TVar WorkerState -> IO ()
setup = do
  sharedState <- IO (TVar WorkerState) -> m (TVar WorkerState)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (TVar WorkerState)
newWorkerState
  liftIO $ setup sharedState
  enabled <- liftIO $ getEnabledQueues "ARBITER_ENABLED_QUEUES" registry
  runSelectedWorkerPools sharedState enabled pools

-- | Run only the worker pools whose names appear in the enabled list.
runSelectedWorkerPools
  :: (MonadMask m, MonadUnliftIO m)
  => TVar WorkerState
  -> [Text]
  -> [NamedWorkerPool m]
  -> m ()
runSelectedWorkerPools :: forall (m :: * -> *).
(MonadMask m, MonadUnliftIO m) =>
TVar WorkerState -> [Text] -> [NamedWorkerPool m] -> m ()
runSelectedWorkerPools TVar WorkerState
sharedState [Text]
enabled [NamedWorkerPool m]
pools =
  case (NamedWorkerPool m -> Bool)
-> [NamedWorkerPool m] -> [NamedWorkerPool m]
forall a. (a -> Bool) -> [a] -> [a]
filter (\(NamedWorkerPool Text
name WorkerConfig m payload result
_) -> Text
name Text -> [Text] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Text]
enabled) [NamedWorkerPool m]
pools of
    [] -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    [NamedWorkerPool m]
selected -> ContT () m () -> m ()
forall (m :: * -> *) r. Monad m => ContT r m r -> m r
evalContT (ContT () m () -> m ()) -> ContT () m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
      asyncs <- [NamedWorkerPool m]
-> (NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for [NamedWorkerPool m]
selected ((NamedWorkerPool m -> ContT () m (Async ()))
 -> ContT () m [Async ()])
-> (NamedWorkerPool m -> ContT () m (Async ()))
-> ContT () m [Async ()]
forall a b. (a -> b) -> a -> b
$ \(NamedWorkerPool Text
_ WorkerConfig m payload result
cfg) ->
        let cfg' :: WorkerConfig m payload result
cfg' = WorkerConfig m payload result
cfg {workerStateVar = sharedState}
         in ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Async () -> m ()) -> m ()) -> ContT () m (Async ()))
-> ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall a b. (a -> b) -> a -> b
$ \Async () -> m ()
k -> m () -> (Async () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
Async.withAsync (WorkerConfig m payload result -> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobResult result, MonadMask m, MonadUnliftIO m,
 QueueOperation m registry payload) =>
WorkerConfig m payload result -> m ()
runWorkerPool WorkerConfig m payload result
cfg') Async () -> m ()
k
      lift $ mapM_ Async.waitCatch asyncs

-- | Get enabled queues from an environment variable.
--
-- If the environment variable is set and non-empty, parses it as a
-- comma-separated list of queue names. Each name is validated against the
-- registry - invalid names cause an error. If not set or empty, returns all
-- queue names from the registry.
--
-- Example:
--
-- @
-- -- With ENABLED_QUEUES="email_jobs,notifications"
-- queues <- getEnabledQueues "ENABLED_QUEUES" (Proxy \@MyRegistry)
-- -- Returns: ["email_jobs", "notifications"]
--
-- -- With ENABLED_QUEUES unset or empty
-- queues <- getEnabledQueues "ENABLED_QUEUES" (Proxy \@MyRegistry)
-- -- Returns: all queues from registry
--
-- -- With ENABLED_QUEUES="email_jobs,invalid_queue"
-- -- Throws error: "Unknown queue names: invalid_queue"
-- @
getEnabledQueues
  :: (RegistryTables registry)
  => String
  -- ^ Environment variable name
  -> Proxy registry
  -- ^ Registry proxy
  -> IO [Text]
getEnabledQueues :: forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
String -> Proxy registry -> IO [Text]
getEnabledQueues String
envVar Proxy registry
registry = do
  let allQueues :: [Text]
allQueues = Proxy registry -> [Text]
forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> [Text]
registryTableNames Proxy registry
registry
  mVal <- String -> IO (Maybe String)
lookupEnv String
envVar
  case mVal of
    Maybe String
Nothing -> [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
allQueues
    Just String
val
      | Text -> Bool
T.null (Text -> Text
T.strip (Text -> Text) -> Text -> Text
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
val) -> [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
allQueues
      | Bool
otherwise -> do
          let requested :: [Text]
requested = (Text -> Text) -> [Text] -> [Text]
forall a b. (a -> b) -> [a] -> [b]
map Text -> Text
T.strip ([Text] -> [Text]) -> [Text] -> [Text]
forall a b. (a -> b) -> a -> b
$ HasCallStack => Text -> Text -> [Text]
Text -> Text -> [Text]
T.splitOn Text
"," (Text -> [Text]) -> Text -> [Text]
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack String
val
              invalid :: [Text]
invalid = (Text -> Bool) -> [Text] -> [Text]
forall a. (a -> Bool) -> [a] -> [a]
filter (Text -> [Text] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`notElem` [Text]
allQueues) [Text]
requested
          if [Text] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Text]
invalid
            then [Text] -> IO [Text]
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure [Text]
requested
            else IOError -> IO [Text]
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (IOError -> IO [Text])
-> (String -> IOError) -> String -> IO [Text]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> IOError
userError (String -> IO [Text]) -> String -> IO [Text]
forall a b. (a -> b) -> a -> b
$ String
"Unknown queue names: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Text -> String
T.unpack (Text -> [Text] -> Text
T.intercalate Text
", " [Text]
invalid)

-- ---------------------------------------------------------------------------
-- Worker Pool
-- ---------------------------------------------------------------------------

-- | Starts a worker pool with a dispatcher and N worker threads.
runWorkerPool
  :: forall m registry payload result
   . ( JobResult result
     , MonadMask m
     , MonadUnliftIO m
     , QueueOperation m registry payload
     )
  => WorkerConfig m payload result
  -> m ()
runWorkerPool :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobResult result, MonadMask m, MonadUnliftIO m,
 QueueOperation m registry payload) =>
WorkerConfig m payload result -> m ()
runWorkerPool WorkerConfig m payload result
config = do
  let workerCap :: Int
workerCap = WorkerConfig m payload result -> Int
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int
workerCount WorkerConfig m payload result
config
      mLiveness :: Maybe LivenessConfig
mLiveness = WorkerConfig m payload result -> Maybe LivenessConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Maybe LivenessConfig
livenessConfig WorkerConfig m payload result
config
      mLivenessSignal :: Maybe (MVar ())
mLivenessSignal = (LivenessConfig -> MVar ())
-> Maybe LivenessConfig -> Maybe (MVar ())
forall a b. (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap LivenessConfig -> MVar ()
livenessSignal Maybe LivenessConfig
mLiveness

  -- Create shared state
  workQueue <- IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (TBQueue (NonEmpty (JobRead payload)))
 -> m (TBQueue (NonEmpty (JobRead payload))))
-> IO (TBQueue (NonEmpty (JobRead payload)))
-> m (TBQueue (NonEmpty (JobRead payload)))
forall a b. (a -> b) -> a -> b
$ Natural -> IO (TBQueue (NonEmpty (JobRead payload)))
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO (Int -> Natural
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
workerCap)
  busyWorkerCount <- liftIO $ newTVarIO 0
  workerFinishedVar <- liftIO $ newTVarIO False

  evalContT $ do
    -- Spawn liveness probe
    liveness <-
      case mLiveness of
        Just LivenessConfig
lc ->
          (Async () -> [Async ()])
-> ContT () m (Async ()) -> ContT () m [Async ()]
forall a b. (a -> b) -> ContT () m a -> ContT () m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Async () -> [Async ()]
forall a. a -> [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ContT () m (Async ()) -> ContT () m [Async ()])
-> (m () -> ContT () m (Async ())) -> m () -> ContT () m [Async ()]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ((Async () -> m ()) -> m ()) -> ContT () m (Async ())
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Async () -> m ()) -> m ()) -> ContT () m (Async ()))
-> (m () -> (Async () -> m ()) -> m ())
-> m ()
-> ContT () m (Async ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> (Async () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (m () -> ContT () m [Async ()]) -> m () -> ContT () m [Async ()]
forall a b. (a -> b) -> a -> b
$
            LogConfig -> String -> MVar () -> Int -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> String -> MVar () -> Int -> m ()
refreshLiveness (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) (LivenessConfig -> String
livenessPath LivenessConfig
lc) (LivenessConfig -> MVar ()
livenessSignal LivenessConfig
lc) (LivenessConfig -> Int
livenessInterval LivenessConfig
lc)
              m () -> m () -> m ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m (Either SomeException ()))
-> IO () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ String -> IO ()
removeFile (LivenessConfig -> String
livenessPath LivenessConfig
lc))
        Maybe LivenessConfig
Nothing -> [Async ()] -> ContT () m [Async ()]
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []

    -- Spawn dispatcher
    dispatcher <-
      ContT . withAsync $
        runDispatcher config workerCap workQueue busyWorkerCount mLivenessSignal workerFinishedVar

    -- Spawn workers
    workers <-
      replicateM workerCap . ContT . withAsync $
        workerLoop config workQueue busyWorkerCount workerFinishedVar

    -- Spawn cron scheduler (only when cronJobs is non-empty)
    cron <-
      case cronJobs config of
        [] -> [Async ()] -> ContT () m [Async ()]
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
        [CronJob payload]
jobs -> do
          sch <- m Text -> ContT () m Text
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
          pure
            <$> ( ContT . withAsync $
                    retryOnException (workerStateVar config) (logConfig config) "Cron scheduler" $
                      bracket
                        (liftIO $ connectPostgreSQL (connStr config))
                        (liftIO . close)
                        (\Connection
conn -> Connection -> LogConfig -> Text -> [CronJob payload] -> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
(MonadUnliftIO m, QueueOperation m registry payload) =>
Connection -> LogConfig -> Text -> [CronJob payload] -> m ()
runCronScheduler Connection
conn (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
sch [CronJob payload]
jobs)
                )

    -- Spawn groups table reaper (corrects drift in job_count, min_priority, min_id)
    reaper <-
      pure
        <$> ( ContT . withAsync $
                retryOnException (workerStateVar config) (logConfig config) "Group reaper" $
                  groupReaperLoop config (groupReaperInterval config)
            )

    -- Wait for any thread to exit (normal or exceptional)
    (_, res) <- waitAnyCatch (dispatcher : cron ++ reaper ++ liveness ++ workers)

    case res of
      Left SomeException
e ->
        -- A thread crashed
        m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ())
-> (IO () -> m ()) -> IO () -> ContT () m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ContT () m ()) -> IO () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> IO ()
logMessage (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String
"Thread pool exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
e
      Right ()
_ ->
        -- A thread exited normally
        () -> ContT () m ()
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

    -- Set shutdown state if not already set
    shutdownWorker config

    -- Graceful shutdown: drain the queue with optional timeout
    lift . void . tryAny . liftIO $
      logMessage (logConfig config) Info (T.pack "Starting graceful shutdown. Draining in-flight jobs...")

    -- Wait for work queue to be empty, with optional timeout
    let waitForDrain = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (STM () -> IO ()) -> STM () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          qEmpty <- TBQueue (NonEmpty (JobRead payload)) -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue
          checkSTM qEmpty
          busy <- readTVar busyWorkerCount
          checkSTM (busy == 0)

    drainResult <- case gracefulShutdownTimeout config of
      Maybe NominalDiffTime
Nothing -> do
        -- No timeout: wait indefinitely, log progress every 10 seconds
        let drainLoop :: ContT () m ()
drainLoop = do
              drainOrTick <- m (Either () ()) -> ContT () m (Either () ())
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either () ()) -> ContT () m (Either () ()))
-> m (Either () ()) -> ContT () m (Either () ())
forall a b. (a -> b) -> a -> b
$ m () -> m () -> m (Either () ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
10_000_000) m ()
waitForDrain
              case drainOrTick of
                Right () -> () -> ContT () m ()
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                Left () -> do
                  (busy, qLen) <-
                    IO (Int, Int) -> ContT () m (Int, Int)
forall a. IO a -> ContT () m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Int, Int) -> ContT () m (Int, Int))
-> (STM (Int, Int) -> IO (Int, Int))
-> STM (Int, Int)
-> ContT () m (Int, Int)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM (Int, Int) -> IO (Int, Int)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Int, Int) -> ContT () m (Int, Int))
-> STM (Int, Int) -> ContT () m (Int, Int)
forall a b. (a -> b) -> a -> b
$
                      (,)
                        (Int -> Int -> (Int, Int)) -> STM Int -> STM (Int -> (Int, Int))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
busyWorkerCount
                        STM (Int -> (Int, Int)) -> STM Int -> STM (Int, Int)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Natural -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (Natural -> Int) -> STM Natural -> STM Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue (NonEmpty (JobRead payload)) -> STM Natural
forall a. TBQueue a -> STM Natural
lengthTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue)
                  lift . void . tryAny . liftIO $
                    logMessage (logConfig config) Info $
                      T.pack $
                        "Graceful shutdown: waiting for "
                          <> show (busy :: Int)
                          <> " busy worker(s), "
                          <> show (qLen :: Int)
                          <> " job(s) in queue..."
                  drainLoop
        ContT () m ()
drainLoop
        Either () () -> ContT () m (Either () ())
forall a. a -> ContT () m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either () ()
forall a b. b -> Either a b
Right ())
      Just NominalDiffTime
timeoutSecs -> do
        -- Race between draining and timeout
        let timeoutMicros :: Int
timeoutMicros = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
timeoutSecs NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1_000_000)
        m (Either () ()) -> ContT () m (Either () ())
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Either () ()) -> ContT () m (Either () ()))
-> m (Either () ()) -> ContT () m (Either () ())
forall a b. (a -> b) -> a -> b
$ m () -> m () -> m (Either () ())
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race (IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
timeoutMicros) m ()
waitForDrain

    case drainResult of
      Right () ->
        m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ())
-> (IO () -> m ()) -> IO () -> ContT () m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ContT () m ()) -> IO () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$
          LogConfig -> LogLevel -> Text -> IO ()
logMessage (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Info (String -> Text
T.pack String
"All workers are now idle. Graceful shutdown complete.")
      Left () ->
        m () -> ContT () m ()
forall (m :: * -> *) a. Monad m => m a -> ContT () m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ContT () m ())
-> (IO () -> m ()) -> IO () -> ContT () m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ContT () m ()) -> IO () -> ContT () m ()
forall a b. (a -> b) -> a -> b
$
          LogConfig -> LogLevel -> Text -> IO ()
logMessage (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Warning (String -> Text
T.pack String
"Graceful shutdown timed out. Some jobs may still be in-flight.")

-- | Periodically writes to a file to signal liveness.
--
-- Requires both a signal (from dispatcher or heartbeat) and a minimum delay
-- before writing — proof of work, rate-limited. If neither the dispatcher
-- nor heartbeat signals (e.g., broken DB connection, deadlocked dispatcher),
-- the file goes stale and the process should be restarted.
refreshLiveness :: (MonadUnliftIO m) => LogConfig -> FilePath -> MVar.MVar () -> Int -> m ()
refreshLiveness :: forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> String -> MVar () -> Int -> m ()
refreshLiveness LogConfig
logCfg String
healthcheckPath MVar ()
livenessMVar Int
n = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  result <- m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> m () -> m (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ String -> String -> IO ()
writeFile String
healthcheckPath String
""
  case result of
    Left SomeException
e ->
      m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
        LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
logCfg LogLevel
Error (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
          Text
"Liveness probe write failed: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
    Right () -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  Async.concurrently_ (MVar.takeMVar livenessMVar) (Conc.threadDelay $ n * 1_000_000)

-- | Main loop for a single worker thread.
workerLoop
  :: forall m registry payload result
   . ( JobOperation m registry payload
     , JobResult result
     , MonadMask m
     , MonadUnliftIO m
     )
  => WorkerConfig m payload result
  -> TBQueue (NonEmpty (Job.JobRead payload))
  -> TVar Int
  -- ^ Busy worker count
  -> TVar Bool
  -- ^ Worker finished signal
  -> m ()
workerLoop :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobOperation m registry payload, JobResult result, MonadMask m,
 MonadUnliftIO m) =>
WorkerConfig m payload result
-> TBQueue (NonEmpty (JobRead payload))
-> TVar Int
-> TVar Bool
-> m ()
workerLoop WorkerConfig m payload result
config TBQueue (NonEmpty (JobRead payload))
workQueue TVar Int
busyCount TVar Bool
workerFinishedVar = m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> m a) -> m b) -> m b
mask (((forall a. m a -> m a) -> m ()) -> m ())
-> ((forall a. m a -> m a) -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
unmask -> do
  -- Read batch from queue (singleton in single mode, multiple in batched mode)
  jobBatch <- STM (NonEmpty (JobRead payload)) -> m (NonEmpty (JobRead payload))
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (NonEmpty (JobRead payload))
 -> m (NonEmpty (JobRead payload)))
-> STM (NonEmpty (JobRead payload))
-> m (NonEmpty (JobRead payload))
forall a b. (a -> b) -> a -> b
$ do
    batch <- TBQueue (NonEmpty (JobRead payload))
-> STM (NonEmpty (JobRead payload))
forall a. TBQueue a -> STM a
readTBQueue TBQueue (NonEmpty (JobRead payload))
workQueue
    modifyTVar' busyCount (+ 1)
    pure batch

  flip
    finally
    ( atomically $ do
        modifyTVar' busyCount (subtract 1)
        writeTVar workerFinishedVar True
    )
    $ withJobContext jobBatch
    $ do
      currentTime <- liftIO getCurrentTime
      mapM_
        (\JobRead payload
job -> LogConfig -> Text -> m () -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> Text -> m () -> m ()
runHook (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
"onJobClaimed" (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
Job.onJobClaimed (WorkerConfig m payload result -> ObservabilityHooks m payload
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ObservabilityHooks m payload
observabilityHooks WorkerConfig m payload result
config) JobRead payload
job UTCTime
currentTime)
        jobBatch
      -- Unmask for actual job processing (allow cancellation during work)
      result <- trySyncOrAsync $ unmask $ processJobsWithRetry config jobBatch
      liftIO $ handleWorkerException (logConfig config) result

-- | Common exception handling for worker loops
handleWorkerException :: LogConfig -> Either SomeException () -> IO ()
handleWorkerException :: LogConfig -> Either SomeException () -> IO ()
handleWorkerException LogConfig
cfg Either SomeException ()
result =
  case Either SomeException ()
result of
    Right () -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Left (SomeException
e :: SomeException) -> do
      let msg :: Text
msg = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ String
"Worker exception: " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> SomeException -> String
forall a. Show a => a -> String
show SomeException
e
      IO (Either SomeException ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Either SomeException ()) -> IO ())
-> (IO () -> IO (Either SomeException ())) -> IO () -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ LogConfig -> LogLevel -> Text -> IO ()
logMessage LogConfig
cfg LogLevel
Error Text
msg
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SomeException -> Bool
forall e. Exception e => e -> Bool
isAsyncException SomeException
e) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
e
      Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay Int
2_000_000

-- | Read and decode child results for a rollup finalizer.
--
-- Decode failures are represented as @Left@ entries in the result map,
-- allowing the handler to process valid results while seeing failures.
readChildResults
  :: (JobResult a, MonadArbiter m)
  => Text
  -> Job.JobRead payload
  -> m (Map.Map Int64 (Either Text a), Map.Map Int64 T.Text)
readChildResults :: forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName JobRead payload
job = do
  (results, failures, mSnapshot, dlqFailures) <-
    Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
Ops.readChildResultsRaw Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job)
  let raw = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
      merged = (Either Text Value -> Either Text a)
-> Map Int64 (Either Text Value) -> Map Int64 (Either Text a)
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map (Either Text Value -> (Value -> Either Text a) -> Either Text a
forall a b. Either Text a -> (a -> Either Text b) -> Either Text b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Value -> Either Text a
forall a. JobResult a => Value -> Either Text a
decodeJobResult) Map Int64 (Either Text Value)
raw
  pure (merged, dlqFailures)

processJobsWithRetry
  :: forall m registry payload result
   . ( JobOperation m registry payload
     , JobResult result
     , MonadUnliftIO m
     )
  => WorkerConfig m payload result
  -> NonEmpty (Job.JobRead payload)
  -> m ()
processJobsWithRetry :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobOperation m registry payload, JobResult result,
 MonadUnliftIO m) =>
WorkerConfig m payload result -> NonEmpty (JobRead payload) -> m ()
processJobsWithRetry WorkerConfig m payload result
config NonEmpty (JobRead payload)
jobs = do
  let hooks :: ObservabilityHooks m payload
hooks = WorkerConfig m payload result -> ObservabilityHooks m payload
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> ObservabilityHooks m payload
observabilityHooks WorkerConfig m payload result
config
      -- Use minimum maxAttempts across all jobs in the batch
      maxAtts :: Int32
maxAtts = [Int32] -> Int32
forall a. Ord a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
minimum ([Int32] -> Int32) -> [Int32] -> Int32
forall a b. (a -> b) -> a -> b
$ (JobRead payload -> Int32) -> [JobRead payload] -> [Int32]
forall a b. (a -> b) -> [a] -> [b]
map (\JobRead payload
job -> Int32 -> Maybe Int32 -> Int32
forall a. a -> Maybe a -> a
fromMaybe (WorkerConfig m payload result -> Int32
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Int32
maxAttempts WorkerConfig m payload result
config) (JobRead payload -> Maybe Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
Job.maxAttempts JobRead payload
job)) (NonEmpty (JobRead payload) -> [JobRead payload]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (JobRead payload)
jobs)
  startTime <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
  result <-
    tryAny
      $ withJobsHeartbeat
        hooks
        (heartbeatInterval config)
        (visibilityTimeout config)
        startTime
        jobs
        (logConfig config)
        (fmap livenessSignal (livenessConfig config))
      $ if useWorkerTransaction config
        then withDbTransaction $ do
          schemaName <- Arb.getSchema
          case transactionTimeout config of
            Just NominalDiffTime
timeout -> do
              let timeoutMs :: Int64
timeoutMs = NominalDiffTime -> Int64
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling (NominalDiffTime
timeout NominalDiffTime -> NominalDiffTime -> NominalDiffTime
forall a. Num a => a -> a -> a
* NominalDiffTime
1000) :: Int64
              m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Params -> m Int64
forall (m :: * -> *). MonadArbiter m => Text -> Params -> m Int64
executeStatement Text
"SET LOCAL statement_timeout = ?" [Col Int64 -> Int64 -> SomeParam
forall a. Col a -> a -> SomeParam
pval Col Int64
CInt8 Int64
timeoutMs]
            Maybe NominalDiffTime
Nothing -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          case handlerMode config of
            SingleJobMode Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result
handler -> do
              let (JobRead payload
job :| [JobRead payload]
_) = NonEmpty (JobRead payload)
jobs
              (childResults, dlqFailures) <-
                if JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
Job.isRollup JobRead payload
job
                  then Text
-> JobRead payload
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName JobRead payload
job
                  else (Map Int64 (Either Text result), Map Int64 Text)
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map Int64 (Either Text result)
forall k a. Map k a
Map.empty, Map Int64 Text
forall k a. Map k a
Map.empty)
              handlerResult <- (runHandlerWithConnection (handler childResults dlqFailures) job :: m result)
              case (Job.parentId job, encodeJobResult handlerResult) of
                (Just Int64
pid, Just Value
val) ->
                  m Int64 -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int64 -> m ()) -> m Int64 -> m ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> Int64 -> Value -> m Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> Int64 -> Value -> m Int64
Ops.insertResult Text
schemaName (JobRead payload -> Text
forall payload key q insertedAt. Job payload key q insertedAt -> q
Job.queueName JobRead payload
job) Int64
pid (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job) Value
val
                (Maybe Int64, Maybe Value)
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              rowsAffected <- Arb.ackJob job
              when (rowsAffected == 0) $
                throwJobNotFound $
                  "Job "
                    <> T.pack (show (Job.primaryKey job))
                    <> " was reclaimed during processing - rolling back handler transaction"
            BatchedJobsMode Int
_ BatchedJobHandler m payload result
handler -> do
              m result -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (BatchedJobHandler m payload result
-> NonEmpty (JobRead payload) -> m result
forall jobs result. Handler m jobs result -> jobs -> m result
forall (m :: * -> *) jobs result.
MonadArbiter m =>
Handler m jobs result -> jobs -> m result
runHandlerWithConnection BatchedJobHandler m payload result
handler NonEmpty (JobRead payload)
jobs :: m result)
              rowsAffected <- [JobRead payload] -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
[JobRead payload] -> m Int64
Arb.ackJobsBulk (NonEmpty (JobRead payload) -> [JobRead payload]
forall a. NonEmpty a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty (JobRead payload)
jobs)
              when (rowsAffected /= fromIntegral (length jobs)) $
                throwJobNotFound $
                  "Expected to ack "
                    <> T.pack (show (length jobs))
                    <> " jobs but only "
                    <> T.pack (show rowsAffected)
                    <> " were deleted - rolling back handler transaction"
        else do
          -- Manual mode: no transaction, no automatic acking
          schemaName' <- Arb.getSchema
          case handlerMode config of
            SingleJobMode Map Int64 (Either Text result)
-> Map Int64 Text -> JobHandler m payload result
handler -> do
              let (JobRead payload
job :| [JobRead payload]
_) = NonEmpty (JobRead payload)
jobs
              (childResults, dlqFailures) <-
                if JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
Job.isRollup JobRead payload
job
                  then Text
-> JobRead payload
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a (m :: * -> *) payload.
(JobResult a, MonadArbiter m) =>
Text
-> JobRead payload -> m (Map Int64 (Either Text a), Map Int64 Text)
readChildResults Text
schemaName' JobRead payload
job
                  else (Map Int64 (Either Text result), Map Int64 Text)
-> m (Map Int64 (Either Text result), Map Int64 Text)
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Map Int64 (Either Text result)
forall k a. Map k a
Map.empty, Map Int64 Text
forall k a. Map k a
Map.empty)
              void (runHandlerWithConnection (handler childResults dlqFailures) job :: m result)
            BatchedJobsMode Int
_ BatchedJobHandler m payload result
handler ->
              m result -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (BatchedJobHandler m payload result
-> NonEmpty (JobRead payload) -> m result
forall jobs result. Handler m jobs result -> jobs -> m result
forall (m :: * -> *) jobs result.
MonadArbiter m =>
Handler m jobs result -> jobs -> m result
runHandlerWithConnection BatchedJobHandler m payload result
handler NonEmpty (JobRead payload)
jobs :: m result)
  endTime <- liftIO getCurrentTime
  case result of
    Right () ->
      -- Success: only call onJobSuccess in automatic transaction mode
      -- In manual mode, users are responsible for acking and success observability
      Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (WorkerConfig m payload result -> Bool
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Bool
useWorkerTransaction WorkerConfig m payload result
config) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
        (JobRead payload -> m ()) -> NonEmpty (JobRead payload) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\JobRead payload
job -> LogConfig -> Text -> m () -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
LogConfig -> Text -> m () -> m ()
runHook (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) Text
"onJobSuccess" (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> UTCTime -> UTCTime -> m ()
forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> UTCTime -> UTCTime -> m ()
Job.onJobSuccess ObservabilityHooks m payload
hooks JobRead payload
job UTCTime
startTime UTCTime
endTime) NonEmpty (JobRead payload)
jobs
    Left SomeException
e
      | SomeException -> Bool
isJobGoneException SomeException
e ->
          m (Either SomeException ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Either SomeException ()) -> m ())
-> (IO () -> m (Either SomeException ())) -> IO () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> m (Either SomeException ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
m a -> m (Either SomeException a)
tryAny (m () -> m (Either SomeException ()))
-> (IO () -> m ()) -> IO () -> m (Either SomeException ())
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
            LogConfig -> LogLevel -> Text -> IO ()
logMessage (WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config) LogLevel
Info (Text -> IO ()) -> Text -> IO ()
forall a b. (a -> b) -> a -> b
$
              Text
"Job(s) no longer available, skipping retry: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
T.pack (SomeException -> String
forall a. Show a => a -> String
show SomeException
e)
      | Bool
otherwise ->
          -- Update all jobs for retry or move to DLQ in a separate transaction
          m () -> m ()
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
            (JobRead payload -> m ()) -> NonEmpty (JobRead payload) -> m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobOperation m registry payload, MonadUnliftIO m) =>
WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
handleJobFailure WorkerConfig m payload result
config ObservabilityHooks m payload
hooks SomeException
e Int32
maxAtts UTCTime
startTime UTCTime
endTime) NonEmpty (JobRead payload)
jobs

-- | Check if an exception indicates the job is gone (stolen or not found).
-- These jobs should not be retried or moved to DLQ.
isJobGoneException :: SomeException -> Bool
isJobGoneException :: SomeException -> Bool
isJobGoneException SomeException
e = case SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
  Just (JobNotFound JobNotFoundException
_) -> Bool
True
  Just (JobStolen JobStolenException
_) -> Bool
True
  Maybe JobException
_ -> Bool
False

-- | Classify a handler exception into an error message and failure disposition.
--
-- Note: 'JobNotFound' and 'JobStolen' are intercepted by 'isJobGoneException'
-- before reaching 'handleJobFailure', so they never arrive here.
data FailureKind = RetryFailure | PermanentFailure | TreeCancelFailure | BranchCancelFailure
  deriving stock (FailureKind -> FailureKind -> Bool
(FailureKind -> FailureKind -> Bool)
-> (FailureKind -> FailureKind -> Bool) -> Eq FailureKind
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: FailureKind -> FailureKind -> Bool
== :: FailureKind -> FailureKind -> Bool
$c/= :: FailureKind -> FailureKind -> Bool
/= :: FailureKind -> FailureKind -> Bool
Eq)

classifyException :: SomeException -> (T.Text, FailureKind)
classifyException :: SomeException -> (Text, FailureKind)
classifyException SomeException
e = case SomeException -> Maybe JobException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
  Just (Retryable (JobRetryableException Text
msg)) -> (Text
msg, FailureKind
RetryFailure)
  Just (Permanent (JobPermanentException Text
msg)) -> (Text
msg, FailureKind
PermanentFailure)
  Just (TreeCancel (TreeCancelException Text
msg)) -> (Text
msg, FailureKind
TreeCancelFailure)
  Just (BranchCancel (BranchCancelException Text
msg)) -> (Text
msg, FailureKind
BranchCancelFailure)
  Just (ParseFailure (ParsingException Text
msg)) -> (Text
msg, FailureKind
PermanentFailure)
  Maybe JobException
_ -> (String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ SomeException -> String
forall a. Show a => a -> String
show SomeException
e, FailureKind
RetryFailure) -- Unknown or non-JobException: treat as retryable

-- | Handle failure for a single job (retry or move to DLQ).
handleJobFailure
  :: forall m registry payload result
   . ( JobOperation m registry payload
     , MonadUnliftIO m
     )
  => WorkerConfig m payload result
  -> Job.ObservabilityHooks m payload
  -> SomeException
  -> Int32
  -> UTCTime
  -> UTCTime
  -> Job.JobRead payload
  -> m ()
handleJobFailure :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(JobOperation m registry payload, MonadUnliftIO m) =>
WorkerConfig m payload result
-> ObservabilityHooks m payload
-> SomeException
-> Int32
-> UTCTime
-> UTCTime
-> JobRead payload
-> m ()
handleJobFailure WorkerConfig m payload result
config ObservabilityHooks m payload
hooks SomeException
e Int32
maxAtts UTCTime
startTime UTCTime
endTime JobRead payload
job = do
  let (Text
errorMsg, FailureKind
failureKind) = SomeException -> (Text, FailureKind)
classifyException SomeException
e
      cfg :: LogConfig
cfg = WorkerConfig m payload result -> LogConfig
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> LogConfig
logConfig WorkerConfig m payload result
config
  case FailureKind
failureKind of
    FailureKind
TreeCancelFailure -> do
      -- TreeCancel: delete the entire tree from root down (including this job)
      schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
      deleted <- Ops.cancelJobTree schemaName (Job.queueName job) (Job.primaryKey job)
      when (deleted > 0) $
        runHook cfg "onJobFailure" $
          Job.onJobFailure hooks job errorMsg startTime endTime
    FailureKind
BranchCancelFailure -> do
      -- BranchCancel: cascade-delete the parent + all siblings (including this job).
      -- If no parent, just delete this job.
      schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
      let target = Int64 -> (Int64 -> Int64) -> Maybe Int64 -> Int64
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
Job.primaryKey JobRead payload
job) Int64 -> Int64
forall a. a -> a
id (JobRead payload -> Maybe Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
Job.parentId JobRead payload
job)
      deleted <- Ops.cancelJobCascade schemaName (Job.queueName job) target
      when (deleted > 0) $
        runHook cfg "onJobFailure" $
          Job.onJobFailure hooks job errorMsg startTime endTime
    FailureKind
_
      | FailureKind
failureKind FailureKind -> FailureKind -> Bool
forall a. Eq a => a -> a -> Bool
== FailureKind
PermanentFailure Bool -> Bool -> Bool
|| JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
Job.attempts JobRead payload
job Int32 -> Int32 -> Bool
forall a. Ord a => a -> a -> Bool
>= Int32
maxAtts -> do
          -- Snapshot into parent_state before DLQ move (survives CASCADE delete).
          -- Merges old snapshot so repeated DLQ round-trips don't lose data.
          Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
Job.isRollup JobRead payload
job) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            schemaName <- m Text
forall (m :: * -> *) (registry :: JobPayloadRegistry).
HasArbiterSchema m registry =>
m Text
getSchema
            (results, failures, mSnapshot, _dlqFailures) <-
              Ops.readChildResultsRaw schemaName (Job.queueName job) (Job.primaryKey job)
            let merged = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
            when (not (Map.null merged)) $
              void $
                Ops.persistParentState schemaName (Job.queueName job) (Job.primaryKey job) (toJSON merged)
          -- Permanent failure or max attempts reached - move to DLQ
          rowsAffected <- Text -> JobRead payload -> m Int64
forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
JobOperation m registry payload =>
Text -> JobRead payload -> m Int64
Arb.moveToDLQ Text
errorMsg JobRead payload
job
          if rowsAffected == 0
            then
              void . tryAny . liftIO $
                logMessage cfg Warning $
                  "Job " <> T.pack (show (Job.primaryKey job)) <> " not available for moving to DLQ"
            else do
              -- Successfully moved to DLQ
              runHook cfg "onJobFailure" $ Job.onJobFailure hooks job errorMsg startTime endTime
              runHook cfg "onJobFailedAndMovedToDLQ" $ Job.onJobFailedAndMovedToDLQ hooks errorMsg job
      | Bool
otherwise -> do
          -- Retry with configured backoff strategy and jitter
          let baseDelay :: NominalDiffTime
baseDelay = BackoffStrategy -> Int32 -> NominalDiffTime
calculateBackoff (WorkerConfig m payload result -> BackoffStrategy
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> BackoffStrategy
backoffStrategy WorkerConfig m payload result
config) (JobRead payload -> Int32
forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
Job.attempts JobRead payload
job)
          backoffSecs <- IO NominalDiffTime -> m NominalDiffTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO NominalDiffTime -> m NominalDiffTime)
-> IO NominalDiffTime -> m NominalDiffTime
forall a b. (a -> b) -> a -> b
$ Jitter -> NominalDiffTime -> IO NominalDiffTime
applyJitter (WorkerConfig m payload result -> Jitter
forall (m :: * -> *) payload result.
WorkerConfig m payload result -> Jitter
jitter WorkerConfig m payload result
config) NominalDiffTime
baseDelay
          rowsAffected <- Arb.updateJobForRetry backoffSecs errorMsg job
          if rowsAffected == 0
            then
              void . tryAny . liftIO $
                logMessage cfg Warning $
                  "Job " <> T.pack (show (Job.primaryKey job)) <> " not available for retry"
            else do
              -- Successfully updated for retry
              runHook cfg "onJobFailure" $ Job.onJobFailure hooks job errorMsg startTime endTime
              runHook cfg "onJobRetry" $ Job.onJobRetry hooks job backoffSecs

groupReaperLoop
  :: forall m registry payload result
   . ( MonadUnliftIO m
     , QueueOperation m registry payload
     )
  => WorkerConfig m payload result
  -> NominalDiffTime
  -> m ()
groupReaperLoop :: forall (m :: * -> *) (registry :: JobPayloadRegistry) payload
       result.
(MonadUnliftIO m, QueueOperation m registry payload) =>
WorkerConfig m payload result -> NominalDiffTime -> m ()
groupReaperLoop WorkerConfig m payload result
_config NominalDiffTime
interval = do
  let intervalSecs :: Int
intervalSecs = NominalDiffTime -> Int
forall b. Integral b => NominalDiffTime -> b
forall a b. (RealFrac a, Integral b) => a -> b
ceiling NominalDiffTime
interval
  forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> m ()
Arb.refreshGroups @m @registry @payload Int
intervalSecs
  m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int
intervalSecs Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1_000_000)
    forall (m :: * -> *) (registry :: JobPayloadRegistry) payload.
QueueOperation m registry payload =>
Int -> m ()
Arb.refreshGroups @m @registry @payload Int
intervalSecs