| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Core.HighLevel
Description
High-level API for job queue operations.
Table names are automatically extracted from the payload type using the registry. Compile-time checks ensure payloads are only used with registered tables.
Synopsis
- type QueueOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, KnownSymbol (TableForPayload payload registry), MonadArbiter m)
- type JobOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, MonadArbiter m)
- insertJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => JobWrite payload -> m (Maybe (JobRead payload))
- insertJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m [JobRead payload]
- insertJobsBatch_ :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m Int64
- claimNextVisibleJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> NominalDiffTime -> m [JobRead payload]
- claimNextVisibleJobsBatched :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> Int -> NominalDiffTime -> m [NonEmpty (JobRead payload)]
- ackJob :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => JobRead payload -> m Int64
- ackJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64
- ackJobsBulk :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64
- updateJobForRetry :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => NominalDiffTime -> Text -> JobRead payload -> m Int64
- setVisibilityTimeout :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => NominalDiffTime -> JobRead payload -> m Int64
- setVisibilityTimeoutBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => NominalDiffTime -> [JobRead payload] -> m [SetVisibilityResult]
- data SetVisibilityResult
- data JobFilter
- listJobsFiltered :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobFilter] -> Int -> Int -> m [JobRead payload]
- countJobsFiltered :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobFilter] -> m Int64
- listDLQFiltered :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobFilter] -> Int -> Int -> m [DLQJob payload]
- countDLQFiltered :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobFilter] -> m Int64
- moveToDLQ :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => Text -> JobRead payload -> m Int64
- moveToDLQBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [(JobRead payload, Text)] -> m Int64
- listDLQJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> Int -> m [DLQJob payload]
- retryFromDLQ :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe (JobRead payload))
- dlqJobExists :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Bool
- deleteDLQJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- deleteDLQJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m Int64
- listJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> Int -> m [JobRead payload]
- getJobById :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe (JobRead payload))
- getJobsByGroup :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Text -> Int -> Int -> m [JobRead payload]
- getJobsByParent :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Int -> Int -> m [JobRead payload]
- getInFlightJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> Int -> m [JobRead payload]
- cancelJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- cancelJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m Int64
- promoteJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- data QueueStats = QueueStats {}
- getQueueStats :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m QueueStats
- countJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64
- countJobsByGroup :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Text -> m Int64
- countJobsByParent :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- countInFlightJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64
- countDLQJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64
- countChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 (Int64, Int64))
- countDLQChildren :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- countDLQChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 Int64)
- pauseChildren :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- resumeChildren :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- cancelJobCascade :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- suspendJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- resumeJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Int64
- insertResult :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Int64 -> Value -> m Int64
- getResultsByParent :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Map Int64 Value)
- getDLQChildErrorsByParent :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Map Int64 Text)
- readChildResultsRaw :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
- mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
- persistParentState :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Value -> m Int64
- getParentStateSnapshot :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe Value)
- refreshGroups :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int -> m ()
- insertJobTree :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => JobTree payload -> m (Either Text (NonEmpty (JobRead payload)))
- getSchema :: HasArbiterSchema m registry => m SchemaName
Constraint Aliases
type QueueOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, KnownSymbol (TableForPayload payload registry), MonadArbiter m) Source #
Constraints for queue operations (requires table name lookup from registry).
type JobOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, MonadArbiter m) Source #
Constraints for job operations (table name stored in job).
Job Operations
insertJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => JobWrite payload -> m (Maybe (JobRead payload)) Source #
Insert a job. Returns the inserted job, or Nothing if skipped by dedup
(IgnoreDuplicate) or if parentId references a non-existent job.
insertJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m [JobRead payload] Source #
Insert multiple jobs in one round-trip. Returns only the jobs that were
actually inserted (dedup'd jobs are excluded). Does not validate parentId -
use insertJobTree for parent-child relationships.
insertJobsBatch_ :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m Int64 Source #
Like insertJobsBatch but returns only the count of inserted rows.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Maximum number of jobs to claim. |
| -> NominalDiffTime | How long the claimed jobs should remain invisible (in seconds). |
| -> m [JobRead payload] |
Claim visible jobs (at most one per group). May return fewer than the limit if groups are exhausted.
claimNextVisibleJobsBatched Source #
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Batch size: maximum number of jobs to claim per group. |
| -> Int | Max groups: maximum number of groups/batches to claim. |
| -> NominalDiffTime | How long the claimed jobs should remain invisible (in seconds). |
| -> m [NonEmpty (JobRead payload)] |
Claims multiple jobs per group. Unlike claimNextVisibleJobs, this can
claim up to batchSize jobs from each group while still respecting
head-of-line blocking between batches.
ackJob :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => JobRead payload -> m Int64 Source #
Acknowledge a job as complete. Deletes it from the queue, or suspends it if it's a parent with unfinished children. Returns 1 on success, 0 if gone.
ackJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64 Source #
Acknowledges multiple jobs as complete. All jobs must be from the same queue. Returns the total number of rows deleted.
ackJobsBulk :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64 Source #
Bulk ack for standalone jobs (no parent, no tree logic).
A single DELETE with unnest - one round trip for N jobs.
Only valid for jobs claimed in BatchedJobsMode.
Returns the number of rows deleted.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => NominalDiffTime | The delay before this job becomes visible again for retry. |
| -> Text | An error message to store with the job. |
| -> JobRead payload | |
| -> m Int64 |
Marks a failed job for retry at a later time.
Returns the number of rows updated (0 if job was already claimed by another worker).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => NominalDiffTime | The new visibility timeout (in seconds) from the current time. |
| -> JobRead payload | |
| -> m Int64 |
Manually extends a job's visibility timeout, useful for long-running jobs.
Returns the number of rows updated (0 if job was already reclaimed by another worker).
setVisibilityTimeoutBatch Source #
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => NominalDiffTime | The new visibility timeout (in seconds) from the current time. |
| -> [JobRead payload] | Jobs to heartbeat (all must be from the same queue) |
| -> m [SetVisibilityResult] |
Extends visibility timeout for multiple jobs. All jobs must be from the same queue.
data SetVisibilityResult Source #
Result of setting visibility timeout for a single job in a batch.
Constructors
| VisibilityExtended Int64 | Visibility timeout was successfully extended. Contains job ID. |
| JobGone Int64 | Job no longer exists (was deleted/acked). Contains job ID. |
| JobReclaimed Int64 Int32 Int32 | Job was reclaimed by another worker (attempts count changed). Contains: job ID, expected attempts, actual attempts. |
Instances
| Show SetVisibilityResult Source # | |
Defined in Arbiter.Core.HighLevel Methods showsPrec :: Int -> SetVisibilityResult -> ShowS # show :: SetVisibilityResult -> String # showList :: [SetVisibilityResult] -> ShowS # | |
| Eq SetVisibilityResult Source # | |
Defined in Arbiter.Core.HighLevel Methods (==) :: SetVisibilityResult -> SetVisibilityResult -> Bool # (/=) :: SetVisibilityResult -> SetVisibilityResult -> Bool # | |
Filtered Query Operations
Filter predicates for job listing queries.
Constructors
| FilterGroupKey Text | |
| FilterParentId Int64 | |
| FilterSuspended Bool | |
| FilterInFlight | |
| FilterRootsOnly |
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [JobFilter] | Composable filters |
| -> Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Lists jobs with composable filters.
Returns jobs ordered by ID (descending, newest first).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [JobFilter] | Composable filters |
| -> m Int64 |
Counts jobs with composable filters.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [JobFilter] | Composable filters |
| -> Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [DLQJob payload] |
Lists DLQ jobs with composable filters.
Returns jobs ordered by failed_at (most recent first).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [JobFilter] | Composable filters |
| -> m Int64 |
Counts DLQ jobs with composable filters.
Dead Letter Queue Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => Text | Error message (the final error that caused the DLQ move) |
| -> JobRead payload | |
| -> m Int64 |
Move a job to the DLQ. Returns 0 if already claimed by another worker.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload | |
| => [(JobRead payload, Text)] | List of (job, error message) pairs. All jobs must be from the same queue. |
| -> m Int64 |
Move multiple jobs to the DLQ. Jobs already claimed by another worker are skipped. Returns the count of jobs moved.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | The maximum number of jobs to return. |
| -> Int | The number of jobs to skip (for pagination). |
| -> m [DLQJob payload] |
Lists jobs in the dead-letter queue with pagination.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | DLQ job ID |
| -> m (Maybe (JobRead payload)) |
Retry a DLQ job (re-insert into main queue with attempts reset).
Returns Nothing if the DLQ job no longer exists.
dlqJobExists :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Bool Source #
Check whether a DLQ job exists by ID.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | DLQ job ID |
| -> m Int64 |
Permanently deletes a job from the dead-letter queue.
Returns the number of rows deleted (0 if the DLQ job no longer exists).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [Int64] | DLQ job IDs |
| -> m Int64 |
Permanently deletes multiple jobs from the dead-letter queue.
Returns the total number of DLQ jobs deleted.
Admin Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Lists jobs in the queue with pagination.
Returns jobs ordered by ID (descending, newest first).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m (Maybe (JobRead payload)) |
Gets a single job by its ID.
Returns Nothing if the job doesn't exist.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Text | Group key to filter by |
| -> Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Gets all jobs for a specific group key with pagination.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent ID to filter by |
| -> Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Gets all jobs for a specific parent ID with pagination.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Maximum number of jobs to return. |
| -> Int | Number of jobs to skip (for pagination). |
| -> m [JobRead payload] |
Gets all in-flight jobs (claimed, visibility timeout not expired).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m Int64 |
Cancels (deletes) a job by ID.
Returns 0 if the job has children - use cancelJobCascade to delete
a parent and all its descendants.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => [Int64] | Job IDs |
| -> m Int64 |
Cancels (deletes) multiple jobs by ID.
Returns the total number of jobs deleted.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m Int64 |
Promote a delayed or retrying job to be immediately visible.
Refuses in-flight jobs (attempts > 0 with no last_error). Returns 1 on success, 0 if not found, already visible, or in-flight.
data QueueStats Source #
Statistics about the job queue
Constructors
| QueueStats | |
Fields
| |
Instances
getQueueStats :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m QueueStats Source #
Gets statistics about the job queue.
Count Operations
countJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #
Counts all jobs in the queue.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Text | Group key to count |
| -> m Int64 |
Counts jobs matching a group key.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent ID to count children of |
| -> m Int64 |
Counts jobs matching a parent ID.
countInFlightJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #
Counts in-flight jobs (currently being processed by workers).
countDLQJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #
Counts jobs in the dead-letter queue.
countChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 (Int64, Int64)) Source #
Counts children for a batch of potential parent IDs.
Returns a Map from parent_id to (total, paused) counts (only non-zero entries).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m Int64 |
Count how many children of a parent are in the DLQ. Useful inside finalizer handlers to detect failed children.
countDLQChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 Int64) Source #
Counts children in the DLQ for a batch of potential parent IDs.
Returns a Map from parent_id to DLQ child count (only non-zero entries).
Job Dependency Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m Int64 |
Pause all visible children of a parent job, making them unclaimable.
Only affects children that are currently claimable. In-flight children are left alone so their visibility timeout can expire normally if the worker crashes.
Returns the number of children paused.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m Int64 |
Resume all suspended children of a parent job.
Returns the number of children resumed.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Root job ID |
| -> m Int64 |
Cancel a job and all its descendants recursively.
Returns the total number of jobs deleted (parent + all descendants).
Suspend/Resume Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m Int64 |
Suspend a job, making it unclaimable.
Only suspends non-in-flight jobs (not currently being processed by workers). Returns the number of rows updated (0 if job doesn't exist, is in-flight, or already suspended).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Job ID |
| -> m Int64 |
Resume a suspended job, making it claimable again.
Returns the number of rows updated (0 if job doesn't exist or isn't suspended).
Results Table Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> Int64 | Child job ID |
| -> Value | Encoded result value |
| -> m Int64 |
Insert a child's result into the results table.
Each child gets its own row keyed by (parent_id, child_id).
The FK ON DELETE CASCADE ensures cleanup when the parent is acked.
Returns the number of rows inserted (1 on success).
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m (Map Int64 Value) |
Get all child results for a parent from the results table.
getDLQChildErrorsByParent Source #
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m (Map Int64 Text) |
Get DLQ child errors for a parent.
Returns a Map from child job ID to the last error message.
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int64 | Parent job ID |
| -> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text) |
Read all child data for a rollup finalizer. Returns
(childId->result, childId->error, parentStateSnapshot, dlqPK->error).
mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value) Source #
Merge raw child results from three sources.
Precedence (left-biased union): DLQ errors > results > snapshot.
persistParentState :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Value -> m Int64 Source #
Snapshot results into parent_state before DLQ move.
getParentStateSnapshot :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe Value) Source #
Read raw parent_state snapshot from the DB.
Groups Table Operations
Arguments
| :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload | |
| => Int | Minimum interval between runs (seconds) |
| -> m () |
Full recompute of the groups table from the main queue.
Corrects any drift in job_count, min_priority, min_id, and in_flight_until.
Job Tree DSL
insertJobTree :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => JobTree payload -> m (Either Text (NonEmpty (JobRead payload))) Source #
Insert a JobTree atomically. Returns all inserted jobs (pre-order),
or Left if the root has a dedup conflict. Rolls back on any failure.
Re-exports
getSchema :: HasArbiterSchema m registry => m SchemaName Source #
The schema name for this monad's Arbiter tables.