{-# LANGUAGE OverloadedStrings #-}

module Arbiter.Core.Job.Types
  ( -- * Core Job Type
    Job (..)
  , JobRead
  , JobWrite
  , defaultJob
  , defaultGroupedJob

    -- * Type Constraints
  , JobPayload

    -- * Deduplication
  , DedupKey (..)

    -- * Observability
  , ObservabilityHooks (..)
  , defaultObservabilityHooks
  ) where

import Data.Aeson (FromJSON (..), ToJSON (..), object, withObject, (.:), (.=))
import Data.Aeson.Types (Parser)
import Data.Int (Int32, Int64)
import Data.Text (Text)
import Data.Time (NominalDiffTime, UTCTime)
import GHC.Generics (Generic)

-- | The core t'Job' type, representing a unit of work in the queue.
data Job payload key q insertedAt = Job
  { forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey :: key
  -- ^ The job's primary key (unit or @Int64@).
  , forall payload key q insertedAt.
Job payload key q insertedAt -> payload
payload :: payload
  -- ^ The user-defined payload, stored as JSONB.
  , forall payload key q insertedAt. Job payload key q insertedAt -> q
queueName :: q
  -- ^ The name of the queue (table) this job belongs to.
  -- This is a read-only virtual field, it does not get serialized.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
groupKey :: Maybe Text
  -- ^ An optional key for grouping jobs to be processed serially.
  -- Jobs with the same group key are processed in order.
  -- Use @Nothing@ for ungrouped jobs that can run in parallel.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> insertedAt
insertedAt :: insertedAt
  -- ^ The time the job was inserted (unit or @UTCTime@).
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
updatedAt :: Maybe UTCTime
  -- ^ The time the job was last updated.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
attempts :: Int32
  -- ^ The number of times this job has been attempted.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Text
lastError :: Maybe Text
  -- ^ The error message from the last failed attempt.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Int32
priority :: Int32
  -- ^ The job's priority. Lower numbers are higher priority.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
lastAttemptedAt :: Maybe UTCTime
  -- ^ The time this job was last claimed by a worker.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe UTCTime
notVisibleUntil :: Maybe UTCTime
  -- ^ When this job becomes visible for claiming.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe DedupKey
dedupKey :: Maybe DedupKey
  -- ^ The deduplication strategy for this job.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int32
maxAttempts :: Maybe Int32
  -- ^ Override the global maxAttempts config for this specific job.
  -- If @Nothing@, uses the worker config's global @maxAttempts@ value.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Maybe Int64
parentId :: Maybe Int64
  -- ^ Optional parent job ID for job dependencies.
  -- When set, this job is a child of the specified parent.
  -- The parent is automatically suspended when acked if children exist,
  -- and resumed for a completion round when the last child finishes.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
isRollup :: Bool
  -- ^ Whether this job is a rollup finalizer (has children whose results
  -- are collected). Set by 'rollup' / '<~~'. When @True@, the worker
  -- passes child results as a typed argument to the handler.
  , forall payload key q insertedAt.
Job payload key q insertedAt -> Bool
suspended :: Bool
  -- ^ Whether this job is suspended (not claimable).
  -- @TRUE@ for: finalizers waiting for children to complete,
  -- or operator-paused jobs.
  }
  deriving stock (Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
(Job payload key q insertedAt
 -> Job payload key q insertedAt -> Bool)
-> (Job payload key q insertedAt
    -> Job payload key q insertedAt -> Bool)
-> Eq (Job payload key q insertedAt)
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
forall payload key q insertedAt.
(Eq key, Eq payload, Eq q, Eq insertedAt) =>
Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
$c== :: forall payload key q insertedAt.
(Eq key, Eq payload, Eq q, Eq insertedAt) =>
Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
== :: Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
$c/= :: forall payload key q insertedAt.
(Eq key, Eq payload, Eq q, Eq insertedAt) =>
Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
/= :: Job payload key q insertedAt
-> Job payload key q insertedAt -> Bool
Eq, (forall x.
 Job payload key q insertedAt
 -> Rep (Job payload key q insertedAt) x)
-> (forall x.
    Rep (Job payload key q insertedAt) x
    -> Job payload key q insertedAt)
-> Generic (Job payload key q insertedAt)
forall x.
Rep (Job payload key q insertedAt) x
-> Job payload key q insertedAt
forall x.
Job payload key q insertedAt
-> Rep (Job payload key q insertedAt) x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
forall payload key q insertedAt x.
Rep (Job payload key q insertedAt) x
-> Job payload key q insertedAt
forall payload key q insertedAt x.
Job payload key q insertedAt
-> Rep (Job payload key q insertedAt) x
$cfrom :: forall payload key q insertedAt x.
Job payload key q insertedAt
-> Rep (Job payload key q insertedAt) x
from :: forall x.
Job payload key q insertedAt
-> Rep (Job payload key q insertedAt) x
$cto :: forall payload key q insertedAt x.
Rep (Job payload key q insertedAt) x
-> Job payload key q insertedAt
to :: forall x.
Rep (Job payload key q insertedAt) x
-> Job payload key q insertedAt
Generic, Int -> Job payload key q insertedAt -> ShowS
[Job payload key q insertedAt] -> ShowS
Job payload key q insertedAt -> String
(Int -> Job payload key q insertedAt -> ShowS)
-> (Job payload key q insertedAt -> String)
-> ([Job payload key q insertedAt] -> ShowS)
-> Show (Job payload key q insertedAt)
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
Int -> Job payload key q insertedAt -> ShowS
forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
[Job payload key q insertedAt] -> ShowS
forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
Job payload key q insertedAt -> String
$cshowsPrec :: forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
Int -> Job payload key q insertedAt -> ShowS
showsPrec :: Int -> Job payload key q insertedAt -> ShowS
$cshow :: forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
Job payload key q insertedAt -> String
show :: Job payload key q insertedAt -> String
$cshowList :: forall payload key q insertedAt.
(Show key, Show payload, Show q, Show insertedAt) =>
[Job payload key q insertedAt] -> ShowS
showList :: [Job payload key q insertedAt] -> ShowS
Show)

-- | Creates an ungrouped 'JobWrite' with default values.
--
-- Jobs created with this function can be processed in parallel by multiple workers.
-- For serial processing within a group, use 'defaultGroupedJob'.
defaultJob :: payload -> JobWrite payload
defaultJob :: forall payload. payload -> JobWrite payload
defaultJob payload
p =
  Job
    { primaryKey :: ()
primaryKey = ()
    , payload :: payload
payload = payload
p
    , queueName :: ()
queueName = ()
    , groupKey :: Maybe Text
groupKey = Maybe Text
forall a. Maybe a
Nothing
    , insertedAt :: ()
insertedAt = ()
    , updatedAt :: Maybe UTCTime
updatedAt = Maybe UTCTime
forall a. Maybe a
Nothing
    , attempts :: Int32
attempts = Int32
0
    , lastError :: Maybe Text
lastError = Maybe Text
forall a. Maybe a
Nothing
    , priority :: Int32
priority = Int32
0
    , lastAttemptedAt :: Maybe UTCTime
lastAttemptedAt = Maybe UTCTime
forall a. Maybe a
Nothing
    , notVisibleUntil :: Maybe UTCTime
notVisibleUntil = Maybe UTCTime
forall a. Maybe a
Nothing
    , dedupKey :: Maybe DedupKey
dedupKey = Maybe DedupKey
forall a. Maybe a
Nothing
    , maxAttempts :: Maybe Int32
maxAttempts = Maybe Int32
forall a. Maybe a
Nothing
    , parentId :: Maybe Int64
parentId = Maybe Int64
forall a. Maybe a
Nothing
    , isRollup :: Bool
isRollup = Bool
False
    , suspended :: Bool
suspended = Bool
False
    }

-- | Creates a grouped 'JobWrite' with default values.
--
-- Jobs with the same group key are processed serially (head-of-line blocking),
-- ensuring that only one job per group is processed at a time.
-- Use for operations that must be ordered, like processing events for a specific user.
--
-- @
-- let job = defaultGroupedJob "user-123" (MyPayload data)
-- @
defaultGroupedJob :: Text -> payload -> JobWrite payload
defaultGroupedJob :: forall payload. Text -> payload -> JobWrite payload
defaultGroupedJob Text
gk payload
p =
  Job
    { primaryKey :: ()
primaryKey = ()
    , payload :: payload
payload = payload
p
    , queueName :: ()
queueName = ()
    , groupKey :: Maybe Text
groupKey = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
gk
    , insertedAt :: ()
insertedAt = ()
    , updatedAt :: Maybe UTCTime
updatedAt = Maybe UTCTime
forall a. Maybe a
Nothing
    , attempts :: Int32
attempts = Int32
0
    , lastError :: Maybe Text
lastError = Maybe Text
forall a. Maybe a
Nothing
    , priority :: Int32
priority = Int32
0
    , lastAttemptedAt :: Maybe UTCTime
lastAttemptedAt = Maybe UTCTime
forall a. Maybe a
Nothing
    , notVisibleUntil :: Maybe UTCTime
notVisibleUntil = Maybe UTCTime
forall a. Maybe a
Nothing
    , dedupKey :: Maybe DedupKey
dedupKey = Maybe DedupKey
forall a. Maybe a
Nothing
    , maxAttempts :: Maybe Int32
maxAttempts = Maybe Int32
forall a. Maybe a
Nothing
    , parentId :: Maybe Int64
parentId = Maybe Int64
forall a. Maybe a
Nothing
    , isRollup :: Bool
isRollup = Bool
False
    , suspended :: Bool
suspended = Bool
False
    }

-- | A type alias for a job that has been read from the database.
type JobRead payload = Job payload Int64 Text UTCTime

-- | A type alias for a job that is ready to be written to the database.
-- It does not yet have an ID or insertion timestamp.
type JobWrite payload = Job payload () () ()

-- | Constraint for types that can be used as a job payload.
--
-- Payloads are serialized to and from JSON for storage in the database.
-- The table name is looked up from the registry via @TableForPayload@.
type JobPayload payload = (FromJSON payload, ToJSON payload)

-- | Defines the deduplication strategy for a job upon insertion.
data DedupKey
  = -- | On conflict, keep the existing job. Corresponds to @ON CONFLICT DO NOTHING@.
    IgnoreDuplicate Text
  | -- | On conflict, replace the existing job with the new one. Corresponds to @ON CONFLICT DO UPDATE@.
    ReplaceDuplicate Text
  deriving stock (DedupKey -> DedupKey -> Bool
(DedupKey -> DedupKey -> Bool)
-> (DedupKey -> DedupKey -> Bool) -> Eq DedupKey
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: DedupKey -> DedupKey -> Bool
== :: DedupKey -> DedupKey -> Bool
$c/= :: DedupKey -> DedupKey -> Bool
/= :: DedupKey -> DedupKey -> Bool
Eq, (forall x. DedupKey -> Rep DedupKey x)
-> (forall x. Rep DedupKey x -> DedupKey) -> Generic DedupKey
forall x. Rep DedupKey x -> DedupKey
forall x. DedupKey -> Rep DedupKey x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cfrom :: forall x. DedupKey -> Rep DedupKey x
from :: forall x. DedupKey -> Rep DedupKey x
$cto :: forall x. Rep DedupKey x -> DedupKey
to :: forall x. Rep DedupKey x -> DedupKey
Generic, Int -> DedupKey -> ShowS
[DedupKey] -> ShowS
DedupKey -> String
(Int -> DedupKey -> ShowS)
-> (DedupKey -> String) -> ([DedupKey] -> ShowS) -> Show DedupKey
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> DedupKey -> ShowS
showsPrec :: Int -> DedupKey -> ShowS
$cshow :: DedupKey -> String
show :: DedupKey -> String
$cshowList :: [DedupKey] -> ShowS
showList :: [DedupKey] -> ShowS
Show)

instance ToJSON DedupKey where
  toJSON :: DedupKey -> Value
toJSON (IgnoreDuplicate Text
k) = [Pair] -> Value
object [Key
"key" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Text
k, Key
"strategy" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (Text
"ignore" :: Text)]
  toJSON (ReplaceDuplicate Text
k) = [Pair] -> Value
object [Key
"key" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= Text
k, Key
"strategy" Key -> Text -> Pair
forall v. ToJSON v => Key -> v -> Pair
forall e kv v. (KeyValue e kv, ToJSON v) => Key -> v -> kv
.= (Text
"replace" :: Text)]

instance FromJSON DedupKey where
  parseJSON :: Value -> Parser DedupKey
parseJSON = String -> (Object -> Parser DedupKey) -> Value -> Parser DedupKey
forall a. String -> (Object -> Parser a) -> Value -> Parser a
withObject String
"DedupKey" ((Object -> Parser DedupKey) -> Value -> Parser DedupKey)
-> (Object -> Parser DedupKey) -> Value -> Parser DedupKey
forall a b. (a -> b) -> a -> b
$ \Object
v -> do
    key <- Object
v Object -> Key -> Parser Text
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"key"
    strategy <- v .: "strategy" :: Parser Text
    case strategy of
      Text
"ignore" -> DedupKey -> Parser DedupKey
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DedupKey -> Parser DedupKey) -> DedupKey -> Parser DedupKey
forall a b. (a -> b) -> a -> b
$ Text -> DedupKey
IgnoreDuplicate Text
key
      Text
