{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeFamilies #-}

-- | Servant server implementation for the Arbiter job queue REST API.
--
-- Hard-coded to use the SimpleDb backend from arbiter-simple.
--
-- __Security:__ This module provides no built-in authentication or
-- authorization. All endpoints (including job deletion, DLQ management, and
-- cron schedule updates) are publicly accessible. Add your own auth
-- middleware before exposing this to untrusted networks.
module Arbiter.Servant.Server
  ( -- * Server handlers
    arbiterServer
  , arbiterApp
  , runArbiterAPI
  , ArbiterServerConfig (..)
  , initArbiterServer
  , BuildServer (..)
  ) where

import Arbiter.Core.CronSchedule qualified as CS
import Arbiter.Core.Job.DLQ qualified as DLQ
import Arbiter.Core.Job.Schema qualified as Schema
import Arbiter.Core.Job.Types (Job (..), JobPayload, JobRead)
import Arbiter.Core.MonadArbiter (withDbTransaction)
import Arbiter.Core.Operations qualified as Ops
import Arbiter.Core.PoolConfig (PoolConfig (..))
import Arbiter.Core.QueueRegistry (AllQueuesUnique, RegistryTables (..))
import Arbiter.Core.SqlTemplates (JobFilter (..))
import Arbiter.Simple (SimpleConnectionPool (..), SimpleEnv (..), createSimpleEnvWithConfig, runSimpleDb)
import Arbiter.Worker.Cron (overlapPolicyFromText)
import Control.Exception (SomeException, bracket, catch)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson (toJSON)
import Data.ByteString (ByteString)
import Data.ByteString.Builder qualified as Builder
import Data.Int (Int64)
import Data.Kind (Type)
import Data.Map.Strict qualified as Map
import Data.Maybe (catMaybes, fromJust, fromMaybe)
import Data.Pool qualified as Pool
import Data.String (fromString)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (getCurrentTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Database.PostgreSQL.Simple qualified as PG
import Database.PostgreSQL.Simple.Notification (Notification (..), getNotification)
import GHC.TypeLits (KnownSymbol, Symbol, symbolVal)
import Network.HTTP.Types (status200)
import Network.Wai (responseStream)
import Network.Wai.Handler.Warp (Port, defaultSettings, runSettings, setPort, setTimeout)
import Servant
import Servant.Server.Generic (AsServerT)
import System.Cron (parseCronSchedule)
import System.Timeout (timeout)

import Arbiter.Servant.API
  ( ArbiterAPI
  , CronAPI (..)
  , DLQAPI (..)
  , JobsAPI (..)
  , QueuesAPI (..)
  , RegistryToAPI
  , StatsAPI (..)
  , TableAPI (..)
  )
import Arbiter.Servant.Types

-- | Configuration for the API server
data ArbiterServerConfig registry = ArbiterServerConfig
  { forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv :: SimpleEnv registry
  -- ^ The SimpleEnv containing schema and connection pool
  , forall {k} (registry :: k). ArbiterServerConfig registry -> Bool
enableSSE :: Bool
  -- ^ Enable Server-Sent Events streaming endpoint. When 'False', the
  -- @\/events\/stream@ endpoint returns a single \"disabled\" event and
  -- closes immediately, avoiding long-lived connections. The admin UI
  -- falls back to polling-only mode. Default: 'True'.
  }

-- | Small pool configuration for admin API traffic.
serverPoolConfig :: PoolConfig
serverPoolConfig :: PoolConfig
serverPoolConfig =
  PoolConfig
    { poolSize :: Int
poolSize = Int
5
    , poolIdleTimeout :: Int
poolIdleTimeout = Int
60
    , poolStripes :: Maybe Int
poolStripes = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
    }

-- | Create an 'ArbiterServerConfig' with its own internal connection pool.
--
-- __Note__: Use 'Arbiter.Migrations.runMigrationsForRegistry' with
-- @enableEventStreaming = True@ to set up the database triggers for SSE.
initArbiterServer
  :: forall registry
   . (AllQueuesUnique registry)
  => Proxy registry
  -> ByteString
  -> Text
  -> IO (ArbiterServerConfig registry)
initArbiterServer :: forall (registry :: JobPayloadRegistry).
AllQueuesUnique registry =>
Proxy registry
-> ByteString -> Text -> IO (ArbiterServerConfig registry)
initArbiterServer Proxy registry
_proxy ByteString
connStr Text
schemaName = do
  env <- Proxy registry
-> ByteString -> Text -> PoolConfig -> IO (SimpleEnv registry)
forall (registry :: JobPayloadRegistry) (m :: * -> *).
(AllQueuesUnique registry, MonadIO m) =>
Proxy registry
-> ByteString -> Text -> PoolConfig -> m (SimpleEnv registry)
createSimpleEnvWithConfig (forall (t :: JobPayloadRegistry). Proxy t
forall {k} (t :: k). Proxy t
Proxy @registry) ByteString
connStr Text
schemaName PoolConfig
serverPoolConfig
  pure ArbiterServerConfig {serverEnv = env, enableSSE = True}

-- | Wrap jobs with current timestamp for status derivation
toApiJobs :: (MonadIO m) => [JobRead payload] -> m [ApiJob payload]
toApiJobs :: forall (m :: * -> *) payload.
MonadIO m =>
[JobRead payload] -> m [ApiJob payload]
toApiJobs [JobRead payload]
jobs = do
  now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
  pure $ map (`ApiJob` now) jobs

toApiJob :: (MonadIO m) => JobRead payload -> m (ApiJob payload)
toApiJob :: forall (m :: * -> *) payload.
MonadIO m =>
JobRead payload -> m (ApiJob payload)
toApiJob JobRead payload
job = do
  now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
  pure $ ApiJob job now

toApiDLQJobs :: (MonadIO m) => [DLQ.DLQJob payload] -> m [ApiDLQJob payload]
toApiDLQJobs :: forall (m :: * -> *) payload.
MonadIO m =>
[DLQJob payload] -> m [ApiDLQJob payload]
toApiDLQJobs [DLQJob payload]
jobs = do
  now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
  pure $ map (`ApiDLQJob` now) jobs

-- | Validate and sanitize pagination parameters
validatePagination :: Maybe Int -> Maybe Int -> (Int, Int)
validatePagination :: Maybe Int -> Maybe Int -> (Int, Int)
validatePagination Maybe Int
mLimit Maybe Int
mOffset =
  let limit :: Int
limit = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
1000 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
50 Maybe Int
mLimit -- Clamp between 1 and 1000
      offset :: Int
offset = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
0 Maybe Int
mOffset -- Must be non-negative
   in (Int
limit, Int
offset)

-- | Jobs API handlers for a specific table
jobsServer
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> JobsAPI payload (AsServerT Handler)
jobsServer :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> JobsAPI payload (AsServerT Handler)
jobsServer Text
table ArbiterServerConfig registry
config =
  JobsAPI
    { listJobs :: AsServerT Handler
:- (QueryParam "limit" Int
    :> (QueryParam "offset" Int
        :> (QueryParam "group_key" Text
            :> (QueryParam "parent_id" Int64
                :> (QueryParam "suspended" Bool
                    :> Get '[JSON] (JobsResponse payload))))))
listJobs = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Text
-> Maybe Int64
-> Maybe Bool
-> Handler (JobsResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Text
-> Maybe Int64
-> Maybe Bool
-> Handler (JobsResponse payload)
listJobsHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , insertJob :: AsServerT Handler
:- (ReqBody '[JSON] (ApiJobWrite payload)
    :> Post '[JSON] (JobResponse payload))
insertJob = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> ApiJobWrite payload
-> Handler (JobResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> ApiJobWrite payload
-> Handler (JobResponse payload)
insertJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , insertJobsBatch :: AsServerT Handler
:- ("batch"
    :> (ReqBody '[JSON] (BatchInsertRequest payload)
        :> Post '[JSON] (BatchInsertResponse payload)))
insertJobsBatch = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> BatchInsertRequest payload
-> Handler (BatchInsertResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> BatchInsertRequest payload
-> Handler (BatchInsertResponse payload)
insertJobsBatchHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , getJob :: AsServerT Handler
:- (Capture "id" Int64 :> Get '[JSON] (JobResponse payload))
getJob = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Int64
-> Handler (JobResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Int64
-> Handler (JobResponse payload)
getJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , getInFlightJobs :: AsServerT Handler
:- ("in-flight"
    :> (QueryParam "limit" Int
        :> (QueryParam "offset" Int
            :> Get '[JSON] (JobsResponse payload))))
getInFlightJobs = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Handler (JobsResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Handler (JobsResponse payload)
getInFlightJobsHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , cancelJob :: AsServerT Handler :- (Capture "id" Int64 :> DeleteNoContent)
cancelJob = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
cancelJobHandler @registry Text
table ArbiterServerConfig registry
config
    , promoteJob :: AsServerT Handler
:- (Capture "id" Int64 :> ("promote" :> PostNoContent))
promoteJob = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
promoteJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , moveToDLQ :: AsServerT Handler
:- (Capture "id" Int64 :> ("move-to-dlq" :> PostNoContent))
moveToDLQ = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
moveToDLQHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , pauseChildren :: AsServerT Handler
:- (Capture "id" Int64 :> ("pause-children" :> PostNoContent))
pauseChildren = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
pauseChildrenHandler @registry Text
table ArbiterServerConfig registry
config
    , resumeChildren :: AsServerT Handler
:- (Capture "id" Int64 :> ("resume-children" :> PostNoContent))
resumeChildren = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
resumeChildrenHandler @registry Text
table ArbiterServerConfig registry
config
    , suspendJob :: AsServerT Handler
:- (Capture "id" Int64 :> ("suspend" :> PostNoContent))
suspendJob = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
suspendJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , resumeJob :: AsServerT Handler
:- (Capture "id" Int64 :> ("resume" :> PostNoContent))
resumeJob = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
resumeJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
    }

-- | List jobs with pagination and composable filters
listJobsHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> Maybe Int
  -> Maybe Int
  -> Maybe Text
  -> Maybe Int64
  -> Maybe Bool
  -> Handler (JobsResponse payload)
listJobsHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Text
-> Maybe Int64
-> Maybe Bool
-> Handler (JobsResponse payload)
listJobsHandler Text
tableName ArbiterServerConfig registry
config Maybe Int
mLimit Maybe Int
mOffset Maybe Text
mGroupKey Maybe Int64
mParentId Maybe Bool
mSuspended = IO (JobsResponse payload) -> Handler (JobsResponse payload)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (JobsResponse payload) -> Handler (JobsResponse payload))
-> IO (JobsResponse payload) -> Handler (JobsResponse payload)
forall a b. (a -> b) -> a -> b
$ do
  let (Int
limit, Int
offset) = Maybe Int -> Maybe Int -> (Int, Int)
validatePagination Maybe Int
mLimit Maybe Int
mOffset
      env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
      filters :: [JobFilter]
filters =
        [Maybe JobFilter] -> [JobFilter]
forall a. [Maybe a] -> [a]
catMaybes
          [ Text -> JobFilter
FilterGroupKey (Text -> JobFilter) -> Maybe Text -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
mGroupKey
          , Int64 -> JobFilter
FilterParentId (Int64 -> JobFilter) -> Maybe Int64 -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int64
mParentId
          , Bool -> JobFilter
FilterSuspended (Bool -> JobFilter) -> Maybe Bool -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Bool
mSuspended
          ]

  (jobs, total, combined, dlqCounts) <- SimpleEnv registry
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
-> IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb
   registry
   IO
   ([JobRead payload], Int64, Map Int64 (Int64, Int64),
    Map Int64 Int64)
 -> IO
      ([JobRead payload], Int64, Map Int64 (Int64, Int64),
       Map Int64 Int64))
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
-> IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ SimpleDb
  registry
  IO
  ([JobRead payload], Int64, Map Int64 (Int64, Int64),
   Map Int64 Int64)
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb
   registry
   IO
   ([JobRead payload], Int64, Map Int64 (Int64, Int64),
    Map Int64 Int64)
 -> SimpleDb
      registry
      IO
      ([JobRead payload], Int64, Map Int64 (Int64, Int64),
       Map Int64 Int64))
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ do
    j <- Text
-> Text
-> [JobFilter]
-> Int
-> Int
-> SimpleDb registry IO [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
Ops.listJobsFiltered Text
schemaName Text
tableName [JobFilter]
filters Int
limit Int
offset
    c <- Ops.countJobsFiltered schemaName tableName filters
    -- Only query child/DLQ counts if any returned job could be a parent.
    -- All parents are rollup finalizers (isRollup = True), so we
    -- skip the extra queries for queues that don't use job trees.
    let jobIds = (JobRead payload -> Int64) -> [JobRead payload] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey [JobRead payload]
j
        hasParents = (JobRead payload -> Bool) -> [JobRead payload] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup [JobRead payload]
j
    if null j || not hasParents
      then pure (j, c, Map.empty, Map.empty)
      else do
        cc <- Ops.countChildrenBatch schemaName tableName jobIds
        dc <- Ops.countDLQChildrenBatch schemaName tableName jobIds
        pure (j, c, cc, dc)

  let childCounts = ((Int64, Int64) -> Int64)
-> Map Int64 (Int64, Int64) -> Map Int64 Int64
forall a b. (a -> b) -> Map Int64 a -> Map Int64 b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int64, Int64) -> Int64
forall a b. (a, b) -> a
fst Map Int64 (Int64, Int64)
combined
      pausedParents = Map Int64 (Int64, Int64) -> [Int64]
forall k a. Map k a -> [k]
Map.keys (Map Int64 (Int64, Int64) -> [Int64])
-> Map Int64 (Int64, Int64) -> [Int64]
forall a b. (a -> b) -> a -> b
$ ((Int64, Int64) -> Bool)
-> Map Int64 (Int64, Int64) -> Map Int64 (Int64, Int64)
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter (\(Int64
t, Int64
p) -> Int64
p Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== Int64
t) Map Int64 (Int64, Int64)
combined

  apiJobs <- toApiJobs jobs
  pure $
    JobsResponse
      { jobs = apiJobs
      , jobsTotal = fromIntegral total
      , jobsOffset = offset
      , jobsLimit = limit
      , childCounts = childCounts
      , pausedParents = pausedParents
      , dlqChildCounts = dlqCounts
      }

-- | Insert a new job into the queue
insertJobHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> ApiJobWrite payload
  -> Handler (JobResponse payload)
insertJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> ApiJobWrite payload
-> Handler (JobResponse payload)
insertJobHandler Text
tableName ArbiterServerConfig registry
config (ApiJobWrite JobWrite payload
jobWrite) = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  mJob <- IO (Maybe (JobRead payload)) -> Handler (Maybe (JobRead payload))
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (JobRead payload)) -> Handler (Maybe (JobRead payload)))
-> IO (Maybe (JobRead payload))
-> Handler (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload))
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Maybe (JobRead payload))
 -> IO (Maybe (JobRead payload)))
-> SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ Text
-> Text
-> JobWrite payload
-> SimpleDb registry IO (Maybe (JobRead payload))
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
Ops.insertJob Text
schemaName Text
tableName JobWrite payload
jobWrite
  case mJob of
    Maybe (JobRead payload)
Nothing -> ServerError -> Handler (JobResponse payload)
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err409 {errBody = "Job insert failed (duplicate dedup key)"}
    Just JobRead payload
j -> do
      aj <- JobRead payload -> Handler (ApiJob payload)
forall (m :: * -> *) payload.
MonadIO m =>
JobRead payload -> m (ApiJob payload)
toApiJob JobRead payload
j
      pure $ JobResponse {job = aj}

-- | Insert multiple jobs in a single batch operation
insertJobsBatchHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> BatchInsertRequest payload
  -> Handler (BatchInsertResponse payload)
insertJobsBatchHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> BatchInsertRequest payload
-> Handler (BatchInsertResponse payload)
insertJobsBatchHandler Text
tableName ArbiterServerConfig registry
config (BatchInsertRequest [ApiJobWrite payload]
jobWrites) = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
      writes :: [JobWrite payload]
writes = (ApiJobWrite payload -> JobWrite payload)
-> [ApiJobWrite payload] -> [JobWrite payload]
forall a b. (a -> b) -> [a] -> [b]
map ApiJobWrite payload -> JobWrite payload
forall payload. ApiJobWrite payload -> JobWrite payload
unApiJobWrite [ApiJobWrite payload]
jobWrites

  inserted <- IO [JobRead payload] -> Handler [JobRead payload]
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [JobRead payload] -> Handler [JobRead payload])
-> IO [JobRead payload] -> Handler [JobRead payload]
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO [JobRead payload] -> IO [JobRead payload]
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO [JobRead payload] -> IO [JobRead payload])
-> SimpleDb registry IO [JobRead payload] -> IO [JobRead payload]
forall a b. (a -> b) -> a -> b
$ Text
-> Text
-> [JobWrite payload]
-> SimpleDb registry IO [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobWrite payload] -> m [JobRead payload]
Ops.insertJobsBatch Text
schemaName Text
tableName [JobWrite payload]
writes
  apiJobs <- toApiJobs inserted
  pure $ BatchInsertResponse {inserted = apiJobs, insertedCount = length apiJobs}

-- | Get a specific job by ID
getJobHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler (JobResponse payload)
getJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Int64
-> Handler (JobResponse payload)
getJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  mJob <- IO (Maybe (JobRead payload)) -> Handler (Maybe (JobRead payload))
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (JobRead payload)) -> Handler (Maybe (JobRead payload)))
-> IO (Maybe (JobRead payload))
-> Handler (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload))
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Maybe (JobRead payload))
 -> IO (Maybe (JobRead payload)))
-> SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ Text
-> Text -> Int64 -> SimpleDb registry IO (Maybe (JobRead payload))
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
Ops.getJobById Text
schemaName Text
tableName Int64
jobId
  case mJob of
    Maybe (JobRead payload)
Nothing -> ServerError -> Handler (JobResponse payload)
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err404 {errBody = "Job not found"}
    Just JobRead payload
j -> do
      aj <- JobRead payload -> Handler (ApiJob payload)
forall (m :: * -> *) payload.
MonadIO m =>
JobRead payload -> m (ApiJob payload)
toApiJob JobRead payload
j
      pure $ JobResponse {job = aj}

-- | Get all in-flight (invisible) jobs
getInFlightJobsHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> Maybe Int
  -> Maybe Int
  -> Handler (JobsResponse payload)
getInFlightJobsHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Handler (JobsResponse payload)
getInFlightJobsHandler Text
tableName ArbiterServerConfig registry
config Maybe Int
mLimit Maybe Int
mOffset = do
  let (Int
limit, Int
offset) = Maybe Int -> Maybe Int -> (Int, Int)
validatePagination Maybe Int
mLimit Maybe Int
mOffset
      env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  (jobs, total, combined, dlqCounts) <- IO
  ([JobRead payload], Int64, Map Int64 (Int64, Int64),
   Map Int64 Int64)
-> Handler
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
   ([JobRead payload], Int64, Map Int64 (Int64, Int64),
    Map Int64 Int64)
 -> Handler
      ([JobRead payload], Int64, Map Int64 (Int64, Int64),
       Map Int64 Int64))
-> IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
-> Handler
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
-> IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb
   registry
   IO
   ([JobRead payload], Int64, Map Int64 (Int64, Int64),
    Map Int64 Int64)
 -> IO
      ([JobRead payload], Int64, Map Int64 (Int64, Int64),
       Map Int64 Int64))
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
-> IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ SimpleDb
  registry
  IO
  ([JobRead payload], Int64, Map Int64 (Int64, Int64),
   Map Int64 Int64)
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb
   registry
   IO
   ([JobRead payload], Int64, Map Int64 (Int64, Int64),
    Map Int64 Int64)
 -> SimpleDb
      registry
      IO
      ([JobRead payload], Int64, Map Int64 (Int64, Int64),
       Map Int64 Int64))
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
-> SimpleDb
     registry
     IO
     ([JobRead payload], Int64, Map Int64 (Int64, Int64),
      Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ do
    j <- Text
-> Text -> Int -> Int -> SimpleDb registry IO [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> Int -> m [JobRead payload]
Ops.getInFlightJobs Text
schemaName Text
tableName Int
limit Int
offset
    c <- Ops.countInFlightJobs schemaName tableName
    let jobIds = (JobRead payload -> Int64) -> [JobRead payload] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey [JobRead payload]
j
        hasParents = (JobRead payload -> Bool) -> [JobRead payload] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup [JobRead payload]
j
    if null j || not hasParents
      then pure (j, c, Map.empty, Map.empty)
      else do
        cc <- Ops.countChildrenBatch schemaName tableName jobIds
        dc <- Ops.countDLQChildrenBatch schemaName tableName jobIds
        pure (j, c, cc, dc)

  let childCounts = ((Int64, Int64) -> Int64)
-> Map Int64 (Int64, Int64) -> Map Int64 Int64
forall a b. (a -> b) -> Map Int64 a -> Map Int64 b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int64, Int64) -> Int64
forall a b. (a, b) -> a
fst Map Int64 (Int64, Int64)
combined
      pausedParents = Map Int64 (Int64, Int64) -> [Int64]
forall k a. Map k a -> [k]
Map.keys (Map Int64 (Int64, Int64) -> [Int64])
-> Map Int64 (Int64, Int64) -> [Int64]
forall a b. (a -> b) -> a -> b
$ ((Int64, Int64) -> Bool)
-> Map Int64 (Int64, Int64) -> Map Int64 (Int64, Int64)
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter (\(Int64
t, Int64
p) -> Int64
p Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== Int64
t) Map Int64 (Int64, Int64)
combined

  apiJobs <- toApiJobs jobs
  pure $
    JobsResponse
      { jobs = apiJobs
      , jobsTotal = fromIntegral total
      , jobsOffset = offset
      , jobsLimit = limit
      , childCounts = childCounts
      , pausedParents = pausedParents
      , dlqChildCounts = dlqCounts
      }

-- | Cancel a job (delete it from the queue)
cancelJobHandler
  :: forall registry
   . Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler NoContent
cancelJobHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
cancelJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  rowsAffected <- IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64) -> IO Int64 -> Handler Int64
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.cancelJobCascade Text
schemaName Text
tableName Int64
jobId
  if rowsAffected > 0
    then pure NoContent
    else throwError err404 {errBody = "Job not found"}

-- | Promote a job (make it immediately visible)
promoteJobHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler NoContent
promoteJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
promoteJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  result <- IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ServerError ()) -> Handler (Either ServerError ()))
-> IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Either ServerError ())
 -> IO (Either ServerError ()))
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ do
    rowsAffected <- Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.promoteJob Text
