{-# LANGUAGE OverloadedStrings #-}

-- | Versioned, tracked migrations for job queue schemas.
--
-- Uses the @postgresql-migration@ library to:
--
--   * Track which migrations have been run in a database table
--   * Run migrations in order
--   * Prevent re-running completed migrations
--   * Support incremental schema changes
--
-- The migration history is stored in a @<schema_name>.schema_migrations@ table.
-- For example, if you use the schema name @"arbiter"@, the migration tracking
-- table will be created at @arbiter.schema_migrations@.
module Arbiter.Migrations
  ( -- * Configuration
    MigrationConfig (..)
  , defaultMigrationConfig

    -- * Tracked Migrations
  , runMigrationsForRegistry
  , runMigrationsTrackedForTables
  , jobQueueMigrationsForTable

    -- * Re-exports
  , MigrationResult (..)
  ) where

import Arbiter.Core.CronSchedule (createCronSchedulesTableSQL)
import Arbiter.Core.Job.Schema
  ( createDLQFailedAtIndexSQL
  , createDLQGroupKeyIndexSQL
  , createDLQParentIdIndexSQL
  , createDedupKeyIndexSQL
  , createEventStreamingFunctionSQL
  , createEventStreamingTriggersSQL
  , createGroupsIndexSQL
  , createGroupsTableSQL
  , createGroupsTriggerFunctionsSQL
  , createGroupsTriggersSQL
  , createJobQueueDLQTableSQL
  , createJobQueueGroupKeyIndexSQL
  , createJobQueueTableSQL
  , createJobQueueUngroupedRankingIndexSQL
  , createNotifyFunctionSQL
  , createNotifyTriggerSQL
  , createParentIdIndexSQL
  , createReaperSeqSQL
  , createResultsTableSQL
  , createSchemaSQL
  )
import Arbiter.Core.QueueRegistry (RegistryTables (..))
import Control.Exception (bracket, try)
import Control.Monad (when)
import Data.ByteString (ByteString)
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.Encoding (encodeUtf8)
import Database.PostgreSQL.LibPQ qualified as LibPQ
import Database.PostgreSQL.Simple (Only (..), close, connectPostgreSQL, execute_, query)
import Database.PostgreSQL.Simple qualified as PG
import Database.PostgreSQL.Simple.Internal (withConnection)
import Database.PostgreSQL.Simple.Migration
  ( MigrationCommand (..)
  , MigrationOptions (..)
  , MigrationResult (..)
  , Verbosity (..)
  , defaultOptions
  , runMigrations
  )
import Database.PostgreSQL.Simple.Types (Query (..))

-- | Configuration for job queue migrations
--
-- Controls which optional features are enabled when creating job queue tables.
data MigrationConfig = MigrationConfig
  { MigrationConfig -> Bool
enableNotifications :: Bool
  -- ^ Whether to create LISTEN/NOTIFY triggers for reactive job claiming.
  -- When enabled, workers can subscribe to notifications instead of polling.
  -- Default: 'True'
  , MigrationConfig -> Bool
enableEventStreaming :: Bool
  -- ^ Whether to create event streaming triggers for the admin UI.
  -- When enabled, every INSERT\/UPDATE\/DELETE on job tables fires an enriched
  -- JSON event via @pg_notify@ on the @arbiter_job_events@ channel.
  -- This adds overhead to every row operation — disable for maximum throughput.
  -- Default: 'False'
  }
  deriving stock (MigrationConfig -> MigrationConfig -> Bool
(MigrationConfig -> MigrationConfig -> Bool)
-> (MigrationConfig -> MigrationConfig -> Bool)
-> Eq MigrationConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MigrationConfig -> MigrationConfig -> Bool
== :: MigrationConfig -> MigrationConfig -> Bool
$c/= :: MigrationConfig -> MigrationConfig -> Bool
/= :: MigrationConfig -> MigrationConfig -> Bool
Eq, Int -> MigrationConfig -> ShowS
[MigrationConfig] -> ShowS
MigrationConfig -> String
(Int -> MigrationConfig -> ShowS)
-> (MigrationConfig -> String)
-> ([MigrationConfig] -> ShowS)
-> Show MigrationConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MigrationConfig -> ShowS
showsPrec :: Int -> MigrationConfig -> ShowS
$cshow :: MigrationConfig -> String
show :: MigrationConfig -> String
$cshowList :: [MigrationConfig] -> ShowS
showList :: [MigrationConfig] -> ShowS
Show)

-- | Default migration configuration
defaultMigrationConfig :: MigrationConfig
defaultMigrationConfig :: MigrationConfig
defaultMigrationConfig =
  MigrationConfig
    { enableNotifications :: Bool
enableNotifications = Bool
True
    , enableEventStreaming :: Bool
enableEventStreaming = Bool
False
    }

-- | Run migrations for all tables in a queue registry
--
-- This function creates all tables defined in the type-level registry
-- within a single PostgreSQL schema. Each payload type gets its own
-- table pair (main + DLQ) within the schema.
--
-- __Note__: This function creates the schema first (outside of migration tracking)
-- so that the @schema_migrations@ table can be placed inside the same schema.
--
-- PostgreSQL notices (like "NOTICE: relation already exists") are suppressed
-- during migration to reduce log noise.
--
-- Example:
--
-- @
-- type AppRegistry =
--   '[ '("email_jobs", EmailPayload)
--    , '("order_jobs", OrderPayload)
--    ]
--
-- main :: IO ()
-- main = do
--   result <- runMigrationsForRegistry
--               (Proxy @AppRegistry)
--               "host=localhost dbname=mydb"
--               "arbiter"
--               defaultMigrationConfig
-- @
runMigrationsForRegistry
  :: forall registry
   . (RegistryTables registry)
  => Proxy registry
  -- ^ Proxy for the job payload registry
  -> ByteString
  -- ^ Database connection string
  -> Text
  -- ^ PostgreSQL schema name (e.g., "arbiter")
  -> MigrationConfig
  -- ^ Migration configuration
  -> IO (MigrationResult String)
  -- ^ Migration results