"replace" -> DedupKey -> Parser DedupKey
forall a. a -> Parser a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DedupKey -> Parser DedupKey) -> DedupKey -> Parser DedupKey
forall a b. (a -> b) -> a -> b
$ Text -> DedupKey
ReplaceDuplicate Text
key
      Text
_ -> String -> Parser DedupKey
forall a. String -> Parser a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser DedupKey) -> String -> Parser DedupKey
forall a b. (a -> b) -> a -> b
$ String
"Unknown dedup strategy: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show Text
strategy

-- | A set of callbacks invoked at key points in the job lifecycle.
--
-- Use these hooks to integrate with metrics, logging, or tracing systems.
-- Hooks are exception-safe; any exception thrown within a hook is caught
-- and ignored to prevent crashing the worker.
data ObservabilityHooks m payload = ObservabilityHooks
  { forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> UTCTime -> m ()
onJobClaimed
      :: (JobPayload payload)
      => JobRead payload
      -> UTCTime
      -- \^ Claim time.
      -> m ()
  -- ^ Called immediately after a job is claimed by a worker.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> UTCTime -> UTCTime -> m ()
onJobSuccess
      :: (JobPayload payload)
      => JobRead payload
      -> UTCTime
      -- \^ Start time.
      -> UTCTime
      -- \^ End time.
      -> m ()
  -- ^ Called after a job handler succeeds. Use @diffUTCTime@ on the timestamps
  -- to calculate job duration.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> Text -> UTCTime -> UTCTime -> m ()
onJobFailure
      :: (JobPayload payload)
      => JobRead payload
      -> Text
      -- \^ Error message.
      -> UTCTime
      -- \^ Start time.
      -> UTCTime
      -- \^ End time.
      -> m ()
  -- ^ Called after a job handler fails. Use @diffUTCTime@ on the timestamps
  -- to calculate job duration.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => JobRead payload -> NominalDiffTime -> m ()
onJobRetry
      :: (JobPayload payload)
      => JobRead payload
      -> NominalDiffTime
      -- \^ The backoff delay until the job becomes visible again.
      -> m ()
  -- ^ Called when a failed job is successfully scheduled for retry.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload => Text -> JobRead payload -> m ()
onJobFailedAndMovedToDLQ
      :: (JobPayload payload)
      => Text
      -- \^ Error message.
      -> JobRead payload
      -> m ()
  -- ^ Called when a job is successfully moved to the dead-letter queue.
  , forall (m :: * -> *) payload.
ObservabilityHooks m payload
-> JobPayload payload =>
   JobRead payload -> UTCTime -> UTCTime -> m ()
onJobHeartbeat
      :: (JobPayload payload)
      => JobRead payload
      -> UTCTime
      -- \^ Current time.
      -> UTCTime
      -- \^ Start time.
      -> m ()
  -- ^ Called periodically for a running job.
  }

