module DomainDriven.Persistance.Postgres.Migration where import Data.Aeson import Data.Int import Data.String import Database.PostgreSQL.Simple as PG import DomainDriven.Persistance.Class import DomainDriven.Persistance.Postgres.Internal ( createEventTable' , mkEventQuery , mkEventStream ) import DomainDriven.Persistance.Postgres.Types import qualified Streamly.Data.Unfold as Unfold import qualified Streamly.Prelude as S import UnliftIO (liftIO) import Prelude migrateValue1to1 :: Connection -> PreviousEventTableName -> EventTableName -> (Value -> Value) -> IO () migrateValue1to1 :: Connection -> PreviousEventTableName -> PreviousEventTableName -> (Value -> Value) -> IO () migrateValue1to1 Connection conn PreviousEventTableName prevTName PreviousEventTableName tName Value -> Value f = forall a b. (FromJSON a, ToJSON b) => Connection -> PreviousEventTableName -> PreviousEventTableName -> (Stored a -> Stored b) -> IO () migrate1to1 Connection conn PreviousEventTableName prevTName PreviousEventTableName tName (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap Value -> Value f) migrate1to1 :: forall a b . (FromJSON a, ToJSON b) => Connection -> PreviousEventTableName -> EventTableName -> (Stored a -> Stored b) -> IO () migrate1to1 :: forall a b. (FromJSON a, ToJSON b) => Connection -> PreviousEventTableName -> PreviousEventTableName -> (Stored a -> Stored b) -> IO () migrate1to1 Connection conn PreviousEventTableName prevTName PreviousEventTableName tName Stored a -> Stored b f = forall a b. (FromJSON a, ToJSON b) => Connection -> PreviousEventTableName -> PreviousEventTableName -> (Stored a -> [Stored b]) -> IO () migrate1toMany Connection conn PreviousEventTableName prevTName PreviousEventTableName tName (forall (f :: * -> *) a. Applicative f => a -> f a pure forall b c a. (b -> c) -> (a -> b) -> a -> c . Stored a -> Stored b f) migrate1toMany :: forall a b . (FromJSON a, ToJSON b) => Connection -> PreviousEventTableName -> EventTableName -> (Stored a -> [Stored b]) -> IO () migrate1toMany :: forall a b. (FromJSON a, ToJSON b) => Connection -> PreviousEventTableName -> PreviousEventTableName -> (Stored a -> [Stored b]) -> IO () migrate1toMany Connection conn PreviousEventTableName prevTName PreviousEventTableName tName Stored a -> [Stored b] f = do Int64 _ <- Connection -> PreviousEventTableName -> IO Int64 createEventTable' Connection conn PreviousEventTableName tName forall (m :: * -> *) a b. Monad m => (a -> m b) -> SerialT m a -> m () S.mapM_ (forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c . Stored b -> IO Int64 writeIt) forall b c a. (b -> c) -> (a -> b) -> a -> c . forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b. (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b S.unfoldMany forall (m :: * -> *) a. Monad m => Unfold m [a] a Unfold.fromList forall a b. (a -> b) -> a -> b $ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b. (IsStream t, Monad m) => (a -> b) -> t m a -> t m b S.map (Stored a -> [Stored b] f forall b c a. (b -> c) -> (a -> b) -> a -> c . forall a b. (a, b) -> a fst) forall a b. (a -> b) -> a -> b $ forall event. FromJSON event => ChunkSize -> Connection -> EventQuery -> SerialT IO (Stored event, EventNumber) mkEventStream ChunkSize 1 Connection conn (PreviousEventTableName -> EventQuery mkEventQuery PreviousEventTableName prevTName) where writeIt :: Stored b -> IO Int64 writeIt :: Stored b -> IO Int64 writeIt Stored b event = forall q. ToRow q => Connection -> Query -> [q] -> IO Int64 PG.executeMany Connection conn ( Query "insert into \"" forall a. Semigroup a => a -> a -> a <> forall a. IsString a => PreviousEventTableName -> a fromString PreviousEventTableName tName forall a. Semigroup a => a -> a -> a <> Query "\" (id, timestamp, event) \ \values (?, ?, ?)" ) (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap (\Stored b x -> (forall a. Stored a -> UUID storedUUID Stored b x, forall a. Stored a -> UTCTime storedTimestamp Stored b x, forall a. ToJSON a => a -> ByteString encode forall a b. (a -> b) -> a -> b $ forall a. Stored a -> a storedEvent Stored b x)) [Stored b event]) migrate1toManyWithState :: forall a b state . (FromJSON a, ToJSON b) => Connection -> PreviousEventTableName -> EventTableName -> (state -> Stored a -> (state, [Stored b])) -> state -> IO () migrate1toManyWithState :: forall a b state. (FromJSON a, ToJSON b) => Connection -> PreviousEventTableName -> PreviousEventTableName -> (state -> Stored a -> (state, [Stored b])) -> state -> IO () migrate1toManyWithState Connection conn PreviousEventTableName prevTName PreviousEventTableName tName state -> Stored a -> (state, [Stored b]) f state initialState = do Int64 _ <- Connection -> PreviousEventTableName -> IO Int64 createEventTable' Connection conn PreviousEventTableName tName forall (m :: * -> *) a b. Monad m => (a -> m b) -> SerialT m a -> m () S.mapM_ (forall (m :: * -> *) a. MonadIO m => IO a -> m a liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c . Stored b -> IO Int64 writeIt) forall b c a. (b -> c) -> (a -> b) -> a -> c . forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b. (IsStream t, Monad m) => Unfold m a b -> t m a -> t m b S.unfoldMany forall (m :: * -> *) a. Monad m => Unfold m [a] a Unfold.fromList forall b c a. (b -> c) -> (a -> b) -> a -> c . forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b. (IsStream t, Monad m) => (a -> b) -> t m a -> t m b S.map forall a b. (a, b) -> b snd forall a b. (a -> b) -> a -> b $ forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a. (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> t m b S.scanl' (\(state, [Stored b]) b -> state -> Stored a -> (state, [Stored b]) f (forall a b. (a, b) -> a fst (state, [Stored b]) b)) (state initialState, []) forall a b. (a -> b) -> a -> b $ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b. (IsStream t, Monad m) => (a -> b) -> t m a -> t m b S.map forall a b. (a, b) -> a fst forall a b. (a -> b) -> a -> b $ forall event. FromJSON event => ChunkSize -> Connection -> EventQuery -> SerialT IO (Stored event, EventNumber) mkEventStream ChunkSize 1 Connection conn (PreviousEventTableName -> EventQuery mkEventQuery PreviousEventTableName prevTName) where writeIt :: Stored b -> IO Int64 writeIt :: Stored b -> IO Int64 writeIt Stored b event = forall q. ToRow q => Connection -> Query -> [q] -> IO Int64 PG.executeMany Connection conn ( Query "insert into \"" forall a. Semigroup a => a -> a -> a <> forall a. IsString a => PreviousEventTableName -> a fromString PreviousEventTableName tName forall a. Semigroup a => a -> a -> a <> Query "\" (id, timestamp, event) \ \values (?, ?, ?)" ) (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b fmap (\Stored b x -> (forall a. Stored a -> UUID storedUUID Stored b x, forall a. Stored a -> UTCTime storedTimestamp Stored b x, forall a. ToJSON a => a -> ByteString encode forall a b. (a -> b) -> a -> b $ forall a. Stored a -> a storedEvent Stored b x)) [Stored b event])