arbiter-core-0.1.0.0: Core types and logic for PostgreSQL-backed job queue
Safe HaskellNone
LanguageGHC2024

Arbiter.Core.HighLevel

Description

High-level API for job queue operations.

Table names are automatically extracted from the payload type using the registry. Compile-time checks ensure payloads are only used with registered tables.

Synopsis

Constraint Aliases

type QueueOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, KnownSymbol (TableForPayload payload registry), MonadArbiter m) Source #

Constraints for queue operations (requires table name lookup from registry).

type JobOperation (m :: Type -> Type) (registry :: JobPayloadRegistry) payload = (HasArbiterSchema m registry, JobPayload payload, MonadArbiter m) Source #

Constraints for job operations (table name stored in job).

Job Operations

insertJob :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => JobWrite payload -> m (Maybe (JobRead payload)) Source #

Insert a job. Returns the inserted job, or Nothing if skipped by dedup (IgnoreDuplicate) or if parentId references a non-existent job.

insertJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m [JobRead payload] Source #

Insert multiple jobs in one round-trip. Returns only the jobs that were actually inserted (dedup'd jobs are excluded). Does not validate parentId - use insertJobTree for parent-child relationships.

insertJobsBatch_ :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [JobWrite payload] -> m Int64 Source #

Like insertJobsBatch but returns only the count of inserted rows.

claimNextVisibleJobs Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Maximum number of jobs to claim.

-> NominalDiffTime

How long the claimed jobs should remain invisible (in seconds).

-> m [JobRead payload] 

Claim visible jobs (at most one per group). May return fewer than the limit if groups are exhausted.

claimNextVisibleJobsBatched Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Batch size: maximum number of jobs to claim per group.

-> Int

Max groups: maximum number of groups/batches to claim.

-> NominalDiffTime

How long the claimed jobs should remain invisible (in seconds).

-> m [NonEmpty (JobRead payload)] 

Claims multiple jobs per group. Unlike claimNextVisibleJobs, this can claim up to batchSize jobs from each group while still respecting head-of-line blocking between batches.

ackJob :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => JobRead payload -> m Int64 Source #

Acknowledge a job as complete. Deletes it from the queue, or suspends it if it's a parent with unfinished children. Returns 1 on success, 0 if gone.

ackJobsBatch :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64 Source #

Acknowledges multiple jobs as complete. All jobs must be from the same queue. Returns the total number of rows deleted.

ackJobsBulk :: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload => [JobRead payload] -> m Int64 Source #

Bulk ack for standalone jobs (no parent, no tree logic).

A single DELETE with unnest - one round trip for N jobs. Only valid for jobs claimed in BatchedJobsMode.

Returns the number of rows deleted.

updateJobForRetry Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> NominalDiffTime

The delay before this job becomes visible again for retry.

-> Text

An error message to store with the job.

-> JobRead payload 
-> m Int64 

Marks a failed job for retry at a later time.

Returns the number of rows updated (0 if job was already claimed by another worker).

setVisibilityTimeout Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> NominalDiffTime

The new visibility timeout (in seconds) from the current time.

-> JobRead payload 
-> m Int64 

Manually extends a job's visibility timeout, useful for long-running jobs.

Returns the number of rows updated (0 if job was already reclaimed by another worker).

setVisibilityTimeoutBatch Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> NominalDiffTime

The new visibility timeout (in seconds) from the current time.

-> [JobRead payload]

Jobs to heartbeat (all must be from the same queue)

-> m [SetVisibilityResult] 

Extends visibility timeout for multiple jobs. All jobs must be from the same queue.

data SetVisibilityResult Source #

Result of setting visibility timeout for a single job in a batch.

Constructors

VisibilityExtended Int64

Visibility timeout was successfully extended. Contains job ID.

JobGone Int64

Job no longer exists (was deleted/acked). Contains job ID.

JobReclaimed Int64 Int32 Int32

Job was reclaimed by another worker (attempts count changed). Contains: job ID, expected attempts, actual attempts.

Filtered Query Operations

data JobFilter Source #

Filter predicates for job listing queries.

Instances

Instances details
Show JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

Eq JobFilter Source # 
Instance details

Defined in Arbiter.Core.SqlTemplates

listJobsFiltered Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [JobFilter]

Composable filters

-> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Lists jobs with composable filters.

Returns jobs ordered by ID (descending, newest first).

countJobsFiltered Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [JobFilter]

Composable filters

-> m Int64 

Counts jobs with composable filters.

listDLQFiltered Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [JobFilter]

Composable filters

-> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [DLQJob payload] 

Lists DLQ jobs with composable filters.

Returns jobs ordered by failed_at (most recent first).

countDLQFiltered Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [JobFilter]

Composable filters

-> m Int64 

Counts DLQ jobs with composable filters.

Dead Letter Queue Operations

moveToDLQ Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> Text

Error message (the final error that caused the DLQ move)

-> JobRead payload 
-> m Int64 

Move a job to the DLQ. Returns 0 if already claimed by another worker.

moveToDLQBatch Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. JobOperation m registry payload 
=> [(JobRead payload, Text)]

List of (job, error message) pairs. All jobs must be from the same queue.

-> m Int64 

Move multiple jobs to the DLQ. Jobs already claimed by another worker are skipped. Returns the count of jobs moved.

listDLQJobs Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

The maximum number of jobs to return.

-> Int

The number of jobs to skip (for pagination).

-> m [DLQJob payload] 

Lists jobs in the dead-letter queue with pagination.

retryFromDLQ Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

DLQ job ID

-> m (Maybe (JobRead payload)) 

Retry a DLQ job (re-insert into main queue with attempts reset). Returns Nothing if the DLQ job no longer exists.

dlqJobExists :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m Bool Source #

Check whether a DLQ job exists by ID.

deleteDLQJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

DLQ job ID

-> m Int64 

Permanently deletes a job from the dead-letter queue.

Returns the number of rows deleted (0 if the DLQ job no longer exists).

deleteDLQJobsBatch Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [Int64]

DLQ job IDs

-> m Int64 

Permanently deletes multiple jobs from the dead-letter queue.

Returns the total number of DLQ jobs deleted.

Admin Operations

listJobs Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Lists jobs in the queue with pagination.

Returns jobs ordered by ID (descending, newest first).

getJobById Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m (Maybe (JobRead payload)) 

Gets a single job by its ID.

Returns Nothing if the job doesn't exist.

getJobsByGroup Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Text

Group key to filter by

-> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Gets all jobs for a specific group key with pagination.

getJobsByParent Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent ID to filter by

-> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Gets all jobs for a specific parent ID with pagination.

getInFlightJobs Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Maximum number of jobs to return.

-> Int

Number of jobs to skip (for pagination).

-> m [JobRead payload] 

Gets all in-flight jobs (claimed, visibility timeout not expired).

cancelJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m Int64 

Cancels (deletes) a job by ID.

Returns 0 if the job has children - use cancelJobCascade to delete a parent and all its descendants.

cancelJobsBatch Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> [Int64]

Job IDs

-> m Int64 

Cancels (deletes) multiple jobs by ID.

Returns the total number of jobs deleted.

promoteJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m Int64 

Promote a delayed or retrying job to be immediately visible.

Refuses in-flight jobs (attempts > 0 with no last_error). Returns 1 on success, 0 if not found, already visible, or in-flight.

data QueueStats Source #

Statistics about the job queue

Constructors

QueueStats 

Fields

Instances

Instances details
FromJSON QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Methods

parseJSON :: Value -> Parser QueueStats

parseJSONList :: Value -> Parser [QueueStats]

omittedField :: Maybe QueueStats

ToJSON QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Methods

toJSON :: QueueStats -> Value

toEncoding :: QueueStats -> Encoding

toJSONList :: [QueueStats] -> Value

toEncodingList :: [QueueStats] -> Encoding

omitField :: QueueStats -> Bool

Generic QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Associated Types

type Rep QueueStats 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double)))))
Show QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

