arbiter-core-0.1.0.0: Core types and logic for PostgreSQL-backed job queue
Safe HaskellNone
LanguageGHC2024

Arbiter.Core.Operations

Synopsis

Job Insertion

insertJob Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name

-> Text

Table name

-> JobWrite payload 
-> m (Maybe (JobRead payload)) 

Inserts a job into the queue.

Returns the inserted job with database-generated fields populated.

Ordering and concurrency

Jobs are claimed in ID order (lowest ID first within a priority level). Concurrent inserts to the same group are serialized at the trigger level: the AFTER INSERT trigger's ON CONFLICT DO UPDATE on the groups table takes a row-level lock, preventing out-of-order commits within a group.

Deduplication

  • Nothing: Always insert (dedup_key is NULL)
  • Just (IgnoreDuplicate k): Skip if dedup_key exists, return Nothing
  • Just (ReplaceDuplicate k): Replace existing job unless actively in-flight on its first attempt. Returns Nothing only when attempts > 0, not_visible_until > NOW(), and last_error IS NULL (i.e., the job is being processed for the first time). Jobs that have previously failed (last_error IS NOT NULL) can always be replaced, even if currently in-flight on a retry attempt — this is by design, so that a fresh replacement takes priority over a failing job.

parentId is validated: if set to a non-existent job ID, returns Nothing. For building parent-child trees, prefer insertJobTree which handles parentId, isRollup, and suspended atomically.

insertJobUnsafe Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name

-> Text

Table name

-> JobWrite payload 
-> m (Maybe (JobRead payload)) 

Insert a job without validating that the parent exists.

This is an internal fast path for callers that already guarantee the parent is present (e.g. insertJobTree). External callers should use insertJob which validates the parent first.

insertJobsBatch Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> [JobWrite payload]

Jobs to insert

-> m [JobRead payload] 

Insert multiple jobs in a single batch operation.

Supports dedup keys: IgnoreDuplicate jobs are silently skipped on conflict, ReplaceDuplicate jobs update the existing row (unless actively in-flight). Only actually inserted or replaced jobs are returned.

If multiple jobs in the batch share the same dedup key, only the last occurrence is kept (last writer wins), consistent with sequential insertJob calls.

Does not validate parentId — callers must ensure referenced parents exist. For parent-child trees, use insertJobTree instead.

insertJobsBatch_ :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> [JobWrite payload] -> m Int64 Source #

insertResult Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name

-> Text

Table name

-> Int64

Parent job ID

-> Int64

Child job ID

-> Value

Encoded result value

-> m Int64 

Insert a child's result into the results table.

Each child gets its own row keyed by (parent_id, child_id). The FK ON DELETE CASCADE ensures cleanup when the parent is acked.

Returns the number of rows inserted (1 on success).

getResultsByParent Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name

-> Text

Table name

-> Int64

Parent job ID

-> m (Map Int64 Value) 

Get all child results for a parent from the results table.

Returns a Map from child ID to the result Value.

getDLQChildErrorsByParent Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name

-> Text

Table name

-> Int64

Parent job ID

-> m (Map Int64 Text) 

Get DLQ child errors for a parent.

Returns a Map from child job ID to the last error message.

persistParentState Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name

-> Text

Table name

-> Int64

Job ID

-> Value

The pre-populated parent state to persist

-> m Int64 

Snapshot results into parent_state before DLQ move.

claimNextVisibleJobs Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Int

Maximum number of jobs to claim

-> NominalDiffTime

Visibility timeout in seconds

-> m [JobRead payload] 

Claim up to maxJobs visible jobs, respecting head-of-line blocking (one job per group). Uses a single-CTE claim with the groups table.

claimNextVisibleJobsBatched Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Int

Batch size per group (how many jobs to claim from each group)

-> Int

Maximum number of groups/batches to claim

-> NominalDiffTime

Visibility timeout in seconds

-> m [NonEmpty (JobRead payload)] 

Batched variant of claimNextVisibleJobs — claims up to batchSize jobs per group, across up to maxBatches groups.

ackJob Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> JobRead payload 
-> m Int64 

Acknowledge a job as completed (smart ack).

