{-# LANGUAGE OverloadedStrings #-}
module Arbiter.Migrations
(
MigrationConfig (..)
, defaultMigrationConfig
, runMigrationsForRegistry
, runMigrationsTrackedForTables
, jobQueueMigrationsForTable
, MigrationResult (..)
) where
import Arbiter.Core.CronSchedule (createCronSchedulesTableSQL)
import Arbiter.Core.Job.Schema
( createDLQFailedAtIndexSQL
, createDLQGroupKeyIndexSQL
, createDLQParentIdIndexSQL
, createDedupKeyIndexSQL
, createEventStreamingFunctionSQL
, createEventStreamingTriggersSQL
, createGroupsIndexSQL
, createGroupsTableSQL
, createGroupsTriggerFunctionsSQL
, createGroupsTriggersSQL
, createJobQueueDLQTableSQL
, createJobQueueGroupKeyIndexSQL
, createJobQueueTableSQL
, createJobQueueUngroupedRankingIndexSQL
, createNotifyFunctionSQL
, createNotifyTriggerSQL
, createParentIdIndexSQL
, createReaperSeqSQL
, createResultsTableSQL
, createSchemaSQL
)
import Arbiter.Core.QueueRegistry (RegistryTables (..))
import Control.Exception (bracket, try)
import Control.Monad (when)
import Data.ByteString (ByteString)
import Data.Proxy (Proxy (..))
import Data.Text (Text)
import Data.Text qualified as T
import Data.Text.Encoding (encodeUtf8)
import Database.PostgreSQL.LibPQ qualified as LibPQ
import Database.PostgreSQL.Simple (Only (..), close, connectPostgreSQL, execute_, query)
import Database.PostgreSQL.Simple qualified as PG
import Database.PostgreSQL.Simple.Internal (withConnection)
import Database.PostgreSQL.Simple.Migration
( MigrationCommand (..)
, MigrationOptions (..)
, MigrationResult (..)
, Verbosity (..)
, defaultOptions
, runMigrations
)
import Database.PostgreSQL.Simple.Types (Query (..))
data MigrationConfig = MigrationConfig
{ MigrationConfig -> Bool
enableNotifications :: Bool
, MigrationConfig -> Bool
enableEventStreaming :: Bool
}
deriving stock (MigrationConfig -> MigrationConfig -> Bool
(MigrationConfig -> MigrationConfig -> Bool)
-> (MigrationConfig -> MigrationConfig -> Bool)
-> Eq MigrationConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: MigrationConfig -> MigrationConfig -> Bool
== :: MigrationConfig -> MigrationConfig -> Bool
$c/= :: MigrationConfig -> MigrationConfig -> Bool
/= :: MigrationConfig -> MigrationConfig -> Bool
Eq, Int -> MigrationConfig -> ShowS
[MigrationConfig] -> ShowS
MigrationConfig -> String
(Int -> MigrationConfig -> ShowS)
-> (MigrationConfig -> String)
-> ([MigrationConfig] -> ShowS)
-> Show MigrationConfig
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> MigrationConfig -> ShowS
showsPrec :: Int -> MigrationConfig -> ShowS
$cshow :: MigrationConfig -> String
show :: MigrationConfig -> String
$cshowList :: [MigrationConfig] -> ShowS
showList :: [MigrationConfig] -> ShowS
Show)
defaultMigrationConfig :: MigrationConfig
defaultMigrationConfig :: MigrationConfig
defaultMigrationConfig =
MigrationConfig
{ enableNotifications :: Bool
enableNotifications = Bool
True
, enableEventStreaming :: Bool
enableEventStreaming = Bool
False
}
runMigrationsForRegistry
:: forall registry
. (RegistryTables registry)
=> Proxy registry
-> ByteString
-> Text
-> MigrationConfig
-> IO (MigrationResult String)
runMigrationsForRegistry :: forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry
-> ByteString
-> Text
-> MigrationConfig
-> IO (MigrationResult String)
runMigrationsForRegistry Proxy registry
proxy ByteString
connStr Text
schemaName MigrationConfig
config = do
let tables :: [Text]
tables = Proxy registry -> [Text]
forall (registry :: JobPayloadRegistry).
RegistryTables registry =>
Proxy registry -> [Text]
registryTableNames Proxy registry
proxy
ByteString
-> Text -> [Text] -> MigrationConfig -> IO (MigrationResult String)
runMigrationsTrackedForTables ByteString
connStr Text
schemaName [Text]
tables MigrationConfig
config
runMigrationsTrackedForTables
:: ByteString
-> Text
-> [Text]
-> MigrationConfig
-> IO (MigrationResult String)
runMigrationsTrackedForTables :: ByteString
-> Text -> [Text] -> MigrationConfig -> IO (MigrationResult String)
runMigrationsTrackedForTables ByteString
connStr Text
schemaName [Text]
tableNames MigrationConfig
config =
IO Connection
-> (Connection -> IO ())
-> (Connection -> IO (MigrationResult String))
-> IO (MigrationResult String)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (ByteString -> IO Connection
connectPostgreSQL ByteString
connStr) Connection -> IO ()
close ((Connection -> IO (MigrationResult String))
-> IO (MigrationResult String))
-> (Connection -> IO (MigrationResult String))
-> IO (MigrationResult String)
forall a b. (a -> b) -> a -> b
$ \Connection
conn -> do
Connection -> (Connection -> IO ()) -> IO ()
forall a. Connection -> (Connection -> IO a) -> IO a
withConnection Connection
conn ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
libpqConn ->
Connection -> IO ()
LibPQ.disableNoticeReporting Connection
libpqConn
let schemaSQL :: Query
schemaSQL = ByteString -> Query
Query (Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Text
createSchemaSQL Text
schemaName)
result <- IO Int64 -> IO (Either SqlError Int64)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO Int64 -> IO (Either SqlError Int64))
-> IO Int64 -> IO (Either SqlError Int64)
forall a b. (a -> b) -> a -> b
$ Connection -> Query -> IO Int64
execute_ Connection
conn Query
schemaSQL
case result of
Right Int64
_ -> () -> IO ()
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
Left (SqlError
e :: PG.SqlError) -> do
exists <- Connection -> Text -> IO Bool
schemaExists Connection
conn Text
schemaName
when (not exists) $
ioError
( userError $
"Failed to create schema "
<> T.unpack schemaName
<> " and it does not exist. Either grant CREATE privilege on the database"
<> " or create the schema manually: CREATE SCHEMA "
<> T.unpack schemaName
<> ";"
<> "\nOriginal error: "
<> show e
)
withConnection conn $ \Connection
libpqConn ->
Connection -> IO ()
LibPQ.enableNoticeReporting Connection
libpqConn
let schemaMigrations =
[ String -> ByteString -> MigrationCommand
MigrationScript
String
"create-cron-schedules"
(Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Text
createCronSchedulesTableSQL Text
schemaName)
]
[MigrationCommand] -> [MigrationCommand] -> [MigrationCommand]
forall a. Semigroup a => a -> a -> a
<> [ String -> ByteString -> MigrationCommand
MigrationScript
String
"create-event-streaming-function"
(Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text -> Text
createEventStreamingFunctionSQL Text
schemaName)
| MigrationConfig -> Bool
enableEventStreaming MigrationConfig
config
]
tableMigrations = (Text -> [MigrationCommand]) -> [Text] -> [MigrationCommand]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (\Text
tableName -> Text -> Text -> MigrationConfig -> [MigrationCommand]
jobQueueMigrationsForTable Text
schemaName Text
tableName MigrationConfig
config) [Text]
tableNames
migrations = [MigrationCommand]
schemaMigrations [MigrationCommand] -> [MigrationCommand] -> [MigrationCommand]
forall a. Semigroup a => a -> a -> a
<> [MigrationCommand]
tableMigrations
migrationTableName = Text -> ByteString
encodeUtf8 (Text -> ByteString) -> Text -> ByteString
forall a b. (a -> b) -> a -> b
$ Text
schemaName Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
".schema_migrations"
options =
MigrationOptions
defaultOptions
{ optVerbose = Quiet
, optTableName = migrationTableName
}
_ <- runMigrations conn options [MigrationInitialization]
runMigrations conn options migrations
jobQueueMigrationsForTable
:: Text
-> Text
-> MigrationConfig
-> [MigrationCommand]
jobQueueMigrationsForTable :: Text -> Text -> MigrationConfig -> [MigrationCommand]
jobQueueMigrationsForTable Text
schemaName Text
tableName MigrationConfig
config =
let prefix :: String
prefix = Text -> String
T.unpack Text
tableName String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
"-"
script :: String -> Text -> MigrationCommand
script String
name Text
sql = String -> ByteString -> MigrationCommand
MigrationScript (String
prefix String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
name) (Text -> ByteString
encodeUtf8 Text
sql)
coreMigrations :: [MigrationCommand]
coreMigrations =
[ String -> Text -> MigrationCommand
script String
"create-table" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createJobQueueTableSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-dlq-table" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createJobQueueDLQTableSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-dlq-group-key-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createDLQGroupKeyIndexSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-dlq-failed-at-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createDLQFailedAtIndexSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-dedup-key-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createDedupKeyIndexSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-group-key-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createJobQueueGroupKeyIndexSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-ungrouped-ranking-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createJobQueueUngroupedRankingIndexSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-parent-id-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createParentIdIndexSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-dlq-parent-id-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createDLQParentIdIndexSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-results-table" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createResultsTableSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-groups-table" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createGroupsTableSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-groups-index" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createGroupsIndexSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-groups-trigger-functions" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createGroupsTriggerFunctionsSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-groups-triggers" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createGroupsTriggersSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-reaper-seq" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createReaperSeqSQL Text
schemaName Text
tableName
]
notifyTriggers :: [MigrationCommand]
notifyTriggers
| MigrationConfig -> Bool
enableNotifications MigrationConfig
config =
[ String -> Text -> MigrationCommand
script String
"create-notify-function" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createNotifyFunctionSQL Text
schemaName Text
tableName
, String -> Text -> MigrationCommand
script String
"create-notify-trigger" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createNotifyTriggerSQL Text
schemaName Text
tableName
]
| Bool
otherwise = []
eventStreamingTriggers :: [MigrationCommand]
eventStreamingTriggers
| MigrationConfig -> Bool
enableEventStreaming MigrationConfig
config =
[ String -> Text -> MigrationCommand
script String
"create-event-streaming-triggers" (Text -> MigrationCommand) -> Text -> MigrationCommand
forall a b. (a -> b) -> a -> b
$ Text -> Text -> Text
createEventStreamingTriggersSQL Text
schemaName Text
tableName
]
| Bool
otherwise = []
in [MigrationCommand]
coreMigrations [MigrationCommand] -> [MigrationCommand] -> [MigrationCommand]
forall a. Semigroup a => a -> a -> a
<> [MigrationCommand]
notifyTriggers [MigrationCommand] -> [MigrationCommand] -> [MigrationCommand]
forall a. Semigroup a => a -> a -> a
<> [MigrationCommand]
eventStreamingTriggers
schemaExists :: PG.Connection -> Text -> IO Bool
schemaExists :: Connection -> Text -> IO Bool
schemaExists Connection
conn Text
schemaName = do
rows <- Connection -> Query -> Only Text -> IO [Only Int]
forall q r.
(ToRow q, FromRow r) =>
Connection -> Query -> q -> IO [r]
query Connection
conn Query
"SELECT 1 FROM pg_namespace WHERE nspname = ?" (Text -> Only Text
forall a. a -> Only a
Only Text
schemaName) :: IO [Only Int]
pure (not (null rows))