arbiter-core-0.1.0.0: Core types and logic for PostgreSQL-backed job queue
Safe HaskellNone
LanguageGHC2024

Arbiter.Core.Job.Types

Synopsis

Core Job Type

data Job payload key q insertedAt Source #

A job in the queue. Parametrized over payload, primary key, queue name, and inserted-at timestamp. See JobWrite (for insertion) and JobRead (returned from claims/queries).

Constructors

Job 

Fields

  • primaryKey :: key

    () in JobWrite (assigned by DB), Int64 in JobRead.

  • payload :: payload

    User-defined payload, stored as JSONB.

  • queueName :: q

    () in JobWrite, table name (Text) in JobRead. Not serialized.

  • groupKey :: Maybe Text

    Jobs with the same group key are processed serially (head-of-line blocking). Nothing for ungrouped jobs that can run in parallel.

  • insertedAt :: insertedAt

    () in JobWrite (set by DB), UTCTime in JobRead.

  • updatedAt :: Maybe UTCTime

    The time the job was last updated.

  • attempts :: Int32

    The number of times this job has been attempted.

  • lastError :: Maybe Text

    The error message from the last failed attempt.

  • priority :: Int32

    The job's priority. Lower numbers are higher priority.

  • lastAttemptedAt :: Maybe UTCTime

    The time this job was last claimed by a worker.

  • notVisibleUntil :: Maybe UTCTime

    When this job becomes visible for claiming.

  • dedupKey :: Maybe DedupKey

    The deduplication strategy for this job.

  • maxAttempts :: Maybe Int32

    Override the global maxAttempts config for this specific job. If Nothing, uses the worker config's global maxAttempts value.

  • parentId :: Maybe Int64

    Parent job ID. Set by insertJobTree, not manually. When this child is the last to complete, the parent (if a rollup finalizer) is resumed.

  • parentState :: Maybe Value

    Snapshot of accumulated child results for rollup finalizers. The engine sets this to an empty object on insert when the job is a rollup finalizer, and overwrites it with the final results map before a DLQ move so the snapshot survives the ON DELETE CASCADE on the results table. isRollup is derived from whether this is non-null.

  • suspended :: Bool

    Whether this job is suspended (not claimable). TRUE for: finalizers waiting for children to complete, or operator-paused jobs.

Instances

Instances details
Generic (Job payload key q insertedAt) Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Associated Types