Deletes standalone jobs; suspends parents waiting for children; wakes parents when the last sibling completes. Uses an advisory lock for child jobs to serialize with concurrent sibling acks.

Returns 1 on success, 0 if the job was already gone.

ackJobsBatch Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> [JobRead payload] 
-> m Int64 

Acknowledge multiple jobs as completed.

Iterates calling ackJob so every job gets smart-ack treatment (parent suspend/wake logic). Returns the total number of rows affected.

ackJobsBulk Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> [JobRead payload] 
-> m Int64 

Bulk ack for standalone jobs (no parent, no tree logic).

A single DELETE with unnest — one round trip for N jobs. Only valid for jobs claimed in BatchedJobsMode, which guarantees parent_id IS NULL AND parent_state IS NULL.

Returns the number of rows deleted.

setVisibilityTimeout Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> NominalDiffTime

Timeout in seconds

-> JobRead payload 
-> m Int64

Returns the number of rows updated (0 if job was reclaimed by another worker)

Set the visibility timeout for a job

setVisibilityTimeoutBatch Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> NominalDiffTime

Timeout in seconds

-> [JobRead payload] 
-> m [VisibilityUpdateInfo]

Returns a list of status records, one for each job that was targeted.

Batch variant of setVisibilityTimeout. Returns per-job status (success, acked, or stolen).

data VisibilityUpdateInfo Source #

Detailed information about the result of a visibility update operation for a single job.

Constructors

VisibilityUpdateInfo 

Fields

Instances

Instances details
Generic VisibilityUpdateInfo Source # 
Instance details

Defined in Arbiter.Core.Operations

Associated Types

type Rep VisibilityUpdateInfo 
Instance details

Defined in Arbiter.Core.Operations

