arbiter-core-0.1.0.0: Core types and logic for PostgreSQL-backed job queue
Safe HaskellNone
LanguageGHC2024

Arbiter.Core.Operations

Synopsis

Job Insertion

insertJob Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> JobWrite payload 
-> m (Maybe (JobRead payload)) 

Inserts a job into the queue.

Returns the inserted job with database-generated fields populated.

Ordering and concurrency

Standalone (ungrouped) jobs are claimed in (priority ASC, id ASC) order. Grouped jobs are claimed one-per-group, with the head of each group chosen by (attempts DESC, priority ASC, id ASC). The attempts DESC prefix prioritises a chronically-failing job so it either succeeds or reaches maxAttempts and moves to the DLQ, rather than starving fresh jobs behind it indefinitely. Concurrent inserts to the same group are serialized at the trigger level: the AFTER INSERT trigger's ON CONFLICT DO UPDATE on the groups table takes a row-level lock, preventing out-of-order commits within a group.

Deduplication

  • Nothing: Always insert (dedup_key is NULL)
  • Just (IgnoreDuplicate k): Skip if dedup_key exists, return Nothing
  • Just (ReplaceDuplicate k): Replace existing job unless actively in-flight on its first attempt. Returns Nothing only when attempts > 0, not_visible_until > NOW(), and last_error IS NULL (i.e., the job is being processed for the first time). Jobs that have previously failed (last_error IS NOT NULL) can always be replaced, even if currently in-flight on a retry attempt - this is by design, so that a fresh replacement takes priority over a failing job.

parentId is validated: if set to a non-existent job ID, returns Nothing. For building parent-child trees, prefer insertJobTree which handles parentId, isRollup, and suspended atomically.

insertJobUnsafe Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> JobWrite payload 
-> m (Maybe (JobRead payload)) 

Insert a job without validating that the parent exists.

This is an internal fast path for callers that already guarantee the parent is present (e.g. insertJobTree). External callers should use insertJob which validates the parent first.

insertJobsBatch Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> [JobWrite payload]

Jobs to insert

-> m [JobRead payload] 

Insert multiple jobs in a single batch operation.

Supports dedup keys: IgnoreDuplicate jobs are silently skipped on conflict, ReplaceDuplicate jobs update the existing row (unless actively in-flight). Only actually inserted or replaced jobs are returned.

If multiple jobs in the batch share the same dedup key, only the last occurrence is kept (last writer wins), consistent with sequential insertJob calls.

Does not validate parentId - callers must ensure referenced parents exist. For parent-child trees, use insertJobTree instead.

insertJobsBatch_ :: forall m payload. (JobPayload payload, MonadArbiter m) => Text -> Text -> [JobWrite payload] -> m Int64 Source #

insertResult Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

Parent job ID

-> Int64

Child job ID

-> Value

Encoded result value

-> m Int64 

Insert a child's result into the results table.

Each child gets its own row keyed by (parent_id, child_id). The FK ON DELETE CASCADE ensures cleanup when the parent is acked.

Returns the number of rows inserted (1 on success).

getResultsByParent Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

Parent job ID

-> m (Map Int64 Value) 

Get all child results for a parent from the results table.

Returns a Map from child ID to the result Value.

getDLQChildErrorsByParent Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

Parent job ID

-> m (Map Int64 Text) 

Get DLQ child errors for a parent.

Returns a Map from child job ID to the last error message.

persistParentState Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

Job ID

-> Value

The pre-populated parent state to persist

-> m Int64 

Snapshot results into parent_state before DLQ move.

claimNextVisibleJobs Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> Int

Maximum number of jobs to claim

-> NominalDiffTime

Visibility timeout in seconds

-> m [JobRead payload] 

Claim up to maxJobs visible jobs, respecting head-of-line blocking (one job per group). Uses a single-CTE claim with the groups table.

claimNextVisibleJobsBatched Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> Int

Batch size per group (how many jobs to claim from each group)

-> Int

Maximum number of groups/batches to claim

-> NominalDiffTime

Visibility timeout in seconds

-> m [NonEmpty (JobRead payload)] 

Batched variant of claimNextVisibleJobs - claims up to batchSize jobs per group, across up to maxBatches groups.

ackJob Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> JobRead payload 
-> m Int64 

Acknowledge a job as completed (smart ack).

Deletes standalone jobs; suspends parents waiting for children; wakes parents when the last sibling completes. Uses an advisory lock for child jobs to serialize with concurrent sibling acks.

Returns 1 on success, 0 if the job was already gone.

ackJobsBatch Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> [JobRead payload] 
-> m Int64 

Acknowledge multiple jobs as completed.

Iterates calling ackJob so every job gets smart-ack treatment (parent suspend/wake logic). Returns the total number of rows affected.