schemaName Text
tableName Int64
jobId
    if rowsAffected > 0
      then pure (Right ())
      else do
        mJob <- Ops.getJobById @_ @payload schemaName tableName jobId
        case mJob of
          Maybe (JobRead payload)
Nothing -> Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err404 {errBody = "Job not found"})
          Just JobRead payload
job
            | JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobRead payload
job ->
                Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is suspended — use resume endpoint"})
            | Bool
otherwise ->
                Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is already visible"})

  case result of
    Left ServerError
err -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err
    Right () -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent

-- | Move a job to the dead letter queue
moveToDLQHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler NoContent
moveToDLQHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
moveToDLQHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  result <- IO (Maybe Int64) -> Handler (Maybe Int64)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Int64) -> Handler (Maybe Int64))
-> IO (Maybe Int64) -> Handler (Maybe Int64)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Maybe Int64) -> IO (Maybe Int64)
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Maybe Int64) -> IO (Maybe Int64))
-> SimpleDb registry IO (Maybe Int64) -> IO (Maybe Int64)
forall a b. (a -> b) -> a -> b
$ SimpleDb registry IO (Maybe Int64)
-> SimpleDb registry IO (Maybe Int64)
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb registry IO (Maybe Int64)
 -> SimpleDb registry IO (Maybe Int64))