-- | A default set of t'ObservabilityHooks' that do nothing.
--
-- This can be used as a starting point for your own custom hooks.
--
-- @
-- myHooks = defaultObservabilityHooks
--   { onJobSuccess = \\job startTime endTime -> do
--       let duration = diffUTCTime endTime startTime
--       logInfo $ "Job " <> show (primaryKey job) <> " succeeded in " <> show duration
--   }
-- @
defaultObservabilityHooks :: (Applicative m) => ObservabilityHooks m payload
defaultObservabilityHooks :: forall (m :: * -> *) payload.
Applicative m =>
ObservabilityHooks m payload
defaultObservabilityHooks =
  ObservabilityHooks
    { onJobClaimed :: JobPayload payload => JobRead payload -> UTCTime -> m ()
onJobClaimed = \JobRead payload
_ UTCTime
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobSuccess :: JobPayload payload => JobRead payload -> UTCTime -> UTCTime -> m ()
onJobSuccess = \JobRead payload
_ UTCTime
_ UTCTime
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobFailure :: JobPayload payload =>
JobRead payload -> Text -> UTCTime -> UTCTime -> m ()
onJobFailure = \JobRead payload
_ Text
_ UTCTime
_ UTCTime
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobRetry :: JobPayload payload => JobRead payload -> NominalDiffTime -> m ()
onJobRetry = \JobRead payload
_ NominalDiffTime
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobFailedAndMovedToDLQ :: JobPayload payload => Text -> JobRead payload -> m ()
onJobFailedAndMovedToDLQ = \Text
_ JobRead payload
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , onJobHeartbeat :: JobPayload payload => JobRead payload -> UTCTime -> UTCTime -> m ()
onJobHeartbeat = \JobRead payload
_ UTCTime
_ UTCTime
_ -> () -> m ()
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    }