| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Core.HighLevel
Description
High-level API for job queue operations.
Table names are automatically extracted from the payload type using the registry. Compile-time checks ensure payloads are only used with registered tables.
Synopsis
- type QueueOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, KnownSymbol (TableForPayload payload registry), MonadArbiter m)
- type JobOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, MonadArbiter m)
- insertJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => JobWrite payload -> m (Maybe (JobRead payload))
- insertJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m [JobRead payload]
- insertJobsBatch_ :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m Int64
- claimNextVisibleJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> NominalDiffTime -> m [JobRead payload]
- claimNextVisibleJobsBatched :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> Int -> NominalDiffTime -> m [NonEmpty (JobRead payload)]
- ackJob :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => JobRead payload -> m Int64
- ackJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64
- ackJobsBulk :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64
- updateJobForRetry :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => NominalDiffTime -> Text -> JobRead payload -> m Int64
- setVisibilityTimeout :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => NominalDiffTime -> JobRead payload -> m Int64
- setVisibilityTimeoutBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => NominalDiffTime -> [JobRead payload] -> m [SetVisibilityResult]
- data SetVisibilityResult
- data JobFilter
- listJobsFiltered :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobFilter] -> Int -> Int -> m [JobRead payload]
- countJobsFiltered :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobFilter] -> m Int64
- listDLQFiltered :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobFilter] -> Int -> Int -> m [DLQJob payload]
- countDLQFiltered :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobFilter] -> m Int64
- moveToDLQ :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => Text -> JobRead payload -> m Int64
- moveToDLQBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [(JobRead payload, Text)] -> m Int64
- listDLQJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> Int -> m [DLQJob payload]
- retryFromDLQ :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe (JobRead payload))
- dlqJobExists :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Bool
- deleteDLQJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- deleteDLQJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m Int64
- listJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> Int -> m [JobRead payload]
- getJobById :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe (JobRead payload))
- getJobsByGroup :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Text -> Int -> Int -> m [JobRead payload]
- getJobsByParent :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Int -> Int -> m [JobRead payload]
- getInFlightJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> Int -> m [JobRead payload]
- cancelJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- cancelJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m Int64
- promoteJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- data QueueStats = QueueStats {}
- getQueueStats :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m QueueStats
- countJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64
- countJobsByGroup :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Text -> m Int64
- countJobsByParent :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- countInFlightJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64
- countDLQJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64
- countChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 (Int64, Int64))
- countDLQChildren :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- countDLQChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 Int64)
- pauseChildren :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- resumeChildren :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- cancelJobCascade :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- suspendJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- resumeJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- insertResult :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Int64 -> Value -> m Int64
- getResultsByParent :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Map Int64 Value)
- getDLQChildErrorsByParent :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Map Int64 Text)
- readChildResultsRaw :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
- mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
- persistParentState :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Value -> m Int64
- getParentStateSnapshot :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe Value)
- refreshGroups :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> m ()
- insertJobTree :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => JobTree payload -> m (Either Text (NonEmpty (JobRead payload)))
- getSchema :: HasArbiterSchema m registry => m Text
Constraint Aliases
type QueueOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, KnownSymbol (TableForPayload payload registry), MonadArbiter m) Source #
Constraints for queue operations (requires table name lookup from registry).
type JobOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, MonadArbiter m) Source #
Constraints for job operations (table name stored in job).
Job Operations
insertJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => JobWrite payload -> m (Maybe (JobRead payload)) Source #
Inserts a job into the queue.
Returns Nothing if a job with the same deduplication key already
exists (with IgnoreDuplicate strategy).
insertJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m [JobRead payload] Source #
Insert multiple jobs in a single batch operation.
Supports dedup keys: within the batch, duplicate keys are resolved
(last ReplaceDuplicate wins), and against existing rows via
ON CONFLICT.
Does not validate parentId. Use insertJobTree for parent-child relationships.
insertJobsBatch_ :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m Int64 Source #
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Maximum number of jobs to claim. |
| -> NominalDiffTime | How long the claimed jobs should remain invisible (in seconds). |
| -> m [JobRead payload] |
Claims visible jobs from the queue. At most one job per group is claimed to enforce head-of-line blocking.
claimNextVisibleJobsBatched Source #
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Batch size: maximum number of jobs to claim per group. |
| -> Int | Max groups: maximum number of groups/batches to claim. |
| -> NominalDiffTime | How long the claimed jobs should remain invisible (in seconds). |
| -> m [NonEmpty (JobRead payload)] |
Claims multiple jobs per group. Unlike claimNextVisibleJobs, this can
claim up to batchSize jobs from each group while still respecting
head-of-line blocking between batches.
ackJob :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => JobRead payload -> m Int64 Source #
Acknowledges a job as complete, permanently deleting it from the queue.
Returns 1 on success (job deleted or parent suspended), 0 if already gone.
ackJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64 Source #
Acknowledges multiple jobs as complete. All jobs must be from the same queue. Returns the total number of rows deleted.
ackJobsBulk :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64 Source #
Bulk ack for standalone jobs (no parent, no tree logic).
A single DELETE with unnest — one round trip for N jobs.
Only valid for jobs claimed in BatchedJobsMode.
Returns the number of rows deleted.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => NominalDiffTime | The delay before this job becomes visible again for retry. |
| -> Text | An error message to store with the job. |
| -> JobRead payload | |
| -> m Int64 |
Marks a failed job for retry at a later time.
Returns the number of rows updated (0 if job was already claimed by another worker).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => NominalDiffTime | The new visibility timeout (in seconds) from the current time. |
| -> JobRead payload | |
| -> m Int64 |
Manually extends a job's visibility timeout, useful for long-running jobs.
Returns the number of rows updated (0 if job was already reclaimed by another worker).
setVisibilityTimeoutBatch Source #
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => NominalDiffTime | The new visibility timeout (in seconds) from the current time. |
| -> [JobRead payload] | Jobs to heartbeat (all must be from the same queue) |
| -> m [SetVisibilityResult] |
Extends visibility timeout for multiple jobs. All jobs must be from the same queue.
data SetVisibilityResult Source #
Result of setting visibility timeout for a single job in a batch.
Constructors
| VisibilityExtended Int64 | Visibility timeout was successfully extended. Contains job ID. |
| JobGone Int64 | Job no longer exists (was deleted/acked). Contains job ID. |
| JobReclaimed Int64 Int32 Int32 | Job was reclaimed by another worker (attempts count changed). Contains: job ID, expected attempts, actual attempts. |
Instances
| Show SetVisibilityResult Source # | |
Defined in Arbiter.Core.HighLevel Methods showsPrec :: Int -> SetVisibilityResult -> ShowS # show :: SetVisibilityResult -> String # showList :: [SetVisibilityResult] -> ShowS # | |
| Eq SetVisibilityResult Source # | |
Defined in Arbiter.Core.HighLevel Methods (==) :: SetVisibilityResult -> SetVisibilityResult -> Bool # (/=) :: SetVisibilityResult -> SetVisibilityResult -> Bool # | |
Filtered Query Operations
Filter predicates for job listing queries.
Constructors
| FilterGroupKey Text | |
| FilterParentId Int64 | |
| FilterSuspended Bool | |
| FilterInFlight |
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [JobFilter] | Composable filters |
| -> Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Lists jobs with composable filters.
Returns jobs ordered by ID (descending, newest first).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [JobFilter] | Composable filters |
| -> m Int64 |
Counts jobs with composable filters.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [JobFilter] | Composable filters |
| -> Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [DLQJob payload] |
Lists DLQ jobs with composable filters.
Returns jobs ordered by failed_at (most recent first).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [JobFilter] | Composable filters |
| -> m Int64 |
Counts DLQ jobs with composable filters.
Dead Letter Queue Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => Text | Error message (the final error that caused the DLQ move) |
| -> JobRead payload | |
| -> m Int64 |
Moves a job from the main queue to the dead-letter queue (DLQ).
Returns the number of rows deleted from main queue (0 if job was already claimed by another worker).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => [(JobRead payload, Text)] | List of (job, error message) pairs. All jobs must be from the same queue. |
| -> m Int64 |
Moves multiple jobs to the dead-letter queue.
Each job is moved with its own error message. Jobs that have already been claimed by another worker (attempts mismatch) are silently skipped.
Returns the total number of jobs moved to DLQ.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | The maximum number of jobs to return. |
| -> Int | The number of jobs to skip (for pagination). |
| -> m [DLQJob payload] |
Lists jobs in the dead-letter queue with pagination.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | DLQ job ID |
| -> m (Maybe (JobRead payload)) |
Moves a job from the dead-letter queue back into the main queue to be retried.
Returns Nothing if the DLQ job no longer exists.
dlqJobExists :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Bool Source #
Check whether a DLQ job exists by ID.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | DLQ job ID |
| -> m Int64 |
Permanently deletes a job from the dead-letter queue.
Returns the number of rows deleted (0 if the DLQ job no longer exists).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [Int64] | DLQ job IDs |
| -> m Int64 |
Permanently deletes multiple jobs from the dead-letter queue.
Returns the total number of DLQ jobs deleted.
Admin Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Lists jobs in the queue with pagination.
Returns jobs ordered by ID (descending, newest first).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m (Maybe (JobRead payload)) |
Gets a single job by its ID.
Returns Nothing if the job doesn't exist.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Text | Group key to filter by |
| -> Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Gets all jobs for a specific group key with pagination.
Useful for debugging or admin UI to see all jobs for a specific entity.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent ID to filter by |
| -> Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Gets all jobs for a specific parent ID with pagination.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Gets all in-flight jobs (currently being processed by workers).
A job is considered in-flight if it has been claimed (attempts > 0) and its visibility timeout hasn't expired yet.
Useful for monitoring active work and detecting stuck jobs.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m Int64 |
Cancels (deletes) a job by ID.
Returns 0 if the job has children — use cancelJobCascade to delete
a parent and all its descendants.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [Int64] | Job IDs |
| -> m Int64 |
Cancels (deletes) multiple jobs by ID.
Returns the total number of jobs deleted.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m Int64 |
Promote a delayed or retrying job to be immediately visible.
Refuses in-flight jobs (attempts > 0 with no last_error). Returns 1 on success, 0 if not found, already visible, or in-flight.
data QueueStats Source #
Statistics about the job queue
Constructors
| QueueStats | |
Fields
| |
Instances
getQueueStats :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m QueueStats Source #
Gets statistics about the job queue.
Count Operations
countJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #
Counts all jobs in the queue.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Text | Group key to count |
| -> m Int64 |
Counts jobs matching a group key.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent ID to count children of |
| -> m Int64 |
Counts jobs matching a parent ID.
countInFlightJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #
Counts in-flight jobs (currently being processed by workers).
countDLQJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #
Counts jobs in the dead-letter queue.
countChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 (Int64, Int64)) Source #
Counts children for a batch of potential parent IDs.
Returns a Map from parent_id to (total, paused) counts (only non-zero entries).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m Int64 |
Count how many children of a parent are in the DLQ. Useful inside finalizer handlers to detect failed children.
countDLQChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 Int64) Source #
Counts children in the DLQ for a batch of potential parent IDs.
Returns a Map from parent_id to DLQ child count (only non-zero entries).
Job Dependency Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m Int64 |
Pause all visible children of a parent job, making them unclaimable.
Only affects children that are currently claimable. In-flight children are left alone so their visibility timeout can expire normally if the worker crashes.
Returns the number of children paused.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m Int64 |
Resume all suspended children of a parent job.
Returns the number of children resumed.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Root job ID |
| -> m Int64 |
Cancel a job and all its descendants recursively.
Returns the total number of jobs deleted (parent + all descendants).
Suspend/Resume Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m Int64 |
Suspend a job, making it unclaimable.
Only suspends non-in-flight jobs (not currently being processed by workers). Returns the number of rows updated (0 if job doesn't exist, is in-flight, or already suspended).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m Int64 |
Resume a suspended job, making it claimable again.
Returns the number of rows updated (0 if job doesn't exist or isn't suspended).
Results Table Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> Int64 | Child job ID |
| -> Value | Encoded result value |
| -> m Int64 |
Insert a child's result into the results table.
Each child gets its own row keyed by (parent_id, child_id).
The FK ON DELETE CASCADE ensures cleanup when the parent is acked.
Returns the number of rows inserted (1 on success).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m (Map Int64 Value) |
Get all child results for a parent from the results table.
getDLQChildErrorsByParent Source #
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m (Map Int64 Text) |
Get DLQ child errors for a parent.
Returns a Map from child job ID to the last error message.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text) |
Read child results, DLQ errors, parent_state snapshot, and DLQ failures for a rollup finalizer in a single query.
mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value) Source #
Merge raw child results from three sources.
Precedence (left-biased union): DLQ errors > results > snapshot.
persistParentState :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Value -> m Int64 Source #
Snapshot results into parent_state before DLQ move.
getParentStateSnapshot :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe Value) Source #
Read raw parent_state snapshot from the DB.
Groups Table Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Minimum interval between runs (seconds) |
| -> m () |
Full recompute of the groups table from the main queue.
Corrects any drift in job_count, min_priority, min_id, and in_flight_until.
Job Tree DSL
insertJobTree :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => JobTree payload -> m (Either Text (NonEmpty (JobRead payload))) Source #
Insert a JobTree atomically in a single transaction.
HighLevel wrapper that resolves schema/table from the registry.
See insertJobTree for details.
Re-exports
getSchema :: HasArbiterSchema m registry => m Text Source #