-> SimpleDb registry IO (Maybe Int64)
-> SimpleDb registry IO (Maybe Int64)
forall a b. (a -> b) -> a -> b
$ do
    mJob <- forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
Ops.getJobById @_ @payload Text
schemaName Text
tableName Int64
jobId
    case mJob of
      Maybe (JobRead payload)
Nothing -> Maybe Int64 -> SimpleDb registry IO (Maybe Int64)
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Int64
forall a. Maybe a
Nothing
      Just JobRead payload
job -> do
        -- Snapshot into parent_state before DLQ move (survives CASCADE delete).
        Bool -> SimpleDb registry IO () -> SimpleDb registry IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobRead payload
job) (SimpleDb registry IO () -> SimpleDb registry IO ())
-> SimpleDb registry IO () -> SimpleDb registry IO ()
forall a b. (a -> b) -> a -> b
$ do
          (results, failures, mSnapshot, _dlqFailures) <-
            Text
-> Text
-> Int64
-> SimpleDb
     registry
     IO
     (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
Ops.readChildResultsRaw Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
          let merged = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
          when (not (Map.null merged)) $
            void $
              Ops.persistParentState schemaName tableName (primaryKey job) (toJSON merged)
        Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (Int64 -> Maybe Int64)
-> SimpleDb registry IO Int64 -> SimpleDb registry IO (Maybe Int64)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text
-> Text -> Text -> JobRead payload -> SimpleDb registry IO Int64
forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> Text -> JobRead payload -> m Int64
Ops.moveToDLQ Text
schemaName Text
tableName Text
"Manually moved to DLQ via admin API" JobRead payload
job

  case result of
    Maybe Int64
Nothing -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err404 {errBody = "Job not found"}
    Just Int64
0 -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err409 {errBody = "Job was concurrently modified"}
    Just Int64
_ -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent

-- | Pause all children of a parent job
pauseChildrenHandler
  :: forall registry
   . Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler NoContent
pauseChildrenHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
pauseChildrenHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  Handler Int64 -> Handler ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Handler Int64 -> Handler ())
-> (SimpleDb registry IO Int64 -> Handler Int64)
-> SimpleDb registry IO Int64
-> Handler ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64)
-> (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64
-> Handler Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> Handler ())
-> SimpleDb registry IO Int64 -> Handler ()
forall a b. (a -> b) -> a -> b
$
    Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.pauseChildren Text