runMigrationsForRegistry :: forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry
-> ByteString
-> Text
-> MigrationConfig
-> IO (MigrationResult String)
runMigrationsForRegistry Proxy registry
proxy ByteString
connStr Text
schemaName MigrationConfig
config = do
  let tables :: [Text]
tables = Proxy registry -> [Text]
forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> [Text]
registryTableNames Proxy registry
proxy
  ByteString
-> Text -> [Text] -> MigrationConfig -> IO (MigrationResult String)
runMigrationsTrackedForTables ByteString
connStr Text
schemaName [Text]
tables MigrationConfig
config

-- | Run migrations for multiple tables within a single schema.
runMigrationsTrackedForTables
  :: ByteString
  -- ^ Database connection string
  -> Text
  -- ^ Schema name
  -> [Text]
  -- ^ List of table names to create
  -> MigrationConfig
  -- ^ Migration configuration
  -> IO (MigrationResult String)
runMigrationsTrackedForTables :: ByteString
-> Text -> [Text] -> MigrationConfig -> IO (MigrationResult String)
runMigrationsTrackedForTables ByteString
connStr Text
schemaName [Text]
tableNames MigrationConfig
config =
  IO Connection
-> (Connection -> IO ())
-> (Connection -> IO (MigrationResult String))
-> IO (MigrationResult String)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (ByteString -> IO Connection
connectPostgreSQL ByteString
connStr) Connection -> IO ()
close ((Connection -> IO (MigrationResult String))
 -> IO (MigrationResult String))
-> (Connection -> IO (MigrationResult String))
-> IO (MigrationResult String)
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
    -- Disable NOTICE messages on the underlying LibPQ connection
    Connection -> (Connection -> IO ()) -> IO ()
forall a. Connection -> (Connection -> IO a) -> IO a
withConnection Connection
conn ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
libpqConn ->
      Connection -> IO ()
LibPQ.disableNoticeReporting Connection
libpqConn

    -- Create the schema. If CREATE SCHEMA fails (e.g. insufficient privileges),
    -- check whether the schema already exists (manual creation) and proceed.
    let schemaSQL :: Query
schemaSQL = ByteString -> Query
Query (Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Text
createSchemaSQL Text
schemaName)
    result <- IO Int64 -> IO (Either SqlError Int64)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO Int64 -> IO (Either SqlError Int64))
-> IO Int64 -> IO (Either SqlError Int64)
forall a b. (a -> b) -> a -> b
$ Connection -> Query -> IO Int64
execute_ Connection
conn Query
schemaSQL
    case result of
      Right Int64
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Left (SqlError
e :: PG.SqlError) -> do
        -- Check if the schema exists despite the CREATE failure
        exists <- Connection -> Text -> IO Bool
schemaExists Connection
conn Text
schemaName
        when (not exists) $
          ioError
            ( userError $
                "Failed to create schema "
                  <> T.unpack schemaName
                  <> " and it does not exist. Either grant CREATE privilege on the database"
                  <> " or create the schema manually: CREATE SCHEMA "
                  <> T.unpack schemaName
                  <> ";"
                  <> "\nOriginal error: "
                  <> show e
            )

    -- Re-enable notice reporting for the migrations
    withConnection conn $ \Connection
libpqConn ->
      Connection -> IO ()
LibPQ.enableNoticeReporting Connection
libpqConn

    -- Build migrations: schema-level (once) + per-table migrations
    let schemaMigrations =
          [ String -> ByteString -> MigrationCommand
MigrationScript
              String
"create-cron-schedules"
              (Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Text
createCronSchedulesTableSQL Text
schemaName)
          ]
            [MigrationCommand] -> [MigrationCommand] -> [MigrationCommand]
