{-# LANGUAGE OverloadedStrings #-}

module Arbiter.Core.Job.Types
  ( -- * Core Job Type
    Job (..)
  , JobRead
  , JobWrite
  , defaultJob
  , defaultGroupedJob
  , isRollup

    -- * Type Constraints
  , JobPayload

    -- * Deduplication
  , DedupKey (..)

    -- * Observability
  , ObservabilityHooks (..)
  , defaultObservabilityHooks
  , ClaimTime
  , CurrentTime
  , StartTime
  , EndTime
  , ErrorMsg
  , BackoffDelay
  ) where

import Data.Aeson (FromJSON (..), ToJSON (..), Value, object, withObject, (.:), (.=))
import Data.Aeson.Types (Parser)
import Data.Int (Int32, Int64)
import Data.Maybe (isJust)
import Data.Text (Text)
import Data.Time (NominalDiffTime, UTCTime)
import GHC.Generics (Generic)

-- | A job in the queue. Parametrized over payload, primary key, queue name,
-- and inserted-at timestamp. See 'JobWrite' (for insertion) and 'JobRead'
-- (returned from claims/queries).
data Job payload key q insertedAt = Job
  { forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey :: key
  -- ^ @()@ in 'JobWrite' (assigned by DB), @Int64@ in 'JobRead'.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> payload
payload :: payload
  -- ^ User-defined payload, stored as JSONB.
  , forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName :: q
  -- ^ @()@ in 'JobWrite', table name ('Text') in 'JobRead'. Not serialized.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey :: Maybe Text
  -- ^ Jobs with the same group key are processed serially (head-of-line blocking).
  -- @Nothing@ for ungrouped jobs that can run in parallel.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> insertedAt
insertedAt :: insertedAt
  -- ^ @()@ in 'JobWrite' (set by DB), @UTCTime@ in 'JobRead'.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
updatedAt :: Maybe UTCTime
  -- ^ The time the job was last updated.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts :: Int32
  -- ^ The number of times this job has been attempted.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
lastError :: Maybe Text
  -- ^ The error message from the last failed attempt.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
priority :: Int32
  -- ^ The job's priority. Lower numbers are higher priority.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
lastAttemptedAt :: Maybe UTCTime
  -- ^ The time this job was last claimed by a worker.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
notVisibleUntil :: Maybe UTCTime
  -- ^ When this job becomes visible for claiming.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey :: Maybe DedupKey
  -- ^ The deduplication strategy for this job.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
maxAttempts :: Maybe Int32
  -- ^ Override the global maxAttempts config for this specific job.
  -- If @Nothing@, uses the worker config's global @maxAttempts@ value.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId :: Maybe Int64
  -- ^ Parent job ID. Set by 'insertJobTree', not manually. When this child
  -- is the last to complete, the parent (if a rollup finalizer) is resumed.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Value
parentState :: Maybe Value
  -- ^ Snapshot of accumulated child results for rollup finalizers. The
  -- engine sets this to an empty object on insert when the job is a
  -- rollup finalizer, and overwrites it with the final results map before
  -- a DLQ move so the snapshot survives the @ON DELETE CASCADE@ on the
  -- results table. 'isRollup' is derived from whether this is non-null.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended :: Bool
  -- ^ Whether this job is suspended (not claimable).
  -- @TRUE@ for: finalizers waiting for children to complete,
  -- or operator-paused jobs.
  }
  deriving stock (Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
(Job payload key q insertedAt
 -> Job payload key q insertedAt -> Bool)
-> (Job payload key q insertedAt
    -> Job payload key q insertedAt -> Bool)
-> Eq (Job payload key q insertedAt)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall payload key q insertedAt.
(Eq key, Eq payload, Eq q, Eq insertedAt) =>
Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
$c== :: forall payload key q insertedAt.
(Eq key, Eq payload, Eq q, Eq insertedAt) =>
Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
== :: Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
$c/= :: forall payload key q insertedAt.
(Eq key, Eq payload, Eq q, Eq insertedAt) =>
Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
/= :: Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
Eq, (forall x.
 Job payload key q insertedAt
 -> Rep (Job payload key q insertedAt) x)
-> (forall x.
    Rep (Job payload key q insertedAt) x
    -> Job payload key q insertedAt)
-> Generic (Job payload key q insertedAt)
forall x.
Rep (Job payload key q insertedAt) x
-> Job payload key q insertedAt
forall x.
Job payload key q insertedAt
-> Rep (Job payload key q insertedAt) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall payload key q insertedAt x.
Rep (Job payload key q insertedAt) x
-> Job payload key q insertedAt
forall payload key q insertedAt x.
Job payload key q insertedAt
-> Rep (Job payload key q insertedAt) x
$cfrom :: forall payload key q insertedAt x.
Job payload key q insertedAt
-> Rep (Job payload key q insertedAt) x
from :: forall x.
Job payload key q insertedAt
-> Rep (Job payload key q insertedAt) x
$cto :: forall payload key q insertedAt x.
Rep (Job payload key q insertedAt) x
-> Job payload key q insertedAt
to :: forall x.
Rep (Job payload key q insertedAt) x
-> Job payload key q insertedAt
Generic, Int -> Job payload key q insertedAt -> ShowS
[Job payload key q insertedAt] -> ShowS
Job payload key q insertedAt -> String
(Int -> Job payload key q insertedAt -> ShowS)
-> (Job payload key q insertedAt -> String)
-> ([Job payload key q insertedAt] -> ShowS)
-> Show (Job payload key q insertedAt)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
Int -> Job payload key q insertedAt -> ShowS
forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
[Job payload key q insertedAt] -> ShowS
forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
Job payload key q insertedAt -> String
$cshowsPrec :: forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
Int -> Job payload key q insertedAt -> ShowS
showsPrec :: Int -> Job payload key q insertedAt -> ShowS
$cshow :: forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
Job payload key q insertedAt -> String
show :: Job payload key q insertedAt -> String
$cshowList :: forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
[Job payload key q insertedAt] -> ShowS
showList :: [Job payload key q insertedAt] -> ShowS
Show)

-- | A rollup finalizer is any job whose 'parentState' snapshot is present
-- (an empty object on insert; the merged child results before a DLQ move).
isRollup :: Job p k q t -> Bool
isRollup :: forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup = Maybe Value -> Bool
forall a. Maybe a -> Bool
isJust (Maybe Value -> Bool)
-> (Job p k q t -> Maybe Value) -> Job p k q t -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Job p k q t -> Maybe Value
forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Value
parentState

-- | Ungrouped 'JobWrite' with default values. For serial processing within a
-- group, use 'defaultGroupedJob'.
defaultJob :: payload -> JobWrite payload
defaultJob :: forall payload. payload -> JobWrite payload
defaultJob payload
p =
  Job
    { primaryKey :: ()
primaryKey = ()
    , payload :: payload
payload = payload
p
    , queueName :: ()
queueName = ()
    , groupKey :: Maybe Text
groupKey = Maybe Text
forall a. Maybe a
Nothing
    , insertedAt :: ()
insertedAt = ()
    , updatedAt :: Maybe UTCTime
updatedAt = Maybe UTCTime
forall a. Maybe a
Nothing
    , attempts :: Int32
attempts = Int32
0
    , lastError :: Maybe Text
lastError = Maybe Text
forall a. Maybe a
Nothing
    , priority :: Int32
priority = Int32
0
    , lastAttemptedAt :: Maybe UTCTime
lastAttemptedAt = Maybe UTCTime
forall a. Maybe a
Nothing
    , notVisibleUntil :: Maybe UTCTime
notVisibleUntil = Maybe UTCTime
forall a. Maybe a
Nothing
    , dedupKey :: Maybe DedupKey
dedupKey = Maybe DedupKey
forall a. Maybe a
Nothing
    , maxAttempts :: Maybe Int32
maxAttempts = Maybe Int32
forall a. Maybe a
Nothing
    , parentId :: Maybe Int64
parentId = Maybe Int64
forall a. Maybe a
Nothing
    , parentState :: Maybe Value
parentState = Maybe Value
forall a. Maybe a
Nothing
    , suspended :: Bool
suspended = Bool
False
    }

-- | Grouped 'JobWrite'. Jobs sharing a group key are processed serially.
--
-- @
-- defaultGroupedJob "user-123" (ProcessEvent eventData)
-- @
defaultGroupedJob :: Text -> payload -> JobWrite payload
defaultGroupedJob :: forall payload. Text -> payload -> JobWrite payload
defaultGroupedJob Text
gk payload
p =
  Job
    { primaryKey :: ()
primaryKey = ()
    , payload :: payload
payload = payload
p
    , queueName :: ()
queueName = ()
    , groupKey :: Maybe Text
groupKey = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
gk
    , insertedAt :: ()
insertedAt = ()
    , updatedAt :: Maybe UTCTime
updatedAt = Maybe UTCTime
forall a. Maybe a
Nothing
    , attempts :: Int32
attempts = Int32
0
    , lastError :: Maybe Text
lastError = Maybe Text
forall a. Maybe a
Nothing
    , priority :: Int32
priority = Int32
0
    , lastAttemptedAt :: Maybe UTCTime
lastAttemptedAt = Maybe UTCTime
forall a. Maybe a
Nothing
    , notVisibleUntil :: Maybe UTCTime
notVisibleUntil = Maybe UTCTime
forall a. Maybe a
Nothing
    , dedupKey :: Maybe DedupKey
dedupKey = Maybe DedupKey
forall a. Maybe a
Nothing
    , maxAttempts :: Maybe Int32
maxAttempts = Maybe Int32
forall a. Maybe a
Nothing
    , parentId :: Maybe Int64
parentId = Maybe Int64
forall a. Maybe a
Nothing
    , parentState :: Maybe Value
parentState = Maybe Value
forall a. Maybe a
Nothing
    , suspended :: Bool
suspended = Bool
False
    }

-- | A type alias for a job that has been read from the database.
type JobRead payload = Job payload Int64 Text UTCTime

-- | A type alias for a job that is ready to be written to the database.
-- It does not yet have an ID or insertion timestamp.
type JobWrite payload = Job payload () () ()

-- | Payloads must round-trip through JSON for PostgreSQL JSONB storage.
type JobPayload payload = (FromJSON payload, ToJSON payload)

-- | Deduplication strategy, checked on INSERT via @ON CONFLICT@ on the dedup key.
data DedupKey
  = -- | Skip if a job with this key exists (@DO NOTHING@).
    IgnoreDuplicate Text
  | -- | Replace the existing job with this key (@DO UPDATE@), unless it's
    -- actively in-flight on its first attempt.
    ReplaceDuplicate Text
  deriving stock (DedupKey -> DedupKey -> Bool
(DedupKey -> DedupKey -> Bool)
-> (DedupKey -> DedupKey -> Bool) -> Eq DedupKey
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DedupKey -> DedupKey -> Bool
== :: DedupKey -> DedupKey -> Bool
$c/= :: DedupKey -> DedupKey -> Bool
/= :: DedupKey -> DedupKey -> Bool
Eq, (forall x. DedupKey -> Rep DedupKey x)
-> (forall x. Rep DedupKey x -> DedupKey) -> Generic DedupKey
forall x. Rep DedupKey x -> DedupKey
forall x. DedupKey -> Rep DedupKey x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. DedupKey -> Rep DedupKey x
from :: forall x. DedupKey -> Rep DedupKey x
$cto :: forall x. Rep DedupKey x -> DedupKey
to :: forall x. Rep DedupKey x -> DedupKey
Generic, Int -> DedupKey -> ShowS
[DedupKey] -> ShowS
DedupKey -> String
(Int -> DedupKey -> ShowS)
-> (DedupKey -> String) -> ([DedupKey] -> ShowS) -> Show DedupKey
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DedupKey -> ShowS
showsPrec :: Int -> DedupKey -> ShowS
$cshow :: DedupKey -> String
show :: DedupKey -> String
$cshowList :: [DedupKey] -> ShowS
showList :: [DedupKey] -> ShowS
Show)

instance ToJSON DedupKey where
  toJSON :: DedupKey -> Value
toJSON (IgnoreDuplicate Text
k) = [Pair] -> Value
object [Key
"key" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Text
k, Key
"strategy" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (Text
"ignore" :: Text)]
  toJSON (ReplaceDuplicate Text
k) = [Pair] -> Value
object [Key
"key" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Text
k, Key
"strategy" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (Text
"replace" :: Text)]

instance FromJSON DedupKey where
  parseJSON :: Value -> Parser DedupKey
parseJSON = String -> (Object -> Parser DedupKey) -> Value -> Parser DedupKey
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"DedupKey" ((Object -> Parser DedupKey) -> Value -> Parser DedupKey)
-> (Object -> Parser DedupKey) -> Value -> Parser DedupKey
forall a b. (a -> b) -> a -> b
$ \Object
v -> do
    key <- Object
v Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"key"
    strategy <- v .: "strategy" :: Parser Text
    case strategy of
      Text
"ignore" -> DedupKey -> Parser DedupKey
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DedupKey -> Parser DedupKey) -> DedupKey -> Parser DedupKey
forall a b. (a -> b) -> a -> b
$ Text -> DedupKey
IgnoreDuplicate Text
key
      Text
