{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE OverloadedStrings #-}

-- | Compositional DSL for building multi-level job trees.
--
-- A 'JobTree' describes a hierarchy of parent-child jobs that is inserted
-- atomically (in a single transaction). Trees can be arbitrarily deep.
--
-- Children run immediately; the finalizer (parent) is suspended until all
-- children complete, then becomes claimable for a completion round.
-- Child results are auto-stored in @{queue}_results@ and passed to the
-- finalizer handler as @Map Int64 (Either Text result)@.
--
-- __Important:__ The results table is a transient coordination buffer, not
-- persistent storage. When the finalizer is acked (deleted), @ON DELETE CASCADE@
-- wipes all associated result rows. If you need results to survive beyond the
-- job tree's lifetime, persist them in your finalizer handler (e.g. write to
-- your own database table, publish to a message broker, etc.).
--
-- @
-- import Arbiter.Core.JobTree
--
-- -- Flat (leaf-only children):
-- myTree = defaultJob (CompileReport "q4")
--   \<~~ (defaultJob (RenderChart "sales") :| [defaultJob (RenderChart "growth")])
--
-- -- Nested:
-- myTree = rollup (defaultJob root)
--   ( (defaultJob mid \<~~ (defaultJob leaf1 :| [defaultJob leaf2]))
--   :| [leaf (defaultJob leaf3)]
--   )
--
-- result <- insertJobTree "arbiter" "reports" myTree
-- @
module Arbiter.Core.JobTree
  ( -- * Tree type
    JobTree

    -- * Smart constructors
  , leaf
  , rollup

    -- * Operators
  , (<~~)

    -- * Interpreter
  , insertJobTree
  ) where

import Control.Exception (Exception)
import Control.Monad (when)
import Data.Either (partitionEithers)
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty (..))
import Data.List.NonEmpty qualified as NE
import Data.Text (Text)
import Data.Typeable (Typeable)
import UnliftIO (MonadUnliftIO)
import UnliftIO.Exception qualified as UE

import Arbiter.Core.Job.Types
  ( Job (..)
  , JobPayload
  , JobRead
  , JobWrite
  )
import Arbiter.Core.MonadArbiter (MonadArbiter (..))
import Arbiter.Core.Operations qualified as Ops

-- | Internal exception used to abort a tree insertion transaction.
-- Not exported — caught and converted to @Left@ by 'insertJobTree'.
newtype TreeInsertFailed = TreeInsertFailed Text
  deriving stock (Int -> TreeInsertFailed -> ShowS
[TreeInsertFailed] -> ShowS
TreeInsertFailed -> String
(Int -> TreeInsertFailed -> ShowS)
-> (TreeInsertFailed -> String)
-> ([TreeInsertFailed] -> ShowS)
-> Show TreeInsertFailed
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TreeInsertFailed -> ShowS
showsPrec :: Int -> TreeInsertFailed -> ShowS
$cshow :: TreeInsertFailed -> String
show :: TreeInsertFailed -> String
$cshowList :: [TreeInsertFailed] -> ShowS
showList :: [TreeInsertFailed] -> ShowS
Show, Typeable)
  deriving anyclass (Show TreeInsertFailed
Typeable TreeInsertFailed
(Typeable TreeInsertFailed, Show TreeInsertFailed) =>
(TreeInsertFailed -> SomeException)
-> (SomeException -> Maybe TreeInsertFailed)
-> (TreeInsertFailed -> String)
-> (TreeInsertFailed -> Bool)
-> Exception TreeInsertFailed
SomeException -> Maybe TreeInsertFailed
TreeInsertFailed -> Bool
TreeInsertFailed -> String
TreeInsertFailed -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> (e -> Bool)
-> Exception e
$ctoException :: TreeInsertFailed -> SomeException
toException :: TreeInsertFailed -> SomeException
$cfromException :: SomeException -> Maybe TreeInsertFailed
fromException :: SomeException -> Maybe TreeInsertFailed
$cdisplayException :: TreeInsertFailed -> String
displayException :: TreeInsertFailed -> String
$cbacktraceDesired :: TreeInsertFailed -> Bool
backtraceDesired :: TreeInsertFailed -> Bool
Exception)

-- | A tree of jobs. Leaves are single jobs; finalizers are parents with
-- children that run immediately while the parent waits for completion.
data JobTree payload
  = -- | A single job with no children.
    Leaf (JobWrite payload)
  | -- | A finalizer job with children. The finalizer is suspended until
    -- all children complete, then it becomes claimable for a completion round.
    Finalizer (JobWrite payload) (NonEmpty (JobTree payload))

-- ---------------------------------------------------------------------------
-- Smart constructors
-- ---------------------------------------------------------------------------

-- | A single job (leaf node) — a terminal node with no children.
leaf :: JobWrite payload -> JobTree payload
leaf :: forall payload. JobWrite payload -> JobTree payload
leaf = JobWrite payload -> JobTree payload
forall payload. JobWrite payload -> JobTree payload
Leaf