ackJobsBulk Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> [JobRead payload] 
-> m Int64 

Bulk ack for standalone jobs (no parent, no tree logic).

A single DELETE with unnest - one round trip for N jobs. Only valid for jobs claimed in BatchedJobsMode, which guarantees parent_id IS NULL AND parent_state IS NULL.

Returns the number of rows deleted.

setVisibilityTimeout Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> NominalDiffTime

Timeout in seconds

-> JobRead payload 
-> m Int64

Returns the number of rows updated (0 if job was reclaimed by another worker)

Set the visibility timeout for a job

setVisibilityTimeoutBatch Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> NominalDiffTime

Timeout in seconds

-> [JobRead payload] 
-> m [VisibilityUpdateInfo]

Returns a list of status records, one for each job that was targeted.

Batch variant of setVisibilityTimeout. Returns per-job status (success, acked, or stolen).

data VisibilityUpdateInfo Source #

Detailed information about the result of a visibility update operation for a single job.

Constructors

VisibilityUpdateInfo 

Fields

Instances

Instances details
Generic VisibilityUpdateInfo Source # 
Instance details

Defined in Arbiter.Core.Operations

Associated Types

type Rep VisibilityUpdateInfo 
Instance details

Defined in Arbiter.Core.Operations

type Rep VisibilityUpdateInfo = D1 ('MetaData "VisibilityUpdateInfo" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "VisibilityUpdateInfo" 'PrefixI 'True) (S1 ('MetaSel ('Just "vuiJobId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: (S1 ('MetaSel ('Just "vuiWasUpdated") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool) :*: S1 ('MetaSel ('Just "vuiCurrentDbAttempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int32)))))
Show VisibilityUpdateInfo Source # 
Instance details

Defined in Arbiter.Core.Operations

Eq VisibilityUpdateInfo Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep VisibilityUpdateInfo Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep VisibilityUpdateInfo = D1 ('MetaData "VisibilityUpdateInfo" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "VisibilityUpdateInfo" 'PrefixI 'True) (S1 ('MetaSel ('Just "vuiJobId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: (S1 ('MetaSel ('Just "vuiWasUpdated") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool) :*: S1 ('MetaSel ('Just "vuiCurrentDbAttempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int32)))))

updateJobForRetry Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> NominalDiffTime

Backoff timeout in seconds

-> Text

Error message

-> JobRead payload 
-> m Int64 

Update a job for retry with backoff and error tracking

Returns the number of rows updated (0 if job was already claimed by another worker).

moveToDLQ Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Text

Error message (the final error that caused the DLQ move)

-> JobRead payload 
-> m Int64 

Move a job to the DLQ. Cascades descendants for rollup parents. Wakes the parent if this was a child job.

Returns 0 if the job was already claimed by another worker.

moveToDLQBatch Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> [(JobRead payload, Text)]

List of (job, error message) pairs

-> m Int64 

Moves multiple jobs from the main queue to the dead-letter queue.

Each job is moved with its own error message. Jobs that have already been claimed by another worker (attempts mismatch) are silently skipped.

Returns the total number of jobs moved to DLQ.

retryFromDLQ Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

DLQ job ID

-> m (Maybe (JobRead payload)) 

Retry a job from the DLQ (re-inserts with attempts reset to 0). The dedup_key is NOT restored - retried jobs won't conflict with new dedup inserts.

dlqJobExists :: MonadArbiter m => Text -> Text -> Int64 -> m Bool Source #

Check whether a DLQ job exists by ID.

listDLQJobs Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> Int

Limit

-> Int

Offset

-> m [DLQJob payload] 

List jobs in the dead letter queue

Returns jobs ordered by failed_at (most recent first).

listDLQJobsByParent Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

Parent job ID

-> Int

Limit

-> Int

Offset

-> m [DLQJob payload] 

List DLQ jobs filtered by parent_id.

Returns jobs ordered by failed_at (most recent first).

countDLQJobsByParent :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m Int64 Source #

Count DLQ jobs matching a parent_id.

deleteDLQJob Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

DLQ job ID

-> m Int64 

Delete a job from the DLQ. If the job was a child, tries to resume the parent when no siblings remain.

deleteDLQJobsBatch Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> [Int64]

DLQ job IDs

-> m Int64 

Delete multiple jobs from the dead letter queue.

If any deleted jobs were children, tries to resume their parents when no siblings remain. Parent IDs are deduplicated and sorted to prevent deadlocks between concurrent batch deletes.

Returns the total number of DLQ jobs deleted.

Filtered Query Operations

data JobFilter Source #

Filter predicates for job listing queries.

Instances

Instances details
Show JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

Eq JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

listJobsFiltered Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> [JobFilter]

Composable filters

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

List jobs with composable filters.

Returns jobs ordered by ID (descending, newest first).

listJobsFilteredOrdered Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> [JobFilter]

Composable filters

-> Maybe JobSortColumn

Sort column (defaults to JsId)

-> Maybe SortDir

Sort direction (defaults to SortDesc)

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

List jobs with composable filters and an explicit sort spec.

Nothing for both sort args yields the default ordering (id DESC).

countJobsFiltered Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> [JobFilter]

Composable filters

-> m Int64 

Count jobs with composable filters.

listDLQFiltered Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> [JobFilter]

Composable filters

-> Int

Limit

-> Int

Offset

-> m [DLQJob payload] 

List DLQ jobs with composable filters.

Returns jobs ordered by failed_at (most recent first).

listDLQFilteredOrdered Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> [JobFilter]

Composable filters

-> Maybe DLQSortColumn

Sort column (defaults to DlqFailedAt)

-> Maybe SortDir

Sort direction (defaults to SortDesc)

-> Int

Limit

-> Int

Offset

-> m [DLQJob payload] 

List DLQ jobs with composable filters and an explicit sort spec.

Nothing for both sort args yields the default ordering (failed_at DESC).

countDLQFiltered Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> [JobFilter]

Composable filters

-> m Int64 

Count DLQ jobs with composable filters.

Admin Operations

listJobs Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

List jobs in the queue with pagination.

Returns jobs ordered by ID (descending).

getJobById Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

Job ID

-> m (Maybe (JobRead payload)) 

Get a single job by its ID

getJobByDedupKey :: forall m payload. (JobPayload payload, MonadArbiter m) => SchemaName -> TableName -> Text -> m (Maybe (JobRead payload)) Source #

Get a single job by its dedup key.

getJobsByGroup Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> SchemaName

Schema name

-> TableName

Table name

-> Text

Group key to filter by

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

Get all jobs for a specific group key.

cancelJob Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> TableName

Table name

-> Int64

Job ID

-> m Int64 

Cancels (deletes) a job by ID.

Returns 0 if the job has children - use cancelJobCascade to delete a parent and all its descendants.

If the deleted job was a child and no siblings remain, the parent is resumed for its completion round.

cancelJobsBatch Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> TableName

Table name

-> [Int64]

Job IDs

-> m Int64 

Cancels (deletes) multiple jobs by ID.

Each job gets full wake-parent logic (same as cancelJob). Wrapped in a transaction so that cancelling multiple children of the same parent sees a consistent view - the last cancel's CTE correctly detects no remaining siblings and resumes the parent. Returns the total number of jobs deleted.

promoteJob Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

Job ID

-> m Int64

Number of rows updated

Promote a delayed or retrying job to be immediately visible.

Refuses in-flight jobs (attempts > 0 with no last_error). Returns 1 on success, 0 if not found, already visible, or in-flight.

data QueueStats Source #

Statistics about the job queue

Constructors

QueueStats 

Fields

Instances

Instances details
FromJSON QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Methods

parseJSON :: Value -> Parser QueueStats

parseJSONList :: Value -> Parser [QueueStats]

omittedField :: Maybe QueueStats

ToJSON QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Methods

toJSON :: QueueStats -> Value

toEncoding :: QueueStats -> Encoding

toJSONList :: [QueueStats] -> Value

toEncodingList :: [QueueStats] -> Encoding

omitField :: QueueStats -> Bool

Generic QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Associated Types

type Rep QueueStats 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double)))))
Show QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Eq QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double)))))