Eq QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats Source # 
Instance details

Defined in Arbiter.Core.Operations

type Rep QueueStats = D1 ('MetaData "QueueStats" "Arbiter.Core.Operations" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "QueueStats" 'PrefixI 'True) ((S1 ('MetaSel ('Just "totalJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "visibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64)) :*: (S1 ('MetaSel ('Just "invisibleJobs") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int64) :*: S1 ('MetaSel ('Just "oldestJobAgeSeconds") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Double)))))

getQueueStats :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m QueueStats Source #

Gets statistics about the job queue.

Count Operations

countJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #

Counts all jobs in the queue.

countJobsByGroup Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Text

Group key to count

-> m Int64 

Counts jobs matching a group key.

countJobsByParent Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent ID to count children of

-> m Int64 

Counts jobs matching a parent ID.

countInFlightJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #

Counts in-flight jobs (currently being processed by workers).

countDLQJobs :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => m Int64 Source #

Counts jobs in the dead-letter queue.

countChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 (Int64, Int64)) Source #

Counts children for a batch of potential parent IDs.

Returns a Map from parent_id to (total, paused) counts (only non-zero entries).

countDLQChildren Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m Int64 

Count how many children of a parent are in the DLQ. Useful inside finalizer handlers to detect failed children.

countDLQChildrenBatch :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => [Int64] -> m (Map Int64 Int64) Source #

Counts children in the DLQ for a batch of potential parent IDs.

Returns a Map from parent_id to DLQ child count (only non-zero entries).

Job Dependency Operations

pauseChildren Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m Int64 

Pause all visible children of a parent job, making them unclaimable.

Only affects children that are currently claimable. In-flight children are left alone so their visibility timeout can expire normally if the worker crashes.

Returns the number of children paused.

resumeChildren Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m Int64 

Resume all suspended children of a parent job.

Returns the number of children resumed.

cancelJobCascade Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Root job ID

-> m Int64 

Cancel a job and all its descendants recursively.

Returns the total number of jobs deleted (parent + all descendants).

Suspend/Resume Operations

suspendJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m Int64 

Suspend a job, making it unclaimable.

Only suspends non-in-flight jobs (not currently being processed by workers). Returns the number of rows updated (0 if job doesn't exist, is in-flight, or already suspended).

resumeJob Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Job ID

-> m Int64 

Resume a suspended job, making it claimable again.

Returns the number of rows updated (0 if job doesn't exist or isn't suspended).

Results Table Operations

insertResult Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> Int64

Child job ID

-> Value

Encoded result value

-> m Int64 

Insert a child's result into the results table.

Each child gets its own row keyed by (parent_id, child_id). The FK ON DELETE CASCADE ensures cleanup when the parent is acked.

Returns the number of rows inserted (1 on success).

getResultsByParent Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m (Map Int64 Value) 

Get all child results for a parent from the results table.

getDLQChildErrorsByParent Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m (Map Int64 Text) 

Get DLQ child errors for a parent.

Returns a Map from child job ID to the last error message.

readChildResultsRaw Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int64

Parent job ID

-> m (Map Int64 Value, Map Int64 Text, Maybe Value, Map Int64 Text) 

Read all child data for a rollup finalizer. Returns (childId->result, childId->error, parentStateSnapshot, dlqPK->error).

mergeRawChildResults :: Map Int64 Value -> Map Int64 Text -> Maybe Value -> Map Int64 (Either Text Value) Source #

Merge raw child results from three sources.

Precedence (left-biased union): DLQ errors > results > snapshot.

persistParentState :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> Value -> m Int64 Source #

Snapshot results into parent_state before DLQ move.

getParentStateSnapshot :: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload => Int64 -> m (Maybe Value) Source #

Read raw parent_state snapshot from the DB.

Groups Table Operations

refreshGroups Source #

Arguments

:: forall m (registry :: JobPayloadRegistry) payload. QueueOperation m registry payload 
=> Int

Minimum interval between runs (seconds)

-> m () 

Full recompute of the groups table from the main queue.

Corrects any drift in job_count, min_priority, min_id, and in_flight_until.

Job Tree DSL

insertJobTree :: forall m (registry :: JobPayloadRegistry) payload. (MonadUnliftIO m, QueueOperation m registry payload) => JobTree payload -> m (Either Text (NonEmpty (JobRead payload))) Source #

Insert a JobTree atomically. Returns all inserted jobs (pre-order), or Left if the root has a dedup conflict. Rolls back on any failure.

Re-exports

getSchema :: HasArbiterSchema m registry => m SchemaName Source #

The schema name for this monad's Arbiter tables.