{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DeriveTraversable #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Database.CQRS.Stream
( Stream(..)
, WritableStream(..)
, EventWithContext(..)
, EventWithContext'
, MonadMetadata(..)
, ConsistencyCheck(..)
, writeEvent
, writeEventCc
, optimistically
, StreamBounds(..)
, StreamBounds'
, afterEvent
, untilEvent
) where
import GHC.Generics
import qualified Control.Monad.Except as Exc
import qualified Pipes
import Database.CQRS.Error
class Stream f stream where
type EventType stream :: *
type EventIdentifier stream :: *
type EventMetadata stream :: *
streamEvents
:: stream
-> StreamBounds' stream
-> Pipes.Producer
[ Either
(EventIdentifier stream, String)
(EventWithContext' stream)
] f ()
data ConsistencyCheck identifier
= NoConsistencyCheck
| CheckNoEvents
| CheckLastEvent identifier
class Stream f stream => WritableStream f stream where
writeEventWithMetadata
:: stream
-> EventType stream
-> EventMetadata stream
-> ConsistencyCheck (EventIdentifier stream)
-> f (EventIdentifier stream)
data EventWithContext identifier metadata event = EventWithContext
{ identifier :: identifier
, metadata :: metadata
, event :: event
} deriving (Eq, Show, Generic)
type EventWithContext' stream
= EventWithContext
(EventIdentifier stream)
(EventMetadata stream)
(EventType stream)
class MonadMetadata metadata m where
getMetadata :: m metadata
instance Monad m => MonadMetadata () m where
getMetadata = pure ()
writeEvent
:: (Monad m, MonadMetadata (EventMetadata stream) m, WritableStream m stream)
=> stream
-> EventType stream
-> m (EventIdentifier stream)
writeEvent stream ev = do
md <- getMetadata
writeEventWithMetadata stream ev md NoConsistencyCheck
writeEventCc
:: (Monad m, MonadMetadata (EventMetadata stream) m, WritableStream m stream)
=> stream
-> EventType stream
-> ConsistencyCheck (EventIdentifier stream)
-> m (EventIdentifier stream)
writeEventCc stream ev cc = do
md <- getMetadata
writeEventWithMetadata stream ev md cc
optimistically :: Exc.MonadError Error m => m a -> m a
optimistically f = do
f `Exc.catchError` \case
ConsistencyCheckError _ -> optimistically f
e -> Exc.throwError e
data StreamBounds identifier = StreamBounds
{ _afterEvent :: Maybe identifier
, _untilEvent :: Maybe identifier
}
deriving (Functor, Foldable, Traversable)
type StreamBounds' stream = StreamBounds (EventIdentifier stream)
instance
forall identifier. Ord identifier
=> Semigroup (StreamBounds identifier) where
sb1 <> sb2 =
StreamBounds
{ _afterEvent = combine _afterEvent max
, _untilEvent = combine _untilEvent min
}
where
combine
:: (StreamBounds identifier -> Maybe b) -> (b -> b -> b) -> Maybe b
combine proj merge =
case (proj sb1, proj sb2) of
(mx, Nothing) -> mx
(Nothing, my) -> my
(Just x, Just y) -> Just $ merge x y
instance Ord identifier => Monoid (StreamBounds identifier) where
mempty = StreamBounds Nothing Nothing
afterEvent :: Ord identifier => identifier -> StreamBounds identifier
afterEvent i = mempty { _afterEvent = Just i }
untilEvent :: Ord identifier => identifier -> StreamBounds identifier
untilEvent i = mempty { _untilEvent = Just i }