"replace" -> DedupKey -> Parser DedupKey
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DedupKey -> Parser DedupKey) -> DedupKey -> Parser DedupKey
forall a b. (a -> b) -> a -> b
$ Text -> DedupKey
ReplaceDuplicate Text
key
      Text
_ -> String -> Parser DedupKey
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser DedupKey) -> String -> Parser DedupKey
forall a b. (a -> b) -> a -> b
$ String
"Unknown dedup strategy: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show Text
strategy

type ClaimTime = UTCTime
type CurrentTime = UTCTime
type StartTime = UTCTime
type EndTime = UTCTime
type ErrorMsg = Text
type BackoffDelay = NominalDiffTime

-- | A set of callbacks invoked at key points in the job lifecycle.
--
-- Use these hooks to integrate with metrics, logging, or tracing systems.
-- Hooks are exception-safe; any exception thrown within a hook is caught
-- and ignored to prevent crashing the worker.
data ObservabilityHooks m payload = ObservabilityHooks
  { forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
onJobClaimed
      :: (JobPayload payload)
      => JobRead payload
      -> ClaimTime
      -> m ()
  -- ^ Called immediately after a job is claimed by a worker.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> UTCTime -> UTCTime -> m ()
onJobSuccess
      :: (JobPayload payload)
      => JobRead payload
      -> StartTime
      -> EndTime
      -> m ()
  -- ^ Called after a job handler succeeds. Use @diffUTCTime@ on the timestamps
  -- to calculate job duration.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> Text -> UTCTime -> UTCTime -> m ()
onJobFailure
      :: (JobPayload payload)
      => JobRead payload
      -> ErrorMsg
      -> StartTime
      -> EndTime
      -> m ()
  -- ^ Called after a job handler fails. Use @diffUTCTime@ on the timestamps
  -- to calculate job duration.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> BackoffDelay -> m ()
onJobRetry
      :: (JobPayload payload)
      => JobRead payload
      -> BackoffDelay
      -> m ()
  -- ^ Called when a failed job is successfully scheduled for retry.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => Text -> JobRead payload -> m ()
onJobFailedAndMovedToDLQ
      :: (JobPayload payload)
      => ErrorMsg
      -> JobRead payload
      -> m ()
  -- ^ Called when a job is successfully moved to the dead-letter queue.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> UTCTime -> UTCTime -> m ()
onJobHeartbeat
      :: (JobPayload payload)
      => JobRead payload
      -> CurrentTime
      -> StartTime
      -> m ()
  -- ^ Called periodically for a running job.
  }

