-- | A projection is an aggregation of all messages in a stream.
module MessageDb.Projection
  ( Projection (..)
  , UnprocessedMessage (..)
  , Projected (..)
  , versionIncludingUnprocessed
  , project
  , fetch
  , SnapshotStreamName (..)
  , fetchWithSnapshots
  )
where

import Control.Exception (Exception)
import Control.Exception.Safe (handle)
import Control.Monad (void)
import Control.Monad.Except (liftEither)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.Trans.Except (runExceptT)
import Control.Monad.Trans.Maybe (MaybeT (MaybeT, runMaybeT))
import qualified Data.Aeson as Aeson
import Data.Bifunctor (Bifunctor (first))
import Data.Coerce (coerce)
import Data.Foldable (foldl', toList)
import Data.Functor ((<&>))
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.List.NonEmpty as NonEmpty
import Data.String (IsString)
import qualified MessageDb.Functions as Functions
import MessageDb.Handlers (HandleError (..))
import qualified MessageDb.Handlers as Handlers
import MessageDb.Message (Message)
import qualified MessageDb.Message as Message
import MessageDb.StreamName (StreamName)


-- | Defines how to perform a projection a stream.
data Projection state = Projection
  { Projection state -> state
initial :: state
  , Projection state -> ProjectionHandlers state
handlers :: Handlers.ProjectionHandlers state
  }


-- | A message that was not able to be processed.
data UnprocessedMessage = UnprocessedMessage
  { UnprocessedMessage -> Message
message :: Message
  , UnprocessedMessage -> HandleError
reason :: HandleError
  }
  deriving (Int -> UnprocessedMessage -> ShowS
[UnprocessedMessage] -> ShowS
UnprocessedMessage -> String
(Int -> UnprocessedMessage -> ShowS)
-> (UnprocessedMessage -> String)
-> ([UnprocessedMessage] -> ShowS)
-> Show UnprocessedMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UnprocessedMessage] -> ShowS
$cshowList :: [UnprocessedMessage] -> ShowS
show :: UnprocessedMessage -> String
$cshow :: UnprocessedMessage -> String
showsPrec :: Int -> UnprocessedMessage -> ShowS
$cshowsPrec :: Int -> UnprocessedMessage -> ShowS
Show, UnprocessedMessage -> UnprocessedMessage -> Bool
(UnprocessedMessage -> UnprocessedMessage -> Bool)
-> (UnprocessedMessage -> UnprocessedMessage -> Bool)
-> Eq UnprocessedMessage
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UnprocessedMessage -> UnprocessedMessage -> Bool
$c/= :: UnprocessedMessage -> UnprocessedMessage -> Bool
== :: UnprocessedMessage -> UnprocessedMessage -> Bool
$c== :: UnprocessedMessage -> UnprocessedMessage -> Bool
Eq)


instance Exception UnprocessedMessage


-- | A projected state
data Projected state = Projected
  { Projected state -> [UnprocessedMessage]
unprocessed :: [UnprocessedMessage]
  , Projected state -> state
state :: state
  , Projected state -> StreamVersion
version :: Functions.StreamVersion
  }
  deriving (Int -> Projected state -> ShowS
[Projected state] -> ShowS
Projected state -> String
(Int -> Projected state -> ShowS)
-> (Projected state -> String)
-> ([Projected state] -> ShowS)
-> Show (Projected state)
forall state. Show state => Int -> Projected state -> ShowS
forall state. Show state => [Projected state] -> ShowS
forall state. Show state => Projected state -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Projected state] -> ShowS
$cshowList :: forall state. Show state => [Projected state] -> ShowS
show :: Projected state -> String
$cshow :: forall state. Show state => Projected state -> String
showsPrec :: Int -> Projected state -> ShowS
$cshowsPrec :: forall state. Show state => Int -> Projected state -> ShowS
Show, Projected state -> Projected state -> Bool
(Projected state -> Projected state -> Bool)
-> (Projected state -> Projected state -> Bool)
-> Eq (Projected state)
forall state.
Eq state =>
Projected state -> Projected state -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Projected state -> Projected state -> Bool
$c/= :: forall state.
Eq state =>
Projected state -> Projected state -> Bool
== :: Projected state -> Projected state -> Bool
$c== :: forall state.
Eq state =>
Projected state -> Projected state -> Bool
Eq, a -> Projected b -> Projected a
(a -> b) -> Projected a -> Projected b
(forall a b. (a -> b) -> Projected a -> Projected b)
-> (forall a b. a -> Projected b -> Projected a)
-> Functor Projected
forall a b. a -> Projected b -> Projected a
forall a b. (a -> b) -> Projected a -> Projected b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Projected b -> Projected a
$c<$ :: forall a b. a -> Projected b -> Projected a
fmap :: (a -> b) -> Projected a -> Projected b
$cfmap :: forall a b. (a -> b) -> Projected a -> Projected b
Functor)