type Rep VisibilityUpdateInfo = D1 ('MetaData "VisibilityUpdateInfo" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "VisibilityUpdateInfo" 'PrefixI 'True) (S1 ('MetaSel ('Just "vuiJobId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: (S1 ('MetaSel ('Just "vuiWasUpdated") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool) :*: S1 ('MetaSel ('Just "vuiCurrentDbAttempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int32)))))
Show VisibilityUpdateInfo Source # 
Instance details

Defined in Arbiter.Core.Operations

Eq VisibilityUpdateInfo Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep VisibilityUpdateInfo Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep VisibilityUpdateInfo = D1 ('MetaData "VisibilityUpdateInfo" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "VisibilityUpdateInfo" 'PrefixI 'True) (S1 ('MetaSel ('Just "vuiJobId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: (S1 ('MetaSel ('Just "vuiWasUpdated") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool) :*: S1 ('MetaSel ('Just "vuiCurrentDbAttempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int32)))))

updateJobForRetry Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> NominalDiffTime

Backoff timeout in seconds

-> Text

Error message

-> JobRead payload 
-> m Int64 

Update a job for retry with backoff and error tracking

Returns the number of rows updated (0 if job was already claimed by another worker).

moveToDLQ Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Text

Error message (the final error that caused the DLQ move)

-> JobRead payload 
-> m Int64 

Move a job to the DLQ. Cascades descendants for rollup parents. Wakes the parent if this was a child job.

Returns 0 if the job was already claimed by another worker.

moveToDLQBatch Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> [(JobRead payload, Text)]

List of (job, error message) pairs

-> m Int64 

Moves multiple jobs from the main queue to the dead-letter queue.

Each job is moved with its own error message. Jobs that have already been claimed by another worker (attempts mismatch) are silently skipped.

Returns the total number of jobs moved to DLQ.

retryFromDLQ Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Int64

DLQ job ID

-> m (Maybe (JobRead payload)) 

Retry a job from the DLQ (re-inserts with attempts reset to 0). The dedup_key is NOT restored — retried jobs won't conflict with new dedup inserts.

dlqJobExists :: MonadArbiter m => Text -> Text -> Int64 -> m Bool Source #

Check whether a DLQ job exists by ID.

listDLQJobs Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Int

Limit

-> Int

Offset

-> m [DLQJob payload] 

List jobs in the dead letter queue

Returns jobs ordered by failed_at (most recent first).

listDLQJobsByParent Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name

-> Text

Table name

-> Int64

Parent job ID

-> Int

Limit

-> Int

Offset

-> m [DLQJob payload] 

List DLQ jobs filtered by parent_id.

Returns jobs ordered by failed_at (most recent first).

countDLQJobsByParent :: MonadArbiter m => Text -> Text -> Int64 -> m Int64 Source #

Count DLQ jobs matching a parent_id.

deleteDLQJob Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Int64

DLQ job ID

-> m Int64 

Delete a job from the dead letter queue.

This permanently removes the job from the DLQ without retrying it. If the deleted job was a child, tries to resume the parent when no siblings remain in the main queue.

deleteDLQJobsBatch Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> [Int64]

DLQ job IDs

-> m Int64 

Delete multiple jobs from the dead letter queue.

If any deleted jobs were children, tries to resume their parents when no siblings remain. Parent IDs are deduplicated and sorted to prevent deadlocks between concurrent batch deletes.

Returns the total number of DLQ jobs deleted.

Filtered Query Operations

data JobFilter Source #

Filter predicates for job listing queries.

Instances

Instances details
Show JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

Eq JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

listJobsFiltered Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name

-> Text

Table name

-> [JobFilter]

Composable filters

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

List jobs with composable filters.

Returns jobs ordered by ID (descending, newest first).

countJobsFiltered Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name

-> Text

Table name

-> [JobFilter]

Composable filters

-> m Int64 

Count jobs with composable filters.

listDLQFiltered Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name

-> Text

Table name

-> [JobFilter]

Composable filters

-> Int

Limit

-> Int

Offset

-> m [DLQJob payload] 

List DLQ jobs with composable filters.

Returns jobs ordered by failed_at (most recent first).

countDLQFiltered Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name

-> Text

Table name

-> [JobFilter]

Composable filters

-> m Int64 

Count DLQ jobs with composable filters.

Admin Operations

listJobs Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

List jobs in the queue with pagination.

Returns jobs ordered by ID (descending).

getJobById Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Int64

Job ID

-> m (Maybe (JobRead payload)) 

Get a single job by its ID

getJobsByGroup Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Text

Group key to filter by

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

Get all jobs for a specific group key

Useful for debugging or admin UI to see all jobs for a specific entity.

getInFlightJobs Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

Get all in-flight jobs (currently being processed by workers)

A job is considered in-flight if it has been claimed (attempts > 0) and its visibility timeout hasn't expired yet.

Useful for monitoring active work and detecting stuck jobs.

cancelJob Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> Text

Table name

-> Int64

Job ID

-> m Int64 

Cancels (deletes) a job by ID.

Returns 0 if the job has children — use cancelJobCascade to delete a parent and all its descendants.

If the deleted job was a child and no siblings remain, the parent is resumed for its completion round.

cancelJobsBatch Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> Text

Table name

-> [Int64]

Job IDs

-> m Int64 

Cancels (deletes) multiple jobs by ID.

Each job gets full wake-parent logic (same as cancelJob). Wrapped in a transaction so that cancelling multiple children of the same parent sees a consistent view — the last cancel's CTE correctly detects no remaining siblings and resumes the parent. Returns the total number of jobs deleted.

promoteJob Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> Int64

Job ID

-> m Int64

Number of rows updated

Promote a delayed or retrying job to be immediately visible.

Refuses in-flight jobs (attempts > 0 with no last_error). Returns 1 on success, 0 if not found, already visible, or in-flight.

data QueueStats Source #

Statistics about the job queue

Constructors

QueueStats 

Fields

Instances

Instances details
FromJSON QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Methods

parseJSON :: Value -> Parser QueueStats

parseJSONList :: Value -> Parser [QueueStats]

omittedField :: Maybe QueueStats

ToJSON QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Methods

toJSON :: QueueStats -> Value

toEncoding :: QueueStats -> Encoding

toJSONList :: [QueueStats] -> Value

toEncodingList :: [QueueStats] -> Encoding

omitField :: QueueStats -> Bool

Generic QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Associated Types

type Rep QueueStats 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double)))))
Show QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Eq QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double)))))

getQueueStats Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name (e.g., "arbiter")

-> Text

Table name (e.g., "email_jobs")

