module MessageDb.Projection
( Projection (..),
Functions.BatchSize (..),
UnprocessedMessage (..),
Projected (..),
versionIncludingUnprocessed,
project,
fetch,
)
where
import Control.Exception (Exception)
import Data.Foldable (foldl', toList)
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.List.NonEmpty as NonEmpty
import qualified MessageDb.Functions as Functions
import MessageDb.Handlers (HandleError (..))
import qualified MessageDb.Handlers as Handlers
import MessageDb.Message (Message)
import qualified MessageDb.Message as Message
import MessageDb.StreamName (StreamName)
data Projection state = Projection
{ Projection state -> state
initial :: state
, Projection state -> ProjectionHandlers state
handlers :: Handlers.ProjectionHandlers state
}
data UnprocessedMessage = UnprocessedMessage
{ UnprocessedMessage -> Message
message :: Message
, UnprocessedMessage -> HandleError
reason :: HandleError
}
deriving (Int -> UnprocessedMessage -> ShowS
[UnprocessedMessage] -> ShowS
UnprocessedMessage -> String
(Int -> UnprocessedMessage -> ShowS)
-> (UnprocessedMessage -> String)
-> ([UnprocessedMessage] -> ShowS)
-> Show UnprocessedMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UnprocessedMessage] -> ShowS
$cshowList :: [UnprocessedMessage] -> ShowS
show :: UnprocessedMessage -> String
$cshow :: UnprocessedMessage -> String
showsPrec :: Int -> UnprocessedMessage -> ShowS
$cshowsPrec :: Int -> UnprocessedMessage -> ShowS
Show, UnprocessedMessage -> UnprocessedMessage -> Bool
(UnprocessedMessage -> UnprocessedMessage -> Bool)
-> (UnprocessedMessage -> UnprocessedMessage -> Bool)
-> Eq UnprocessedMessage
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UnprocessedMessage -> UnprocessedMessage -> Bool
$c/= :: UnprocessedMessage -> UnprocessedMessage -> Bool
== :: UnprocessedMessage -> UnprocessedMessage -> Bool
$c== :: UnprocessedMessage -> UnprocessedMessage -> Bool
Eq)
instance Exception UnprocessedMessage
data Projected state = Projected
{ Projected state -> [UnprocessedMessage]
unprocessed :: [UnprocessedMessage]
, Projected state -> state
state :: state
, Projected state -> StreamVersion
version :: Functions.StreamVersion
}
deriving (Int -> Projected state -> ShowS
[Projected state] -> ShowS
Projected state -> String
(Int -> Projected state -> ShowS)
-> (Projected state -> String)
-> ([Projected state] -> ShowS)
-> Show (Projected state)
forall state. Show state => Int -> Projected state -> ShowS
forall state. Show state => [Projected state] -> ShowS
forall state. Show state => Projected state -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Projected state] -> ShowS
$cshowList :: forall state. Show state => [Projected state] -> ShowS
show :: Projected state -> String
$cshow :: forall state. Show state => Projected state -> String
showsPrec :: Int -> Projected state -> ShowS
$cshowsPrec :: forall state. Show state => Int -> Projected state -> ShowS
Show, Projected state -> Projected state -> Bool
(Projected state -> Projected state -> Bool)
-> (Projected state -> Projected state -> Bool)
-> Eq (Projected state)
forall state.
Eq state =>
Projected state -> Projected state -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Projected state -> Projected state -> Bool
$c/= :: forall state.
Eq state =>
Projected state -> Projected state -> Bool
== :: Projected state -> Projected state -> Bool
$c== :: forall state.
Eq state =>
Projected state -> Projected state -> Bool
Eq, a -> Projected b -> Projected a
(a -> b) -> Projected a -> Projected b
(forall a b. (a -> b) -> Projected a -> Projected b)
-> (forall a b. a -> Projected b -> Projected a)
-> Functor Projected
forall a b. a -> Projected b -> Projected a
forall a b. (a -> b) -> Projected a -> Projected b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Projected b -> Projected a
$c<$ :: forall a b. a -> Projected b -> Projected a
fmap :: (a -> b) -> Projected a -> Projected b
$cfmap :: forall a b. (a -> b) -> Projected a -> Projected b
Functor)
empty :: state -> Projected state
empty :: state -> Projected state
empty state
initialState =
Projected :: forall state.
[UnprocessedMessage] -> state -> StreamVersion -> Projected state
Projected
{ unprocessed :: [UnprocessedMessage]
unprocessed = []
, state :: state
state = state
initialState
, version :: StreamVersion
version = StreamVersion
Functions.DoesNotExist
}
versionIncludingUnprocessed :: Projected state -> Functions.StreamVersion
versionIncludingUnprocessed :: Projected state -> StreamVersion
versionIncludingUnprocessed Projected{state
[UnprocessedMessage]
StreamVersion
version :: StreamVersion
state :: state
unprocessed :: [UnprocessedMessage]
version :: forall state. Projected state -> StreamVersion
state :: forall state. Projected state -> state
unprocessed :: forall state. Projected state -> [UnprocessedMessage]
..} =
let unprocessedPositions :: [StreamVersion]
unprocessedPositions =
StreamPosition -> StreamVersion
Functions.DoesExist (StreamPosition -> StreamVersion)
-> (UnprocessedMessage -> StreamPosition)
-> UnprocessedMessage
-> StreamVersion
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> StreamPosition
Message.messageStreamPosition (Message -> StreamPosition)
-> (UnprocessedMessage -> Message)
-> UnprocessedMessage
-> StreamPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnprocessedMessage -> Message
message (UnprocessedMessage -> StreamVersion)
-> [UnprocessedMessage] -> [StreamVersion]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [UnprocessedMessage]
unprocessed
allPositions :: NonEmpty StreamVersion
allPositions =
StreamVersion
version StreamVersion -> [StreamVersion] -> NonEmpty StreamVersion
forall a. a -> [a] -> NonEmpty a
:| [StreamVersion]
unprocessedPositions
in NonEmpty StreamVersion -> StreamVersion
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum NonEmpty StreamVersion
allPositions
reverseUnprocessed :: Projected state -> Projected state
reverseUnprocessed :: Projected state -> Projected state
reverseUnprocessed Projected state
projected =
Projected state
projected
{ unprocessed :: [UnprocessedMessage]
unprocessed = [UnprocessedMessage] -> [UnprocessedMessage]
forall a. [a] -> [a]
reverse (Projected state -> [UnprocessedMessage]
forall state. Projected state -> [UnprocessedMessage]
unprocessed Projected state
projected)
}
project' :: Projection state -> NonEmpty Message -> Projected state
project' :: Projection state -> NonEmpty Message -> Projected state
project' Projection{state
initial :: state
initial :: forall state. Projection state -> state
initial, ProjectionHandlers state
handlers :: ProjectionHandlers state
handlers :: forall state. Projection state -> ProjectionHandlers state
handlers} NonEmpty Message
messages =
let applyHandler :: Message -> Projected state -> Projected state
applyHandler Message
message projected :: Projected state
projected@Projected{state
state :: state
state :: forall state. Projected state -> state
state, [UnprocessedMessage]
unprocessed :: [UnprocessedMessage]
unprocessed :: forall state. Projected state -> [UnprocessedMessage]
unprocessed} =
case ProjectionHandlers state
-> Message -> state -> Either HandleError state
forall state.
ProjectionHandlers state
-> Message -> state -> Either HandleError state
Handlers.projectionHandle ProjectionHandlers state
handlers Message
message state
state of
Right state
updatedState ->
Projected state
projected
{ state :: state
state = state
updatedState
, version :: StreamVersion
version = StreamPosition -> StreamVersion
Functions.DoesExist (StreamPosition -> StreamVersion)
-> StreamPosition -> StreamVersion
forall a b. (a -> b) -> a -> b
$ Message -> StreamPosition
Message.messageStreamPosition Message
message
}
Left HandleError
newError ->
Projected state
projected
{ unprocessed :: [UnprocessedMessage]
unprocessed = Message -> HandleError -> UnprocessedMessage
UnprocessedMessage Message
message HandleError
newError UnprocessedMessage -> [UnprocessedMessage] -> [UnprocessedMessage]
forall a. a -> [a] -> [a]
: [UnprocessedMessage]
unprocessed
}
in (Projected state -> Message -> Projected state)
-> Projected state -> [Message] -> Projected state
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((Message -> Projected state -> Projected state)
-> Projected state -> Message -> Projected state
forall a b c. (a -> b -> c) -> b -> a -> c
flip Message -> Projected state -> Projected state
applyHandler) (state -> Projected state
forall state. state -> Projected state
empty state
initial) (NonEmpty Message -> [Message]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty Message
messages)
project :: Projection state -> NonEmpty Message -> Projected state
project :: Projection state -> NonEmpty Message -> Projected state
project Projection state
messages =
Projected state -> Projected state
forall state. Projected state -> Projected state
reverseUnprocessed (Projected state -> Projected state)
-> (NonEmpty Message -> Projected state)
-> NonEmpty Message
-> Projected state
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Projection state -> NonEmpty Message -> Projected state
forall state.
Projection state -> NonEmpty Message -> Projected state
project' Projection state
messages
fetch :: forall state. Functions.WithConnection -> Functions.BatchSize -> StreamName -> Projection state -> IO (Maybe (Projected state))
fetch :: WithConnection
-> BatchSize
-> StreamName
-> Projection state
-> IO (Maybe (Projected state))
fetch WithConnection
withConnection BatchSize
batchSize StreamName
streamName Projection state
projection =
let query :: StreamPosition -> Projected state -> IO (Maybe (Projected state))
query StreamPosition
position projected :: Projected state
projected@Projected{state
state :: state
state :: forall state. Projected state -> state
state, [UnprocessedMessage]
unprocessed :: [UnprocessedMessage]
unprocessed :: forall state. Projected state -> [UnprocessedMessage]
unprocessed} = do
[Message]
messages <- (Connection -> IO [Message]) -> IO [Message]
WithConnection
withConnection ((Connection -> IO [Message]) -> IO [Message])
-> (Connection -> IO [Message]) -> IO [Message]
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
Connection
-> StreamName
-> Maybe StreamPosition
-> Maybe BatchSize
-> Maybe Condition
-> IO [Message]
Functions.getStreamMessages Connection
connection StreamName
streamName (StreamPosition -> Maybe StreamPosition
forall a. a -> Maybe a
Just StreamPosition
position) (BatchSize -> Maybe BatchSize
forall a. a -> Maybe a
Just BatchSize
batchSize) Maybe Condition
forall a. Maybe a
Nothing
case [Message]
messages of
(Message
firstMessage : [Message]
otherMessages) -> do
let nonEmptyMessages :: NonEmpty Message
nonEmptyMessages = Message
firstMessage Message -> [Message] -> NonEmpty Message
forall a. a -> [a] -> NonEmpty a
:| [Message]
otherMessages
let (Projected [UnprocessedMessage]
newErrors state
updatedState StreamVersion
updatedVersion) =
Projection state -> NonEmpty Message -> Projected state
forall state.
Projection state -> NonEmpty Message -> Projected state
project' (Projection state
projection{initial :: state
initial = state
state}) NonEmpty Message
nonEmptyMessages
updatedProjectedState :: Projected state
updatedProjectedState =
Projected :: forall state.
[UnprocessedMessage] -> state -> StreamVersion -> Projected state
Projected
{ unprocessed :: [UnprocessedMessage]
unprocessed = [UnprocessedMessage]
newErrors [UnprocessedMessage]
-> [UnprocessedMessage] -> [UnprocessedMessage]
forall a. Semigroup a => a -> a -> a
<> [UnprocessedMessage]
unprocessed
, state :: state
state = state
updatedState
, version :: StreamVersion
version = StreamVersion
updatedVersion
}
if BatchSize
batchSize BatchSize -> BatchSize -> Bool
forall a. Eq a => a -> a -> Bool
== BatchSize
Functions.Unlimited
then Maybe (Projected state) -> IO (Maybe (Projected state))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Projected state) -> IO (Maybe (Projected state)))
-> Maybe (Projected state) -> IO (Maybe (Projected state))
forall a b. (a -> b) -> a -> b
$ Projected state -> Maybe (Projected state)
forall a. a -> Maybe a
Just Projected state
updatedProjectedState
else
let nextPosition :: StreamPosition
nextPosition = Message -> StreamPosition
Message.messageStreamPosition (NonEmpty Message -> Message
forall a. NonEmpty a -> a
NonEmpty.last NonEmpty Message
nonEmptyMessages) StreamPosition -> StreamPosition -> StreamPosition
forall a. Num a => a -> a -> a
+ StreamPosition
1
in StreamPosition -> Projected state -> IO (Maybe (Projected state))
query StreamPosition
nextPosition Projected state
updatedProjectedState
[Message]
_ ->
Maybe (Projected state) -> IO (Maybe (Projected state))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Projected state) -> IO (Maybe (Projected state)))
-> Maybe (Projected state) -> IO (Maybe (Projected state))
forall a b. (a -> b) -> a -> b
$
if StreamPosition
position StreamPosition -> StreamPosition -> Bool
forall a. Ord a => a -> a -> Bool
<= StreamPosition
0
then Maybe (Projected state)
forall a. Maybe a
Nothing
else Projected state -> Maybe (Projected state)
forall a. a -> Maybe a
Just Projected state
projected
in (Projected state -> Projected state)
-> Maybe (Projected state) -> Maybe (Projected state)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Projected state -> Projected state
forall state. Projected state -> Projected state
reverseUnprocessed (Maybe (Projected state) -> Maybe (Projected state))
-> IO (Maybe (Projected state)) -> IO (Maybe (Projected state))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamPosition -> Projected state -> IO (Maybe (Projected state))
query StreamPosition
0 (state -> Projected state
forall state. state -> Projected state
empty (state -> Projected state) -> state -> Projected state
forall a b. (a -> b) -> a -> b
$ Projection state -> state
forall state. Projection state -> state
initial Projection state
projection)