-- | Constructs an empty projection.
empty :: state -> Projected state
empty :: state -> Projected state
empty state
initialState =
  Projected :: forall state.
[UnprocessedMessage] -> state -> StreamVersion -> Projected state
Projected
    { unprocessed :: [UnprocessedMessage]
unprocessed = []
    , state :: state
state = state
initialState
    , version :: StreamVersion
version = StreamVersion
Functions.DoesNotExist
    }


-- | Version of the projection with unprocessed messages included.
versionIncludingUnprocessed :: Projected state -> Functions.StreamVersion
versionIncludingUnprocessed :: Projected state -> StreamVersion
versionIncludingUnprocessed Projected{state
[UnprocessedMessage]
StreamVersion
version :: StreamVersion
state :: state
unprocessed :: [UnprocessedMessage]
version :: forall state. Projected state -> StreamVersion
state :: forall state. Projected state -> state
unprocessed :: forall state. Projected state -> [UnprocessedMessage]
..} =
  let unprocessedPositions :: [StreamVersion]
unprocessedPositions =
        StreamPosition -> StreamVersion
Functions.DoesExist (StreamPosition -> StreamVersion)
-> (UnprocessedMessage -> StreamPosition)
-> UnprocessedMessage
-> StreamVersion
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> StreamPosition
Message.messageStreamPosition (Message -> StreamPosition)
-> (UnprocessedMessage -> Message)
-> UnprocessedMessage
-> StreamPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnprocessedMessage -> Message
message (UnprocessedMessage -> StreamVersion)
-> [UnprocessedMessage] -> [StreamVersion]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [UnprocessedMessage]
unprocessed

      allPositions :: NonEmpty StreamVersion
allPositions =
        StreamVersion
version StreamVersion -> [StreamVersion] -> NonEmpty StreamVersion
forall a. a -> [a] -> NonEmpty a
:| [StreamVersion]
unprocessedPositions
   in NonEmpty StreamVersion -> StreamVersion
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum NonEmpty StreamVersion
allPositions


reverseUnprocessed :: Projected state -> Projected state
reverseUnprocessed :: Projected state -> Projected state
reverseUnprocessed Projected state
projected =
  Projected state
projected
    { unprocessed :: [UnprocessedMessage]
unprocessed = [UnprocessedMessage] -> [UnprocessedMessage]
forall a. [a] -> [a]
reverse (Projected state -> [UnprocessedMessage]
forall state. Projected state -> [UnprocessedMessage]
unprocessed Projected state
projected)
    }


project' :: Projection state -> NonEmpty Message -> Projected state
project' :: Projection state -> NonEmpty Message -> Projected state
project' Projection{state
initial :: state
initial :: forall state. Projection state -> state
initial, ProjectionHandlers state
handlers :: ProjectionHandlers state
handlers :: forall state. Projection state -> ProjectionHandlers state
handlers} NonEmpty Message
messages =
  let applyHandler :: Message -> Projected state -> Projected state
applyHandler Message
message projected :: Projected state
projected@Projected{state
state :: state
state :: forall state. Projected state -> state
state, [UnprocessedMessage]
unprocessed :: [UnprocessedMessage]
unprocessed :: forall state. Projected state -> [UnprocessedMessage]
unprocessed} =
        case ProjectionHandlers state
-> Message -> state -> Either HandleError state
forall state.
ProjectionHandlers state
-> Message -> state -> Either HandleError state
Handlers.projectionHandle ProjectionHandlers state
handlers Message
message state
state of
          Right state
updatedState ->
            Projected state
projected
              { state :: state
state = state
updatedState
              , version :: StreamVersion
version = StreamPosition -> StreamVersion
Functions.DoesExist (StreamPosition -> StreamVersion)
-> StreamPosition -> StreamVersion
forall a b. (a -> b) -> a -> b
$ Message -> StreamPosition
Message.messageStreamPosition Message
message
              }
          Left HandleError
newError ->
            Projected state
