arbiter-core-0.1.0.0: Core types and logic for PostgreSQL-backed job queue
Safe HaskellNone
LanguageGHC2024

Arbiter.Core.Job.Types

Synopsis

Core Job Type

data Job payload key q insertedAt Source #

The core Job type, representing a unit of work in the queue.

Constructors

Job 

Fields

  • primaryKey :: key

    The job's primary key (unit or Int64).

  • payload :: payload

    The user-defined payload, stored as JSONB.

  • queueName :: q

    The name of the queue (table) this job belongs to. This is a read-only virtual field, it does not get serialized.

  • groupKey :: Maybe Text

    An optional key for grouping jobs to be processed serially. Jobs with the same group key are processed in order. Use Nothing for ungrouped jobs that can run in parallel.

  • insertedAt :: insertedAt

    The time the job was inserted (unit or UTCTime).

  • updatedAt :: Maybe UTCTime

    The time the job was last updated.

  • attempts :: Int32

    The number of times this job has been attempted.

  • lastError :: Maybe Text

    The error message from the last failed attempt.

  • priority :: Int32

    The job's priority. Lower numbers are higher priority.

  • lastAttemptedAt :: Maybe UTCTime

    The time this job was last claimed by a worker.

  • notVisibleUntil :: Maybe UTCTime

    When this job becomes visible for claiming.

  • dedupKey :: Maybe DedupKey

    The deduplication strategy for this job.

  • maxAttempts :: Maybe Int32

    Override the global maxAttempts config for this specific job. If Nothing, uses the worker config's global maxAttempts value.

  • parentId :: Maybe Int64

    Optional parent job ID for job dependencies. When set, this job is a child of the specified parent. The parent is automatically suspended when acked if children exist, and resumed for a completion round when the last child finishes.

  • isRollup :: Bool

    Whether this job is a rollup finalizer (has children whose results are collected). Set by rollup / <~~. When True, the worker passes child results as a typed argument to the handler.

  • suspended :: Bool

    Whether this job is suspended (not claimable). TRUE for: finalizers waiting for children to complete, or operator-paused jobs.

Instances

Instances details
Generic (Job payload key q insertedAt) Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Associated Types

