arbiter-core-0.1.0.0: Core types and logic for PostgreSQL-backed job queue
Safe HaskellNone
LanguageGHC2024

Arbiter.Core.JobTree

Description

Compositional DSL for building multi-level job trees.

A JobTree describes a hierarchy of parent-child jobs that is inserted atomically (in a single transaction). Trees can be arbitrarily deep.

Children run immediately; the finalizer (parent) is suspended until all children complete, then becomes claimable for a completion round. Child results are auto-stored in {queue}_results and passed to the finalizer handler as Map Int64 (Either Text result).

Important: The results table is a transient coordination buffer, not persistent storage. When the finalizer is acked (deleted), ON DELETE CASCADE wipes all associated result rows. If you need results to survive beyond the job tree's lifetime, persist them in your finalizer handler (e.g. write to your own database table, publish to a message broker, etc.).

import Arbiter.Core.JobTree

-- Flat (leaf-only children):
myTree = defaultJob (CompileReport "q4")
  <~~ (defaultJob (RenderChart "sales") :| [defaultJob (RenderChart "growth")])

-- Nested:
myTree = rollup (defaultJob root)
  ( (defaultJob mid <~~ (defaultJob leaf1 :| [defaultJob leaf2]))
  :| [leaf (defaultJob leaf3)]
  )

result <- insertJobTree "arbiter" "reports" myTree
Synopsis

Tree type

data JobTree payload Source #

A tree of jobs. Leaves are single jobs; finalizers are parents with children that run immediately while the parent waits for completion.

Smart constructors

leaf :: JobWrite payload -> JobTree payload Source #

A single job (leaf node) — a terminal node with no children.

rollup :: JobWrite payload -> NonEmpty (JobTree payload) -> JobTree payload Source #

Finalizer that runs after all children finish.

For nested rollups, intermediate finalizers must explicitly return the merged value to propagate results upward. This is not automatic. When no children remain in the main queue (all completed or DLQ'd), the finalizer wakes.

rollup (defaultJob root)
  ( leaf (defaultJob leaf1)
  :| [leaf (defaultJob leaf2)]
  )

Operators

(<~~) :: JobWrite payload -> NonEmpty (JobWrite payload) -> JobTree payload infixr 6 Source #

Infix rollup for leaf-only children.

defaultJob reducer <~~ (defaultJob mapper1 :| [defaultJob mapper2])

Interpreter

insertJobTree Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m, MonadUnliftIO m) 
=> Text

PostgreSQL schema name

-> Text

Table name

-> JobTree payload 
-> m (Either Text (NonEmpty (JobRead payload))) 

Insert a JobTree atomically in a single transaction.

Returns a flat NonEmpty list of all inserted jobs (pre-order: root first). Returns Left errMsg if any insertion fails (e.g. dedup conflict on root, phantom parent). The entire transaction is rolled back on failure — no partial trees are committed.