schemaName Text
tableName Int64
jobId

  -- Returns NoContent even if no children were paused (they may all be
  -- in-flight, already suspended, or completed). This is not an error.
  NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent

-- | Resume all suspended children of a parent job.
resumeChildrenHandler
  :: forall registry
   . Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler NoContent
resumeChildrenHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
resumeChildrenHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  Handler Int64 -> Handler ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Handler Int64 -> Handler ())
-> (SimpleDb registry IO Int64 -> Handler Int64)
-> SimpleDb registry IO Int64
-> Handler ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64)
-> (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64
-> Handler Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> Handler ())
-> SimpleDb registry IO Int64 -> Handler ()
forall a b. (a -> b) -> a -> b
$
    Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.resumeChildren Text
schemaName Text
tableName Int64
jobId

  -- Returns NoContent even if no children were resumed (they may not be
  -- suspended, or may have already completed). This is not an error.
  NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent

-- | Suspend a job (make it unclaimable)
suspendJobHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler NoContent
suspendJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
suspendJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  result <- IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ServerError ()) -> Handler (Either ServerError ()))
-> IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Either ServerError ())
 -> IO (Either ServerError ()))
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ do
    rowsAffected <- Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.suspendJob Text
schemaName Text
tableName Int64
jobId
    if rowsAffected > 0
      then pure (Right ())
      else do
        mJob <- Ops.getJobById @_ @payload schemaName tableName jobId
        case mJob of
          Maybe (JobRead payload)
Nothing -> Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err404 {errBody = "Job not found"})
          Just JobRead payload
job
            | JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobRead payload
job ->
                Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is already suspended"})
            | Bool
otherwise ->
                Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is in-flight — cannot suspend"})

  case result of
    Left ServerError
err -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err
    Right () -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent

-- | Resume a suspended job, making it claimable again.
--
-- Refuses to resume a rollup finalizer that still has children, preventing
-- premature handler execution.
resumeJobHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler NoContent
resumeJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
resumeJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  result <- IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ServerError ()) -> Handler (Either ServerError ()))
-> IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Either ServerError ())
 -> IO (Either ServerError ()))
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ do
    rowsAffected <- Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.resumeJob Text
schemaName Text
tableName Int64
jobId
    if rowsAffected > 0
      then pure (Right ())
      else do
        mJob <- Ops.getJobById @_ @payload schemaName tableName jobId
        case mJob of
          Maybe (JobRead payload)
Nothing -> Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err404 {errBody = "Job not found"})
          Just JobRead payload
job
            | Bool -> Bool
not (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobRead payload
job) ->
                Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is not suspended"})
            | JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobRead payload
job ->
                Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Cannot resume a rollup finalizer with active children"})
            | Bool
otherwise ->
                Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job could not be resumed (concurrent modification)"})

  case result of
    Left ServerError
err -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err
    Right () -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent

