| Safe Haskell | None |
|---|---|
| Language | GHC2024 |
Arbiter.Core.Operations
Synopsis
- insertJob :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> JobWrite payload -> m (Maybe (JobRead payload))
- insertJobUnsafe :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> JobWrite payload -> m (Maybe (JobRead payload))
- insertJobsBatch :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> [JobWrite payload] -> m [JobRead payload]
- insertJobsBatch_ :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> [JobWrite payload] -> m Int64
- insertResult :: MonadArbiter m => SchemaName -> TableName -> Int64 -> Int64 -> Value -> m Int64
- getResultsByParent :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m (Map Int64 Value)
- getDLQChildErrorsByParent :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m (Map Int64 Text)
- persistParentState :: MonadArbiter m => SchemaName -> TableName -> Int64 -> Value -> m Int64
- claimNextVisibleJobs :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Int -> NominalDiffTime -> m [JobRead payload]
- claimNextVisibleJobsBatched :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Int -> Int -> NominalDiffTime -> m [NonEmpty (JobRead payload)]
- ackJob :: MonadArbiter m => SchemaName -> TableName -> JobRead payload -> m Int64
- ackJobsBatch :: MonadArbiter m => SchemaName -> TableName -> [JobRead payload] -> m Int64
- ackJobsBulk :: MonadArbiter m => SchemaName -> TableName -> [JobRead payload] -> m Int64
- setVisibilityTimeout :: MonadArbiter m => SchemaName -> TableName -> NominalDiffTime -> JobRead payload -> m Int64
- setVisibilityTimeoutBatch :: MonadArbiter m => SchemaName -> TableName -> NominalDiffTime -> [JobRead payload] -> m [VisibilityUpdateInfo]
- data VisibilityUpdateInfo = VisibilityUpdateInfo {}
- updateJobForRetry :: MonadArbiter m => SchemaName -> TableName -> NominalDiffTime -> Text -> JobRead payload -> m Int64
- moveToDLQ :: MonadArbiter m => SchemaName -> TableName -> Text -> JobRead payload -> m Int64
- moveToDLQBatch :: MonadArbiter m => SchemaName -> TableName -> [(JobRead payload, Text)] -> m Int64
- retryFromDLQ :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Int64 -> m (Maybe (JobRead payload))
- dlqJobExists :: MonadArbiter m => Text -> Text -> Int64 -> m Bool
- listDLQJobs :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Int -> Int -> m [DLQJob payload]
- listDLQJobsByParent :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Int64 -> Int -> Int -> m [DLQJob payload]
- countDLQJobsByParent :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m Int64
- deleteDLQJob :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m Int64
- deleteDLQJobsBatch :: MonadArbiter m => SchemaName -> TableName -> [Int64] -> m Int64
- data JobFilter
- listJobsFiltered :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> [JobFilter] -> Int -> Int -> m [JobRead payload]
- listJobsFilteredOrdered :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> [JobFilter] -> Maybe JobSortColumn -> Maybe SortDir -> Int -> Int -> m [JobRead payload]
- countJobsFiltered :: MonadArbiter m => SchemaName -> TableName -> [JobFilter] -> m Int64
- listDLQFiltered :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> [JobFilter] -> Int -> Int -> m [DLQJob payload]
- listDLQFilteredOrdered :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> [JobFilter] -> Maybe DLQSortColumn -> Maybe SortDir -> Int -> Int -> m [DLQJob payload]
- countDLQFiltered :: MonadArbiter m => SchemaName -> TableName -> [JobFilter] -> m Int64
- listJobs :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Int -> Int -> m [JobRead payload]
- getJobById :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Int64 -> m (Maybe (JobRead payload))
- getJobByDedupKey :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Text -> m (Maybe (JobRead payload))
- getJobsByGroup :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Text -> Int -> Int -> m [JobRead payload]
- cancelJob :: MonadArbiter m => Text -> TableName -> Int64 -> m Int64
- cancelJobsBatch :: MonadArbiter m => Text -> TableName -> [Int64] -> m Int64
- promoteJob :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m Int64
- data QueueStats = QueueStats {}
- getQueueStats :: MonadArbiter m => SchemaName -> TableName -> m QueueStats
- countJobs :: MonadArbiter m => SchemaName -> TableName -> m Int64
- countJobsByGroup :: MonadArbiter m => SchemaName -> TableName -> Text -> m Int64
- countDLQJobs :: MonadArbiter m => SchemaName -> TableName -> m Int64
- getJobsByParent :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> TableName -> Int64 -> Int -> Int -> m [JobRead payload]
- countJobsByParent :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m Int64
- countChildrenBatch :: MonadArbiter m => SchemaName -> TableName -> [Int64] -> m (Map Int64 (Int64, Int64))
- countDLQChildrenBatch :: MonadArbiter m => SchemaName -> TableName -> [Int64] -> m (Map Int64 Int64)
- pauseChildren :: MonadArbiter m => Text -> TableName -> Int64 -> m Int64
- resumeChildren :: MonadArbiter m => Text -> TableName -> Int64 -> m Int64
- cancelJobCascade :: MonadArbiter m => Text -> TableName -> Int64 -> m Int64
- cancelJobTree :: MonadArbiter m => Text -> TableName -> Int64 -> m Int64
- suspendJob :: MonadArbiter m => Text -> TableName -> Int64 -> m Int64
- resumeJob :: MonadArbiter m => Text -> TableName -> Int64 -> m Int64
- refreshGroups :: MonadArbiter m => SchemaName -> TableName -> Int -> m ()
- upsertCronDefault :: MonadArbiter m => SchemaName -> Text -> Text -> Text -> Maybe Text -> m Int64
- listCronSchedules :: MonadArbiter m => SchemaName -> m [CronScheduleRow]
- getCronScheduleByName :: MonadArbiter m => SchemaName -> Text -> m (Maybe CronScheduleRow)
- updateCronSchedule :: MonadArbiter m => SchemaName -> Text -> CronScheduleUpdate -> m Int64
- deleteStaleCronSchedules :: MonadArbiter m => SchemaName -> [Text] -> m Int64
- touchCronLastFired :: MonadArbiter m => SchemaName -> Text -> m Int64
- touchCronChecked :: MonadArbiter m => SchemaName -> UTCTime -> [Text] -> m Int64
- tryFireCronGate :: MonadArbiter m => SchemaName -> Text -> UTCTime -> m Bool
- tryAcquireCronLeader :: MonadArbiter m => SchemaName -> Text -> m Bool
- getParentStateSnapshot :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m (Maybe Value)
- readChildResultsRaw :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text)
- mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value)
Job Insertion
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> JobWrite payload | |
| -> m (Maybe (JobRead payload)) |
Inserts a job into the queue.
Returns the inserted job with database-generated fields populated.
Ordering and concurrency
Standalone (ungrouped) jobs are claimed in (priority ASC, id ASC) order.
Grouped jobs are claimed one-per-group, with the head of each group chosen
by (attempts DESC, priority ASC, id ASC). The attempts DESC prefix
prioritises a chronically-failing job so it either succeeds or reaches
maxAttempts and moves to the DLQ, rather than starving fresh jobs behind
it indefinitely. Concurrent inserts to the same group are serialized at
the trigger level: the AFTER INSERT trigger's ON CONFLICT DO UPDATE on
the groups table takes a row-level lock, preventing out-of-order commits
within a group.
Deduplication
Nothing: Always insert (dedup_key is NULL)Just (IgnoreDuplicate k): Skip if dedup_key exists, return NothingJust (ReplaceDuplicate k): Replace existing job unless actively in-flight on its first attempt. Returns Nothing only whenattempts > 0,not_visible_until > NOW(), andlast_error IS NULL(i.e., the job is being processed for the first time). Jobs that have previously failed (last_error IS NOT NULL) can always be replaced, even if currently in-flight on a retry attempt - this is by design, so that a fresh replacement takes priority over a failing job.
parentId is validated: if set to a non-existent job ID, returns Nothing.
For building parent-child trees, prefer insertJobTree which handles
parentId, isRollup, and suspended atomically.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> JobWrite payload | |
| -> m (Maybe (JobRead payload)) |
Insert a job without validating that the parent exists.
This is an internal fast path for callers that already guarantee the parent
is present (e.g. insertJobTree). External callers
should use insertJob which validates the parent first.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [JobWrite payload] | Jobs to insert |
| -> m [JobRead payload] |
Insert multiple jobs in a single batch operation.
Supports dedup keys: IgnoreDuplicate jobs are silently skipped on
conflict, ReplaceDuplicate jobs update the existing row (unless
actively in-flight). Only actually inserted or replaced jobs are returned.
If multiple jobs in the batch share the same dedup key, only the last
occurrence is kept (last writer wins), consistent with sequential
insertJob calls.
Does not validate parentId - callers must ensure referenced parents
exist. For parent-child trees, use insertJobTree instead.
insertJobsBatch_ :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> [JobWrite payload] -> m Int64 Source #
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | Parent job ID |
| -> Int64 | Child job ID |
| -> Value | Encoded result value |
| -> m Int64 |
Insert a child's result into the results table.
Each child gets its own row keyed by (parent_id, child_id).
The FK ON DELETE CASCADE ensures cleanup when the parent is acked.
Returns the number of rows inserted (1 on success).
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | Parent job ID |
| -> m (Map Int64 Value) |
Get all child results for a parent from the results table.
Returns a Map from child ID to the result Value.
getDLQChildErrorsByParent Source #
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | Parent job ID |
| -> m (Map Int64 Text) |
Get DLQ child errors for a parent.
Returns a Map from child job ID to the last error message.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | Job ID |
| -> Value | The pre-populated parent state to persist |
| -> m Int64 |
Snapshot results into parent_state before DLQ move.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int | Maximum number of jobs to claim |
| -> NominalDiffTime | Visibility timeout in seconds |
| -> m [JobRead payload] |
Claim up to maxJobs visible jobs, respecting head-of-line blocking
(one job per group). Uses a single-CTE claim with the groups table.
claimNextVisibleJobsBatched Source #
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int | Batch size per group (how many jobs to claim from each group) |
| -> Int | Maximum number of groups/batches to claim |
| -> NominalDiffTime | Visibility timeout in seconds |
| -> m [NonEmpty (JobRead payload)] |
Batched variant of claimNextVisibleJobs - claims up to batchSize jobs
per group, across up to maxBatches groups.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> JobRead payload | |
| -> m Int64 |
Acknowledge a job as completed (smart ack).
Deletes standalone jobs; suspends parents waiting for children; wakes parents when the last sibling completes. Uses an advisory lock for child jobs to serialize with concurrent sibling acks.
Returns 1 on success, 0 if the job was already gone.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [JobRead payload] | |
| -> m Int64 |
Acknowledge multiple jobs as completed.
Iterates calling ackJob so every job gets smart-ack treatment
(parent suspend/wake logic).
Returns the total number of rows affected.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [JobRead payload] | |
| -> m Int64 |
Bulk ack for standalone jobs (no parent, no tree logic).
A single DELETE with unnest - one round trip for N jobs.
Only valid for jobs claimed in BatchedJobsMode, which guarantees
parent_id IS NULL AND parent_state IS NULL.
Returns the number of rows deleted.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> NominalDiffTime | Timeout in seconds |
| -> JobRead payload | |
| -> m Int64 | Returns the number of rows updated (0 if job was reclaimed by another worker) |
Set the visibility timeout for a job
setVisibilityTimeoutBatch Source #
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> NominalDiffTime | Timeout in seconds |
| -> [JobRead payload] | |
| -> m [VisibilityUpdateInfo] | Returns a list of status records, one for each job that was targeted. |
Batch variant of setVisibilityTimeout. Returns per-job status
(success, acked, or stolen).
data VisibilityUpdateInfo Source #
Detailed information about the result of a visibility update operation for a single job.
Constructors
| VisibilityUpdateInfo | |
Fields
| |
Instances
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> NominalDiffTime | Backoff timeout in seconds |
| -> Text | Error message |
| -> JobRead payload | |
| -> m Int64 |
Update a job for retry with backoff and error tracking
Returns the number of rows updated (0 if job was already claimed by another worker).
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Text | Error message (the final error that caused the DLQ move) |
| -> JobRead payload | |
| -> m Int64 |
Move a job to the DLQ. Cascades descendants for rollup parents. Wakes the parent if this was a child job.
Returns 0 if the job was already claimed by another worker.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [(JobRead payload, Text)] | List of (job, error message) pairs |
| -> m Int64 |
Moves multiple jobs from the main queue to the dead-letter queue.
Each job is moved with its own error message. Jobs that have already been claimed by another worker (attempts mismatch) are silently skipped.
Returns the total number of jobs moved to DLQ.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | DLQ job ID |
| -> m (Maybe (JobRead payload)) |
Retry a job from the DLQ (re-inserts with attempts reset to 0). The dedup_key is NOT restored - retried jobs won't conflict with new dedup inserts.
dlqJobExists :: MonadArbiter m => Text -> Text -> Int64 -> m Bool Source #
Check whether a DLQ job exists by ID.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int | Limit |
| -> Int | Offset |
| -> m [DLQJob payload] |
List jobs in the dead letter queue
Returns jobs ordered by failed_at (most recent first).
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | Parent job ID |
| -> Int | Limit |
| -> Int | Offset |
| -> m [DLQJob payload] |
List DLQ jobs filtered by parent_id.
Returns jobs ordered by failed_at (most recent first).
countDLQJobsByParent :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m Int64 Source #
Count DLQ jobs matching a parent_id.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | DLQ job ID |
| -> m Int64 |
Delete a job from the DLQ. If the job was a child, tries to resume the parent when no siblings remain.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [Int64] | DLQ job IDs |
| -> m Int64 |
Delete multiple jobs from the dead letter queue.
If any deleted jobs were children, tries to resume their parents when no siblings remain. Parent IDs are deduplicated and sorted to prevent deadlocks between concurrent batch deletes.
Returns the total number of DLQ jobs deleted.
Filtered Query Operations
Filter predicates for job listing queries.
Constructors
| FilterGroupKey Text | |
| FilterParentId Int64 | |
| FilterSuspended Bool | |
| FilterInFlight | |
| FilterRootsOnly |
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [JobFilter] | Composable filters |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
List jobs with composable filters.
Returns jobs ordered by ID (descending, newest first).
listJobsFilteredOrdered Source #
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [JobFilter] | Composable filters |
| -> Maybe JobSortColumn | Sort column (defaults to |
| -> Maybe SortDir | Sort direction (defaults to |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
List jobs with composable filters and an explicit sort spec.
Nothing for both sort args yields the default ordering (id DESC).
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [JobFilter] | Composable filters |
| -> m Int64 |
Count jobs with composable filters.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [JobFilter] | Composable filters |
| -> Int | Limit |
| -> Int | Offset |
| -> m [DLQJob payload] |
List DLQ jobs with composable filters.
Returns jobs ordered by failed_at (most recent first).
listDLQFilteredOrdered Source #
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [JobFilter] | Composable filters |
| -> Maybe DLQSortColumn | Sort column (defaults to |
| -> Maybe SortDir | Sort direction (defaults to |
| -> Int | Limit |
| -> Int | Offset |
| -> m [DLQJob payload] |
List DLQ jobs with composable filters and an explicit sort spec.
Nothing for both sort args yields the default ordering
(failed_at DESC).
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> [JobFilter] | Composable filters |
| -> m Int64 |
Count DLQ jobs with composable filters.
Admin Operations
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
List jobs in the queue with pagination.
Returns jobs ordered by ID (descending).
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | Job ID |
| -> m (Maybe (JobRead payload)) |
Get a single job by its ID
getJobByDedupKey :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Text -> m (Maybe (JobRead payload)) Source #
Get a single job by its dedup key.
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Text | Group key to filter by |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
Get all jobs for a specific group key.
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> TableName | Table name |
| -> Int64 | Job ID |
| -> m Int64 |
Cancels (deletes) a job by ID.
Returns 0 if the job has children - use cancelJobCascade to delete
a parent and all its descendants.
If the deleted job was a child and no siblings remain, the parent is resumed for its completion round.
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> TableName | Table name |
| -> [Int64] | Job IDs |
| -> m Int64 |
Cancels (deletes) multiple jobs by ID.
Each job gets full wake-parent logic (same as cancelJob).
Wrapped in a transaction so that cancelling multiple children of the
same parent sees a consistent view - the last cancel's CTE correctly
detects no remaining siblings and resumes the parent.
Returns the total number of jobs deleted.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | Job ID |
| -> m Int64 | Number of rows updated |
Promote a delayed or retrying job to be immediately visible.
Refuses in-flight jobs (attempts > 0 with no last_error). Returns 1 on success, 0 if not found, already visible, or in-flight.
data QueueStats Source #
Statistics about the job queue
Constructors
| QueueStats | |
Fields
| |
Instances
| FromJSON QueueStats Source # | |||||
Defined in Arbiter.Core.Operations | |||||
| ToJSON QueueStats Source # | |||||
Defined in Arbiter.Core.Operations Methods toJSON :: QueueStats -> Value toEncoding :: QueueStats -> Encoding toJSONList :: [QueueStats] -> Value toEncodingList :: [QueueStats] -> Encoding omitField :: QueueStats -> Bool | |||||
| Generic QueueStats Source # | |||||
Defined in Arbiter.Core.Operations Associated Types
| |||||
| Show QueueStats Source # | |||||
Defined in Arbiter.Core.Operations Methods showsPrec :: Int -> QueueStats -> ShowS # show :: QueueStats -> String # showList :: [QueueStats] -> ShowS # | |||||
| Eq QueueStats Source # | |||||
Defined in Arbiter.Core.Operations | |||||
| type Rep QueueStats Source # | |||||
Defined in Arbiter.Core.Operations type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double))))) | |||||
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> m QueueStats |
Get statistics about the job queue
Count Operations
countJobs :: MonadArbiter m => SchemaName -> TableName -> m Int64 Source #
Count all jobs in a table
countJobsByGroup :: MonadArbiter m => SchemaName -> TableName -> Text -> m Int64 Source #
Count jobs matching a group key
countDLQJobs :: MonadArbiter m => SchemaName -> TableName -> m Int64 Source #
Count DLQ jobs
Parent-Child Operations
Arguments
| :: forall m payload. (JobPayload payload, MonadArbiter m) | |
| => Text | Schema name |
| -> TableName | Table name |
| -> Int64 | Parent ID |
| -> Int | Limit |
| -> Int | Offset |
| -> m [JobRead payload] |
List jobs filtered by parent_id with pagination.
countJobsByParent :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m Int64 Source #
Count jobs matching a parent_id.
countChildrenBatch :: MonadArbiter m => SchemaName -> TableName -> [Int64] -> m (Map Int64 (Int64, Int64)) Source #
Count children for a batch of potential parent IDs.
Returns a Map from parent_id to (total, paused) counts (only non-zero entries).
countDLQChildrenBatch :: MonadArbiter m => SchemaName -> TableName -> [Int64] -> m (Map Int64 Int64) Source #
Count children in the DLQ for a batch of potential parent IDs.
Returns a Map from parent_id to DLQ child count (only non-zero entries).
Job Dependency Operations
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> TableName | Table name |
| -> Int64 | Parent job ID |
| -> m Int64 |
Pause all visible children of a parent job.
Sets suspended = TRUE for claimable children, making them unclaimable. In-flight children (currently being processed by workers) are left alone so their visibility timeout can expire normally if the worker crashes. Returns the number of children paused.
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> TableName | Table name |
| -> Int64 | Parent job ID |
| -> m Int64 |
Resume all suspended children of a parent job.
Only affects children whose suspended = TRUE. Returns the number of children resumed.
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> TableName | Table name |
| -> Int64 | Root job ID |
| -> m Int64 |
Cancel a job and all its descendants recursively.
Uses a recursive CTE to find all descendants and deletes them all. If the root job itself is a child, resumes its parent for a completion round. Returns the total number of jobs deleted (parent + all descendants).
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> TableName | Table name |
| -> Int64 | Any job ID within the tree |
| -> m Int64 |
Cancel an entire job tree by walking up from any node to the root, then cascade-deleting everything from the root down.
Unlike cancelJobCascade, this does NOT call tryResumeParent - the root
by definition has no parent. Returns the total number of jobs deleted.
Suspend/Resume Operations
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> TableName | Table name |
| -> Int64 | Job ID |
| -> m Int64 |
Suspend a job, making it unclaimable.
Only suspends non-in-flight jobs (not currently being processed by workers). Returns the number of rows updated (0 if job doesn't exist, is in-flight, or already suspended).
Arguments
| :: MonadArbiter m | |
| => Text | Schema name |
| -> TableName | Table name |
| -> Int64 | Job ID |
| -> m Int64 |
Resume a suspended job, making it claimable again.
Returns the number of rows updated (0 if job doesn't exist or isn't suspended).
Groups Table Operations
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int | Minimum interval between runs (seconds) |
| -> m () |
Full recompute of the groups table from the main queue.
Corrects any drift in job_count, min_priority, min_id, and in_flight_until. Checks the reaper sequence to skip if a recent run occurred within the given interval. Uses an advisory lock to serialize concurrent attempts, then locks all groups rows to prevent trigger interleaving.
Cron Schedule Operations
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> Text | Schedule name |
| -> Text | Default cron expression |
| -> Text | Default overlap policy |
| -> Maybe Text | Default IANA tz name ( |
| -> m Int64 |
Upsert a cron schedule's default expression and overlap policy.
Preserves user overrides and enabled state.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> m [CronScheduleRow] |
List all cron schedules ordered by name.
getCronScheduleByName Source #
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> Text | Schedule name |
| -> m (Maybe CronScheduleRow) |
Get a single cron schedule by name.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> Text | Schedule name |
| -> CronScheduleUpdate | |
| -> m Int64 |
Update a cron schedule (patch semantics).
Returns the number of rows affected (0 = not found, 1 = updated).
deleteStaleCronSchedules Source #
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> [Text] | Schedule names to keep |
| -> m Int64 |
Delete schedules whose names are not in the given list.
Returns the number of rows deleted. Does nothing if the list is empty.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> Text | Schedule name |
| -> m Int64 |
Update last_fired_at to NOW() for a cron schedule.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> UTCTime | Watermark (the minute the scheduler is advancing to) |
| -> [Text] | Schedule names |
| -> m Int64 |
Advance last_checked_at to the supplied watermark for the given cron
schedule names. The watermark must be the minute boundary the scheduler
finished evaluating, not NOW(). A wrapping GREATEST in the SQL keeps
the column monotonic when concurrent worker pools race.
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> Text | Schedule name |
| -> UTCTime | Minute floor for the tick being attempted |
| -> m Bool |
Arguments
| :: MonadArbiter m | |
| => SchemaName | |
| -> Text | Schedule name |
| -> m Bool |
Try to acquire the (schema, name) cron leader lock. Must be inside a transaction.
Internal Operations
getParentStateSnapshot Source #
Arguments
| :: MonadArbiter m | |
| => SchemaName | Schema name |
| -> TableName | Table name |
| -> Int64 | Job ID |
| -> m (Maybe Value) |
Read the raw parent_state snapshot from the DB.
Internal operation used by the worker for DLQ-retried finalizers that have a persisted snapshot.