| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Core.Operations
Synopsis
- insertJob :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
- insertJobUnsafe :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> JobWrite payload -> m (Maybe (JobRead payload))
- insertJobsBatch :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> [JobWrite payload] -> m [JobRead payload]
- insertJobsBatch_ :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> [JobWrite payload] -> m Int64
- insertResult :: MonadArbiter m => Text -> Text -> Int64 -> Int64 -> Value -> m Int64
- getResultsByParent :: MonadArbiter m => Text -> Text -> Int64 -> m (Map Int64 Value)
- getDLQChildErrorsByParent :: MonadArbiter m => Text -> Text -> Int64 -> m (Map Int64 Text)
- persistParentState :: MonadArbiter m => Text -> Text -> Int64 -> Value -> m Int64
- claimNextVisibleJobs :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Int -> NominalDiffTime -> m [JobRead payload]
- claimNextVisibleJobsBatched :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Int -> Int -> NominalDiffTime -> m [NonEmpty (JobRead payload)]
- ackJob :: MonadArbiter m => Text -> Text -> JobRead payload -> m Int64
- ackJobsBatch :: MonadArbiter m => Text -> Text -> [JobRead payload] -> m Int64
- ackJobsBulk :: MonadArbiter m => Text -> Text -> [JobRead payload] -> m Int64
- setVisibilityTimeout :: MonadArbiter m => Text -> Text -> NominalDiffTime -> JobRead payload -> m Int64
- setVisibilityTimeoutBatch :: MonadArbiter m => Text -> Text -> NominalDiffTime -> [JobRead payload] -> m [VisibilityUpdateInfo]
- data VisibilityUpdateInfo = VisibilityUpdateInfo {}
- updateJobForRetry :: MonadArbiter m => Text -> Text -> NominalDiffTime -> Text -> JobRead payload -> m Int64
- moveToDLQ :: MonadArbiter m => Text -> Text -> Text -> JobRead payload -> m Int64
- moveToDLQBatch :: MonadArbiter m => Text -> Text -> [(JobRead payload, Text)] -> m Int64
- retryFromDLQ :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Int64 -> m (Maybe (JobRead payload))
- dlqJobExists :: MonadArbiter m => Text -> Text -> Int64 -> m Bool
- listDLQJobs :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Int -> Int -> m [DLQJob payload]
- listDLQJobsByParent :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Int64 -> Int -> Int -> m [DLQJob payload]
- countDLQJobsByParent :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- deleteDLQJob :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- deleteDLQJobsBatch :: MonadArbiter m => Text -> Text -> [Int64] -> m Int64
- data JobFilter
- listJobsFiltered :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> [JobFilter] -> Int -> Int -> m [JobRead payload]
- countJobsFiltered :: MonadArbiter m => Text -> Text -> [JobFilter] -> m Int64
- listDLQFiltered :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
- countDLQFiltered :: MonadArbiter m => Text -> Text -> [JobFilter] -> m Int64
- listJobs :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Int -> Int -> m [JobRead payload]
- getJobById :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Int64 -> m (Maybe (JobRead payload))
- getJobsByGroup :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Text -> Int -> Int -> m [JobRead payload]
- getInFlightJobs :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Int -> Int -> m [JobRead payload]
- cancelJob :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- cancelJobsBatch :: MonadArbiter m => Text -> Text -> [Int64] -> m Int64
- promoteJob :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- data QueueStats = QueueStats {}
- getQueueStats :: MonadArbiter m => Text -> Text -> m QueueStats
- countJobs :: MonadArbiter m => Text -> Text -> m Int64
- countJobsByGroup :: MonadArbiter m => Text -> Text -> Text -> m Int64
- countInFlightJobs :: MonadArbiter m => Text -> Text -> m Int64
- countDLQJobs :: MonadArbiter m => Text -> Text -> m Int64
- getJobsByParent :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> Int64 -> Int -> Int -> m [JobRead payload]
- countJobsByParent :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- countChildrenBatch :: MonadArbiter m => Text -> Text -> [Int64] -> m (Map Int64 (Int64, Int64))
- countDLQChildrenBatch :: MonadArbiter m => Text -> Text -> [Int64] -> m (Map Int64 Int64)
- pauseChildren :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- resumeChildren :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- cancelJobCascade :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- cancelJobTree :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- suspendJob :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- resumeJob :: MonadArbiter m => Text -> Text -> Int64 -> m Int64
- refreshGroups :: MonadArbiter m => Text -> Text -> Int -> m ()
- getParentStateSnapshot :: MonadArbiter m => Text -> Text -> Int64 -> m (Maybe Value)
- readChildResultsRaw :: MonadArbiter m => Text -> Text -> Int64 -> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
- mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Job Insertion
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> JobWrite payload | |
| -> m (Maybe (JobRead payload)) |
Inserts a job into the queue.
Returns the inserted job with database-generated fields populated.
Ordering and concurrency
Jobs are claimed in ID order (lowest ID first within a priority level).
Concurrent inserts to the same group are serialized at the trigger level:
the AFTER INSERT trigger's ON CONFLICT DO UPDATE on the groups table
takes a row-level lock, preventing out-of-order commits within a group.
Deduplication
Nothing: Always insert (dedup_key is NULL)Just (IgnoreDuplicate k): Skip if dedup_key exists, return NothingJust (ReplaceDuplicate k): Replace existing job unless actively in-flight on its first attempt. Returns Nothing only whenattempts > 0,not_visible_until > NOW(), andlast_error IS NULL(i.e., the job is being processed for the first time). Jobs that have previously failed (last_error IS NOT NULL) can always be replaced, even if currently in-flight on a retry attempt — this is by design, so that a fresh replacement takes priority over a failing job.
parentId is validated: if set to a non-existent job ID, returns Nothing.
For building parent-child trees, prefer insertJobTree which handles
parentId, isRollup, and suspended atomically.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> JobWrite payload | |
| -> m (Maybe (JobRead payload)) |
Insert a job without validating that the parent exists.
This is an internal fast path for callers that already guarantee the parent
is present (e.g. insertJobTree). External callers
should use insertJob which validates the parent first.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> [JobWrite payload] | Jobs to insert |
| -> m [JobRead payload] |
Insert multiple jobs in a single batch operation.
Supports dedup keys: IgnoreDuplicate jobs are silently skipped on
conflict, ReplaceDuplicate jobs update the existing row (unless
actively in-flight). Only actually inserted or replaced jobs are returned.
If multiple jobs in the batch share the same dedup key, only the last
occurrence is kept (last writer wins), consistent with sequential
insertJob calls.
Does not validate parentId — callers must ensure referenced parents
exist. For parent-child trees, use insertJobTree instead.
insertJobsBatch_ :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> [JobWrite payload] -> m Int64 Source #
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> Int64 | Parent job ID |
| -> Int64 | Child job ID |
| -> Value | Encoded result value |
| -> m Int64 |
Insert a child's result into the results table.
Each child gets its own row keyed by (parent_id, child_id).
The FK ON DELETE CASCADE ensures cleanup when the parent is acked.
Returns the number of rows inserted (1 on success).
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> Int64 | Parent job ID |
| -> m (Map Int64 Value) |
Get all child results for a parent from the results table.
Returns a Map from child ID to the result Value.
getDLQChildErrorsByParent Source #
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> Int64 | Parent job ID |
| -> m (Map Int64 Text) |
Get DLQ child errors for a parent.
Returns a Map from child job ID to the last error message.
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> Int64 | Job ID |
| -> Value | The pre-populated parent state to persist |
| -> m Int64 |
Snapshot results into parent_state before DLQ move.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Int | Maximum number of jobs to claim |
| -> NominalDiffTime | Visibility timeout in seconds |
| -> m [JobRead payload] |
Claim up to maxJobs visible jobs, respecting head-of-line blocking
(one job per group). Uses a single-CTE claim with the groups table.
claimNextVisibleJobsBatched Source #
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Int | Batch size per group (how many jobs to claim from each group) |
| -> Int | Maximum number of groups/batches to claim |
| -> NominalDiffTime | Visibility timeout in seconds |
| -> m [NonEmpty (JobRead payload)] |
Batched variant of claimNextVisibleJobs — claims up to batchSize jobs
per group, across up to maxBatches groups.
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> JobRead payload | |
| -> m Int64 |
Acknowledge a job as completed (smart ack).
Deletes standalone jobs; suspends parents waiting for children; wakes parents when the last sibling completes. Uses an advisory lock for child jobs to serialize with concurrent sibling acks.
Returns 1 on success, 0 if the job was already gone.
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> [JobRead payload] | |
| -> m Int64 |
Acknowledge multiple jobs as completed.
Iterates calling ackJob so every job gets smart-ack treatment
(parent suspend/wake logic).
Returns the total number of rows affected.
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> [JobRead payload] | |
| -> m Int64 |
Bulk ack for standalone jobs (no parent, no tree logic).
A single DELETE with unnest — one round trip for N jobs.
Only valid for jobs claimed in BatchedJobsMode, which guarantees
parent_id IS NULL AND parent_state IS NULL.
Returns the number of rows deleted.
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> NominalDiffTime | Timeout in seconds |
| -> JobRead payload | |
| -> m Int64 | Returns the number of rows updated (0 if job was reclaimed by another worker) |
Set the visibility timeout for a job
setVisibilityTimeoutBatch Source #
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> NominalDiffTime | Timeout in seconds |
| -> [JobRead payload] | |
| -> m [VisibilityUpdateInfo] | Returns a list of status records, one for each job that was targeted. |
Batch variant of setVisibilityTimeout. Returns per-job status
(success, acked, or stolen).
data VisibilityUpdateInfo Source #
Detailed information about the result of a visibility update operation for a single job.
Constructors
| VisibilityUpdateInfo | |
Fields
| |
Instances
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> NominalDiffTime | Backoff timeout in seconds |
| -> Text | Error message |
| -> JobRead payload | |
| -> m Int64 |
Update a job for retry with backoff and error tracking
Returns the number of rows updated (0 if job was already claimed by another worker).
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Text | Error message (the final error that caused the DLQ move) |
| -> JobRead payload | |
| -> m Int64 |
Move a job to the DLQ. Cascades descendants for rollup parents. Wakes the parent if this was a child job.
Returns 0 if the job was already claimed by another worker.
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> [(JobRead payload, Text)] | List of (job, error message) pairs |
| -> m Int64 |
Moves multiple jobs from the main queue to the dead-letter queue.
Each job is moved with its own error message. Jobs that have already been claimed by another worker (attempts mismatch) are silently skipped.
Returns the total number of jobs moved to DLQ.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Int64 | DLQ job ID |
| -> m (Maybe (JobRead payload)) |
Retry a job from the DLQ (re-inserts with attempts reset to 0). The dedup_key is NOT restored — retried jobs won't conflict with new dedup inserts.
dlqJobExists :: MonadArbiter m => Text -> Text -> Int64 -> m Bool Source #
Check whether a DLQ job exists by ID.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Int | Limit |
| -> Int | Offset |
| -> m [DLQJob payload] |
List jobs in the dead letter queue
Returns jobs ordered by failed_at (most recent first).
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> Int64 | Parent job ID |
| -> Int | Limit |
| -> Int | Offset |
| -> m [DLQJob payload] |
List DLQ jobs filtered by parent_id.
Returns jobs ordered by failed_at (most recent first).
countDLQJobsByParent :: MonadArbiter m => Text -> Text -> Int64 -> m Int64 Source #
Count DLQ jobs matching a parent_id.
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Int64 | DLQ job ID |
| -> m Int64 |
Delete a job from the dead letter queue.
This permanently removes the job from the DLQ without retrying it. If the deleted job was a child, tries to resume the parent when no siblings remain in the main queue.
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> [Int64] | DLQ job IDs |
| -> m Int64 |
Delete multiple jobs from the dead letter queue.
If any deleted jobs were children, tries to resume their parents when no siblings remain. Parent IDs are deduplicated and sorted to prevent deadlocks between concurrent batch deletes.
Returns the total number of DLQ jobs deleted.
Filtered Query Operations
Filter predicates for job listing queries.
Constructors
| FilterGroupKey Text | |
| FilterParentId Int64 | |
| FilterSuspended Bool | |
| FilterInFlight |
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> [JobFilter] | Composable filters |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
List jobs with composable filters.
Returns jobs ordered by ID (descending, newest first).
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> [JobFilter] | Composable filters |
| -> m Int64 |
Count jobs with composable filters.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> [JobFilter] | Composable filters |
| -> Int | Limit |
| -> Int | Offset |
| -> m [DLQJob payload] |
List DLQ jobs with composable filters.
Returns jobs ordered by failed_at (most recent first).
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> [JobFilter] | Composable filters |
| -> m Int64 |
Count DLQ jobs with composable filters.
Admin Operations
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
List jobs in the queue with pagination.
Returns jobs ordered by ID (descending).
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Int64 | Job ID |
| -> m (Maybe (JobRead payload)) |
Get a single job by its ID
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Text | Group key to filter by |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
Get all jobs for a specific group key
Useful for debugging or admin UI to see all jobs for a specific entity.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
Get all in-flight jobs (currently being processed by workers)
A job is considered in-flight if it has been claimed (attempts > 0) and its visibility timeout hasn't expired yet.
Useful for monitoring active work and detecting stuck jobs.
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> Text | Table name |
| -> Int64 | Job ID |
| -> m Int64 |
Cancels (deletes) a job by ID.
Returns 0 if the job has children — use cancelJobCascade to delete
a parent and all its descendants.
If the deleted job was a child and no siblings remain, the parent is resumed for its completion round.
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> Text | Table name |
| -> [Int64] | Job IDs |
| -> m Int64 |
Cancels (deletes) multiple jobs by ID.
Each job gets full wake-parent logic (same as cancelJob).
Wrapped in a transaction so that cancelling multiple children of the
same parent sees a consistent view — the last cancel's CTE correctly
detects no remaining siblings and resumes the parent.
Returns the total number of jobs deleted.
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> Int64 | Job ID |
| -> m Int64 | Number of rows updated |
Promote a delayed or retrying job to be immediately visible.
Refuses in-flight jobs (attempts > 0 with no last_error). Returns 1 on success, 0 if not found, already visible, or in-flight.
data QueueStats Source #
Statistics about the job queue
Constructors
| QueueStats | |
Fields
| |
Instances
| FromJSON QueueStats Source # | |||||
Defined in Arbiter.Core.Operations | |||||
| ToJSON QueueStats Source # | |||||
Defined in Arbiter.Core.Operations Methods toJSON :: QueueStats -> Value toEncoding :: QueueStats -> Encoding toJSONList :: [QueueStats] -> Value toEncodingList :: [QueueStats] -> Encoding omitField :: QueueStats -> Bool | |||||
| Generic QueueStats Source # | |||||
Defined in Arbiter.Core.Operations Associated Types
| |||||
| Show QueueStats Source # | |||||
Defined in Arbiter.Core.Operations Methods showsPrec :: Int -> QueueStats -> ShowS # show :: QueueStats -> String # showList :: [QueueStats] -> ShowS # | |||||
| Eq QueueStats Source # | |||||
Defined in Arbiter.Core.Operations | |||||
| type Rep QueueStats Source # | |||||
Defined in Arbiter.Core.Operations type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double))))) | |||||
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name (e.g., "arbiter") |
| -> Text | Table name (e.g., "email_jobs") |
| -> m QueueStats |
Get statistics about the job queue
Count Operations
countJobsByGroup :: MonadArbiter m => Text -> Text -> Text -> m Int64 Source #
Count jobs matching a group key
countInFlightJobs :: MonadArbiter m => Text -> Text -> m Int64 Source #
Count in-flight jobs
countDLQJobs :: MonadArbiter m => Text -> Text -> m Int64 Source #
Count DLQ jobs
Parent-Child Operations
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | Schema name |
| -> Text | Table name |
| -> Int64 | Parent ID |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
List jobs filtered by parent_id with pagination.
countJobsByParent :: MonadArbiter m => Text -> Text -> Int64 -> m Int64 Source #
Count jobs matching a parent_id.
countChildrenBatch :: MonadArbiter m => Text -> Text -> [Int64] -> m (Map Int64 (Int64, Int64)) Source #
Count children for a batch of potential parent IDs.
Returns a Map from parent_id to (total, paused) counts (only non-zero entries).
countDLQChildrenBatch :: MonadArbiter m => Text -> Text -> [Int64] -> m (Map Int64 Int64) Source #
Count children in the DLQ for a batch of potential parent IDs.
Returns a Map from parent_id to DLQ child count (only non-zero entries).
Job Dependency Operations
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> Text | Table name |
| -> Int64 | Parent job ID |
| -> m Int64 |
Pause all visible children of a parent job.
Sets suspended = TRUE for claimable children, making them unclaimable. In-flight children (currently being processed by workers) are left alone so their visibility timeout can expire normally if the worker crashes. Returns the number of children paused.
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> Text | Table name |
| -> Int64 | Parent job ID |
| -> m Int64 |
Resume all suspended children of a parent job.
Only affects children whose suspended = TRUE. Returns the number of children resumed.
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> Text | Table name |
| -> Int64 | Root job ID |
| -> m Int64 |
Cancel a job and all its descendants recursively.
Uses a recursive CTE to find all descendants and deletes them all. If the root job itself is a child, resumes its parent for a completion round. Returns the total number of jobs deleted (parent + all descendants).
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> Text | Table name |
| -> Int64 | Any job ID within the tree |
| -> m Int64 |
Cancel an entire job tree by walking up from any node to the root, then cascade-deleting everything from the root down.
Unlike cancelJobCascade, this does NOT call tryResumeParent — the root
by definition has no parent. Returns the total number of jobs deleted.
Suspend/Resume Operations
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> Text | Table name |
| -> Int64 | Job ID |
| -> m Int64 |
Suspend a job, making it unclaimable.
Only suspends non-in-flight jobs (not currently being processed by workers). Returns the number of rows updated (0 if job doesn't exist, is in-flight, or already suspended).
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> Text | Table name |
| -> Int64 | Job ID |
| -> m Int64 |
Resume a suspended job, making it claimable again.
Returns the number of rows updated (0 if job doesn't exist or isn't suspended).
Groups Table Operations
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> Int | Minimum interval between runs (seconds) |
| -> m () |
Full recompute of the groups table from the main queue.
Corrects any drift in job_count, min_priority, min_id, and in_flight_until. Checks the reaper sequence to skip if a recent run occurred within the given interval. Uses an advisory lock to serialize concurrent attempts, then locks all groups rows to prevent trigger interleaving.
Internal Operations
getParentStateSnapshot Source #
Arguments
| :: MonadArbiter m | |
| => Text | PostgreSQL schema name |
| -> Text | Table name |
| -> Int64 | Job ID |
| -> m (Maybe Value) |
Read the raw parent_state snapshot from the DB.
Internal operation used by the worker for DLQ-retried finalizers that have a persisted snapshot.