type Rep (Job payload key q insertedAt) 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep (Job payload key q insertedAt) = D1 ('MetaData "Job" "Arbiter.Core.Job.Types" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "Job" 'PrefixI 'True) ((((S1 ('MetaSel ('Just "primaryKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 key) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 payload)) :*: (S1 ('MetaSel ('Just "queueName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 q) :*: S1 ('MetaSel ('Just "groupKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Text)))) :*: ((S1 ('MetaSel ('Just "insertedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 insertedAt) :*: S1 ('MetaSel ('Just "updatedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime))) :*: (S1 ('MetaSel ('Just "attempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int32) :*: S1 ('MetaSel ('Just "lastError") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Text))))) :*: (((S1 ('MetaSel ('Just "priority") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int32) :*: S1 ('MetaSel ('Just "lastAttemptedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime))) :*: (S1 ('MetaSel ('Just "notVisibleUntil") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime)) :*: S1 ('MetaSel ('Just "dedupKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe DedupKey)))) :*: ((S1 ('MetaSel ('Just "maxAttempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int32)) :*: S1 ('MetaSel ('Just "parentId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int64))) :*: (S1 ('MetaSel ('Just "parentState") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Value)) :*: S1 ('MetaSel ('Just "suspended") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool))))))

Methods

from :: Job payload key q insertedAt -> Rep (Job payload key q insertedAt) x #

to :: Rep (Job payload key q insertedAt) x -> Job payload key q insertedAt #

(Show key, Show payload, Show q, Show insertedAt) => Show (Job payload key q insertedAt) Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Methods

showsPrec :: Int -> Job payload key q insertedAt -> ShowS #

show :: Job payload key q insertedAt -> String #

showList :: [Job payload key q insertedAt] -> ShowS #

(Eq key, Eq payload, Eq q, Eq insertedAt) => Eq (Job payload key q insertedAt) Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Methods

(==) :: Job payload key q insertedAt -> Job payload key q insertedAt -> Bool #

(/=) :: Job payload key q insertedAt -> Job payload key q insertedAt -> Bool #

type Rep (Job payload key q insertedAt) Source # 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep (Job payload key q insertedAt) = D1 ('MetaData "Job" "Arbiter.Core.Job.Types" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "Job" 'PrefixI 'True) ((((S1 ('MetaSel ('Just "primaryKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 key) :*: S1 ('MetaSel ('Just "payload") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 payload)) :*: (S1 ('MetaSel ('Just "queueName") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 q) :*: S1 ('MetaSel ('Just "groupKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Text)))) :*: ((S1 ('MetaSel ('Just "insertedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 insertedAt) :*: S1 ('MetaSel ('Just "updatedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime))) :*: (S1 ('MetaSel ('Just "attempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int32) :*: S1 ('MetaSel ('Just "lastError") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Text))))) :*: (((S1 ('MetaSel ('Just "priority") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Int32) :*: S1 ('MetaSel ('Just "lastAttemptedAt") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime))) :*: (S1 ('MetaSel ('Just "notVisibleUntil") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe UTCTime)) :*: S1 ('MetaSel ('Just "dedupKey") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe DedupKey)))) :*: ((S1 ('MetaSel ('Just "maxAttempts") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int32)) :*: S1 ('MetaSel ('Just "parentId") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Int64))) :*: (S1 ('MetaSel ('Just "parentState") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 (Maybe Value)) :*: S1 ('MetaSel ('Just "suspended") 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Bool))))))

type JobRead payload = Job payload Int64 Text UTCTime Source #

A type alias for a job that has been read from the database.

type JobWrite payload = Job payload () () () Source #

A type alias for a job that is ready to be written to the database. It does not yet have an ID or insertion timestamp.

defaultJob :: payload -> JobWrite payload Source #

Ungrouped JobWrite with default values. For serial processing within a group, use defaultGroupedJob.

defaultGroupedJob :: Text -> payload -> JobWrite payload Source #

Grouped JobWrite. Jobs sharing a group key are processed serially.

defaultGroupedJob "user-123" (ProcessEvent eventData)

isRollup :: Job p k q t -> Bool Source #

A rollup finalizer is any job whose parentState snapshot is present (an empty object on insert; the merged child results before a DLQ move).

Type Constraints

type JobPayload payload = (FromJSON payload, ToJSON payload) Source #

Payloads must round-trip through JSON for PostgreSQL JSONB storage.

Deduplication

data DedupKey Source #

Deduplication strategy, checked on INSERT via ON CONFLICT on the dedup key.

Constructors

IgnoreDuplicate Text

Skip if a job with this key exists (DO NOTHING).

ReplaceDuplicate Text

Replace the existing job with this key (DO UPDATE), unless it's actively in-flight on its first attempt.

Instances

Instances details
FromJSON DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Methods

parseJSON :: Value -> Parser DedupKey

parseJSONList :: Value -> Parser [DedupKey]

omittedField :: Maybe DedupKey

ToJSON DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Methods

toJSON :: DedupKey -> Value

toEncoding :: DedupKey -> Encoding

toJSONList :: [DedupKey] -> Value

toEncodingList :: [DedupKey] -> Encoding

omitField :: DedupKey -> Bool

Generic DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Associated Types

type Rep DedupKey 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep DedupKey = D1 ('MetaData "DedupKey" "Arbiter.Core.Job.Types" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "IgnoreDuplicate" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: C1 ('MetaCons "ReplaceDuplicate" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

Methods

from :: DedupKey -> Rep DedupKey x #

to :: Rep DedupKey x -> DedupKey #

Show DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

Eq DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep DedupKey Source # 
Instance details

Defined in Arbiter.Core.Job.Types

type Rep DedupKey = D1 ('MetaData "DedupKey" "Arbiter.Core.Job.Types" "arbiter-core-0.1.0.0-inplace" 'False) (C1 ('MetaCons "IgnoreDuplicate" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)) :+: C1 ('MetaCons "ReplaceDuplicate" 'PrefixI 'False) (S1 ('MetaSel ('Nothing :: Maybe Symbol) 'NoSourceUnpackedness 'NoSourceStrictness 'DecidedLazy) (Rec0 Text)))

Observability

data ObservabilityHooks (m :: Type -> Type) payload Source #

A set of callbacks invoked at key points in the job lifecycle.

Use these hooks to integrate with metrics, logging, or tracing systems. Hooks are exception-safe; any exception thrown within a hook is caught and ignored to prevent crashing the worker.

Constructors

ObservabilityHooks 

Fields

defaultObservabilityHooks :: forall (m :: Type -> Type) payload. Applicative m => ObservabilityHooks m payload Source #

No-op hooks. Override fields to add observability:

myHooks = defaultObservabilityHooks
  { onJobSuccess = \job startTime endTime -> do
      let duration = diffUTCTime endTime startTime
      logInfo $ "Job " <> show (primaryKey job) <> " took " <> show duration
  }