projected
              { unprocessed :: [UnprocessedMessage]
unprocessed = Message -> HandleError -> UnprocessedMessage
UnprocessedMessage Message
message HandleError
newError UnprocessedMessage -> [UnprocessedMessage] -> [UnprocessedMessage]
forall a. a -> [a] -> [a]
: [UnprocessedMessage]
unprocessed
              }
   in (Projected state -> Message -> Projected state)
-> Projected state -> [Message] -> Projected state
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((Message -> Projected state -> Projected state)
-> Projected state -> Message -> Projected state
forall a b c. (a -> b -> c) -> b -> a -> c
flip Message -> Projected state -> Projected state
applyHandler) (state -> Projected state
forall state. state -> Projected state
empty state
initial) (NonEmpty Message -> [Message]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty Message
messages)


-- | Project a state of a stream by aggregating messages.
project :: Projection state -> NonEmpty Message -> Projected state
project :: Projection state -> NonEmpty Message -> Projected state
project Projection state
messages =
  Projected state -> Projected state
forall state. Projected state -> Projected state
reverseUnprocessed (Projected state -> Projected state)
-> (NonEmpty Message -> Projected state)
-> NonEmpty Message
-> Projected state
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Projection state -> NonEmpty Message -> Projected state
forall state.
Projection state -> NonEmpty Message -> Projected state
project' Projection state
messages


-- | Query a stream and project the messages.
fetch' ::
  forall state.
  Functions.WithConnection ->
  Functions.BatchSize ->
  StreamName ->
  Message.StreamPosition ->
  Projection state ->
  IO (Maybe (Projected state))
fetch' :: WithConnection
-> BatchSize
-> StreamName
-> StreamPosition
-> Projection state
-> IO (Maybe (Projected state))
fetch' WithConnection
withConnection BatchSize
batchSize StreamName
streamName StreamPosition
startingPosition Projection state
projection =
  let query :: StreamPosition -> Projected state -> IO (Maybe (Projected state))
query StreamPosition
position projected :: Projected state
projected@Projected{state
state :: state
state :: forall state. Projected state -> state
state, [UnprocessedMessage]
unprocessed :: [UnprocessedMessage]
unprocessed :: forall state. Projected state -> [UnprocessedMessage]
unprocessed} = do
        [Message]
messages <- (Connection -> IO [Message]) -> IO [Message]
WithConnection
withConnection ((Connection -> IO [Message]) -> IO [Message])
-> (Connection -> IO [Message]) -> IO [Message]
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
          Connection
-> StreamName
-> Maybe StreamPosition
-> Maybe BatchSize
-> Maybe Condition
-> IO [Message]
Functions.getStreamMessages Connection
connection StreamName
streamName (StreamPosition -> Maybe StreamPosition
forall a. a -> Maybe a
Just StreamPosition
position) (BatchSize -> Maybe BatchSize
forall a. a -> Maybe a
Just BatchSize
batchSize) Maybe Condition
forall a. Maybe a
Nothing

        case [Message]
messages of
          (Message
firstMessage : [Message]
otherMessages) -> do
            let nonEmptyMessages :: NonEmpty Message
nonEmptyMessages = Message
firstMessage Message -> [Message] -> NonEmpty Message
forall a. a -> [a] -> NonEmpty a
:| [Message]
otherMessages

            let (Projected [UnprocessedMessage]
newErrors state
updatedState StreamVersion
updatedVersion) =
                  Projection state -> NonEmpty Message -> Projected state
forall state.
Projection state -> NonEmpty Message -> Projected state
project' (Projection state
projection{initial :: state
initial = state
state}) NonEmpty Message
nonEmptyMessages

                updatedProjectedState :: Projected state
updatedProjectedState =
                  Projected :: forall state.
[UnprocessedMessage] -> state -> StreamVersion -> Projected state
Projected
                    { unprocessed :: [UnprocessedMessage]
unprocessed = [UnprocessedMessage]
newErrors [UnprocessedMessage]
-> [UnprocessedMessage] -> [UnprocessedMessage]
forall a. Semigroup a => a -> a -> a
<> [UnprocessedMessage]
unprocessed
                    , state :: state
state = state
updatedState
                    , version :: StreamVersion
version = StreamVersion
updatedVersion
                    }

            if BatchSize
batchSize BatchSize -> BatchSize -> Bool
forall a. Eq a => a -> a -> Bool
== BatchSize
Functions.Unlimited
              then Maybe (Projected state) -> IO (Maybe (Projected state))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Projected state) -> IO (Maybe (Projected state)))