-> m QueueStats 

Get statistics about the job queue

Count Operations

countJobs :: MonadArbiter m => Text -> Text -> m Int64 Source #

Count all jobs in a table

countJobsByGroup :: MonadArbiter m => Text -> Text -> Text -> m Int64 Source #

Count jobs matching a group key

countInFlightJobs :: MonadArbiter m => Text -> Text -> m Int64 Source #

Count in-flight jobs

countDLQJobs :: MonadArbiter m => Text -> Text -> m Int64 Source #

Count DLQ jobs

Parent-Child Operations

getJobsByParent Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

Schema name

-> Text

Table name

-> Int64

Parent ID

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

List jobs filtered by parent_id with pagination.

countJobsByParent :: MonadArbiter m => Text -> Text -> Int64 -> m Int64 Source #

Count jobs matching a parent_id.

countChildrenBatch :: MonadArbiter m => Text -> Text -> [Int64] -> m (Map Int64 (Int64, Int64)) Source #

Count children for a batch of potential parent IDs.

Returns a Map from parent_id to (total, paused) counts (only non-zero entries).

countDLQChildrenBatch :: MonadArbiter m => Text -> Text -> [Int64] -> m (Map Int64 Int64) Source #

Count children in the DLQ for a batch of potential parent IDs.

Returns a Map from parent_id to DLQ child count (only non-zero entries).

Job Dependency Operations

pauseChildren Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> Text

Table name

-> Int64

Parent job ID

-> m Int64 

Pause all visible children of a parent job.

Sets suspended = TRUE for claimable children, making them unclaimable. In-flight children (currently being processed by workers) are left alone so their visibility timeout can expire normally if the worker crashes. Returns the number of children paused.

resumeChildren Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> Text

Table name

-> Int64

Parent job ID

-> m Int64 

Resume all suspended children of a parent job.

Only affects children whose suspended = TRUE. Returns the number of children resumed.

cancelJobCascade Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> Text

Table name

-> Int64

Root job ID

-> m Int64 

Cancel a job and all its descendants recursively.

Uses a recursive CTE to find all descendants and deletes them all. If the root job itself is a child, resumes its parent for a completion round. Returns the total number of jobs deleted (parent + all descendants).

cancelJobTree Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> Text

Table name

-> Int64

Any job ID within the tree

-> m Int64 

Cancel an entire job tree by walking up from any node to the root, then cascade-deleting everything from the root down.

Unlike cancelJobCascade, this does NOT call tryResumeParent — the root by definition has no parent. Returns the total number of jobs deleted.

Suspend/Resume Operations

suspendJob Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> Text

Table name

-> Int64

Job ID

-> m Int64 

Suspend a job, making it unclaimable.

Only suspends non-in-flight jobs (not currently being processed by workers). Returns the number of rows updated (0 if job doesn't exist, is in-flight, or already suspended).

resumeJob Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> Text

Table name

-> Int64

Job ID

-> m Int64 

Resume a suspended job, making it claimable again.

Returns the number of rows updated (0 if job doesn't exist or isn't suspended).

Groups Table Operations

refreshGroups Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name

-> Text

Table name

-> Int

Minimum interval between runs (seconds)

-> m () 

Full recompute of the groups table from the main queue.

Corrects any drift in job_count, min_priority, min_id, and in_flight_until. Checks the reaper sequence to skip if a recent run occurred within the given interval. Uses an advisory lock to serialize concurrent attempts, then locks all groups rows to prevent trigger interleaving.

Internal Operations

getParentStateSnapshot Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name

-> Text

Table name

-> Int64

Job ID

-> m (Maybe Value) 

Read the raw parent_state snapshot from the DB.

Internal operation used by the worker for DLQ-retried finalizers that have a persisted snapshot.

readChildResultsRaw Source #

Arguments

:: MonadArbiter m 
=> Text

PostgreSQL schema name

-> Text

Table name

-> Int64

Parent job ID

-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text) 

Read child results, DLQ errors, parent_state snapshot, and DLQ failures for a rollup finalizer in a single query.

mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value) Source #

Merge raw child results from three sources.

Precedence (left-biased union): DLQ errors > results > snapshot.