{-# LANGUAGE AllowAmbiguousTypes #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeFamilies #-}
module Arbiter.Servant.Server
(
arbiterServer
, arbiterApp
, runArbiterAPI
, ArbiterServerConfig (..)
, initArbiterServer
, BuildServer (..)
) where
import Arbiter.Core.CronSchedule qualified as CS
import Arbiter.Core.Job.DLQ qualified as DLQ
import Arbiter.Core.Job.Schema qualified as Schema
import Arbiter.Core.Job.Types (Job (..), JobPayload, JobRead)
import Arbiter.Core.MonadArbiter (withDbTransaction)
import Arbiter.Core.Operations qualified as Ops
import Arbiter.Core.PoolConfig (PoolConfig (..))
import Arbiter.Core.QueueRegistry (AllQueuesUnique, RegistryTables (..))
import Arbiter.Core.SqlTemplates (JobFilter (..))
import Arbiter.Simple (SimpleConnectionPool (..), SimpleEnv (..), createSimpleEnvWithConfig, runSimpleDb)
import Arbiter.Worker.Cron (overlapPolicyFromText)
import Control.Exception (SomeException, bracket, catch)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson (toJSON)
import Data.ByteString (ByteString)
import Data.ByteString.Builder qualified as Builder
import Data.Int (Int64)
import Data.Kind (Type)
import Data.Map.Strict qualified as Map
import Data.Maybe (catMaybes, fromJust, fromMaybe)
import Data.Pool qualified as Pool
import Data.String (fromString)
import Data.Text (Text)
import Data.Text qualified as T
import Data.Time (getCurrentTime)
import Data.Time.Format (defaultTimeLocale, formatTime)
import Database.PostgreSQL.Simple qualified as PG
import Database.PostgreSQL.Simple.Notification (Notification (..), getNotification)
import GHC.TypeLits (KnownSymbol, Symbol, symbolVal)
import Network.HTTP.Types (status200)
import Network.Wai (responseStream)
import Network.Wai.Handler.Warp (Port, defaultSettings, runSettings, setPort, setTimeout)
import Servant
import Servant.Server.Generic (AsServerT)
import System.Cron (parseCronSchedule)
import System.Timeout (timeout)
import Arbiter.Servant.API
( ArbiterAPI
, CronAPI (..)
, DLQAPI (..)
, JobsAPI (..)
, QueuesAPI (..)
, RegistryToAPI
, StatsAPI (..)
, TableAPI (..)
)
import Arbiter.Servant.Types
data ArbiterServerConfig registry = ArbiterServerConfig
{ forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv :: SimpleEnv registry
, forall {k} (registry :: k). ArbiterServerConfig registry -> Bool
enableSSE :: Bool
}
serverPoolConfig :: PoolConfig
serverPoolConfig :: PoolConfig
serverPoolConfig =
PoolConfig
{ poolSize :: Int
poolSize = Int
5
, poolIdleTimeout :: Int
poolIdleTimeout = Int
60
, poolStripes :: Maybe Int
poolStripes = Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1
}
initArbiterServer
:: forall registry
. (AllQueuesUnique registry)
=> Proxy registry
-> ByteString
-> Text
-> IO (ArbiterServerConfig registry)
initArbiterServer :: forall (registry :: JobPayloadRegistry).
AllQueuesUnique registry =>
Proxy registry
-> ByteString -> Text -> IO (ArbiterServerConfig registry)
initArbiterServer Proxy registry
_proxy ByteString
connStr Text
schemaName = do
env <- Proxy registry
-> ByteString -> Text -> PoolConfig -> IO (SimpleEnv registry)
forall (registry :: JobPayloadRegistry) (m :: * -> *).
(AllQueuesUnique registry, MonadIO m) =>
Proxy registry
-> ByteString -> Text -> PoolConfig -> m (SimpleEnv registry)
createSimpleEnvWithConfig (forall (t :: JobPayloadRegistry). Proxy t
forall {k} (t :: k). Proxy t
Proxy @registry) ByteString
connStr Text
schemaName PoolConfig
serverPoolConfig
pure ArbiterServerConfig {serverEnv = env, enableSSE = True}
toApiJobs :: (MonadIO m) => [JobRead payload] -> m [ApiJob payload]
toApiJobs :: forall (m :: * -> *) payload.
MonadIO m =>
[JobRead payload] -> m [ApiJob payload]
toApiJobs [JobRead payload]
jobs = do
now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
pure $ map (`ApiJob` now) jobs
toApiJob :: (MonadIO m) => JobRead payload -> m (ApiJob payload)
toApiJob :: forall (m :: * -> *) payload.
MonadIO m =>
JobRead payload -> m (ApiJob payload)
toApiJob JobRead payload
job = do
now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
pure $ ApiJob job now
toApiDLQJobs :: (MonadIO m) => [DLQ.DLQJob payload] -> m [ApiDLQJob payload]
toApiDLQJobs :: forall (m :: * -> *) payload.
MonadIO m =>
[DLQJob payload] -> m [ApiDLQJob payload]
toApiDLQJobs [DLQJob payload]
jobs = do
now <- IO UTCTime -> m UTCTime
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
pure $ map (`ApiDLQJob` now) jobs
validatePagination :: Maybe Int -> Maybe Int -> (Int, Int)
Maybe Int
mLimit Maybe Int
mOffset =
let limit :: Int
limit = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
1000 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
50 Maybe Int
mLimit
offset :: Int
offset = Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
0 (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Int -> Maybe Int -> Int
forall a. a -> Maybe a -> a
fromMaybe Int
0 Maybe Int
mOffset
in (Int
limit, Int
offset)
jobsServer
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> JobsAPI payload (AsServerT Handler)
jobsServer :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> JobsAPI payload (AsServerT Handler)
jobsServer Text
table ArbiterServerConfig registry
config =
JobsAPI
{ listJobs :: AsServerT Handler
:- (QueryParam "limit" Int
:> (QueryParam "offset" Int
:> (QueryParam "group_key" Text
:> (QueryParam "parent_id" Int64
:> (QueryParam "suspended" Bool
:> Get '[JSON] (JobsResponse payload))))))
listJobs = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Text
-> Maybe Int64
-> Maybe Bool
-> Handler (JobsResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Text
-> Maybe Int64
-> Maybe Bool
-> Handler (JobsResponse payload)
listJobsHandler @registry @payload Text
table ArbiterServerConfig registry
config
, insertJob :: AsServerT Handler
:- (ReqBody '[JSON] (ApiJobWrite payload)
:> Post '[JSON] (JobResponse payload))
insertJob = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> ApiJobWrite payload
-> Handler (JobResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> ApiJobWrite payload
-> Handler (JobResponse payload)
insertJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
, insertJobsBatch :: AsServerT Handler
:- ("batch"
:> (ReqBody '[JSON] (BatchInsertRequest payload)
:> Post '[JSON] (BatchInsertResponse payload)))
insertJobsBatch = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> BatchInsertRequest payload
-> Handler (BatchInsertResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> BatchInsertRequest payload
-> Handler (BatchInsertResponse payload)
insertJobsBatchHandler @registry @payload Text
table ArbiterServerConfig registry
config
, getJob :: AsServerT Handler
:- (Capture "id" Int64 :> Get '[JSON] (JobResponse payload))
getJob = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Int64
-> Handler (JobResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Int64
-> Handler (JobResponse payload)
getJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
, getInFlightJobs :: AsServerT Handler
:- ("in-flight"
:> (QueryParam "limit" Int
:> (QueryParam "offset" Int
:> Get '[JSON] (JobsResponse payload))))
getInFlightJobs = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Handler (JobsResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Handler (JobsResponse payload)
getInFlightJobsHandler @registry @payload Text
table ArbiterServerConfig registry
config
, cancelJob :: AsServerT Handler :- (Capture "id" Int64 :> DeleteNoContent)
cancelJob = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
cancelJobHandler @registry Text
table ArbiterServerConfig registry
config
, promoteJob :: AsServerT Handler
:- (Capture "id" Int64 :> ("promote" :> PostNoContent))
promoteJob = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
promoteJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
, moveToDLQ :: AsServerT Handler
:- (Capture "id" Int64 :> ("move-to-dlq" :> PostNoContent))
moveToDLQ = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
moveToDLQHandler @registry @payload Text
table ArbiterServerConfig registry
config
, pauseChildren :: AsServerT Handler
:- (Capture "id" Int64 :> ("pause-children" :> PostNoContent))
pauseChildren = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
pauseChildrenHandler @registry Text
table ArbiterServerConfig registry
config
, resumeChildren :: AsServerT Handler
:- (Capture "id" Int64 :> ("resume-children" :> PostNoContent))
resumeChildren = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
resumeChildrenHandler @registry Text
table ArbiterServerConfig registry
config
, suspendJob :: AsServerT Handler
:- (Capture "id" Int64 :> ("suspend" :> PostNoContent))
suspendJob = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
suspendJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
, resumeJob :: AsServerT Handler
:- (Capture "id" Int64 :> ("resume" :> PostNoContent))
resumeJob = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
resumeJobHandler @registry @payload Text
table ArbiterServerConfig registry
config
}
listJobsHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Text
-> Maybe Int64
-> Maybe Bool
-> Handler (JobsResponse payload)
listJobsHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Text
-> Maybe Int64
-> Maybe Bool
-> Handler (JobsResponse payload)
listJobsHandler Text
tableName ArbiterServerConfig registry
config Maybe Int
mLimit Maybe Int
mOffset Maybe Text
mGroupKey Maybe Int64
mParentId Maybe Bool
mSuspended = IO (JobsResponse payload) -> Handler (JobsResponse payload)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (JobsResponse payload) -> Handler (JobsResponse payload))
-> IO (JobsResponse payload) -> Handler (JobsResponse payload)
forall a b. (a -> b) -> a -> b
$ do
let (Int
limit, Int
offset) = Maybe Int -> Maybe Int -> (Int, Int)
validatePagination Maybe Int
mLimit Maybe Int
mOffset
env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
filters :: [JobFilter]
filters =
[Maybe JobFilter] -> [JobFilter]
forall a. [Maybe a] -> [a]
catMaybes
[ Text -> JobFilter
FilterGroupKey (Text -> JobFilter) -> Maybe Text -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
mGroupKey
, Int64 -> JobFilter
FilterParentId (Int64 -> JobFilter) -> Maybe Int64 -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int64
mParentId
, Bool -> JobFilter
FilterSuspended (Bool -> JobFilter) -> Maybe Bool -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Bool
mSuspended
]
(jobs, total, combined, dlqCounts) <- SimpleEnv registry
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64))
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64))
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ do
j <- Text
-> Text
-> [JobFilter]
-> Int
-> Int
-> SimpleDb registry IO [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
Ops.listJobsFiltered Text
schemaName Text
tableName [JobFilter]
filters Int
limit Int
offset
c <- Ops.countJobsFiltered schemaName tableName filters
let jobIds = (JobRead payload -> Int64) -> [JobRead payload] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey [JobRead payload]
j
hasParents = (JobRead payload -> Bool) -> [JobRead payload] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup [JobRead payload]
j
if null j || not hasParents
then pure (j, c, Map.empty, Map.empty)
else do
cc <- Ops.countChildrenBatch schemaName tableName jobIds
dc <- Ops.countDLQChildrenBatch schemaName tableName jobIds
pure (j, c, cc, dc)
let childCounts = ((Int64, Int64) -> Int64)
-> Map Int64 (Int64, Int64) -> Map Int64 Int64
forall a b. (a -> b) -> Map Int64 a -> Map Int64 b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int64, Int64) -> Int64
forall a b. (a, b) -> a
fst Map Int64 (Int64, Int64)
combined
pausedParents = Map Int64 (Int64, Int64) -> [Int64]
forall k a. Map k a -> [k]
Map.keys (Map Int64 (Int64, Int64) -> [Int64])
-> Map Int64 (Int64, Int64) -> [Int64]
forall a b. (a -> b) -> a -> b
$ ((Int64, Int64) -> Bool)
-> Map Int64 (Int64, Int64) -> Map Int64 (Int64, Int64)
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter (\(Int64
t, Int64
p) -> Int64
p Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== Int64
t) Map Int64 (Int64, Int64)
combined
apiJobs <- toApiJobs jobs
pure $
JobsResponse
{ jobs = apiJobs
, jobsTotal = fromIntegral total
, jobsOffset = offset
, jobsLimit = limit
, childCounts = childCounts
, pausedParents = pausedParents
, dlqChildCounts = dlqCounts
}
insertJobHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> ApiJobWrite payload
-> Handler (JobResponse payload)
insertJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> ApiJobWrite payload
-> Handler (JobResponse payload)
insertJobHandler Text
tableName ArbiterServerConfig registry
config (ApiJobWrite JobWrite payload
jobWrite) = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
mJob <- IO (Maybe (JobRead payload)) -> Handler (Maybe (JobRead payload))
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (JobRead payload)) -> Handler (Maybe (JobRead payload)))
-> IO (Maybe (JobRead payload))
-> Handler (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload))
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload)))
-> SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ Text
-> Text
-> JobWrite payload
-> SimpleDb registry IO (Maybe (JobRead payload))
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
Ops.insertJob Text
schemaName Text
tableName JobWrite payload
jobWrite
case mJob of
Maybe (JobRead payload)
Nothing -> ServerError -> Handler (JobResponse payload)
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err409 {errBody = "Job insert failed (duplicate dedup key)"}
Just JobRead payload
j -> do
aj <- JobRead payload -> Handler (ApiJob payload)
forall (m :: * -> *) payload.
MonadIO m =>
JobRead payload -> m (ApiJob payload)
toApiJob JobRead payload
j
pure $ JobResponse {job = aj}
insertJobsBatchHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> BatchInsertRequest payload
-> Handler (BatchInsertResponse payload)
insertJobsBatchHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> BatchInsertRequest payload
-> Handler (BatchInsertResponse payload)
insertJobsBatchHandler Text
tableName ArbiterServerConfig registry
config (BatchInsertRequest [ApiJobWrite payload]
jobWrites) = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
writes :: [JobWrite payload]
writes = (ApiJobWrite payload -> JobWrite payload)
-> [ApiJobWrite payload] -> [JobWrite payload]
forall a b. (a -> b) -> [a] -> [b]
map ApiJobWrite payload -> JobWrite payload
forall payload. ApiJobWrite payload -> JobWrite payload
unApiJobWrite [ApiJobWrite payload]
jobWrites
inserted <- IO [JobRead payload] -> Handler [JobRead payload]
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [JobRead payload] -> Handler [JobRead payload])
-> IO [JobRead payload] -> Handler [JobRead payload]
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO [JobRead payload] -> IO [JobRead payload]
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO [JobRead payload] -> IO [JobRead payload])
-> SimpleDb registry IO [JobRead payload] -> IO [JobRead payload]
forall a b. (a -> b) -> a -> b
$ Text
-> Text
-> [JobWrite payload]
-> SimpleDb registry IO [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobWrite payload] -> m [JobRead payload]
Ops.insertJobsBatch Text
schemaName Text
tableName [JobWrite payload]
writes
apiJobs <- toApiJobs inserted
pure $ BatchInsertResponse {inserted = apiJobs, insertedCount = length apiJobs}
getJobHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> Int64
-> Handler (JobResponse payload)
getJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Int64
-> Handler (JobResponse payload)
getJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
mJob <- IO (Maybe (JobRead payload)) -> Handler (Maybe (JobRead payload))
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (JobRead payload)) -> Handler (Maybe (JobRead payload)))
-> IO (Maybe (JobRead payload))
-> Handler (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload))
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload)))
-> SimpleDb registry IO (Maybe (JobRead payload))
-> IO (Maybe (JobRead payload))
forall a b. (a -> b) -> a -> b
$ Text
-> Text -> Int64 -> SimpleDb registry IO (Maybe (JobRead payload))
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
Ops.getJobById Text
schemaName Text
tableName Int64
jobId
case mJob of
Maybe (JobRead payload)
Nothing -> ServerError -> Handler (JobResponse payload)
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err404 {errBody = "Job not found"}
Just JobRead payload
j -> do
aj <- JobRead payload -> Handler (ApiJob payload)
forall (m :: * -> *) payload.
MonadIO m =>
JobRead payload -> m (ApiJob payload)
toApiJob JobRead payload
j
pure $ JobResponse {job = aj}
getInFlightJobsHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Handler (JobsResponse payload)
getInFlightJobsHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Handler (JobsResponse payload)
getInFlightJobsHandler Text
tableName ArbiterServerConfig registry
config Maybe Int
mLimit Maybe Int
mOffset = do
let (Int
limit, Int
offset) = Maybe Int -> Maybe Int -> (Int, Int)
validatePagination Maybe Int
mLimit Maybe Int
mOffset
env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
(jobs, total, combined, dlqCounts) <- IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> Handler
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> Handler
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64))
-> IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> Handler
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64))
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64))
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
-> SimpleDb
registry
IO
([JobRead payload], Int64, Map Int64 (Int64, Int64),
Map Int64 Int64)
forall a b. (a -> b) -> a -> b
$ do
j <- Text
-> Text -> Int -> Int -> SimpleDb registry IO [JobRead payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int -> Int -> m [JobRead payload]
Ops.getInFlightJobs Text
schemaName Text
tableName Int
limit Int
offset
c <- Ops.countInFlightJobs schemaName tableName
let jobIds = (JobRead payload -> Int64) -> [JobRead payload] -> [Int64]
forall a b. (a -> b) -> [a] -> [b]
map JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey [JobRead payload]
j
hasParents = (JobRead payload -> Bool) -> [JobRead payload] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
any JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup [JobRead payload]
j
if null j || not hasParents
then pure (j, c, Map.empty, Map.empty)
else do
cc <- Ops.countChildrenBatch schemaName tableName jobIds
dc <- Ops.countDLQChildrenBatch schemaName tableName jobIds
pure (j, c, cc, dc)
let childCounts = ((Int64, Int64) -> Int64)
-> Map Int64 (Int64, Int64) -> Map Int64 Int64
forall a b. (a -> b) -> Map Int64 a -> Map Int64 b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int64, Int64) -> Int64
forall a b. (a, b) -> a
fst Map Int64 (Int64, Int64)
combined
pausedParents = Map Int64 (Int64, Int64) -> [Int64]
forall k a. Map k a -> [k]
Map.keys (Map Int64 (Int64, Int64) -> [Int64])
-> Map Int64 (Int64, Int64) -> [Int64]
forall a b. (a -> b) -> a -> b
$ ((Int64, Int64) -> Bool)
-> Map Int64 (Int64, Int64) -> Map Int64 (Int64, Int64)
forall a k. (a -> Bool) -> Map k a -> Map k a
Map.filter (\(Int64
t, Int64
p) -> Int64
p Int64 -> Int64 -> Bool
forall a. Eq a => a -> a -> Bool
== Int64
t) Map Int64 (Int64, Int64)
combined
apiJobs <- toApiJobs jobs
pure $
JobsResponse
{ jobs = apiJobs
, jobsTotal = fromIntegral total
, jobsOffset = offset
, jobsLimit = limit
, childCounts = childCounts
, pausedParents = pausedParents
, dlqChildCounts = dlqCounts
}
cancelJobHandler
:: forall registry
. Text
-> ArbiterServerConfig registry
-> Int64
-> Handler NoContent
cancelJobHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
cancelJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
rowsAffected <- IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64) -> IO Int64 -> Handler Int64
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.cancelJobCascade Text
schemaName Text
tableName Int64
jobId
if rowsAffected > 0
then pure NoContent
else throwError err404 {errBody = "Job not found"}
promoteJobHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> Int64
-> Handler NoContent
promoteJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
promoteJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
result <- IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ServerError ()) -> Handler (Either ServerError ()))
-> IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ()))
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ do
rowsAffected <- Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.promoteJob Text
schemaName Text
tableName Int64
jobId
if rowsAffected > 0
then pure (Right ())
else do
mJob <- Ops.getJobById @_ @payload schemaName tableName jobId
case mJob of
Maybe (JobRead payload)
Nothing -> Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err404 {errBody = "Job not found"})
Just JobRead payload
job
| JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobRead payload
job ->
Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is suspended — use resume endpoint"})
| Bool
otherwise ->
Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is already visible"})
case result of
Left ServerError
err -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err
Right () -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent
moveToDLQHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> Int64
-> Handler NoContent
moveToDLQHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
moveToDLQHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
result <- IO (Maybe Int64) -> Handler (Maybe Int64)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe Int64) -> Handler (Maybe Int64))
-> IO (Maybe Int64) -> Handler (Maybe Int64)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Maybe Int64) -> IO (Maybe Int64)
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Maybe Int64) -> IO (Maybe Int64))
-> SimpleDb registry IO (Maybe Int64) -> IO (Maybe Int64)
forall a b. (a -> b) -> a -> b
$ SimpleDb registry IO (Maybe Int64)
-> SimpleDb registry IO (Maybe Int64)
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb registry IO (Maybe Int64)
-> SimpleDb registry IO (Maybe Int64))
-> SimpleDb registry IO (Maybe Int64)
-> SimpleDb registry IO (Maybe Int64)
forall a b. (a -> b) -> a -> b
$ do
mJob <- forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
Ops.getJobById @_ @payload Text
schemaName Text
tableName Int64
jobId
case mJob of
Maybe (JobRead payload)
Nothing -> Maybe Int64 -> SimpleDb registry IO (Maybe Int64)
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe Int64
forall a. Maybe a
Nothing
Just JobRead payload
job -> do
Bool -> SimpleDb registry IO () -> SimpleDb registry IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobRead payload
job) (SimpleDb registry IO () -> SimpleDb registry IO ())
-> SimpleDb registry IO () -> SimpleDb registry IO ()
forall a b. (a -> b) -> a -> b
$ do
(results, failures, mSnapshot, _dlqFailures) <-
Text
-> Text
-> Int64
-> SimpleDb
registry
IO
(Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
forall (m :: * -> *).
MonadArbiter m =>
Text
-> Text
-> Int64
-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
Ops.readChildResultsRaw Text
schemaName Text
tableName (JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
job)
let merged = Map Int64 Value
-> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Ops.mergeRawChildResults Map Int64 Value
results Map Int64 Text
failures Maybe Value
mSnapshot
when (not (Map.null merged)) $
void $
Ops.persistParentState schemaName tableName (primaryKey job) (toJSON merged)
Int64 -> Maybe Int64
forall a. a -> Maybe a
Just (Int64 -> Maybe Int64)
-> SimpleDb registry IO Int64 -> SimpleDb registry IO (Maybe Int64)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text
-> Text -> Text -> JobRead payload -> SimpleDb registry IO Int64
forall (m :: * -> *) payload.
MonadArbiter m =>
Text -> Text -> Text -> JobRead payload -> m Int64
Ops.moveToDLQ Text
schemaName Text
tableName Text
"Manually moved to DLQ via admin API" JobRead payload
job
case result of
Maybe Int64
Nothing -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err404 {errBody = "Job not found"}
Just Int64
0 -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err409 {errBody = "Job was concurrently modified"}
Just Int64
_ -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent
pauseChildrenHandler
:: forall registry
. Text
-> ArbiterServerConfig registry
-> Int64
-> Handler NoContent
pauseChildrenHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
pauseChildrenHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
Handler Int64 -> Handler ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Handler Int64 -> Handler ())
-> (SimpleDb registry IO Int64 -> Handler Int64)
-> SimpleDb registry IO Int64
-> Handler ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64)
-> (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64
-> Handler Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> Handler ())
-> SimpleDb registry IO Int64 -> Handler ()
forall a b. (a -> b) -> a -> b
$
Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.pauseChildren Text
schemaName Text
tableName Int64
jobId
NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent
resumeChildrenHandler
:: forall registry
. Text
-> ArbiterServerConfig registry
-> Int64
-> Handler NoContent
resumeChildrenHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
resumeChildrenHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
Handler Int64 -> Handler ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Handler Int64 -> Handler ())
-> (SimpleDb registry IO Int64 -> Handler Int64)
-> SimpleDb registry IO Int64
-> Handler ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64)
-> (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64
-> Handler Int64
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> Handler ())
-> SimpleDb registry IO Int64 -> Handler ()
forall a b. (a -> b) -> a -> b
$
Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.resumeChildren Text
schemaName Text
tableName Int64
jobId
NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent
suspendJobHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> Int64
-> Handler NoContent
suspendJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
suspendJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
result <- IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ServerError ()) -> Handler (Either ServerError ()))
-> IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ()))
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ do
rowsAffected <- Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.suspendJob Text
schemaName Text
tableName Int64
jobId
if rowsAffected > 0
then pure (Right ())
else do
mJob <- Ops.getJobById @_ @payload schemaName tableName jobId
case mJob of
Maybe (JobRead payload)
Nothing -> Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err404 {errBody = "Job not found"})
Just JobRead payload
job
| JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobRead payload
job ->
Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is already suspended"})
| Bool
otherwise ->
Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is in-flight — cannot suspend"})
case result of
Left ServerError
err -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err
Right () -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent
resumeJobHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> Int64
-> Handler NoContent
resumeJobHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
resumeJobHandler Text
tableName ArbiterServerConfig registry
config Int64
jobId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
result <- IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either ServerError ()) -> Handler (Either ServerError ()))
-> IO (Either ServerError ()) -> Handler (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ()))
-> SimpleDb registry IO (Either ServerError ())
-> IO (Either ServerError ())
forall a b. (a -> b) -> a -> b
$ do
rowsAffected <- Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.resumeJob Text
schemaName Text
tableName Int64
jobId
if rowsAffected > 0
then pure (Right ())
else do
mJob <- Ops.getJobById @_ @payload schemaName tableName jobId
case mJob of
Maybe (JobRead payload)
Nothing -> Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err404 {errBody = "Job not found"})
Just JobRead payload
job
| Bool -> Bool
not (JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended JobRead payload
job) ->
Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job is not suspended"})
| JobRead payload -> Bool
forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup JobRead payload
job ->
Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Cannot resume a rollup finalizer with active children"})
| Bool
otherwise ->
Either ServerError ()
-> SimpleDb registry IO (Either ServerError ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (ServerError -> Either ServerError ()
forall a b. a -> Either a b
Left ServerError
err409 {errBody = "Job could not be resumed (concurrent modification)"})
case result of
Left ServerError
err -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err
Right () -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent
dlqServer
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> DLQAPI payload (AsServerT Handler)
dlqServer :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> DLQAPI payload (AsServerT Handler)
dlqServer Text
table ArbiterServerConfig registry
config =
DLQAPI
{ listDLQ :: AsServerT Handler
:- (QueryParam "limit" Int
:> (QueryParam "offset" Int
:> (QueryParam "parent_id" Int64
:> (QueryParam "group_key" Text
:> Get '[JSON] (DLQResponse payload)))))
listDLQ = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Int64
-> Maybe Text
-> Handler (DLQResponse payload)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Int64
-> Maybe Text
-> Handler (DLQResponse payload)
listDLQHandler @registry @payload Text
table ArbiterServerConfig registry
config
, retryFromDLQ :: AsServerT Handler
:- (Capture "id" Int64 :> ("retry" :> PostNoContent))
retryFromDLQ = forall (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
retryFromDLQHandler @registry @payload Text
table ArbiterServerConfig registry
config
, deleteDLQ :: AsServerT Handler :- (Capture "id" Int64 :> DeleteNoContent)
deleteDLQ = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
deleteDLQHandler @registry Text
table ArbiterServerConfig registry
config
, deleteDLQBatch :: AsServerT Handler
:- ("batch-delete"
:> (ReqBody '[JSON] BatchDeleteRequest
:> Post '[JSON] BatchDeleteResponse))
deleteDLQBatch = forall (registry :: k).
Text
-> ArbiterServerConfig registry
-> BatchDeleteRequest
-> Handler BatchDeleteResponse
forall {k} (registry :: k).
Text
-> ArbiterServerConfig registry
-> BatchDeleteRequest
-> Handler BatchDeleteResponse
deleteDLQBatchHandler @registry Text
table ArbiterServerConfig registry
config
}
listDLQHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Int64
-> Maybe Text
-> Handler (DLQResponse payload)
listDLQHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> Maybe Int
-> Maybe Int
-> Maybe Int64
-> Maybe Text
-> Handler (DLQResponse payload)
listDLQHandler Text
tableName ArbiterServerConfig registry
config Maybe Int
mLimit Maybe Int
mOffset Maybe Int64
mParentId Maybe Text
mGroupKey = do
let (Int
limit, Int
offset) = Maybe Int -> Maybe Int -> (Int, Int)
validatePagination Maybe Int
mLimit Maybe Int
mOffset
env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
filters :: [JobFilter]
filters =
[Maybe JobFilter] -> [JobFilter]
forall a. [Maybe a] -> [a]
catMaybes
[ Int64 -> JobFilter
FilterParentId (Int64 -> JobFilter) -> Maybe Int64 -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Int64
mParentId
, Text -> JobFilter
FilterGroupKey (Text -> JobFilter) -> Maybe Text -> Maybe JobFilter
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
mGroupKey
]
(dlqJobs, total) <- IO ([DLQJob payload], Int64) -> Handler ([DLQJob payload], Int64)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([DLQJob payload], Int64) -> Handler ([DLQJob payload], Int64))
-> IO ([DLQJob payload], Int64)
-> Handler ([DLQJob payload], Int64)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO ([DLQJob payload], Int64)
-> IO ([DLQJob payload], Int64)
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO ([DLQJob payload], Int64)
-> IO ([DLQJob payload], Int64))
-> SimpleDb registry IO ([DLQJob payload], Int64)
-> IO ([DLQJob payload], Int64)
forall a b. (a -> b) -> a -> b
$ SimpleDb registry IO ([DLQJob payload], Int64)
-> SimpleDb registry IO ([DLQJob payload], Int64)
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb registry IO ([DLQJob payload], Int64)
-> SimpleDb registry IO ([DLQJob payload], Int64))
-> SimpleDb registry IO ([DLQJob payload], Int64)
-> SimpleDb registry IO ([DLQJob payload], Int64)
forall a b. (a -> b) -> a -> b
$ do
j <- Text
-> Text
-> [JobFilter]
-> Int
-> Int
-> SimpleDb registry IO [DLQJob payload]
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
Ops.listDLQFiltered Text
schemaName Text
tableName [JobFilter]
filters Int
limit Int
offset
c <- Ops.countDLQFiltered schemaName tableName filters
pure (j, c)
apiDlqJobs <- toApiDLQJobs dlqJobs
pure $
DLQResponse
{ dlqJobs = apiDlqJobs
, dlqTotal = fromIntegral total
, dlqOffset = offset
, dlqLimit = limit
}
retryFromDLQHandler
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> Int64
-> Handler NoContent
retryFromDLQHandler :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
retryFromDLQHandler Text
tableName ArbiterServerConfig registry
config Int64
dlqId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
result <- IO (Either Bool ()) -> Handler (Either Bool ())
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either Bool ()) -> Handler (Either Bool ()))
-> IO (Either Bool ()) -> Handler (Either Bool ())
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO (Either Bool ()) -> IO (Either Bool ())
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO (Either Bool ()) -> IO (Either Bool ()))
-> SimpleDb registry IO (Either Bool ()) -> IO (Either Bool ())
forall a b. (a -> b) -> a -> b
$ SimpleDb registry IO (Either Bool ())
-> SimpleDb registry IO (Either Bool ())
forall a. SimpleDb registry IO a -> SimpleDb registry IO a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (SimpleDb registry IO (Either Bool ())
-> SimpleDb registry IO (Either Bool ()))
-> SimpleDb registry IO (Either Bool ())
-> SimpleDb registry IO (Either Bool ())
forall a b. (a -> b) -> a -> b
$ do
mJob <- forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> Int64 -> m (Maybe (JobRead payload))
Ops.retryFromDLQ @_ @payload Text
schemaName Text
tableName Int64
dlqId
case mJob of
Just JobRead payload
_ -> Either Bool () -> SimpleDb registry IO (Either Bool ())
forall a. a -> SimpleDb registry IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (() -> Either Bool ()
forall a b. b -> Either a b
Right ())
Maybe (JobRead payload)
Nothing -> Bool -> Either Bool ()
forall a b. a -> Either a b
Left (Bool -> Either Bool ())
-> SimpleDb registry IO Bool
-> SimpleDb registry IO (Either Bool ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Text -> Text -> Int64 -> SimpleDb registry IO Bool
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Bool
Ops.dlqJobExists Text
schemaName Text
tableName Int64
dlqId
case result of
Right () -> NoContent -> Handler NoContent
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure NoContent
NoContent
Left Bool
True -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err409 {errBody = "Cannot retry: parent job no longer exists (not in queue or DLQ)"}
Left Bool
False -> ServerError -> Handler NoContent
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err404 {errBody = "DLQ job not found"}
deleteDLQHandler
:: forall registry
. Text
-> ArbiterServerConfig registry
-> Int64
-> Handler NoContent
deleteDLQHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Int64 -> Handler NoContent
deleteDLQHandler Text
tableName ArbiterServerConfig registry
config Int64
dlqId = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
rowsAffected <- IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64) -> IO Int64 -> Handler Int64
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Int64 -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> Int64 -> m Int64
Ops.deleteDLQJob Text
schemaName Text
tableName Int64
dlqId
if rowsAffected > 0
then pure NoContent
else throwError err404 {errBody = "DLQ job not found"}
deleteDLQBatchHandler
:: forall registry
. Text
-> ArbiterServerConfig registry
-> BatchDeleteRequest
-> Handler BatchDeleteResponse
deleteDLQBatchHandler :: forall {k} (registry :: k).
Text
-> ArbiterServerConfig registry
-> BatchDeleteRequest
-> Handler BatchDeleteResponse
deleteDLQBatchHandler Text
tableName ArbiterServerConfig registry
config (BatchDeleteRequest [Int64]
dlqIds) = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
rowsDeleted <- IO Int64 -> Handler Int64
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int64 -> Handler Int64) -> IO Int64 -> Handler Int64
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleDb registry IO Int64 -> IO Int64
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO Int64 -> IO Int64)
-> SimpleDb registry IO Int64 -> IO Int64
forall a b. (a -> b) -> a -> b
$ Text -> Text -> [Int64] -> SimpleDb registry IO Int64
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> [Int64] -> m Int64
Ops.deleteDLQJobsBatch Text
schemaName Text
tableName [Int64]
dlqIds
pure $ BatchDeleteResponse {deleted = rowsDeleted}
statsServer
:: forall registry
. Text
-> ArbiterServerConfig registry
-> StatsAPI (AsServerT Handler)
statsServer :: forall {k} (registry :: k).
Text
-> ArbiterServerConfig registry -> StatsAPI (AsServerT Handler)
statsServer Text
tableName ArbiterServerConfig registry
config =
StatsAPI
{ getStats :: AsServerT Handler :- Get '[JSON] StatsResponse
getStats = forall (registry :: k).
Text -> ArbiterServerConfig registry -> Handler StatsResponse
forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Handler StatsResponse
getStatsHandler @registry Text
tableName ArbiterServerConfig registry
config
}
getStatsHandler
:: forall registry
. Text
-> ArbiterServerConfig registry
-> Handler StatsResponse
getStatsHandler :: forall {k} (registry :: k).
Text -> ArbiterServerConfig registry -> Handler StatsResponse
getStatsHandler Text
tableName ArbiterServerConfig registry
config = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
queueStats <- IO QueueStats -> Handler QueueStats
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO QueueStats -> Handler QueueStats)
-> IO QueueStats -> Handler QueueStats
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry
-> SimpleDb registry IO QueueStats -> IO QueueStats
forall {k} (registry :: k) (m :: * -> *) a.
SimpleEnv registry -> SimpleDb registry m a -> m a
runSimpleDb SimpleEnv registry
env (SimpleDb registry IO QueueStats -> IO QueueStats)
-> SimpleDb registry IO QueueStats -> IO QueueStats
forall a b. (a -> b) -> a -> b
$ Text -> Text -> SimpleDb registry IO QueueStats
forall (m :: * -> *).
MonadArbiter m =>
Text -> Text -> m QueueStats
Ops.getQueueStats Text
schemaName Text
tableName
now <- liftIO getCurrentTime
let timestamp = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ TimeLocale -> String -> UTCTime -> String
forall t. FormatTime t => TimeLocale -> String -> t -> String
formatTime TimeLocale
defaultTimeLocale String
"%Y-%m-%dT%H:%M:%S%z" UTCTime
now
pure $ StatsResponse {stats = queueStats, timestamp = timestamp}
tableServer
:: forall registry payload
. (JobPayload payload)
=> Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
tableServer :: forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
tableServer Text
table ArbiterServerConfig registry
config =
TableAPI
{ jobs :: AsServerT Handler :- ("jobs" :> NamedRoutes (JobsAPI payload))
jobs = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> JobsAPI payload (AsServerT Handler)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> JobsAPI payload (AsServerT Handler)
jobsServer @registry @payload Text
table ArbiterServerConfig registry
config
, dlq :: AsServerT Handler :- ("dlq" :> NamedRoutes (DLQAPI payload))
dlq = forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> DLQAPI payload (AsServerT Handler)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> DLQAPI payload (AsServerT Handler)
dlqServer @registry @payload Text
table ArbiterServerConfig registry
config
, stats :: AsServerT Handler :- ("stats" :> NamedRoutes StatsAPI)
stats = forall (registry :: k).
Text
-> ArbiterServerConfig registry -> StatsAPI (AsServerT Handler)
forall {k} (registry :: k).
Text
-> ArbiterServerConfig registry -> StatsAPI (AsServerT Handler)
statsServer @registry Text
table ArbiterServerConfig registry
config
}
queuesServer
:: forall registry
. (RegistryTables registry)
=> Proxy registry
-> QueuesAPI (AsServerT Handler)
queuesServer :: forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> QueuesAPI (AsServerT Handler)
queuesServer Proxy registry
registryProxy =
QueuesAPI
{ listQueues :: AsServerT Handler :- Get '[JSON] QueuesResponse
listQueues = QueuesResponse -> Handler QueuesResponse
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (QueuesResponse -> Handler QueuesResponse)
-> QueuesResponse -> Handler QueuesResponse
forall a b. (a -> b) -> a -> b
$ QueuesResponse {queues :: [Text]
queues = Proxy registry -> [Text]
forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> [Text]
registryTableNames Proxy registry
registryProxy}
}
eventsServer
:: forall registry
. ArbiterServerConfig registry
-> Tagged Handler Application
eventsServer :: forall {k} (registry :: k).
ArbiterServerConfig registry -> Tagged Handler Application
eventsServer ArbiterServerConfig registry
config = Application -> Tagged Handler Application
forall {k} (s :: k) b. b -> Tagged s b
Tagged (Application -> Tagged Handler Application)
-> Application -> Tagged Handler Application
forall a b. (a -> b) -> a -> b
$ \Request
_req Response -> IO ResponseReceived
sendResponse ->
if Bool -> Bool
not (ArbiterServerConfig registry -> Bool
forall {k} (registry :: k). ArbiterServerConfig registry -> Bool
enableSSE ArbiterServerConfig registry
config)
then Response -> IO ResponseReceived
sendResponse (Response -> IO ResponseReceived)
-> Response -> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ Status -> ResponseHeaders -> StreamingBody -> Response
responseStream Status
status200 ResponseHeaders
sseHeaders (StreamingBody -> Response) -> StreamingBody -> Response
forall a b. (a -> b) -> a -> b
$ \Builder -> IO ()
write IO ()
flush -> do
Builder -> IO ()
write Builder
"data: {\"event\":\"disabled\"}\n\n"
IO ()
flush
else do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
connPool :: Pool Connection
connPool = Maybe (Pool Connection) -> Pool Connection
forall a. HasCallStack => Maybe a -> a
fromJust (SimpleConnectionPool -> Maybe (Pool Connection)
connectionPool (SimpleConnectionPool -> Maybe (Pool Connection))
-> SimpleConnectionPool -> Maybe (Pool Connection)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleConnectionPool
forall {k} (registry :: k).
SimpleEnv registry -> SimpleConnectionPool
simplePool SimpleEnv registry
env)
IO (Connection, LocalPool Connection)
-> ((Connection, LocalPool Connection) -> IO ())
-> ((Connection, LocalPool Connection) -> IO ResponseReceived)
-> IO ResponseReceived
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket
(Pool Connection -> IO (Connection, LocalPool Connection)
forall a. Pool a -> IO (a, LocalPool a)
Pool.takeResource Pool Connection
connPool)
( \(Connection
conn, LocalPool Connection
localPool) -> do
result <- (Connection -> Query -> IO Int64
PG.execute_ Connection
conn Query
"UNLISTEN *" IO Int64 -> IO Bool -> IO Bool
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
True) IO Bool -> (SomeException -> IO Bool) -> IO Bool
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> Bool -> IO Bool
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False)
if result
then Pool.putResource localPool conn
else Pool.destroyResource connPool localPool conn
)
(((Connection, LocalPool Connection) -> IO ResponseReceived)
-> IO ResponseReceived)
-> ((Connection, LocalPool Connection) -> IO ResponseReceived)
-> IO ResponseReceived
forall a b. (a -> b) -> a -> b
$ \(Connection
conn, LocalPool Connection
_) -> do
_ <- Connection -> Query -> IO Int64
PG.execute_ Connection
conn (Query -> IO Int64) -> Query -> IO Int64
forall a b. (a -> b) -> a -> b
$ Query
"LISTEN " Query -> Query -> Query
forall a. Semigroup a => a -> a -> a
<> String -> Query
forall a. IsString a => String -> a
fromString (Text -> String
T.unpack Text
Schema.eventStreamingChannel)
sendResponse $ responseStream status200 sseHeaders $ \Builder -> IO ()
write IO ()
flush -> do
Builder -> IO ()
write Builder
"data: {\"event\":\"connected\",\"message\":\"Stream connected\"}\n\n"
IO ()
flush
let go :: IO ()
go = do
mNotification <- Int -> IO Notification -> IO (Maybe Notification)
forall a. Int -> IO a -> IO (Maybe a)
timeout Int
15_000_000 (Connection -> IO Notification
getNotification Connection
conn)
case mNotification of
Just Notification
notification -> do
let payload :: ByteString
payload = Notification -> ByteString
notificationData Notification
notification
Builder -> IO ()
write (Builder
"data: " Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> ByteString -> Builder
Builder.byteString ByteString
payload Builder -> Builder -> Builder
forall a. Semigroup a => a -> a -> a
<> Builder
"\n\n")
IO ()
flush
IO ()
go
Maybe Notification
Nothing -> do
Builder -> IO ()
write Builder
": keepalive\n\n"
IO ()
flush
IO ()
go
IO ()
go IO () -> (SomeException -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(SomeException
_ :: SomeException) -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
where
sseHeaders :: ResponseHeaders
sseHeaders =
[ (HeaderName
"Content-Type", ByteString
"text/event-stream")
, (HeaderName
"Cache-Control", ByteString
"no-cache")
, (HeaderName
"Connection", ByteString
"keep-alive")
, (HeaderName
"X-Accel-Buffering", ByteString
"no")
]
cronServer
:: forall registry
. ArbiterServerConfig registry
-> CronAPI (AsServerT Handler)
cronServer :: forall {k} (registry :: k).
ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
cronServer ArbiterServerConfig registry
config =
CronAPI
{ listSchedules :: AsServerT Handler
:- ("schedules" :> Get '[JSON] CronSchedulesResponse)
listSchedules = ArbiterServerConfig registry -> Handler CronSchedulesResponse
forall {k} (registry :: k).
ArbiterServerConfig registry -> Handler CronSchedulesResponse
listCronSchedulesHandler ArbiterServerConfig registry
config
, updateSchedule :: AsServerT Handler
:- ("schedules"
:> (Capture "name" Text
:> (ReqBody '[JSON] CronScheduleUpdate
:> Patch '[JSON] CronScheduleRow)))
updateSchedule = ArbiterServerConfig registry
-> Text -> CronScheduleUpdate -> Handler CronScheduleRow
forall {k} (registry :: k).
ArbiterServerConfig registry
-> Text -> CronScheduleUpdate -> Handler CronScheduleRow
updateCronScheduleHandler ArbiterServerConfig registry
config
}
listCronSchedulesHandler
:: forall registry
. ArbiterServerConfig registry
-> Handler CronSchedulesResponse
listCronSchedulesHandler :: forall {k} (registry :: k).
ArbiterServerConfig registry -> Handler CronSchedulesResponse
listCronSchedulesHandler ArbiterServerConfig registry
config = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
connPool :: Pool Connection
connPool = Maybe (Pool Connection) -> Pool Connection
forall a. HasCallStack => Maybe a -> a
fromJust (SimpleConnectionPool -> Maybe (Pool Connection)
connectionPool (SimpleConnectionPool -> Maybe (Pool Connection))
-> SimpleConnectionPool -> Maybe (Pool Connection)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleConnectionPool
forall {k} (registry :: k).
SimpleEnv registry -> SimpleConnectionPool
simplePool SimpleEnv registry
env)
rows <- IO [CronScheduleRow] -> Handler [CronScheduleRow]
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [CronScheduleRow] -> Handler [CronScheduleRow])
-> IO [CronScheduleRow] -> Handler [CronScheduleRow]
forall a b. (a -> b) -> a -> b
$ Pool Connection
-> (Connection -> IO [CronScheduleRow]) -> IO [CronScheduleRow]
forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool Connection
connPool ((Connection -> IO [CronScheduleRow]) -> IO [CronScheduleRow])
-> (Connection -> IO [CronScheduleRow]) -> IO [CronScheduleRow]
forall a b. (a -> b) -> a -> b
$ \Connection
conn ->
Connection -> Text -> IO [CronScheduleRow]
CS.listCronSchedules Connection
conn Text
schemaName
pure $ CronSchedulesResponse {cronSchedules = rows}
updateCronScheduleHandler
:: forall registry
. ArbiterServerConfig registry
-> Text
-> CronScheduleUpdate
-> Handler CronScheduleRow
updateCronScheduleHandler :: forall {k} (registry :: k).
ArbiterServerConfig registry
-> Text -> CronScheduleUpdate -> Handler CronScheduleRow
updateCronScheduleHandler ArbiterServerConfig registry
config Text
name CronScheduleUpdate
update = do
let env :: SimpleEnv registry
env = ArbiterServerConfig registry -> SimpleEnv registry
forall {k} (registry :: k).
ArbiterServerConfig registry -> SimpleEnv registry
serverEnv ArbiterServerConfig registry
config
schemaName :: Text
schemaName = SimpleEnv registry -> Text
forall {k} (registry :: k). SimpleEnv registry -> Text
schema SimpleEnv registry
env
connPool :: Pool Connection
connPool = Maybe (Pool Connection) -> Pool Connection
forall a. HasCallStack => Maybe a -> a
fromJust (SimpleConnectionPool -> Maybe (Pool Connection)
connectionPool (SimpleConnectionPool -> Maybe (Pool Connection))
-> SimpleConnectionPool -> Maybe (Pool Connection)
forall a b. (a -> b) -> a -> b
$ SimpleEnv registry -> SimpleConnectionPool
forall {k} (registry :: k).
SimpleEnv registry -> SimpleConnectionPool
simplePool SimpleEnv registry
env)
case CronScheduleUpdate
update.overrideExpression of
Just (Just Text
expr) ->
case Text -> Either String CronSchedule
parseCronSchedule Text
expr of
Left String
err -> ServerError -> Handler ()
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err400 {errBody = "Invalid cron expression: " <> fromString err}
Right CronSchedule
_ -> () -> Handler ()
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Maybe (Maybe Text)
_ -> () -> Handler ()
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
case CronScheduleUpdate
update.overrideOverlap of
Just (Just Text
ov) ->
case Text -> Maybe OverlapPolicy
overlapPolicyFromText Text
ov of
Maybe OverlapPolicy
Nothing ->
ServerError -> Handler ()
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError
ServerError
err400
{ errBody = "Invalid overlap policy: must be SkipOverlap or AllowOverlap"
}
Just OverlapPolicy
_ -> () -> Handler ()
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Maybe (Maybe Text)
_ -> () -> Handler ()
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
result <- IO (Maybe CronScheduleRow) -> Handler (Maybe CronScheduleRow)
forall a. IO a -> Handler a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe CronScheduleRow) -> Handler (Maybe CronScheduleRow))
-> IO (Maybe CronScheduleRow) -> Handler (Maybe CronScheduleRow)
forall a b. (a -> b) -> a -> b
$ Pool Connection
-> (Connection -> IO (Maybe CronScheduleRow))
-> IO (Maybe CronScheduleRow)
forall a r. Pool a -> (a -> IO r) -> IO r
Pool.withResource Pool Connection
connPool ((Connection -> IO (Maybe CronScheduleRow))
-> IO (Maybe CronScheduleRow))
-> (Connection -> IO (Maybe CronScheduleRow))
-> IO (Maybe CronScheduleRow)
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
rowsAffected <- Connection -> Text -> Text -> CronScheduleUpdate -> IO Int64
CS.updateCronSchedule Connection
conn Text
schemaName Text
name CronScheduleUpdate
update
if rowsAffected == 0
then pure Nothing
else CS.getCronScheduleByName conn schemaName name
case result of
Maybe CronScheduleRow
Nothing -> ServerError -> Handler CronScheduleRow
forall a. ServerError -> Handler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError ServerError
err404 {errBody = "Cron schedule not found"}
Just CronScheduleRow
row -> CronScheduleRow -> Handler CronScheduleRow
forall a. a -> Handler a
forall (f :: * -> *) a. Applicative f => a -> f a
pure CronScheduleRow
row
class BuildServer registry (reg :: [(Symbol, Type)]) where
buildServer :: ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler
instance
(RegistryTables registry)
=> BuildServer registry '[]
where
buildServer :: ArbiterServerConfig registry -> ServerT (RegistryToAPI '[]) Handler
buildServer ArbiterServerConfig registry
config =
forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> QueuesAPI (AsServerT Handler)
queuesServer @registry (forall (t :: JobPayloadRegistry). Proxy t
forall {k} (t :: k). Proxy t
Proxy @registry)
QueuesAPI (AsServerT Handler)
-> (Tagged Handler Application :<|> CronAPI (AsServerT Handler))
-> QueuesAPI (AsServerT Handler)
:<|> (Tagged Handler Application :<|> CronAPI (AsServerT Handler))
forall a b. a -> b -> a :<|> b
:<|> ArbiterServerConfig registry -> Tagged Handler Application
forall {k} (registry :: k).
ArbiterServerConfig registry -> Tagged Handler Application
eventsServer ArbiterServerConfig registry
config
Tagged Handler Application
-> CronAPI (AsServerT Handler)
-> Tagged Handler Application :<|> CronAPI (AsServerT Handler)
forall a b. a -> b -> a :<|> b
:<|> ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
forall {k} (registry :: k).
ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
cronServer ArbiterServerConfig registry
config
instance
( JobPayload payload
, KnownSymbol tableName
, RegistryTables registry
)
=> BuildServer registry ('(tableName, payload) ': '[])
where
buildServer :: ArbiterServerConfig registry
-> ServerT (RegistryToAPI '[ '(tableName, payload)]) Handler
buildServer ArbiterServerConfig registry
config =
let tableName :: Text
tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy tableName -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @tableName)
in forall (registry :: JobPayloadRegistry) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
tableServer @registry @payload Text
tableName ArbiterServerConfig registry
config
TableAPI payload (AsServerT Handler)
-> (QueuesAPI (AsServerT Handler)
:<|> (Tagged Handler Application :<|> CronAPI (AsServerT Handler)))
-> TableAPI payload (AsServerT Handler)
:<|> (QueuesAPI (AsServerT Handler)
:<|> (Tagged Handler Application :<|> CronAPI (AsServerT Handler)))
forall a b. a -> b -> a :<|> b
:<|> forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> QueuesAPI (AsServerT Handler)
queuesServer @registry (forall (t :: JobPayloadRegistry). Proxy t
forall {k} (t :: k). Proxy t
Proxy @registry)
QueuesAPI (AsServerT Handler)
-> (Tagged Handler Application :<|> CronAPI (AsServerT Handler))
-> QueuesAPI (AsServerT Handler)
:<|> (Tagged Handler Application :<|> CronAPI (AsServerT Handler))
forall a b. a -> b -> a :<|> b
:<|> ArbiterServerConfig registry -> Tagged Handler Application
forall {k} (registry :: k).
ArbiterServerConfig registry -> Tagged Handler Application
eventsServer ArbiterServerConfig registry
config
Tagged Handler Application
-> CronAPI (AsServerT Handler)
-> Tagged Handler Application :<|> CronAPI (AsServerT Handler)
forall a b. a -> b -> a :<|> b
:<|> ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
forall {k} (registry :: k).
ArbiterServerConfig registry -> CronAPI (AsServerT Handler)
cronServer ArbiterServerConfig registry
config
instance
( BuildServer registry (nextTable ': moreRest)
, JobPayload payload
, KnownSymbol tableName
)
=> BuildServer registry ('(tableName, payload) ': (nextTable ': moreRest))
where
buildServer :: ArbiterServerConfig registry
-> ServerT
(RegistryToAPI ('(tableName, payload) : nextTable : moreRest))
Handler
buildServer ArbiterServerConfig registry
config =
let tableName :: Text
tableName = String -> Text
T.pack (String -> Text) -> String -> Text
forall a b. (a -> b) -> a -> b
$ Proxy tableName -> String
forall (n :: Symbol) (proxy :: Symbol -> *).
KnownSymbol n =>
proxy n -> String
symbolVal (forall {k} (t :: k). Proxy t
forall (t :: Symbol). Proxy t
Proxy @tableName)
in forall (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
forall {k} (registry :: k) payload.
JobPayload payload =>
Text
-> ArbiterServerConfig registry
-> TableAPI payload (AsServerT Handler)
tableServer @registry @payload Text
tableName ArbiterServerConfig registry
config
TableAPI payload (AsServerT Handler)
-> ServerT (RegistryToAPI (nextTable : moreRest)) Handler
-> TableAPI payload (AsServerT Handler)
:<|> ServerT (RegistryToAPI (nextTable : moreRest)) Handler
forall a b. a -> b -> a :<|> b
:<|> forall (registry :: k) (reg :: JobPayloadRegistry).
BuildServer registry reg =>
ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler
forall {k} (registry :: k) (reg :: JobPayloadRegistry).
BuildServer registry reg =>
ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler
buildServer @registry @(nextTable ': moreRest) ArbiterServerConfig registry
config
arbiterServer
:: forall registry
. (BuildServer registry registry)
=> ArbiterServerConfig registry
-> ServerT (ArbiterAPI registry) Handler
arbiterServer :: forall (registry :: JobPayloadRegistry).
BuildServer registry registry =>
ArbiterServerConfig registry
-> ServerT (ArbiterAPI registry) Handler
arbiterServer = forall (registry :: JobPayloadRegistry)
(reg :: JobPayloadRegistry).
BuildServer registry reg =>
ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler
forall {k} (registry :: k) (reg :: JobPayloadRegistry).
BuildServer registry reg =>
ArbiterServerConfig registry -> ServerT (RegistryToAPI reg) Handler
buildServer @registry @registry
arbiterApp
:: forall registry
. ( BuildServer registry registry
, HasServer (ArbiterAPI registry) '[]
)
=> ArbiterServerConfig registry
-> Application
arbiterApp :: forall (registry :: JobPayloadRegistry).
(BuildServer registry registry,
HasServer (ArbiterAPI registry) '[]) =>
ArbiterServerConfig registry -> Application
arbiterApp ArbiterServerConfig registry
config =
Proxy (ArbiterAPI registry)
-> Server (ArbiterAPI registry) -> Application
forall {k} (api :: k).
HasServer api '[] =>
Proxy api -> Server api -> Application
serve (forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @(ArbiterAPI registry)) (ArbiterServerConfig registry -> Server (ArbiterAPI registry)
forall (registry :: JobPayloadRegistry).
BuildServer registry registry =>
ArbiterServerConfig registry
-> ServerT (ArbiterAPI registry) Handler
arbiterServer ArbiterServerConfig registry
config)
runArbiterAPI
:: forall registry
. ( BuildServer registry registry
, HasServer (ArbiterAPI registry) '[]
)
=> Port
-> ArbiterServerConfig registry
-> IO ()
runArbiterAPI :: forall (registry :: JobPayloadRegistry).
(BuildServer registry registry,
HasServer (ArbiterAPI registry) '[]) =>
Int -> ArbiterServerConfig registry -> IO ()
runArbiterAPI Int
port ArbiterServerConfig registry
config = do
String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Starting Arbiter API server on port " String -> String -> String
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
port
let settings :: Settings
settings = Int -> Settings -> Settings
setPort Int
port (Settings -> Settings) -> Settings -> Settings
forall a b. (a -> b) -> a -> b
$ Int -> Settings -> Settings
setTimeout Int
0 Settings
defaultSettings
Settings -> Application -> IO ()
runSettings Settings
settings (ArbiterServerConfig registry -> Application
forall (registry :: JobPayloadRegistry).
(BuildServer registry registry,
HasServer (ArbiterAPI registry) '[]) =>
ArbiterServerConfig registry -> Application
arbiterApp ArbiterServerConfig registry
config)