arbiter-core-0.1.0.0: Core types and logic for PostgreSQL-backed job queue
Safe HaskellNone
LanguageGHC2024

Arbiter.Core.HighLevel

Description

High-level API for job queue operations.

Table names are automatically extracted from the payload type using the registry. Compile-time checks ensure payloads are only used with registered tables.

Synopsis

Constraint Aliases

type QueueOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, KnownSymbol (TableForPayload payload registry), MonadArbiter m) Source #

Constraints for queue operations (requires table name lookup from registry).

type JobOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, MonadArbiter m) Source #

Constraints for job operations (table name stored in job).

Job Operations

insertJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => JobWrite payload -> m (Maybe (JobRead payload)) Source #

Inserts a job into the queue.

Returns Nothing if a job with the same deduplication key already exists (with IgnoreDuplicate strategy).

insertJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m [JobRead payload] Source #

Insert multiple jobs in a single batch operation.

Supports dedup keys: within the batch, duplicate keys are resolved (last ReplaceDuplicate wins), and against existing rows via ON CONFLICT.

Does not validate parentId. Use insertJobTree for parent-child relationships.

insertJobsBatch_ :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m Int64 Source #

claimNextVisibleJobs Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Maximum number of jobs to claim.

-> NominalDiffTime

How long the claimed jobs should remain invisible (in seconds).

-> m [JobRead payload] 

Claims visible jobs from the queue. At most one job per group is claimed to enforce head-of-line blocking.

claimNextVisibleJobsBatched Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Batch size: maximum number of jobs to claim per group.

-> Int

Max groups: maximum number of groups/batches to claim.

-> NominalDiffTime

How long the claimed jobs should remain invisible (in seconds).

-> m [NonEmpty (JobRead payload)] 

Claims multiple jobs per group. Unlike claimNextVisibleJobs, this can claim up to batchSize jobs from each group while still respecting head-of-line blocking between batches.

ackJob :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => JobRead payload -> m Int64 Source #

Acknowledges a job as complete, permanently deleting it from the queue.

Returns 1 on success (job deleted or parent suspended), 0 if already gone.

ackJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64 Source #

Acknowledges multiple jobs as complete. All jobs must be from the same queue. Returns the total number of rows deleted.

ackJobsBulk :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64 Source #

Bulk ack for standalone jobs (no parent, no tree logic).

A single DELETE with unnest — one round trip for N jobs. Only valid for jobs claimed in BatchedJobsMode.

Returns the number of rows deleted.

updateJobForRetry Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> NominalDiffTime

The delay before this job becomes visible again for retry.

-> Text

An error message to store with the job.

-> JobRead payload 
-> m Int64 

Marks a failed job for retry at a later time.

Returns the number of rows updated (0 if job was already claimed by another worker).

setVisibilityTimeout Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> NominalDiffTime

The new visibility timeout (in seconds) from the current time.

-> JobRead payload 
-> m Int64 

Manually extends a job's visibility timeout, useful for long-running jobs.

Returns the number of rows updated (0 if job was already reclaimed by another worker).

setVisibilityTimeoutBatch Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> NominalDiffTime

The new visibility timeout (in seconds) from the current time.

-> [JobRead payload]

Jobs to heartbeat (all must be from the same queue)

-> m [SetVisibilityResult] 

Extends visibility timeout for multiple jobs. All jobs must be from the same queue.

data SetVisibilityResult Source #

Result of setting visibility timeout for a single job in a batch.

Constructors

VisibilityExtended Int64

Visibility timeout was successfully extended. Contains job ID.

JobGone Int64

Job no longer exists (was deleted/acked). Contains job ID.

JobReclaimed Int64 Int32 Int32

Job was reclaimed by another worker (attempts count changed). Contains: job ID, expected attempts, actual attempts.

Filtered Query Operations

data JobFilter Source #

Filter predicates for job listing queries.

Instances

Instances details
Show JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

Eq JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

listJobsFiltered Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [JobFilter]

Composable filters

-> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Lists jobs with composable filters.

Returns jobs ordered by ID (descending, newest first).

countJobsFiltered Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [JobFilter]

Composable filters

-> m Int64 

Counts jobs with composable filters.

listDLQFiltered Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [JobFilter]

Composable filters

-> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [DLQJob payload] 

Lists DLQ jobs with composable filters.

Returns jobs ordered by failed_at (most recent first).

countDLQFiltered Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [JobFilter]

Composable filters

-> m Int64 

Counts DLQ jobs with composable filters.

Dead Letter Queue Operations

moveToDLQ Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> Text

Error message (the final error that caused the DLQ move)

-> JobRead payload 
-> m Int64 

Moves a job from the main queue to the dead-letter queue (DLQ).

Returns the number of rows deleted from main queue (0 if job was already claimed by another worker).

moveToDLQBatch Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> [(JobRead payload, Text)]

List of (job, error message) pairs. All jobs must be from the same queue.

-> m Int64 

Moves multiple jobs to the dead-letter queue.

Each job is moved with its own error message. Jobs that have already been claimed by another worker (attempts mismatch) are silently skipped.

Returns the total number of jobs moved to DLQ.

listDLQJobs Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

The maximum number of jobs to return.

-> Int

The number of jobs to skip (for pagination).

-> m [DLQJob payload] 

Lists jobs in the dead-letter queue with pagination.

retryFromDLQ Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

DLQ job ID

-> m (Maybe (JobRead payload)) 

Moves a job from the dead-letter queue back into the main queue to be retried.

Returns Nothing if the DLQ job no longer exists.

dlqJobExists :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Bool Source #

Check whether a DLQ job exists by ID.

deleteDLQJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

DLQ job ID

