module DomainDriven.Persistance.Postgres.Migration where

import Data.Aeson
import Data.Int
import Data.String
import Database.PostgreSQL.Simple as PG
import DomainDriven.Persistance.Class
import DomainDriven.Persistance.Postgres.Internal
    ( createEventTable'
    , mkEventQuery
    , mkEventStream
    )
import DomainDriven.Persistance.Postgres.Types
import qualified Streamly.Data.Unfold as Unfold
import qualified Streamly.Prelude as S
import UnliftIO (liftIO)
import Prelude

migrateValue1to1
    :: Connection -> PreviousEventTableName -> EventTableName -> (Value -> Value) -> IO ()
migrateValue1to1 :: Connection
-> PreviousEventTableName
-> PreviousEventTableName
-> (Value -> Value)
-> IO ()
migrateValue1to1 Connection
conn PreviousEventTableName
prevTName PreviousEventTableName
tName Value -> Value
f = forall a b.
(FromJSON a, ToJSON b) =>
Connection
-> PreviousEventTableName
-> PreviousEventTableName
-> (Stored a -> Stored b)
-> IO ()
migrate1to1 Connection
conn PreviousEventTableName
prevTName PreviousEventTableName
tName (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Value -> Value
f)

migrate1to1
    :: forall a b
     . (FromJSON a, ToJSON b)
    => Connection
    -> PreviousEventTableName
    -> EventTableName
    -> (Stored a -> Stored b)
    -> IO ()
migrate1to1 :: forall a b.
(FromJSON a, ToJSON b) =>
Connection
-> PreviousEventTableName
-> PreviousEventTableName
-> (Stored a -> Stored b)
-> IO ()
migrate1to1 Connection
conn PreviousEventTableName
prevTName PreviousEventTableName
tName Stored a -> Stored b
f = forall a b.
(FromJSON a, ToJSON b) =>
Connection
-> PreviousEventTableName
-> PreviousEventTableName
-> (Stored a -> [Stored b])
-> IO ()
migrate1toMany Connection
conn PreviousEventTableName
prevTName PreviousEventTableName
tName (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stored a -> Stored b
f)

migrate1toMany
    :: forall a b
     . (FromJSON a, ToJSON b)
    => Connection
    -> PreviousEventTableName
    -> EventTableName
    -> (Stored a -> [Stored b])
    -> IO ()
migrate1toMany :: forall a b.
(FromJSON a, ToJSON b) =>
Connection
-> PreviousEventTableName
-> PreviousEventTableName
-> (Stored a -> [Stored b])
-> IO ()
migrate1toMany Connection
conn PreviousEventTableName
prevTName PreviousEventTableName
tName Stored a -> [Stored b]
f = do
    Int64
_ <- Connection -> PreviousEventTableName -> IO Int64
createEventTable' Connection
conn PreviousEventTableName
tName
    forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> m ()
S.mapM_ (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stored b -> IO Int64
writeIt)
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> t m a -> t m b
S.unfoldMany forall (m :: * -> *) a. Monad m => Unfold m [a] a
Unfold.fromList
        forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map (Stored a -> [Stored b]
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. (a, b) -> a
fst)
        forall a b. (a -> b) -> a -> b
$ forall event.
FromJSON event =>
ChunkSize
-> Connection
-> EventQuery
-> SerialT IO (Stored event, EventNumber)
mkEventStream ChunkSize
1 Connection
conn (PreviousEventTableName -> EventQuery
mkEventQuery PreviousEventTableName
prevTName)
  where
    writeIt :: Stored b -> IO Int64
    writeIt :: Stored b -> IO Int64
writeIt Stored b
event =
        forall q. ToRow q => Connection -> Query -> [q] -> IO Int64
PG.executeMany
            Connection
conn
            ( Query
"insert into \""
                forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => PreviousEventTableName -> a
fromString PreviousEventTableName
tName
                forall a. Semigroup a => a -> a -> a
<> Query
"\" (id, timestamp, event) \
                   \values (?, ?, ?)"
            )
            (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\Stored b
x -> (forall a. Stored a -> UUID
storedUUID Stored b
x, forall a. Stored a -> UTCTime
storedTimestamp Stored b
x, forall a. ToJSON a => a -> ByteString
encode forall a b. (a -> b) -> a -> b
$ forall a. Stored a -> a
storedEvent Stored b
x)) [Stored b
event])

migrate1toManyWithState
    :: forall a b state
     . (FromJSON a, ToJSON b)
    => Connection
    -> PreviousEventTableName
    -> EventTableName
    -> (state -> Stored a -> (state, [Stored b]))
    -> state
    -> IO ()
migrate1toManyWithState :: forall a b state.
(FromJSON a, ToJSON b) =>
Connection
-> PreviousEventTableName
-> PreviousEventTableName
-> (state -> Stored a -> (state, [Stored b]))
-> state
-> IO ()
migrate1toManyWithState Connection
conn PreviousEventTableName
prevTName PreviousEventTableName
tName state -> Stored a -> (state, [Stored b])
f state
initialState = do
    Int64
_ <- Connection -> PreviousEventTableName -> IO Int64
createEventTable' Connection
conn PreviousEventTableName
tName
    forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> SerialT m a -> m ()
S.mapM_ (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stored b -> IO Int64
writeIt)
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
Unfold m a b -> t m a -> t m b
S.unfoldMany forall (m :: * -> *) a. Monad m => Unfold m [a] a
Unfold.fromList
        forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map forall a b. (a, b) -> b
snd
        forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
(IsStream t, Monad m) =>
(b -> a -> b) -> b -> t m a -> t m b
S.scanl' (\(state, [Stored b])
b -> state -> Stored a -> (state, [Stored b])
f (forall a b. (a, b) -> a
fst (state, [Stored b])
b)) (state
initialState, [])
        forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, Monad m) =>
(a -> b) -> t m a -> t m b
S.map forall a b. (a, b) -> a
fst
        forall a b. (a -> b) -> a -> b
$ forall event.
FromJSON event =>
ChunkSize
-> Connection
-> EventQuery
-> SerialT IO (Stored event, EventNumber)
mkEventStream ChunkSize
1 Connection
conn (PreviousEventTableName -> EventQuery
mkEventQuery PreviousEventTableName
prevTName)
  where
    writeIt :: Stored b -> IO Int64
    writeIt :: Stored b -> IO Int64
writeIt Stored b
event =
        forall q. ToRow q => Connection -> Query -> [q] -> IO Int64
PG.executeMany
            Connection
conn
            ( Query
"insert into \""
                forall a. Semigroup a => a -> a -> a
<> forall a. IsString a => PreviousEventTableName -> a
fromString PreviousEventTableName
tName
                forall a. Semigroup a => a -> a -> a
<> Query
"\" (id, timestamp, event) \
                   \values (?, ?, ?)"
            )
            (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (\Stored b
x -> (forall a. Stored a -> UUID
storedUUID Stored b
x, forall a. Stored a -> UTCTime
storedTimestamp Stored b
x, forall a. ToJSON a => a -> ByteString
encode forall a b. (a -> b) -> a -> b
$ forall a. Stored a -> a
storedEvent Stored b
x)) [Stored b
event])