getQueueStats Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> m QueueStats 

Get statistics about the job queue

Count Operations

countJobs :: MonadArbiter m => SchemaName -> TableName -> m Int64 Source #

Count all jobs in a table

countJobsByGroup :: MonadArbiter m => SchemaName -> TableName -> Text -> m Int64 Source #

Count jobs matching a group key

Parent-Child Operations

getJobsByParent Source #

Arguments

:: forall m payload. (JobPayload payload, MonadArbiter m) 
=> Text

Schema name

-> TableName

Table name

-> Int64

Parent ID

-> Int

Limit

-> Int

Offset

-> m [JobRead payload] 

List jobs filtered by parent_id with pagination.

countJobsByParent :: MonadArbiter m => SchemaName -> TableName -> Int64 -> m Int64 Source #

Count jobs matching a parent_id.

countChildrenBatch :: MonadArbiter m => SchemaName -> TableName -> [Int64] -> m (Map Int64 (Int64, Int64)) Source #

Count children for a batch of potential parent IDs.

Returns a Map from parent_id to (total, paused) counts (only non-zero entries).

countDLQChildrenBatch :: MonadArbiter m => SchemaName -> TableName -> [Int64] -> m (Map Int64 Int64) Source #

Count children in the DLQ for a batch of potential parent IDs.