type Rep (Job payload key q insertedAt) 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep (Job payload key q insertedAt) = D1 ('MetaData "Job" "Arbiter.Core.Job.Types" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "Job" 'PrefixI 'True) ((((S1 ('MetaSel ('Just "primaryKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 key) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 payload)) :*: (S1 ('MetaSel ('Just "queueName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 q) :*: S1 ('MetaSel ('Just "groupKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Text)))) :*: ((S1 ('MetaSel ('Just "insertedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 insertedAt) :*: S1 ('MetaSel ('Just "updatedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime))) :*: (S1 ('MetaSel ('Just "attempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int32) :*: S1 ('MetaSel ('Just "lastError") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Text))))) :*: (((S1 ('MetaSel ('Just "priority") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int32) :*: S1 ('MetaSel ('Just "lastAttemptedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime))) :*: (S1 ('MetaSel ('Just "notVisibleUntil") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime)) :*: S1 ('MetaSel ('Just "dedupKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe DedupKey)))) :*: ((S1 ('MetaSel ('Just "maxAttempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int32)) :*: S1 ('MetaSel ('Just "parentId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int64))) :*: (S1 ('MetaSel ('Just "isRollup") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool) :*: S1 ('MetaSel ('Just "suspended") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool))))))

Methods

from :: Job payload key q insertedAt -> Rep (Job payload key q insertedAt) x #

to :: Rep (Job payload key q insertedAt) x -> Job payload key q insertedAt #

(Show key, Show payload, Show q, Show insertedAt) => Show (Job payload key q insertedAt) Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Methods

showsPrec :: Int -> Job payload key q insertedAt -> ShowS #

show :: Job payload key q insertedAt -> String #

showList :: [Job payload key q insertedAt] -> ShowS #

(Eq key, Eq payload, Eq q, Eq insertedAt) => Eq (Job payload key q insertedAt) Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Methods

(==) :: Job payload key q insertedAt -> Job payload key q insertedAt -> Bool #

(/=) :: Job payload key q insertedAt -> Job payload key q insertedAt -> Bool #

type Rep (Job payload key q insertedAt) Source # 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep (Job payload key q insertedAt) = D1 ('MetaData "Job" "Arbiter.Core.Job.Types" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "Job" 'PrefixI 'True) ((((S1 ('MetaSel ('Just "primaryKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 key) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 payload)) :*: (S1 ('MetaSel ('Just "queueName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 q) :*: S1 ('MetaSel ('Just "groupKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Text)))) :*: ((S1 ('MetaSel ('Just "insertedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 insertedAt) :*: S1 ('MetaSel ('Just "updatedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime))) :*: (S1 ('MetaSel ('Just "attempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int32) :*: S1 ('MetaSel ('Just "lastError") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Text))))) :*: (((S1 ('MetaSel ('Just "priority") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int32) :*: S1 ('MetaSel ('Just "lastAttemptedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime))) :*: (S1 ('MetaSel ('Just "notVisibleUntil") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime)) :*: S1 ('MetaSel ('Just "dedupKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe DedupKey)))) :*: ((S1 ('MetaSel ('Just "maxAttempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int32)) :*: S1 ('MetaSel ('Just "parentId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int64))) :*: (S1 ('MetaSel ('Just "isRollup") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool) :*: S1 ('MetaSel ('Just "suspended") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool))))))

type JobRead payload = Job payload Int64 Text UTCTime Source #

A type alias for a job that has been read from the database.

type JobWrite payload = Job payload () () () Source #

A type alias for a job that is ready to be written to the database. It does not yet have an ID or insertion timestamp.

defaultJob :: payload -> JobWrite payload Source #

Creates an ungrouped JobWrite with default values.

Jobs created with this function can be processed in parallel by multiple workers. For serial processing within a group, use defaultGroupedJob.

defaultGroupedJob :: Text -> payload -> JobWrite payload Source #

Creates a grouped JobWrite with default values.

Jobs with the same group key are processed serially (head-of-line blocking), ensuring that only one job per group is processed at a time. Use for operations that must be ordered, like processing events for a specific user.

let job = defaultGroupedJob "user-123" (MyPayload data)

Type Constraints

type JobPayload payload = (FromJSON payload, ToJSON payload) Source #

Constraint for types that can be used as a job payload.

Payloads are serialized to and from JSON for storage in the database. The table name is looked up from the registry via TableForPayload.

Deduplication

data DedupKey Source #

Defines the deduplication strategy for a job upon insertion.

Constructors

IgnoreDuplicate Text

On conflict, keep the existing job. Corresponds to ON CONFLICT DO NOTHING.

ReplaceDuplicate Text

On conflict, replace the existing job with the new one. Corresponds to ON CONFLICT DO UPDATE.

Instances

Instances details
FromJSON DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Methods

parseJSON :: Value -> Parser DedupKey

parseJSONList :: Value -> Parser [DedupKey]

omittedField :: Maybe DedupKey

ToJSON DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Methods

toJSON :: DedupKey -> Value

toEncoding :: DedupKey -> Encoding

toJSONList :: [DedupKey] -> Value

toEncodingList :: [DedupKey] -> Encoding

omitField :: DedupKey -> Bool

Generic DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Associated Types

type Rep DedupKey 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep DedupKey = D1 ('MetaData "DedupKey" "Arbiter.Core.Job.Types" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "IgnoreDuplicate" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: C1 ('MetaCons "ReplaceDuplicate" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

Methods

from :: DedupKey -> Rep DedupKey x #

to :: Rep DedupKey x -> DedupKey #

Show DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Eq DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep DedupKey = D1 ('MetaData "DedupKey" "Arbiter.Core.Job.Types" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "IgnoreDuplicate" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: C1 ('MetaCons "ReplaceDuplicate" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

Observability

data ObservabilityHooks (m :: Type -> Type) payload Source #

A set of callbacks invoked at key points in the job lifecycle.

Use these hooks to integrate with metrics, logging, or tracing systems. Hooks are exception-safe; any exception thrown within a hook is caught and ignored to prevent crashing the worker.

Constructors

ObservabilityHooks 

Fields

defaultObservabilityHooks :: forall (m :: Type -> Type) payload. Applicative m => ObservabilityHooks m payload Source #

A default set of ObservabilityHooks that do nothing.

This can be used as a starting point for your own custom hooks.

myHooks = defaultObservabilityHooks
  { onJobSuccess = \job startTime endTime -> do
      let duration = diffUTCTime endTime startTime
      logInfo $ "Job " <> show (primaryKey job) <> " succeeded in " <> show duration
  }