-- | Finalizer that runs after all children finish.
--
-- For nested rollups, intermediate finalizers must explicitly return the
-- merged value to propagate results upward. This is not automatic. When no
-- children remain in the main queue (all completed or DLQ'd), the finalizer
-- wakes.
--
-- @
-- rollup (defaultJob root)
--   ( leaf (defaultJob leaf1)
--   :| [leaf (defaultJob leaf2)]
--   )
-- @
rollup :: JobWrite payload -> NonEmpty (JobTree payload) -> JobTree payload
rollup :: forall payload.
JobWrite payload -> NonEmpty (JobTree payload) -> JobTree payload
rollup JobWrite payload
parent NonEmpty (JobTree payload)
children =
  JobWrite payload -> NonEmpty (JobTree payload) -> JobTree payload
forall payload.
JobWrite payload -> NonEmpty (JobTree payload) -> JobTree payload
Finalizer (JobWrite payload
parent {isRollup = True}) NonEmpty (JobTree payload)
children

-- ---------------------------------------------------------------------------
-- Operators
-- ---------------------------------------------------------------------------

-- | Infix 'rollup' for leaf-only children.
--
-- @
-- defaultJob reducer \<~~ (defaultJob mapper1 :| [defaultJob mapper2])
-- @
infixr 6 <~~

(<~~) :: JobWrite payload -> NonEmpty (JobWrite payload) -> JobTree payload
JobWrite payload
parent <~~ :: forall payload.
JobWrite payload -> NonEmpty (JobWrite payload) -> JobTree payload
<~~ NonEmpty (JobWrite payload)
children = JobWrite payload -> NonEmpty (JobTree payload) -> JobTree payload
forall payload.
JobWrite payload -> NonEmpty (JobTree payload) -> JobTree payload
Finalizer (JobWrite payload
parent {isRollup = True}) ((JobWrite payload -> JobTree payload)
-> NonEmpty (JobWrite payload) -> NonEmpty (JobTree payload)
forall a b. (a -> b) -> NonEmpty a -> NonEmpty b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap JobWrite payload -> JobTree payload
forall payload. JobWrite payload -> JobTree payload
Leaf NonEmpty (JobWrite payload)
children)

-- ---------------------------------------------------------------------------
-- Interpreter
-- ---------------------------------------------------------------------------

-- | Insert a 'JobTree' atomically in a single transaction.
--
-- Returns a flat 'NonEmpty' list of all inserted jobs (pre-order: root first).
-- Returns @Left errMsg@ if any insertion fails (e.g. dedup conflict on root,
-- phantom parent). The entire transaction is rolled back on failure — no
-- partial trees are committed.
insertJobTree
  :: forall m payload
   . (JobPayload payload, MonadArbiter m, MonadUnliftIO m)
  => Text
  -- ^ PostgreSQL schema name
  -> Text
  -- ^ Table name
  -> JobTree payload
  -> m (Either Text (NonEmpty (JobRead payload)))
insertJobTree :: forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m, MonadUnliftIO m) =>
Text
-> Text
-> JobTree payload
-> m (Either Text (NonEmpty (JobRead payload)))
insertJobTree Text
schemaName Text
tableName JobTree payload
tree = do
  result <- m (NonEmpty (JobRead payload))
-> m (Either TreeInsertFailed (NonEmpty (JobRead payload)))
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> m (Either e a)
UE.try (m (NonEmpty (JobRead payload))
 -> m (Either TreeInsertFailed (NonEmpty (JobRead payload))))
-> m (NonEmpty (JobRead payload))
-> m (Either TreeInsertFailed (NonEmpty (JobRead payload)))
forall a b. (a -> b) -> a -> b
$ m (NonEmpty (JobRead payload)) -> m (NonEmpty (JobRead payload))
forall a. m a -> m a
forall (m :: * -> *) a. MonadArbiter m => m a -> m a
withDbTransaction (m (NonEmpty (JobRead payload)) -> m (NonEmpty (JobRead payload)))
-> m (NonEmpty (JobRead payload)) -> m (NonEmpty (JobRead payload))
forall a b. (a -> b) -> a -> b
$ Maybe Int64
-> Bool -> JobTree payload -> m (NonEmpty (JobRead payload))
go Maybe Int64
forall a. Maybe a
Nothing (JobTree payload -> Bool
rootSuspended JobTree payload
tree) JobTree payload
tree
  pure $ case result of
    Left (TreeInsertFailed Text
msg) -> Text -> Either Text (NonEmpty (JobRead payload))
forall a b. a -> Either a b
Left Text
msg
    Right NonEmpty (JobRead payload)
jobs -> NonEmpty (JobRead payload)
-> Either Text (NonEmpty (JobRead payload))
forall a b. b -> Either a b
Right NonEmpty (JobRead payload)
jobs
  where
    -- Finalizer roots are suspended (waiting for children to complete).
    rootSuspended :: JobTree payload -> Bool
    rootSuspended :: JobTree payload -> Bool