Returns a Map from parent_id to DLQ child count (only non-zero entries).

Job Dependency Operations

pauseChildren Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> TableName

Table name

-> Int64

Parent job ID

-> m Int64 

Pause all visible children of a parent job.

Sets suspended = TRUE for claimable children, making them unclaimable. In-flight children (currently being processed by workers) are left alone so their visibility timeout can expire normally if the worker crashes. Returns the number of children paused.

resumeChildren Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> TableName

Table name

-> Int64

Parent job ID

-> m Int64 

Resume all suspended children of a parent job.

Only affects children whose suspended = TRUE. Returns the number of children resumed.

cancelJobCascade Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> TableName

Table name

-> Int64

Root job ID

-> m Int64 

Cancel a job and all its descendants recursively.

Uses a recursive CTE to find all descendants and deletes them all. If the root job itself is a child, resumes its parent for a completion round. Returns the total number of jobs deleted (parent + all descendants).

cancelJobTree Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> TableName

Table name

-> Int64

Any job ID within the tree

-> m Int64 

Cancel an entire job tree by walking up from any node to the root, then cascade-deleting everything from the root down.

Unlike cancelJobCascade, this does NOT call tryResumeParent - the root by definition has no parent. Returns the total number of jobs deleted.

Suspend/Resume Operations

suspendJob Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> TableName

Table name

-> Int64

Job ID

-> m Int64 

Suspend a job, making it unclaimable.

Only suspends non-in-flight jobs (not currently being processed by workers). Returns the number of rows updated (0 if job doesn't exist, is in-flight, or already suspended).

resumeJob Source #

Arguments

:: MonadArbiter m 
=> Text

Schema name

-> TableName

Table name

-> Int64

Job ID

-> m Int64 

Resume a suspended job, making it claimable again.

Returns the number of rows updated (0 if job doesn't exist or isn't suspended).

Groups Table Operations

refreshGroups Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Int

Minimum interval between runs (seconds)

-> m () 

Full recompute of the groups table from the main queue.

Corrects any drift in job_count, min_priority, min_id, and in_flight_until. Checks the reaper sequence to skip if a recent run occurred within the given interval. Uses an advisory lock to serialize concurrent attempts, then locks all groups rows to prevent trigger interleaving.

Cron Schedule Operations

upsertCronDefault Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> Text

Schedule name

-> Text

Default cron expression

-> Text

Default overlap policy

-> Maybe Text

Default IANA tz name (Nothing = UTC).

-> m Int64 

Upsert a cron schedule's default expression and overlap policy.

Preserves user overrides and enabled state.

listCronSchedules Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> m [CronScheduleRow] 

List all cron schedules ordered by name.

getCronScheduleByName Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> Text

Schedule name

-> m (Maybe CronScheduleRow) 

Get a single cron schedule by name.

updateCronSchedule Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> Text

Schedule name

-> CronScheduleUpdate 
-> m Int64 

Update a cron schedule (patch semantics).

Returns the number of rows affected (0 = not found, 1 = updated).

deleteStaleCronSchedules Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> [Text]

Schedule names to keep

-> m Int64 

Delete schedules whose names are not in the given list.

Returns the number of rows deleted. Does nothing if the list is empty.

touchCronLastFired Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> Text

Schedule name

-> m Int64 

Update last_fired_at to NOW() for a cron schedule.

touchCronChecked Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> UTCTime

Watermark (the minute the scheduler is advancing to)

-> [Text]

Schedule names

-> m Int64 

Advance last_checked_at to the supplied watermark for the given cron schedule names. The watermark must be the minute boundary the scheduler finished evaluating, not NOW(). A wrapping GREATEST in the SQL keeps the column monotonic when concurrent worker pools race.

tryFireCronGate Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> Text

Schedule name

-> UTCTime

Minute floor for the tick being attempted

-> m Bool 

Claim a minute floor for a schedule. True = caller proceeds with the insert. False = another pool already fired this minute, skip.

tryAcquireCronLeader Source #

Arguments

:: MonadArbiter m 
=> SchemaName 
-> Text

Schedule name

-> m Bool 

Try to acquire the (schema, name) cron leader lock. Must be inside a transaction.

Internal Operations

getParentStateSnapshot Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

Job ID

-> m (Maybe Value) 

Read the raw parent_state snapshot from the DB.

Internal operation used by the worker for DLQ-retried finalizers that have a persisted snapshot.

readChildResultsRaw Source #

Arguments

:: MonadArbiter m 
=> SchemaName

Schema name

-> TableName

Table name

-> Int64

Parent job ID

-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text) 

Read child results, DLQ errors, parent_state snapshot, and DLQ failures for a rollup finalizer in a single query.

mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value) Source #

Merge raw child results from three sources.

Precedence (left-biased union): DLQ errors > results > snapshot.