-> m Int64 

Permanently deletes a job from the dead-letter queue.

Returns the number of rows deleted (0 if the DLQ job no longer exists).

deleteDLQJobsBatch Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [Int64]

DLQ job IDs

-> m Int64 

Permanently deletes multiple jobs from the dead-letter queue.

Returns the total number of DLQ jobs deleted.

Admin Operations

listJobs Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Lists jobs in the queue with pagination.

Returns jobs ordered by ID (descending, newest first).

getJobById Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m (Maybe (JobRead payload)) 

Gets a single job by its ID.

Returns Nothing if the job doesn't exist.

getJobsByGroup Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Text

Group key to filter by

-> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Gets all jobs for a specific group key with pagination.

Useful for debugging or admin UI to see all jobs for a specific entity.

getJobsByParent Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent ID to filter by

-> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Gets all jobs for a specific parent ID with pagination.

getInFlightJobs Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Gets all in-flight jobs (currently being processed by workers).

A job is considered in-flight if it has been claimed (attempts > 0) and its visibility timeout hasn't expired yet.

Useful for monitoring active work and detecting stuck jobs.

cancelJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m Int64 

Cancels (deletes) a job by ID.

Returns 0 if the job has children — use cancelJobCascade to delete a parent and all its descendants.

cancelJobsBatch Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [Int64]

Job IDs

-> m Int64 

Cancels (deletes) multiple jobs by ID.

Returns the total number of jobs deleted.

promoteJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m Int64 

Promote a delayed or retrying job to be immediately visible.

Refuses in-flight jobs (attempts > 0 with no last_error). Returns 1 on success, 0 if not found, already visible, or in-flight.

data QueueStats Source #

Statistics about the job queue

Constructors

QueueStats 

Fields

Instances

Instances details
FromJSON QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Methods

parseJSON :: Value -> Parser QueueStats

parseJSONList :: Value -> Parser [QueueStats]

omittedField :: Maybe QueueStats

ToJSON QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Methods

toJSON :: QueueStats -> Value

toEncoding :: QueueStats -> Encoding

toJSONList :: [QueueStats] -> Value

toEncodingList :: [QueueStats] -> Encoding

omitField :: QueueStats -> Bool

Generic QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Associated Types

type Rep QueueStats 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double)))))
Show QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Eq QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double)))))

getQueueStats :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m QueueStats Source #

Gets statistics about the job queue.

Count Operations

countJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #

Counts all jobs in the queue.

countJobsByGroup Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Text

Group key to count

-> m Int64 

Counts jobs matching a group key.

countJobsByParent Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent ID to count children of

-> m Int64 

Counts jobs matching a parent ID.

countInFlightJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #

Counts in-flight jobs (currently being processed by workers).

countDLQJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #

Counts jobs in the dead-letter queue.

countChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 (Int64, Int64)) Source #

Counts children for a batch of potential parent IDs.

Returns a Map from parent_id to (total, paused) counts (only non-zero entries).

countDLQChildren Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m Int64 

Count how many children of a parent are in the DLQ. Useful inside finalizer handlers to detect failed children.

countDLQChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 Int64) Source #

Counts children in the DLQ for a batch of potential parent IDs.

Returns a Map from parent_id to DLQ child count (only non-zero entries).

Job Dependency Operations

pauseChildren Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m Int64 

Pause all visible children of a parent job, making them unclaimable.

Only affects children that are currently claimable. In-flight children are left alone so their visibility timeout can expire normally if the worker crashes.

Returns the number of children paused.

resumeChildren Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m Int64 

Resume all suspended children of a parent job.

Returns the number of children resumed.

cancelJobCascade Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Root job ID

-> m Int64 

Cancel a job and all its descendants recursively.

Returns the total number of jobs deleted (parent + all descendants).

Suspend/Resume Operations

suspendJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m Int64 

Suspend a job, making it unclaimable.

Only suspends non-in-flight jobs (not currently being processed by workers). Returns the number of rows updated (0 if job doesn't exist, is in-flight, or already suspended).

resumeJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m Int64 

Resume a suspended job, making it claimable again.

Returns the number of rows updated (0 if job doesn't exist or isn't suspended).

Results Table Operations

insertResult Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> Int64

Child job ID

-> Value

Encoded result value

-> m Int64 

Insert a child's result into the results table.

Each child gets its own row keyed by (parent_id, child_id). The FK ON DELETE CASCADE ensures cleanup when the parent is acked.

Returns the number of rows inserted (1 on success).

getResultsByParent Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m (Map Int64 Value) 

Get all child results for a parent from the results table.

getDLQChildErrorsByParent Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m (Map Int64 Text) 

Get DLQ child errors for a parent.

Returns a Map from child job ID to the last error message.

readChildResultsRaw Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text) 

Read child results, DLQ errors, parent_state snapshot, and DLQ failures for a rollup finalizer in a single query.

mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value) Source #

Merge raw child results from three sources.

Precedence (left-biased union): DLQ errors > results > snapshot.

persistParentState :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Value -> m Int64 Source #

Snapshot results into parent_state before DLQ move.

getParentStateSnapshot :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe Value) Source #

Read raw parent_state snapshot from the DB.

Groups Table Operations

refreshGroups Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Minimum interval between runs (seconds)

-> m () 

Full recompute of the groups table from the main queue.

Corrects any drift in job_count, min_priority, min_id, and in_flight_until.

Job Tree DSL

insertJobTree :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => JobTree payload -> m (Either Text (NonEmpty (JobRead payload))) Source #

Insert a JobTree atomically in a single transaction.

HighLevel wrapper that resolves schema/table from the registry. See insertJobTree for details.

Re-exports