forall a. Semigroup a => a -> a -> a
<> [ String -> ByteString -> MigrationCommand
MigrationScript
                   String
"create-event-streaming-function"
                   (Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Text
createEventStreamingFunctionSQL Text
schemaName)
               | MigrationConfig -> Bool
enableEventStreaming MigrationConfig
config
               ]
        tableMigrations = (Text -> [MigrationCommand]) -> [Text] -> [MigrationCommand]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\Text
tableName -> Text -> Text -> MigrationConfig -> [MigrationCommand]
jobQueueMigrationsForTable Text
schemaName Text
tableName MigrationConfig
config) [Text]
tableNames
        migrations = [MigrationCommand]
schemaMigrations [MigrationCommand] -> [MigrationCommand] -> [MigrationCommand]
forall a. Semigroup a => a -> a -> a
<> [MigrationCommand]
tableMigrations
        migrationTableName = Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
".schema_migrations"
        options =
          MigrationOptions
defaultOptions
            { optVerbose = Quiet
            , optTableName = migrationTableName
            }

    -- Initialize the migration system
    _ <- runMigrations conn options [MigrationInitialization]

    -- Run the actual migrations
    runMigrations conn options migrations

-- | All job queue migrations for a single table
--
-- This creates migrations for one table and its DLQ within a schema.
-- Each table gets its own set of migrations with unique version identifiers.
jobQueueMigrationsForTable
  :: Text
  -- ^ Schema name
  -> Text
  -- ^ Table name
  -> MigrationConfig
  -- ^ Migration configuration
  -> [MigrationCommand]
  -- ^ List of migration commands
jobQueueMigrationsForTable :: Text -> Text -> MigrationConfig -> [MigrationCommand]
jobQueueMigrationsForTable Text
schemaName Text
tableName MigrationConfig
config =
  let prefix :: String
prefix = Text -> String
T.unpack Text
tableName String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"-"
      script :: String -> Text -> MigrationCommand
script String
name Text
sql = String -> ByteString -> MigrationCommand
MigrationScript (String
prefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
name) (Text -> ByteString
encodeUtf8 Text
sql)

      coreMigrations :: [MigrationCommand]
coreMigrations =
        [ String -> Text -> MigrationCommand
script String
"create-table" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createJobQueueTableSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-dlq-table" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createJobQueueDLQTableSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-dlq-group-key-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createDLQGroupKeyIndexSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-dlq-failed-at-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createDLQFailedAtIndexSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-dedup-key-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createDedupKeyIndexSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-group-key-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createJobQueueGroupKeyIndexSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-ungrouped-ranking-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createJobQueueUngroupedRankingIndexSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-parent-id-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createParentIdIndexSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-dlq-parent-id-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createDLQParentIdIndexSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-results-table" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createResultsTableSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-groups-table" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createGroupsTableSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-groups-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createGroupsIndexSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-groups-trigger-functions" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createGroupsTriggerFunctionsSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-groups-triggers" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createGroupsTriggersSQL Text
schemaName Text
tableName
        , String -> Text -> MigrationCommand
script String
"create-reaper-seq" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createReaperSeqSQL Text
schemaName Text
tableName
        ]
      notifyTriggers :: [MigrationCommand]
notifyTriggers
        | MigrationConfig -> Bool
enableNotifications MigrationConfig
config =
            [ String -> Text -> MigrationCommand
script String
"create-notify-function" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createNotifyFunctionSQL Text
schemaName Text
tableName
            , String -> Text -> MigrationCommand
script String
"create-notify-trigger" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createNotifyTriggerSQL Text
schemaName Text
tableName
            ]
        | Bool
otherwise = []
      eventStreamingTriggers :: [MigrationCommand]
eventStreamingTriggers
        | MigrationConfig -> Bool
enableEventStreaming MigrationConfig
config =
            [ String -> Text -> MigrationCommand
script String
"create-event-streaming-triggers" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createEventStreamingTriggersSQL Text
schemaName Text
tableName
            ]
        | Bool
otherwise = []
   in [MigrationCommand]
coreMigrations [MigrationCommand] -> [MigrationCommand] -> [MigrationCommand]
forall a. Semigroup a => a -> a -> a
<> [MigrationCommand]
notifyTriggers [MigrationCommand] -> [MigrationCommand] -> [MigrationCommand]
forall a. Semigroup a => a -> a -> a
<> [MigrationCommand]
eventStreamingTriggers

-- | Check whether a schema exists in the database.
schemaExists :: PG.Connection -> Text -> IO Bool
schemaExists :: Connection -> Text -> IO Bool
schemaExists Connection
conn Text
schemaName = do
  rows <- Connection -> Query -> Only Text -> IO [Only Int]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
query Connection
conn Query
"SELECT 1 FROM pg_namespace WHERE nspname = ?" (Text -> Only Text
forall a. a -> Only a
Only Text
schemaName) :: IO [Only Int]
  pure (not (null rows))