-- | DLQ API handlers for a specific table
dlqServer
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> DLQAPI payload (AsServerT Handler)
dlqServer :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> DLQAPI payload (AsServerT Handler)
dlqServer Text
table ArbiterServerConfig registry
config =
  DLQAPI
    { listDLQ :: AsServerT Handler
:- (QueryParam "limit" Int
    :> (QueryParam "offset" Int
        :> (QueryParam "parent_id" Int64
            :> (QueryParam "group_key" Text
                :> Get '[JSON] (DLQResponse payload)))))
listDLQ = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Int64
-> Maybe Text
-> Handler (DLQResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Int64
-> Maybe Text
-> Handler (DLQResponse payload)
listDLQHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , retryFromDLQ :: AsServerT Handler
:- (Capture "id" Int64 :> ("retry" :> PostNoContent))
retryFromDLQ = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
retryFromDLQHandler @registry @payload Text
table ArbiterServerConfig registry
config
    , deleteDLQ :: AsServerT Handler :- (Capture "id" Int64 :> DeleteNoContent)
deleteDLQ = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
deleteDLQHandler @registry Text
table ArbiterServerConfig registry
config
    , deleteDLQBatch :: AsServerT Handler
:- ("batch-delete"
    :> (ReqBody '[JSON] BatchDeleteRequest
        :> Post '[JSON] BatchDeleteResponse))
deleteDLQBatch = forall (registry :: k).
Text
-> ArbiterServerConfig registry
-> BatchDeleteRequest
-> Handler BatchDeleteResponse
forall {k} (registry :: k).
Text
-> ArbiterServerConfig registry
-> BatchDeleteRequest
-> Handler BatchDeleteResponse
deleteDLQBatchHandler @registry Text
table ArbiterServerConfig registry
config
    }

-- | List DLQ jobs with pagination and composable filters
listDLQHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> Maybe Int
  -> Maybe Int
  -> Maybe Int64
  -> Maybe Text
  -> Handler (DLQResponse payload)
listDLQHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Int64
-> Maybe Text
-> Handler (DLQResponse payload)
listDLQHandler Text
tableName ArbiterServerConfig registry
config Maybe Int
mLimit Maybe Int
mOffset Maybe Int64
mParentId Maybe Text
mGroupKey = do
  let (Int
limit, Int
offset) = Maybe Int -> Maybe Int -> (Int, Int)
validatePagination Maybe Int
mLimit Maybe Int
mOffset
      env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
      filters :: [JobFilter]
filters =
        [Maybe JobFilter] -> [JobFilter]
forall a. [Maybe a] -> [a]
catMaybes
          [ Int64 -> JobFilter
FilterParentId (Int64 -> JobFilter) -> Maybe Int64 -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int64
mParentId
          , Text -> JobFilter
FilterGroupKey (Text -> JobFilter) -> Maybe Text -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
mGroupKey
          ]

  (dlqJobs, total) <- IO ([DLQJob payload], Int64) -> Handler ([DLQJob payload], Int64)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([DLQJob payload], Int64) -> Handler ([DLQJob payload], Int64))
-> IO ([DLQJob payload], Int64)
-> Handler ([DLQJob payload], Int64)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO ([DLQJob payload], Int64)
-> IO ([DLQJob payload], Int64)
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO ([DLQJob payload], Int64)
 -> IO ([DLQJob payload], Int64))
-> SimpleDb registry IO ([DLQJob payload], Int64)
-> IO ([DLQJob payload], Int64)
forall a b. (a -> b) -> a -> b
$ SimpleDb registry IO ([DLQJob payload], Int64)
-> SimpleDb registry IO ([DLQJob payload], Int64)
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb registry IO ([DLQJob payload], Int64)
 -> SimpleDb registry IO ([DLQJob payload], Int64))
-> SimpleDb registry IO ([DLQJob payload], Int64)
-> SimpleDb registry IO ([DLQJob payload], Int64)
forall a b. (a -> b) -> a -> b
$ do
    j <- Text
-> Text
-> [JobFilter]
-> Int
-> Int
-> SimpleDb registry IO [DLQJob payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
Ops.listDLQFiltered Text
schemaName Text
tableName [JobFilter]
filters Int
limit Int
offset
    c <- Ops.countDLQFiltered schemaName tableName filters
    pure (j, c)

  apiDlqJobs <- toApiDLQJobs dlqJobs
  pure $
    DLQResponse
      { dlqJobs = apiDlqJobs
      , dlqTotal = fromIntegral total
      , dlqOffset = offset
      , dlqLimit = limit
      }

-- | Retry a job from DLQ (move back to main queue)
--
-- Returns 409 if the job has a parent_id that no longer exists (orphaned child).
retryFromDLQHandler
  :: forall registry payload
   . (JobPayload payload)
  => Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler NoContent
retryFromDLQHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
retryFromDLQHandler Text
tableName ArbiterServerConfig registry
config Int64
dlqId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  result <- IO (Either Bool ()) -> Handler (Either Bool ())
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either Bool ()) -> Handler (Either Bool ()))
-> IO (Either Bool ()) -> Handler (Either Bool ())
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Either Bool ()) -> IO (Either Bool ())
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Either Bool ()) -> IO (Either Bool ()))
-> SimpleDb registry IO (Either Bool ()) -> IO (Either Bool ())
forall a b. (a -> b) -> a -> b
$ SimpleDb registry IO (Either Bool ())
-> SimpleDb registry IO (Either Bool ())
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb registry IO (Either Bool ())
 -> SimpleDb registry IO (Either Bool ()))
-> SimpleDb registry IO (Either Bool ())
-> SimpleDb registry IO (Either Bool ())
forall a b. (a -> b) -> a -> b
$ do
    mJob <- forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
Ops.retryFromDLQ @_ @payload Text
schemaName Text
tableName Int64
dlqId
    case mJob of
      Just JobRead payload
_ -> Either Bool () -> SimpleDb registry IO (Either Bool ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either Bool ()
forall a b. b -> Either a b
Right ())
      Maybe (JobRead payload)
Nothing -> Bool -> Either Bool ()
forall a b. a -> Either a b
Left (Bool -> Either Bool ())
-> SimpleDb registry IO Bool
-> SimpleDb registry IO (Either Bool ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Text -> Int64 -> SimpleDb registry IO Bool
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Bool
Ops.dlqJobExists Text
schemaName Text
tableName Int64
dlqId

  case result of
    Right () -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent
    Left Bool
True -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err409 {errBody = "Cannot retry: parent job no longer exists (not in queue or DLQ)"}
    Left Bool
False -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err404 {errBody = "DLQ job not found"}

-- | Delete a job from DLQ permanently
deleteDLQHandler
  :: forall registry
   . Text
  -> ArbiterServerConfig registry
  -> Int64
  -> Handler NoContent
deleteDLQHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
deleteDLQHandler Text
tableName ArbiterServerConfig registry
config Int64
dlqId = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  rowsAffected <- IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64) -> IO Int64 -> Handler Int64
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.deleteDLQJob Text
schemaName Text
tableName Int64
dlqId
  if rowsAffected > 0
    then pure NoContent
    else throwError err404 {errBody = "DLQ job not found"}

-- | Batch delete jobs from DLQ permanently
deleteDLQBatchHandler
  :: forall registry
   . Text
  -> ArbiterServerConfig registry
  -> BatchDeleteRequest
  -> Handler BatchDeleteResponse
deleteDLQBatchHandler :: forall {k} (registry :: k).
Text
-> ArbiterServerConfig registry
-> BatchDeleteRequest
-> Handler BatchDeleteResponse
deleteDLQBatchHandler Text
tableName ArbiterServerConfig registry
config (BatchDeleteRequest [Int64]
dlqIds) = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
  rowsDeleted <- IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64) -> IO Int64 -> Handler Int64
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> [Int64] -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m Int64
Ops.deleteDLQJobsBatch Text
schemaName Text
tableName [Int64]
dlqIds
  pure $ BatchDeleteResponse {deleted = rowsDeleted}

-- | Stats API handler for a specific table
statsServer
  :: forall registry
   . Text
  -> ArbiterServerConfig registry
  -> StatsAPI (AsServerT Handler)
statsServer :: forall {k} (registry :: k).
Text
-> ArbiterServerConfig registry -> StatsAPI (AsServerT Handler)
statsServer Text
tableName ArbiterServerConfig registry
config =
  StatsAPI
    { getStats :: AsServerT Handler :- Get '[JSON] StatsResponse
getStats = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Handler StatsResponse
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Handler StatsResponse
getStatsHandler @registry Text
tableName ArbiterServerConfig registry
config
    }