-> Maybe (Projected state) -> IO (Maybe (Projected state))
forall a b. (a -> b) -> a -> b
$ Projected state -> Maybe (Projected state)
forall a. a -> Maybe a
Just Projected state
updatedProjectedState
              else
                let nextPosition :: StreamPosition
nextPosition = Message -> StreamPosition
Message.messageStreamPosition (NonEmpty Message -> Message
forall a. NonEmpty a -> a
NonEmpty.last NonEmpty Message
nonEmptyMessages) StreamPosition -> StreamPosition -> StreamPosition
forall a. Num a => a -> a -> a
+ StreamPosition
1
                 in StreamPosition -> Projected state -> IO (Maybe (Projected state))
query StreamPosition
nextPosition Projected state
updatedProjectedState
          [Message]
_ ->
            Maybe (Projected state) -> IO (Maybe (Projected state))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Projected state) -> IO (Maybe (Projected state)))
-> Maybe (Projected state) -> IO (Maybe (Projected state))
forall a b. (a -> b) -> a -> b
$
              if StreamPosition
position StreamPosition -> StreamPosition -> Bool
forall a. Ord a => a -> a -> Bool
<= StreamPosition
0
                then Maybe (Projected state)
forall a. Maybe a
Nothing
                else Projected state -> Maybe (Projected state)
forall a. a -> Maybe a
Just Projected state
projected
   in (Projected state -> Projected state)
-> Maybe (Projected state) -> Maybe (Projected state)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Projected state -> Projected state
forall state. Projected state -> Projected state
reverseUnprocessed (Maybe (Projected state) -> Maybe (Projected state))
-> IO (Maybe (Projected state)) -> IO (Maybe (Projected state))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamPosition -> Projected state -> IO (Maybe (Projected state))
query StreamPosition
startingPosition (state -> Projected state
forall state. state -> Projected state
empty (state -> Projected state) -> state -> Projected state
forall a b. (a -> b) -> a -> b
$ Projection state -> state
forall state. Projection state -> state
initial Projection state
projection)


-- | Query a stream and project the messages.
fetch ::
  forall state.
  Functions.WithConnection ->
  Functions.BatchSize ->
  StreamName ->
  Projection state ->
  IO (Maybe (Projected state))
fetch :: WithConnection
-> BatchSize
-> StreamName
-> Projection state
-> IO (Maybe (Projected state))
fetch WithConnection
withConnection BatchSize
batchSize StreamName
streamName =
  WithConnection
-> BatchSize
-> StreamName
-> StreamPosition
-> Projection state
-> IO (Maybe (Projected state))
forall state.
WithConnection
-> BatchSize
-> StreamName
-> StreamPosition
-> Projection state
-> IO (Maybe (Projected state))
fetch' WithConnection
withConnection BatchSize
batchSize StreamName
streamName StreamPosition
0


data Snapshot state = Snapshot
  { Snapshot state -> state
snapshotState :: state
  , Snapshot state -> StreamPosition
snapshotSavedPosition :: Message.StreamPosition
  , Snapshot state -> StreamPosition
snapshotVersion :: Message.StreamPosition
  }