-- | No-op hooks. Override fields to add observability:
--
-- @
-- myHooks = defaultObservabilityHooks
--   { onJobSuccess = \\job startTime endTime -> do
--       let duration = diffUTCTime endTime startTime
--       logInfo $ "Job " <> show (primaryKey job) <> " took " <> show duration
--   }
-- @
defaultObservabilityHooks :: (Applicative m) => ObservabilityHooks m payload
defaultObservabilityHooks :: forall (m :: * -> *) payload.
Applicative m =>
ObservabilityHooks m payload
defaultObservabilityHooks =
  ObservabilityHooks
    { onJobClaimed :: JobPayload payload => JobRead payload -> UTCTime -> m ()
onJobClaimed = \JobRead payload
_ UTCTime
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobSuccess :: JobPayload payload => JobRead payload -> UTCTime -> UTCTime -> m ()
onJobSuccess = \JobRead payload
_ UTCTime
_ UTCTime
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobFailure :: JobPayload payload =>
JobRead payload -> Text -> UTCTime -> UTCTime -> m ()
onJobFailure = \JobRead payload
_ Text
_ UTCTime
_ UTCTime
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobRetry :: JobPayload payload => JobRead payload -> BackoffDelay -> m ()
onJobRetry = \JobRead payload
_ BackoffDelay
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobFailedAndMovedToDLQ :: JobPayload payload => Text -> JobRead payload -> m ()
onJobFailedAndMovedToDLQ = \Text
_ JobRead payload
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobHeartbeat :: JobPayload payload => JobRead payload -> UTCTime -> UTCTime -> m ()
onJobHeartbeat = \JobRead payload
_ UTCTime
_ UTCTime
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    }