-- | Get queue statistics
getStatsHandler
  :: forall registry
   . Text
  -> ArbiterServerConfig registry
  -> Handler StatsResponse
getStatsHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Handler StatsResponse
getStatsHandler Text
tableName ArbiterServerConfig registry
config = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env

  queueStats <- IO QueueStats -> Handler QueueStats
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO QueueStats -> Handler QueueStats)
-> IO QueueStats -> Handler QueueStats
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO QueueStats -> IO QueueStats
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO QueueStats -> IO QueueStats)
-> SimpleDb registry IO QueueStats -> IO QueueStats
forall a b. (a -> b) -> a -> b
$ Text -> Text -> SimpleDb registry IO QueueStats
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> m QueueStats
Ops.getQueueStats Text
schemaName Text
tableName
  now <- liftIO getCurrentTime
  let timestamp = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y-%m-%dT%H:%M:%S%z" UTCTime
now

  pure $ StatsResponse {stats = queueStats, timestamp = timestamp}

-- | Table API handlers for a specific table
tableServer
  :: forall registry payload
   . (JobPayload payload)
  => Text -- table
  -> ArbiterServerConfig registry
  -> TableAPI payload (AsServerT Handler)
tableServer :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
tableServer Text
table ArbiterServerConfig registry
config =
  TableAPI
    { jobs :: AsServerT Handler :- ("jobs" :> NamedRoutes (JobsAPI payload))
jobs = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> JobsAPI payload (AsServerT Handler)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> JobsAPI payload (AsServerT Handler)
jobsServer @registry @payload Text
table ArbiterServerConfig registry
config
    , dlq :: AsServerT Handler :- ("dlq" :> NamedRoutes (DLQAPI payload))
dlq = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> DLQAPI payload (AsServerT Handler)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> DLQAPI payload (AsServerT Handler)
dlqServer @registry @payload Text
table ArbiterServerConfig registry
config
    , stats :: AsServerT Handler :- ("stats" :> NamedRoutes StatsAPI)
stats = forall (registry :: k).
Text
-> ArbiterServerConfig registry -> StatsAPI (AsServerT Handler)
forall {k} (registry :: k).
Text
-> ArbiterServerConfig registry -> StatsAPI (AsServerT Handler)
statsServer @registry Text
table ArbiterServerConfig registry
config
    }

-- | Queues API handler
queuesServer
  :: forall registry
   . (RegistryTables registry)
  => Proxy registry
  -> QueuesAPI (AsServerT Handler)
queuesServer :: forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> QueuesAPI (AsServerT Handler)
queuesServer Proxy registry
registryProxy =
  QueuesAPI
    { listQueues :: AsServerT Handler :- Get '[JSON] QueuesResponse
listQueues = QueuesResponse -> Handler QueuesResponse
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (QueuesResponse -> Handler QueuesResponse)
-> QueuesResponse -> Handler QueuesResponse
forall a b. (a -> b) -> a -> b
$ QueuesResponse {queues :: [Text]
queues = Proxy registry -> [Text]
forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> [Text]
registryTableNames Proxy registry
registryProxy}
    }

-- | Events server — raw WAI application for SSE streaming.
--
-- Uses WAI 'responseStream' with explicit flush after every event so the
-- browser receives data immediately.  Sends a @: keepalive@ comment every
-- 15 seconds to keep the connection alive through reverse proxies.
--
-- When 'enableSSE' is 'False', sends a single @disabled@ event and closes
-- immediately. The admin UI detects this and skips reconnection.
eventsServer
  :: forall registry
   . ArbiterServerConfig registry
  -> Tagged Handler Application
eventsServer :: forall {k} (registry :: k).
ArbiterServerConfig registry -> Tagged Handler Application
eventsServer ArbiterServerConfig registry
config = Application -> Tagged Handler Application
forall {k} (s :: k) b. b -> Tagged s b
Tagged (Application -> Tagged Handler Application)
-> Application -> Tagged Handler Application
forall a b. (a -> b) -> a -> b
$ \Request
_req Response -> IO ResponseReceived
sendResponse ->
  if Bool -> Bool
not (ArbiterServerConfig registry -> Bool
forall {k} (registry :: k). ArbiterServerConfig registry -> Bool
enableSSE ArbiterServerConfig registry
config)
    then Response -> IO ResponseReceived
sendResponse (Response -> IO ResponseReceived)
-> Response -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> StreamingBody -> Response
responseStream Status
status200 ResponseHeaders
sseHeaders (StreamingBody -> Response) -> StreamingBody -> Response
forall a b. (a -> b) -> a -> b
$ \Builder -> IO ()
write IO ()
flush -> do
      Builder -> IO ()
write Builder
"data: {\"event\":\"disabled\"}\n\n"
      IO ()
flush
    else do
      let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
          connPool :: Pool Connection
connPool = Maybe (Pool Connection) -> Pool Connection
forall a. HasCallStack => Maybe a -> a
fromJust (SimpleConnectionPool -> Maybe (Pool Connection)
connectionPool (SimpleConnectionPool -> Maybe (Pool Connection))
-> SimpleConnectionPool -> Maybe (Pool Connection)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleConnectionPool
forall {k} (registry :: k).
SimpleEnv registry -> SimpleConnectionPool
simplePool SimpleEnv registry
env)
      IO (Connection, LocalPool Connection)
-> ((Connection, LocalPool Connection) -> IO ())
-> ((Connection, LocalPool Connection) -> IO ResponseReceived)
-> IO ResponseReceived
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
        (Pool Connection -> IO (Connection, LocalPool Connection)
forall a. Pool a -> IO (a, LocalPool a)
Pool.takeResource Pool Connection
connPool)
        ( \(Connection
conn, LocalPool Connection
localPool) -> do
            -- Clean up LISTEN state before returning to pool so the connection
            -- doesn't accumulate buffered notifications while idle.
            -- If UNLISTEN fails, the connection is likely broken — destroy it
            -- instead of returning a dead connection to the pool.
            result <- (Connection -> Query -> IO Int64
PG.execute_ Connection
conn Query
"UNLISTEN *" IO Int64 -> IO Bool -> IO Bool
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) IO Bool -> (SomeException -> IO Bool) -> IO Bool
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False)
            if result
              then Pool.putResource localPool conn
              else Pool.destroyResource connPool localPool conn
        )
        (((Connection, LocalPool Connection) -> IO ResponseReceived)
 -> IO ResponseReceived)
-> ((Connection, LocalPool Connection) -> IO ResponseReceived)
-> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ \(Connection
conn, LocalPool Connection
_) -> do
          _ <- Connection -> Query -> IO Int64
PG.execute_ Connection
conn (Query -> IO Int64) -> Query -> IO Int64
forall a b. (a -> b) -> a -> b
$ Query
"LISTEN " Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> String -> Query
forall a. IsString a => String -> a
fromString (Text -> String
T.unpack Text
Schema.eventStreamingChannel)
          sendResponse $ responseStream status200 sseHeaders $ \Builder -> IO ()
write IO ()
flush -> do
            -- Send immediate "connected" event
            Builder -> IO ()
write Builder
"data: {\"event\":\"connected\",\"message\":\"Stream connected\"}\n\n"
            IO ()
flush
            -- Loop: wait for notifications with a 15s keepalive heartbeat.
            -- The heartbeat keeps Warp and any reverse proxies from timing out.
            let go :: IO ()
go = do
                  mNotification <- Int -> IO Notification -> IO (Maybe Notification)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