newtype SnapshotStreamName = SnapshotStreamName
  { SnapshotStreamName -> StreamName
fromSnapshotStreamName :: StreamName
  }
  deriving (Int -> SnapshotStreamName -> ShowS
[SnapshotStreamName] -> ShowS
SnapshotStreamName -> String
(Int -> SnapshotStreamName -> ShowS)
-> (SnapshotStreamName -> String)
-> ([SnapshotStreamName] -> ShowS)
-> Show SnapshotStreamName
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SnapshotStreamName] -> ShowS
$cshowList :: [SnapshotStreamName] -> ShowS
show :: SnapshotStreamName -> String
$cshow :: SnapshotStreamName -> String
showsPrec :: Int -> SnapshotStreamName -> ShowS
$cshowsPrec :: Int -> SnapshotStreamName -> ShowS
Show, SnapshotStreamName -> SnapshotStreamName -> Bool
(SnapshotStreamName -> SnapshotStreamName -> Bool)
-> (SnapshotStreamName -> SnapshotStreamName -> Bool)
-> Eq SnapshotStreamName
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: SnapshotStreamName -> SnapshotStreamName -> Bool
$c/= :: SnapshotStreamName -> SnapshotStreamName -> Bool
== :: SnapshotStreamName -> SnapshotStreamName -> Bool
$c== :: SnapshotStreamName -> SnapshotStreamName -> Bool
Eq, Eq SnapshotStreamName
Eq SnapshotStreamName
-> (SnapshotStreamName -> SnapshotStreamName -> Ordering)
-> (SnapshotStreamName -> SnapshotStreamName -> Bool)
-> (SnapshotStreamName -> SnapshotStreamName -> Bool)
-> (SnapshotStreamName -> SnapshotStreamName -> Bool)
-> (SnapshotStreamName -> SnapshotStreamName -> Bool)
-> (SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName)
-> (SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName)
-> Ord SnapshotStreamName
SnapshotStreamName -> SnapshotStreamName -> Bool
SnapshotStreamName -> SnapshotStreamName -> Ordering
SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName
$cmin :: SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName
max :: SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName
$cmax :: SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName
>= :: SnapshotStreamName -> SnapshotStreamName -> Bool
$c>= :: SnapshotStreamName -> SnapshotStreamName -> Bool
> :: SnapshotStreamName -> SnapshotStreamName -> Bool
$c> :: SnapshotStreamName -> SnapshotStreamName -> Bool
<= :: SnapshotStreamName -> SnapshotStreamName -> Bool
$c<= :: SnapshotStreamName -> SnapshotStreamName -> Bool
< :: SnapshotStreamName -> SnapshotStreamName -> Bool
$c< :: SnapshotStreamName -> SnapshotStreamName -> Bool
compare :: SnapshotStreamName -> SnapshotStreamName -> Ordering
$ccompare :: SnapshotStreamName -> SnapshotStreamName -> Ordering
$cp1Ord :: Eq SnapshotStreamName
Ord, String -> SnapshotStreamName
(String -> SnapshotStreamName) -> IsString SnapshotStreamName
forall a. (String -> a) -> IsString a
fromString :: String -> SnapshotStreamName
$cfromString :: String -> SnapshotStreamName
IsString, b -> SnapshotStreamName -> SnapshotStreamName
NonEmpty SnapshotStreamName -> SnapshotStreamName
SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName
(SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName)
-> (NonEmpty SnapshotStreamName -> SnapshotStreamName)
-> (forall b.
    Integral b =>
    b -> SnapshotStreamName -> SnapshotStreamName)
-> Semigroup SnapshotStreamName
forall b.
Integral b =>
b -> SnapshotStreamName -> SnapshotStreamName
forall a.
(a -> a -> a)
-> (NonEmpty a -> a)
-> (forall b. Integral b => b -> a -> a)
-> Semigroup a
stimes :: b -> SnapshotStreamName -> SnapshotStreamName
$cstimes :: forall b.
Integral b =>
b -> SnapshotStreamName -> SnapshotStreamName
sconcat :: NonEmpty SnapshotStreamName -> SnapshotStreamName
$csconcat :: NonEmpty SnapshotStreamName -> SnapshotStreamName
<> :: SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName
$c<> :: SnapshotStreamName -> SnapshotStreamName -> SnapshotStreamName
Semigroup)


retrieveSnapshot :: Aeson.FromJSON state => Functions.WithConnection -> SnapshotStreamName -> IO (Maybe (Either UnprocessedMessage (Snapshot state)))
retrieveSnapshot :: WithConnection
-> SnapshotStreamName
-> IO (Maybe (Either UnprocessedMessage (Snapshot state)))
retrieveSnapshot WithConnection
withConnection SnapshotStreamName
streamName =
  MaybeT IO (Either UnprocessedMessage (Snapshot state))
-> IO (Maybe (Either UnprocessedMessage (Snapshot state)))
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT IO (Either UnprocessedMessage (Snapshot state))
 -> IO (Maybe (Either UnprocessedMessage (Snapshot state))))
-> (ExceptT UnprocessedMessage (MaybeT IO) (Snapshot state)
    -> MaybeT IO (Either UnprocessedMessage (Snapshot state)))
-> ExceptT UnprocessedMessage (MaybeT IO) (Snapshot state)
-> IO (Maybe (Either UnprocessedMessage (Snapshot state)))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT UnprocessedMessage (MaybeT IO) (Snapshot state)
-> MaybeT IO (Either UnprocessedMessage (Snapshot state))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT UnprocessedMessage (MaybeT IO) (Snapshot state)
 -> IO (Maybe (Either UnprocessedMessage (Snapshot state))))