rootSuspended (Finalizer JobWrite payload
_ NonEmpty (JobTree payload)
_) = Bool
True
    rootSuspended JobTree payload
_ = Bool
False

    go
      :: Maybe Int64
      -- \^ Parent primary key (Nothing for root)
      -> Bool
      -- \^ Whether this node should be inserted suspended
      -> JobTree payload
      -> m (NonEmpty (JobRead payload))
    go :: Maybe Int64
-> Bool -> JobTree payload -> m (NonEmpty (JobRead payload))
go Maybe Int64
mParentId Bool
susp (Leaf JobWrite payload
jobW) = do
      let jobW' :: JobWrite payload
jobW' = JobWrite payload
jobW {parentId = mParentId, suspended = susp}
      mInserted <- Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
Ops.insertJobUnsafe Text
schemaName Text
tableName JobWrite payload
jobW'
      case mInserted of
        Maybe (JobRead payload)
Nothing -> TreeInsertFailed -> m (NonEmpty (JobRead payload))
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
UE.throwIO (TreeInsertFailed -> m (NonEmpty (JobRead payload)))
-> TreeInsertFailed -> m (NonEmpty (JobRead payload))
forall a b. (a -> b) -> a -> b
$ Text -> TreeInsertFailed
TreeInsertFailed Text
"insertJobTree: job insert failed (dedup conflict or invalid parent)"
        Just JobRead payload
inserted -> NonEmpty (JobRead payload) -> m (NonEmpty (JobRead payload))
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (JobRead payload
inserted JobRead payload -> [JobRead payload] -> NonEmpty (JobRead payload)
forall a. a -> [a] -> NonEmpty a
:| [])
    go Maybe Int64
mParentId Bool
susp (Finalizer JobWrite payload
jobW NonEmpty (JobTree payload)
children) = do
      let jobW' :: JobWrite payload
jobW' = JobWrite payload
jobW {parentId = mParentId, suspended = susp}
      mInserted <- Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
forall (m :: * -> *) payload.
(JobPayload payload, MonadArbiter m) =>
Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
Ops.insertJobUnsafe Text
schemaName Text
tableName JobWrite payload
jobW'
      case mInserted of
        Maybe (JobRead payload)
Nothing -> TreeInsertFailed -> m (NonEmpty (JobRead payload))
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
UE.throwIO (TreeInsertFailed -> m (NonEmpty (JobRead payload)))
-> TreeInsertFailed -> m (NonEmpty (JobRead payload))
forall a b. (a -> b) -> a -> b
$ Text -> TreeInsertFailed
TreeInsertFailed Text
"insertJobTree: parent insert failed (dedup conflict or invalid parent)"
        Just JobRead payload
inserted -> do
          let parentPK :: Int64
parentPK = JobRead payload -> Int64
forall payload key q insertedAt.
Job payload key q insertedAt -> key
primaryKey JobRead payload
inserted
              ([JobWrite payload]
leaves, [JobTree payload]
subTrees) =
                [Either (JobWrite payload) (JobTree payload)]
-> ([JobWrite payload], [JobTree payload])
forall a b. [Either a b] -> ([a], [b])
partitionEithers
                  [case JobTree payload
c of Leaf JobWrite payload
j -> JobWrite payload -> Either (JobWrite payload) (JobTree payload)
forall a b. a -> Either a b
Left JobWrite payload
j; JobTree payload
t -> JobTree payload -> Either (JobWrite payload) (JobTree payload)
forall a b. b -> Either a b
Right JobTree payload
t | JobTree payload
c <- NonEmpty (JobTree payload) -> [JobTree payload]
forall a. NonEmpty a -> [a]
NE.toList NonEmpty (JobTree payload)
children]

          -- Recursively insert sub-finalizers first (preserves pre-order)
          subTreeJobs <- (JobTree payload -> m (NonEmpty (JobRead payload)))
-> [JobTree payload] -> m [NonEmpty (JobRead payload)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM (\JobTree payload
child -> Maybe Int64
-> Bool -> JobTree payload -> m (NonEmpty (JobRead payload))
go (Int64 -> Maybe Int64
forall a. a -> Maybe a
Just Int64
parentPK) Bool
True JobTree payload
child) [JobTree payload]
subTrees

          -- Batch-insert all leaf children in one roundtrip
          let leafWrites = [JobWrite payload
j {parentId = Just parentPK, suspended = False} | JobWrite payload
j <- [JobWrite payload]
leaves]
          leafJobs <- Ops.insertJobsBatch schemaName tableName leafWrites
          when (length leafJobs /= length leaves) $
            UE.throwIO $
              TreeInsertFailed "insertJobTree: leaf batch insert had dedup conflicts"

          pure (inserted :| concatMap NE.toList subTreeJobs <> leafJobs)