15_000_000 (Connection -> IO Notification
getNotification Connection
conn)
                  case mNotification of
                    Just Notification
notification -> do
                      let payload :: ByteString
payload = Notification -> ByteString
notificationData Notification
notification
                      Builder -> IO ()
write (Builder
"data: " Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
Builder.byteString ByteString
payload Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Builder
"\n\n")
                      IO ()
flush
                      IO ()
go
                    Maybe Notification
Nothing -> do
                      -- Timeout: send SSE comment to keep connection alive
                      Builder -> IO ()
write Builder
": keepalive\n\n"
                      IO ()
flush
                      IO ()
go
            -- IOException from getNotification = PG connection lost
            -- IOException from write/flush = client disconnected
            IO ()
go IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
  where
    sseHeaders :: ResponseHeaders
sseHeaders =
      [ (HeaderName
"Content-Type", ByteString
"text/event-stream")
      , (HeaderName
"Cache-Control", ByteString
"no-cache")
      , (HeaderName
"Connection", ByteString
"keep-alive")
      , (HeaderName
"X-Accel-Buffering", ByteString
"no")
      ]

-- | Cron API handlers
cronServer
  :: forall registry
   . ArbiterServerConfig registry
  -> CronAPI (AsServerT Handler)
cronServer :: forall {k} (registry :: k).
ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
cronServer ArbiterServerConfig registry
config =
  CronAPI
    { listSchedules :: AsServerT Handler
:- ("schedules" :> Get '[JSON] CronSchedulesResponse)
listSchedules = ArbiterServerConfig registry -> Handler CronSchedulesResponse
forall {k} (registry :: k).
ArbiterServerConfig registry -> Handler CronSchedulesResponse
listCronSchedulesHandler ArbiterServerConfig registry
config
    , updateSchedule :: AsServerT Handler
:- ("schedules"
    :> (Capture "name" Text
        :> (ReqBody '[JSON] CronScheduleUpdate
            :> Patch '[JSON] CronScheduleRow)))
updateSchedule = ArbiterServerConfig registry
-> Text -> CronScheduleUpdate -> Handler CronScheduleRow
forall {k} (registry :: k).
ArbiterServerConfig registry
-> Text -> CronScheduleUpdate -> Handler CronScheduleRow
updateCronScheduleHandler ArbiterServerConfig registry
config
    }

-- | List all cron schedules
listCronSchedulesHandler
  :: forall registry
   . ArbiterServerConfig registry
  -> Handler CronSchedulesResponse
listCronSchedulesHandler :: forall {k} (registry :: k).
ArbiterServerConfig registry -> Handler CronSchedulesResponse
listCronSchedulesHandler ArbiterServerConfig registry
config = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
      connPool :: Pool Connection
connPool = Maybe (Pool Connection) -> Pool Connection
forall a. HasCallStack => Maybe a -> a
fromJust (SimpleConnectionPool -> Maybe (Pool Connection)
connectionPool (SimpleConnectionPool -> Maybe (Pool Connection))
-> SimpleConnectionPool -> Maybe (Pool Connection)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleConnectionPool
forall {k} (registry :: k).
SimpleEnv registry -> SimpleConnectionPool
simplePool SimpleEnv registry
env)
  rows <- IO [CronScheduleRow] -> Handler [CronScheduleRow]
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [CronScheduleRow] -> Handler [CronScheduleRow])
-> IO [CronScheduleRow] -> Handler [CronScheduleRow]
forall a b. (a -> b) -> a -> b
$ Pool Connection
-> (Connection -> IO [CronScheduleRow]) -> IO [CronScheduleRow]
forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool Connection
connPool ((Connection -> IO [CronScheduleRow]) -> IO [CronScheduleRow])
-> (Connection -> IO [CronScheduleRow]) -> IO [CronScheduleRow]
forall a b. (a -> b) -> a -> b
$ \Connection
conn ->
    Connection -> Text -> IO [CronScheduleRow]
CS.listCronSchedules Connection
conn Text
schemaName
  pure $ CronSchedulesResponse {cronSchedules = rows}

-- | Update a cron schedule
updateCronScheduleHandler
  :: forall registry
   . ArbiterServerConfig registry
  -> Text
  -> CronScheduleUpdate
  -> Handler CronScheduleRow
updateCronScheduleHandler :: forall {k} (registry :: k).
ArbiterServerConfig registry
-> Text -> CronScheduleUpdate -> Handler CronScheduleRow
updateCronScheduleHandler ArbiterServerConfig registry
config Text
name CronScheduleUpdate
update = do
  let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
      schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
      connPool :: Pool Connection
connPool = Maybe (Pool Connection) -> Pool Connection
forall a. HasCallStack => Maybe a -> a
fromJust (SimpleConnectionPool -> Maybe (Pool Connection)
connectionPool (SimpleConnectionPool -> Maybe (Pool Connection))
-> SimpleConnectionPool -> Maybe (Pool Connection)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleConnectionPool
forall {k} (registry :: k).
SimpleEnv registry -> SimpleConnectionPool
simplePool SimpleEnv registry
env)

  -- Validate cron expression if provided
  case CronScheduleUpdate
update.overrideExpression of
    Just (Just Text
expr) ->
      case Text -> Either String CronSchedule
parseCronSchedule Text
expr of
        Left String
err -> ServerError -> Handler ()
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err400 {errBody = "Invalid cron expression: " <> fromString err}
        Right CronSchedule
_ -> () -> Handler ()
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Maybe (Maybe Text)
_ -> () -> Handler ()
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  -- Validate overlap policy if provided
  case CronScheduleUpdate
update.overrideOverlap of
    Just (Just Text
ov) ->
      case Text -> Maybe OverlapPolicy
overlapPolicyFromText Text
ov of
        Maybe OverlapPolicy
Nothing ->
          ServerError -> Handler ()
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError
            ServerError
err400
              { errBody = "Invalid overlap policy: must be SkipOverlap or AllowOverlap"
              }
        Just OverlapPolicy
_ -> () -> Handler ()
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Maybe (Maybe Text)
_ -> () -> Handler ()
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  -- Apply update and read back in a single connection
  result <- IO (Maybe CronScheduleRow) -> Handler (Maybe CronScheduleRow)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe CronScheduleRow) -> Handler (Maybe CronScheduleRow))
-> IO (Maybe CronScheduleRow) -> Handler (Maybe CronScheduleRow)
forall a b. (a -> b) -> a -> b
$ Pool Connection
-> (Connection -> IO (Maybe CronScheduleRow))
-> IO (Maybe CronScheduleRow)
forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool Connection
connPool ((Connection -> IO (Maybe CronScheduleRow))
 -> IO (Maybe CronScheduleRow))
-> (Connection -> IO (Maybe CronScheduleRow))
-> IO (Maybe CronScheduleRow)
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    rowsAffected <- Connection -> Text -> Text -> CronScheduleUpdate -> IO Int64
CS.updateCronSchedule Connection
conn Text
schemaName Text
name CronScheduleUpdate
update
    if rowsAffected == 0
      then pure Nothing
      else CS.getCronScheduleByName conn schemaName name

  case result of
    Maybe CronScheduleRow
Nothing -> ServerError -> Handler CronScheduleRow
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err404 {errBody = "Cron schedule not found"}
    Just CronScheduleRow
row -> CronScheduleRow -> Handler CronScheduleRow
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure CronScheduleRow
row

-- | Type class to build server implementations for registry entries
class BuildServer registry (reg :: [(Symbol, Type)]) where
  buildServer :: ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler

-- Base case: empty registry, just queues, events, and cron endpoints
instance
  (RegistryTables registry)
  => BuildServer registry '[]
  where
  buildServer :: ArbiterServerConfig registry -> ServerT (RegistryToAPI '[]) Handler
buildServer ArbiterServerConfig registry
config =
    forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> QueuesAPI (AsServerT Handler)
queuesServer @registry (forall (t :: JobPayloadRegistry). Proxy t
forall {k} (t :: k). Proxy t
Proxy @registry)
      QueuesAPI (AsServerT Handler)
-> (Tagged Handler Application :<|> CronAPI (AsServerT Handler))
-> QueuesAPI (AsServerT Handler)
   :<|> (Tagged Handler Application :<|> CronAPI (AsServerT Handler))
forall a b. a -> b -> a :<|> b
:<|> ArbiterServerConfig registry -> Tagged Handler Application
forall {k} (registry :: k).
ArbiterServerConfig registry -> Tagged Handler Application
eventsServer ArbiterServerConfig registry
config
      Tagged Handler Application
-> CronAPI (AsServerT Handler)
-> Tagged Handler Application :<|> CronAPI (AsServerT Handler)
forall a b. a -> b -> a :<|> b
:<|> ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
forall {k} (registry :: k).
ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
cronServer ArbiterServerConfig registry
config

-- Single table case: table endpoints :<|> queues :<|> events :<|> cron endpoints
instance
  ( JobPayload payload
  , KnownSymbol tableName
  , RegistryTables registry
  )
  => BuildServer registry ('(tableName, payload) ': '[])
  where
  buildServer :: ArbiterServerConfig registry
-> ServerT (RegistryToAPI '[ '(tableName, payload)]) Handler
buildServer ArbiterServerConfig registry
config =
    let tableName :: Text
tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy tableName -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @tableName)
     in forall (registry :: JobPayloadRegistry) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
tableServer @registry @payload Text
tableName ArbiterServerConfig registry
config
          TableAPI payload (AsServerT Handler)
-> (QueuesAPI (AsServerT Handler)
    :<|> (Tagged Handler Application :<|> CronAPI (AsServerT Handler)))
-> TableAPI payload (AsServerT Handler)
   :<|> (QueuesAPI (AsServerT Handler)
         :<|> (Tagged Handler Application :<|> CronAPI (AsServerT Handler)))
forall a b. a -> b -> a :<|> b
:<|> forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> QueuesAPI (AsServerT Handler)
queuesServer @registry (forall (t :: JobPayloadRegistry). Proxy t
forall {k} (t :: k). Proxy t
Proxy @registry)
          QueuesAPI (AsServerT Handler)
-> (Tagged Handler Application :<|> CronAPI (AsServerT Handler))
-> QueuesAPI (AsServerT Handler)
   :<|> (Tagged Handler Application :<|> CronAPI (AsServerT Handler))
forall a b. a -> b -> a :<|> b
:<|> ArbiterServerConfig registry -> Tagged Handler Application
forall {k} (registry :: k).
ArbiterServerConfig registry -> Tagged Handler Application
eventsServer ArbiterServerConfig registry
config
          Tagged Handler Application
-> CronAPI (AsServerT Handler)
-> Tagged Handler Application :<|> CronAPI (AsServerT Handler)
forall a b. a -> b -> a :<|> b
:<|> ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
forall {k} (registry :: k).
ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
cronServer ArbiterServerConfig registry
config

-- Recursive case: table endpoints :<|> rest of tables (at least 2 tables total)
instance
  ( BuildServer registry (nextTable ': moreRest)
  , JobPayload payload
  , KnownSymbol tableName
  )
  => BuildServer registry ('(tableName, payload) ': (nextTable ': moreRest))
  where
  buildServer :: ArbiterServerConfig registry
-> ServerT
     (RegistryToAPI ('(tableName, payload) : nextTable : moreRest))
     Handler
buildServer ArbiterServerConfig registry
config =
    let tableName :: Text
tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy tableName -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @tableName)
     in forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
tableServer @registry @payload Text
tableName ArbiterServerConfig registry
config
          TableAPI payload (AsServerT Handler)
-> ServerT (RegistryToAPI (nextTable : moreRest)) Handler
-> TableAPI payload (AsServerT Handler)
   :<|> ServerT (RegistryToAPI (nextTable : moreRest)) Handler
forall a b. a -> b -> a :<|> b
:<|> forall (registry :: k) (reg :: JobPayloadRegistry).
BuildServer registry reg =>
ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler
forall {k} (registry :: k) (reg :: JobPayloadRegistry).
BuildServer registry reg =>
ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler
buildServer @registry @(nextTable ': moreRest) ArbiterServerConfig registry
config

-- | Complete Arbiter server at @\/api\/v1\/...@
arbiterServer
  :: forall registry
   . (BuildServer registry registry)
  => ArbiterServerConfig registry
  -> ServerT (ArbiterAPI registry) Handler
arbiterServer :: forall (registry :: JobPayloadRegistry).
BuildServer registry registry =>
ArbiterServerConfig registry
-> ServerT (ArbiterAPI registry) Handler
arbiterServer = forall (registry :: JobPayloadRegistry)
       (reg :: JobPayloadRegistry).
BuildServer registry reg =>
ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler
forall {k} (registry :: k) (reg :: JobPayloadRegistry).
BuildServer registry reg =>
ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler
buildServer @registry @registry

-- | Convert to WAI Application
arbiterApp
  :: forall registry
   . ( BuildServer registry registry
     , HasServer (ArbiterAPI registry) '[]
     )
  => ArbiterServerConfig registry
  -> Application
arbiterApp :: forall (registry :: JobPayloadRegistry).
(BuildServer registry registry,
 HasServer (ArbiterAPI registry) '[]) =>
ArbiterServerConfig registry -> Application
arbiterApp ArbiterServerConfig registry
config =
  Proxy (ArbiterAPI registry)
-> Server (ArbiterAPI registry) -> Application
forall {k} (api :: k).
HasServer api '[] =>
Proxy api -> Server api -> Application
serve (forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @(ArbiterAPI registry)) (ArbiterServerConfig registry -> Server (ArbiterAPI registry)
forall (registry :: JobPayloadRegistry).
BuildServer registry registry =>
ArbiterServerConfig registry
-> ServerT (ArbiterAPI registry) Handler
arbiterServer ArbiterServerConfig registry
config)

-- | Run the API server on a specific port.
--
-- Uses @setTimeout 0@ (no idle timeout) so that SSE streaming connections
-- are not killed by Warp.
runArbiterAPI
  :: forall registry
   . ( BuildServer registry registry
     , HasServer (ArbiterAPI registry) '[]
     )
  => Port
  -> ArbiterServerConfig registry
  -> IO ()
runArbiterAPI :: forall (registry :: JobPayloadRegistry).
(BuildServer registry registry,
 HasServer (ArbiterAPI registry) '[]) =>
Int -> ArbiterServerConfig registry -> IO ()
runArbiterAPI Int
port ArbiterServerConfig registry
config = do
  String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Starting Arbiter API server on port " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
port
  let settings :: Settings
settings = Int -> Settings -> Settings
setPort Int
port (Settings -> Settings) -> Settings -> Settings
forall a b. (a -> b) -> a -> b
$ Int -> Settings -> Settings
setTimeout Int
0 Settings
defaultSettings
  Settings -> Application -> IO ()
runSettings Settings
settings (ArbiterServerConfig registry -> Application
forall (registry :: JobPayloadRegistry).
(BuildServer registry registry,
 HasServer (ArbiterAPI registry) '[]) =>
ArbiterServerConfig registry -> Application
arbiterApp ArbiterServerConfig registry
config)