-> ExceptT UnprocessedMessage (MaybeT IO) (Snapshot state)
-> IO (Maybe (Either UnprocessedMessage (Snapshot state)))
forall a b. (a -> b) -> a -> b
$ do
    Message
message <-
      MaybeT IO Message -> ExceptT UnprocessedMessage (MaybeT IO) Message
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (MaybeT IO Message
 -> ExceptT UnprocessedMessage (MaybeT IO) Message)
-> ((Connection -> IO (Maybe Message)) -> MaybeT IO Message)
-> (Connection -> IO (Maybe Message))
-> ExceptT UnprocessedMessage (MaybeT IO) Message
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (Maybe Message) -> MaybeT IO Message
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (IO (Maybe Message) -> MaybeT IO Message)
-> ((Connection -> IO (Maybe Message)) -> IO (Maybe Message))
-> (Connection -> IO (Maybe Message))
-> MaybeT IO Message
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Connection -> IO (Maybe Message)) -> IO (Maybe Message)
WithConnection
withConnection ((Connection -> IO (Maybe Message))
 -> ExceptT UnprocessedMessage (MaybeT IO) Message)
-> (Connection -> IO (Maybe Message))
-> ExceptT UnprocessedMessage (MaybeT IO) Message
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
        Connection -> StreamName -> IO (Maybe Message)
Functions.getLastStreamMessage Connection
connection (SnapshotStreamName -> StreamName
coerce SnapshotStreamName
streamName)

    Message.ParsedMessage{state
parsedPayload :: forall payload metadata. ParsedMessage payload metadata -> payload
parsedPayload :: state
parsedPayload, StreamPosition
parsedMetadata :: forall payload metadata. ParsedMessage payload metadata -> metadata
parsedMetadata :: StreamPosition
parsedMetadata} <-
      let handleError :: ParseMessageFailure -> UnprocessedMessage
handleError ParseMessageFailure
failure =
            UnprocessedMessage :: Message -> HandleError -> UnprocessedMessage
UnprocessedMessage
              { message :: Message
message = Message
message
              , reason :: HandleError
reason = ParseMessageFailure -> HandleError
Handlers.HandlerParseFailure ParseMessageFailure
failure
              }
       in Either UnprocessedMessage (ParsedMessage state StreamPosition)
-> ExceptT
     UnprocessedMessage (MaybeT IO) (ParsedMessage state StreamPosition)
forall e (m :: * -> *) a. MonadError e m => Either e a -> m a
liftEither (Either UnprocessedMessage (ParsedMessage state StreamPosition)
 -> ExceptT
      UnprocessedMessage
      (MaybeT IO)
      (ParsedMessage state StreamPosition))
-> (Either ParseMessageFailure (ParsedMessage state StreamPosition)
    -> Either UnprocessedMessage (ParsedMessage state StreamPosition))
-> Either ParseMessageFailure (ParsedMessage state StreamPosition)
-> ExceptT
     UnprocessedMessage (MaybeT IO) (ParsedMessage state StreamPosition)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ParseMessageFailure -> UnprocessedMessage)
-> Either ParseMessageFailure (ParsedMessage state StreamPosition)
-> Either UnprocessedMessage (ParsedMessage state StreamPosition)
forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first ParseMessageFailure -> UnprocessedMessage
handleError (Either ParseMessageFailure (ParsedMessage state StreamPosition)
 -> ExceptT
      UnprocessedMessage
      (MaybeT IO)
      (ParsedMessage state StreamPosition))
-> Either ParseMessageFailure (ParsedMessage state StreamPosition)
-> ExceptT
     UnprocessedMessage (MaybeT IO) (ParsedMessage state StreamPosition)
forall a b. (a -> b) -> a -> b
$ Message
-> Either ParseMessageFailure (ParsedMessage state StreamPosition)
forall payload metadata.
(FromJSON payload, FromJSON metadata) =>
Message
-> Either ParseMessageFailure (ParsedMessage payload metadata)
Message.parseMessage Message
message

    Snapshot state
-> ExceptT UnprocessedMessage (MaybeT IO) (Snapshot state)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Snapshot state
 -> ExceptT UnprocessedMessage (MaybeT IO) (Snapshot state))
-> Snapshot state
-> ExceptT UnprocessedMessage (MaybeT IO) (Snapshot state)
forall a b. (a -> b) -> a -> b
$
      Snapshot :: forall state.
