{-# LANGUAGE DeriveFunctor #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TupleSections #-}
{-# LANGUAGE TypeFamilies #-}
module Database.CQRS.Transformer
(
Transformer
, TransformedStream
, transformStream
, TransformedStreamFamily
, transformStreamFamily
, Transform
, pushEvent
, mergeEvents
, flushEvents
, failTransformer
) where
import Control.Monad (forM_)
import Control.Monad.Trans (lift)
import Data.Functor ((<&>))
import Data.Hashable (Hashable)
import qualified Data.HashMap.Strict as HM
import qualified Control.Monad.Free as Free
import qualified Control.Monad.State as St
import qualified Pipes
import qualified Pipes.Prelude as Pipes
import qualified Database.CQRS.Stream as CQRS
import qualified Database.CQRS.StreamFamily as CQRS
type Transformer inputEvent eventId event =
inputEvent -> Transform eventId event ()
data TransformedStream m identifier metadata event =
forall stream. CQRS.Stream m stream => TransformedStream
{ transformer
:: Transformer
(Either
(CQRS.EventIdentifier stream, String)
(CQRS.EventWithContext' stream))
identifier (CQRS.EventWithContext identifier metadata event)
, inputStream :: stream
, reverseIdentifier :: identifier -> m (CQRS.EventIdentifier stream)
}
transformStream
:: CQRS.Stream m stream
=> Transformer
(Either
(CQRS.EventIdentifier stream, String)
(CQRS.EventWithContext' stream))
identifier (CQRS.EventWithContext identifier metadata event)
-> (identifier -> m (CQRS.EventIdentifier stream))
-> stream
-> TransformedStream m identifier metadata event
transformStream transformer reverseIdentifier inputStream =
TransformedStream{..}
instance
Monad m
=> CQRS.Stream m (TransformedStream m identifier metadata event) where
type EventType (TransformedStream m identifier metadata event) = event
type EventIdentifier (TransformedStream m identifier metadata event) =
identifier
type EventMetadata (TransformedStream m identifier metadata event) = metadata
streamEvents = transformedStreamStreamEvents
transformedStreamStreamEvents
:: Monad m
=> TransformedStream m identifier metadata event
-> CQRS.StreamBounds identifier
-> Pipes.Producer
[ Either
(identifier, String)
(CQRS.EventWithContext identifier metadata event)
] m ()
transformedStreamStreamEvents TransformedStream{..} bounds = do
inputBounds <- lift $ traverse reverseIdentifier bounds
let inputs = Pipes.hoist lift $ CQRS.streamEvents inputStream inputBounds
finalEvents <-
(\f -> Pipes.foldM f (pure []) pure inputs) $ \waitingEvents batch -> do
let (batches, waitingEvents') =
runTransform waitingEvents . mapM_ transformer $ batch
Pipes.each $ batches <&> \case
Left err -> [Left err]
Right b -> map Right b
case drop 100 waitingEvents' of
[] -> pure waitingEvents'
_ -> do
Pipes.yield $ map Right waitingEvents'
pure []
Pipes.yield $ map Right finalEvents
data TransformedStreamFamily m streamId eventId metadata event =
forall streamFamily.
( Hashable (CQRS.StreamIdentifier streamFamily)
, Ord (CQRS.StreamIdentifier streamFamily)
, CQRS.Stream m (CQRS.StreamType streamFamily)
, CQRS.StreamFamily m streamFamily
)
=> TransformedStreamFamily
{ inputStreamFamily :: streamFamily
, streamTransformer
:: Transformer
(Either
(CQRS.EventIdentifier (CQRS.StreamType streamFamily), String)
(CQRS.EventWithContext' (CQRS.StreamType streamFamily)))
eventId (CQRS.EventWithContext eventId metadata event)
, makeStreamIdentifier
:: CQRS.StreamIdentifier streamFamily -> m streamId
, reverseStreamIdentifier
:: streamId -> m (CQRS.StreamIdentifier streamFamily)
, makeEventIdentifier
:: CQRS.EventIdentifier (CQRS.StreamType streamFamily) -> m eventId
, reverseEventIdentifier
:: eventId -> m (CQRS.EventIdentifier (CQRS.StreamType streamFamily))
}
transformStreamFamily
:: forall m streamId eventId metadata event streamFamily.
( Hashable (CQRS.StreamIdentifier streamFamily)
, Ord (CQRS.StreamIdentifier streamFamily)
, CQRS.Stream m (CQRS.StreamType streamFamily)
, CQRS.StreamFamily m streamFamily
)
=> Transformer
(Either
(CQRS.EventIdentifier (CQRS.StreamType streamFamily), String)
(CQRS.EventWithContext' (CQRS.StreamType streamFamily)))
eventId (CQRS.EventWithContext eventId metadata event)
-> (CQRS.StreamIdentifier streamFamily -> m streamId)
-> (streamId -> m (CQRS.StreamIdentifier streamFamily))
-> (CQRS.EventIdentifier (CQRS.StreamType streamFamily) -> m eventId)
-> (eventId -> m (CQRS.EventIdentifier (CQRS.StreamType streamFamily)))
-> streamFamily
-> TransformedStreamFamily m streamId eventId metadata event
transformStreamFamily streamTransformer
makeStreamIdentifier reverseStreamIdentifier
makeEventIdentifier reverseEventIdentifier
inputStreamFamily =
TransformedStreamFamily{..}
instance
Monad m
=> CQRS.StreamFamily m
(TransformedStreamFamily m streamId eventId metadata event) where
type StreamType (TransformedStreamFamily m streamId eventId metadata event) =
TransformedStream m eventId metadata event
type StreamIdentifier
(TransformedStreamFamily m streamId eventId metadata event) = streamId
getStream = transformedStreamFamilyGetStream
allNewEvents = transformedStreamFamilyAllNewEvents
latestEventIdentifiers = transformedStreamFamilyLatestEventIdentifiers
transformedStreamFamilyGetStream
:: Monad m
=> TransformedStreamFamily m streamId eventId metadata event
-> streamId
-> m (TransformedStream m eventId metadata event)
transformedStreamFamilyGetStream TransformedStreamFamily{..} streamId = do
inputStreamId <- reverseStreamIdentifier streamId
inputStream <- CQRS.getStream inputStreamFamily inputStreamId
pure $ transformStream streamTransformer reverseEventIdentifier inputStream
transformedStreamFamilyAllNewEvents
:: Monad m
=> TransformedStreamFamily m streamId eventId metadata event
-> m (Pipes.Producer
[ ( streamId
, Either
(eventId, String)
(CQRS.EventWithContext eventId metadata event)
) ]
m a)
transformedStreamFamilyAllNewEvents TransformedStreamFamily{..} = do
inputNewEvents <- CQRS.allNewEvents inputStreamFamily
pure $ Pipes.for inputNewEvents $ \batch -> do
let eventsByStream =
HM.toList . HM.fromListWith (++) . map (fmap pure) $ batch
forM_ eventsByStream $ \(inputStreamId, events) -> do
streamId <- lift $ makeStreamIdentifier inputStreamId
let (batches, waitingEvents) =
runTransform [] . mapM_ streamTransformer $ events
Pipes.each $ (batches ++ [Right waitingEvents]) <&> \case
Left err -> [(streamId, Left err)]
Right b -> map ((streamId,) . Right) b
transformedStreamFamilyLatestEventIdentifiers
:: Monad m
=> TransformedStreamFamily m streamId eventId metadata event
-> Pipes.Producer (streamId, eventId) m ()
transformedStreamFamilyLatestEventIdentifiers TransformedStreamFamily{..} = do
let inputIds = CQRS.latestEventIdentifiers inputStreamFamily
Pipes.for inputIds $ \(inputStreamId, inputEventId) -> do
streamId <- lift $ makeStreamIdentifier inputStreamId
eventId <- lift $ makeEventIdentifier inputEventId
Pipes.yield (streamId, eventId)
data TransformF eventId event a
= PushEvent a event
| MergeEvents ([event] -> (a, [event]))
| FlushEvents a
| Failure a eventId String
deriving Functor
type Transform eventId event = Free.Free (TransformF eventId event)
runTransform
:: [event] -> Transform eventId event ()
-> ([Either (eventId, String) [event]], [event])
runTransform lastWaitingEvents =
flip St.execState ([], lastWaitingEvents) . Free.foldFree interpreter
where
interpreter
:: TransformF eventId event a
-> St.State ([Either (eventId, String) [event]], [event]) a
interpreter = \case
PushEvent x event -> do
St.modify $ \(batches, waitingEvents) ->
(batches, waitingEvents ++ [event])
pure x
MergeEvents f ->
St.state $ \(batches, waitingEvents) ->
let (x, waitingEvents') = f waitingEvents
st = (batches, waitingEvents')
in (x, st)
FlushEvents x -> do
St.modify $ \(batches, waitingEvents) ->
(batches ++ [Right waitingEvents], [])
pure x
Failure x eventId err -> do
St.modify $ \(batches, waitingEvents) ->
(batches ++ [Right waitingEvents, Left (eventId, err)], [])
pure x
pushEvent :: event -> Transform eventId event ()
pushEvent = Free.liftF . PushEvent ()
mergeEvents :: ([event] -> (a, [event])) -> Transform eventId event a
mergeEvents = Free.liftF . MergeEvents
flushEvents :: Transform eventId event ()
flushEvents = Free.liftF $ FlushEvents ()
failTransformer :: eventId -> String -> Transform eventId event ()
failTransformer eventId err = Free.liftF $ Failure () eventId err