state -> StreamPosition -> StreamPosition -> Snapshot state
Snapshot
        { snapshotState :: state
snapshotState = state
parsedPayload
        , snapshotSavedPosition :: StreamPosition
snapshotSavedPosition = StreamPosition
parsedMetadata
        , snapshotVersion :: StreamPosition
snapshotVersion = Message -> StreamPosition
Message.messageStreamPosition Message
message
        }


recordSnapshot :: forall state. Aeson.ToJSON state => Functions.WithConnection -> SnapshotStreamName -> state -> Message.StreamPosition -> Functions.ExpectedVersion -> IO ()
recordSnapshot :: WithConnection
-> SnapshotStreamName
-> state
-> StreamPosition
-> ExpectedVersion
-> IO ()
recordSnapshot WithConnection
withConnection SnapshotStreamName
streamName state
snapshotState StreamPosition
snapshotPosition ExpectedVersion
expectedVersion = do
  IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ())
-> ((Connection -> IO ()) -> IO ())
-> (Connection -> IO ())
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Connection -> IO ()) -> IO ()
WithConnection
withConnection ((Connection -> IO ()) -> IO ()) -> (Connection -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
    -- An 'Functions.ExpectedVersionViolation' means that someone else wrote a snapshot
    -- in between us reading the latest snapshot and computing the next snapshot.
    -- In this case, we disregard this snapshot because it's out of date.
    (ExpectedVersionViolation -> IO ()) -> IO () -> IO ()
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
(e -> m a) -> m a -> m a
handle (\(ExpectedVersionViolation
_ :: Functions.ExpectedVersionViolation) -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (IO () -> IO ())
-> (IO (MessageId, StreamPosition) -> IO ())
-> IO (MessageId, StreamPosition)
-> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO (MessageId, StreamPosition) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (MessageId, StreamPosition) -> IO ())
-> IO (MessageId, StreamPosition) -> IO ()
forall a b. (a -> b) -> a -> b
$
      Connection
-> StreamName
-> MessageType
-> state
-> Maybe StreamPosition
-> Maybe ExpectedVersion
-> IO (MessageId, StreamPosition)
forall payload metadata.
(ToJSON payload, ToJSON metadata) =>
Connection
-> StreamName
-> MessageType
-> payload
-> Maybe metadata
-> Maybe ExpectedVersion
-> IO (MessageId, StreamPosition)
Functions.writeMessage
        Connection
connection
        (SnapshotStreamName -> StreamName
coerce SnapshotStreamName
streamName)
        MessageType
"Snapshotted"
        state
snapshotState
        (StreamPosition -> Maybe StreamPosition
forall a. a -> Maybe a
Just StreamPosition
snapshotPosition)
        (ExpectedVersion -> Maybe ExpectedVersion
forall a. a -> Maybe a
Just ExpectedVersion
expectedVersion)


fetchWithSnapshots ::
  forall state.
  (Aeson.ToJSON state, Aeson.FromJSON state) =>
  Functions.WithConnection ->
  Functions.BatchSize ->
  StreamName ->
  Projection state ->
  SnapshotStreamName ->
  IO (Maybe (Projected state))
fetchWithSnapshots :: WithConnection
-> BatchSize
-> StreamName
-> Projection state
-> SnapshotStreamName
-> IO (Maybe (Projected state))
fetchWithSnapshots WithConnection
withConnection BatchSize
batchSize StreamName
streamName Projection state
projection SnapshotStreamName
snapshotStreamName = do
  Maybe (Either UnprocessedMessage (Snapshot state))
previousSnapshotResult <- WithConnection
-> SnapshotStreamName
-> IO (Maybe (Either UnprocessedMessage (Snapshot state)))
forall state.
FromJSON state =>
WithConnection
-> SnapshotStreamName
-> IO (Maybe (Either UnprocessedMessage (Snapshot state)))
retrieveSnapshot @state WithConnection
withConnection SnapshotStreamName
snapshotStreamName

  Maybe (Projected state)
fetchResult <-
    (StreamPosition
 -> Projection state -> IO (Maybe (Projected state)))
-> (StreamPosition, Projection state)
-> IO (Maybe (Projected state))
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry (WithConnection
-> BatchSize
-> StreamName
-> StreamPosition
-> Projection state
-> IO (Maybe (Projected state))
forall state.
WithConnection
-> BatchSize
-> StreamName
-> StreamPosition
-> Projection state
-> IO (Maybe (Projected state))
fetch' @state WithConnection
withConnection BatchSize
batchSize StreamName
streamName) ((StreamPosition, Projection state)
 -> IO (Maybe (Projected state)))
-> (StreamPosition, Projection state)
-> IO (Maybe (Projected state))
forall a b. (a -> b) -> a -> b
$
      case Maybe (Either UnprocessedMessage (Snapshot state))
previousSnapshotResult of
        Just (Right Snapshot{state
StreamPosition
snapshotVersion :: StreamPosition
snapshotSavedPosition :: StreamPosition
snapshotState :: state
snapshotVersion :: forall state. Snapshot state -> StreamPosition
snapshotSavedPosition :: forall state. Snapshot state -> StreamPosition
snapshotState :: forall state. Snapshot state -> state
..}) ->
          (StreamPosition
snapshotSavedPosition StreamPosition -> StreamPosition -> StreamPosition
forall a. Num a => a -> a -> a
+ StreamPosition
1, Projection state
projection{initial :: state
initial = state
snapshotState})
        Maybe (Either UnprocessedMessage (Snapshot state))
_ ->
          (StreamPosition
0, Projection state
projection)

  case Maybe (Projected state)
fetchResult of
    Just Projected{StreamVersion
version :: StreamVersion
version :: forall state. Projected state -> StreamVersion
version, state
state :: state
state :: forall state. Projected state -> state
state}
      | Functions.DoesExist StreamPosition
updatedSnapshotVersion <- StreamVersion
version ->
          let expectedVersion :: ExpectedVersion
expectedVersion =
                StreamVersion -> ExpectedVersion
Functions.ExpectedVersion (StreamVersion -> ExpectedVersion)
-> StreamVersion -> ExpectedVersion
forall a b. (a -> b) -> a -> b
$ case Maybe (Either UnprocessedMessage (Snapshot state))
previousSnapshotResult of
                  Just (Left UnprocessedMessage{Message
message :: Message
message :: UnprocessedMessage -> Message
message}) ->
                    StreamPosition -> StreamVersion
Functions.DoesExist (StreamPosition -> StreamVersion)
-> StreamPosition -> StreamVersion
forall a b. (a -> b) -> a -> b
$ Message -> StreamPosition
Message.messageStreamPosition Message
message
                  Just (Right Snapshot{StreamPosition
snapshotVersion :: StreamPosition
snapshotVersion :: forall state. Snapshot state -> StreamPosition
snapshotVersion}) ->
                    StreamPosition -> StreamVersion
Functions.DoesExist StreamPosition
snapshotVersion
                  Maybe (Either UnprocessedMessage (Snapshot state))
Nothing ->
                    StreamVersion
Functions.DoesNotExist
           in WithConnection
-> SnapshotStreamName
-> state
-> StreamPosition
-> ExpectedVersion
-> IO ()
forall state.
ToJSON state =>
WithConnection
-> SnapshotStreamName
-> state
-> StreamPosition
-> ExpectedVersion
-> IO ()
recordSnapshot WithConnection
withConnection SnapshotStreamName
snapshotStreamName state
state StreamPosition
updatedSnapshotVersion ExpectedVersion
expectedVersion
    Maybe (Projected state)
_ -> () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()

  Maybe (Projected state) -> IO (Maybe (Projected state))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Projected state) -> IO (Maybe (Projected state)))
-> Maybe (Projected state) -> IO (Maybe (Projected state))
forall a b. (a -> b) -> a -> b
$
    Maybe (Projected state)
fetchResult Maybe (Projected state)
-> (Projected state -> Projected state) -> Maybe (Projected state)
forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \Projected state
p ->
      let correctedVersion :: StreamVersion
correctedVersion =
            case (Maybe (Either UnprocessedMessage (Snapshot state))
previousSnapshotResult, Projected state -> StreamVersion
forall state. Projected state -> StreamVersion
version Projected state
p) of
              (Just (Right Snapshot{StreamPosition
snapshotSavedPosition :: StreamPosition
snapshotSavedPosition :: forall state. Snapshot state -> StreamPosition
snapshotSavedPosition}), StreamVersion
Functions.DoesNotExist) ->
                StreamPosition -> StreamVersion
Functions.DoesExist StreamPosition
snapshotSavedPosition
              (Maybe (Either UnprocessedMessage (Snapshot state)), StreamVersion)
_ ->
                Projected state -> StreamVersion
forall state. Projected state -> StreamVersion
version Projected state
p
       in Projected state
p{version :: StreamVersion
version = StreamVersion
correctedVersion}