-- | A projection is an aggregation of all messages in a stream.
module MessageDb.Projection
  ( Projection (..),
    Functions.BatchSize (..),
    UnprocessedMessage (..),
    Projected (..),
    versionIncludingUnprocessed,
    project,
    fetch,
  )
where

import Control.Exception (Exception)
import Data.Foldable (foldl', toList)
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.List.NonEmpty as NonEmpty
import qualified MessageDb.Functions as Functions
import MessageDb.Handlers (HandleError (..))
import qualified MessageDb.Handlers as Handlers
import MessageDb.Message (Message)
import qualified MessageDb.Message as Message
import MessageDb.StreamName (StreamName)


-- | Defines how to perform a projection a stream.
data Projection state = Projection
  { Projection state -> state
initial :: state
  , Projection state -> ProjectionHandlers state
handlers :: Handlers.ProjectionHandlers state
  }


-- | A message that was not able to be processed.
data UnprocessedMessage = UnprocessedMessage
  { UnprocessedMessage -> Message
message :: Message
  , UnprocessedMessage -> HandleError
reason :: HandleError
  }
  deriving (Int -> UnprocessedMessage -> ShowS
[UnprocessedMessage] -> ShowS
UnprocessedMessage -> String
(Int -> UnprocessedMessage -> ShowS)
-> (UnprocessedMessage -> String)
-> ([UnprocessedMessage] -> ShowS)
-> Show UnprocessedMessage
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [UnprocessedMessage] -> ShowS
$cshowList :: [UnprocessedMessage] -> ShowS
show :: UnprocessedMessage -> String
$cshow :: UnprocessedMessage -> String
showsPrec :: Int -> UnprocessedMessage -> ShowS
$cshowsPrec :: Int -> UnprocessedMessage -> ShowS
Show, UnprocessedMessage -> UnprocessedMessage -> Bool
(UnprocessedMessage -> UnprocessedMessage -> Bool)
-> (UnprocessedMessage -> UnprocessedMessage -> Bool)
-> Eq UnprocessedMessage
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: UnprocessedMessage -> UnprocessedMessage -> Bool
$c/= :: UnprocessedMessage -> UnprocessedMessage -> Bool
== :: UnprocessedMessage -> UnprocessedMessage -> Bool
$c== :: UnprocessedMessage -> UnprocessedMessage -> Bool
Eq)


instance Exception UnprocessedMessage


-- | A projected state
data Projected state = Projected
  { Projected state -> [UnprocessedMessage]
unprocessed :: [UnprocessedMessage]
  , Projected state -> state
state :: state
  , Projected state -> StreamVersion
version :: Functions.StreamVersion
  }
  deriving (Int -> Projected state -> ShowS
[Projected state] -> ShowS
Projected state -> String
(Int -> Projected state -> ShowS)
-> (Projected state -> String)
-> ([Projected state] -> ShowS)
-> Show (Projected state)
forall state. Show state => Int -> Projected state -> ShowS
forall state. Show state => [Projected state] -> ShowS
forall state. Show state => Projected state -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [Projected state] -> ShowS
$cshowList :: forall state. Show state => [Projected state] -> ShowS
show :: Projected state -> String
$cshow :: forall state. Show state => Projected state -> String
showsPrec :: Int -> Projected state -> ShowS
$cshowsPrec :: forall state. Show state => Int -> Projected state -> ShowS
Show, Projected state -> Projected state -> Bool
(Projected state -> Projected state -> Bool)
-> (Projected state -> Projected state -> Bool)
-> Eq (Projected state)
forall state.
Eq state =>
Projected state -> Projected state -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: Projected state -> Projected state -> Bool
$c/= :: forall state.
Eq state =>
Projected state -> Projected state -> Bool
== :: Projected state -> Projected state -> Bool
$c== :: forall state.
Eq state =>
Projected state -> Projected state -> Bool
Eq, a -> Projected b -> Projected a
(a -> b) -> Projected a -> Projected b
(forall a b. (a -> b) -> Projected a -> Projected b)
-> (forall a b. a -> Projected b -> Projected a)
-> Functor Projected
forall a b. a -> Projected b -> Projected a
forall a b. (a -> b) -> Projected a -> Projected b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Projected b -> Projected a
$c<$ :: forall a b. a -> Projected b -> Projected a
fmap :: (a -> b) -> Projected a -> Projected b
$cfmap :: forall a b. (a -> b) -> Projected a -> Projected b
Functor)


-- | Constructs an empty projection.
empty :: state -> Projected state
empty :: state -> Projected state
empty state
initialState =
  Projected :: forall state.
[UnprocessedMessage] -> state -> StreamVersion -> Projected state
Projected
    { unprocessed :: [UnprocessedMessage]
unprocessed = []
    , state :: state
state = state
initialState
    , version :: StreamVersion
version = StreamVersion
Functions.DoesNotExist
    }


-- | Version of the projection with unprocessed messages included.
versionIncludingUnprocessed :: Projected state -> Functions.StreamVersion
versionIncludingUnprocessed :: Projected state -> StreamVersion
versionIncludingUnprocessed Projected{state
[UnprocessedMessage]
StreamVersion
version :: StreamVersion
state :: state
unprocessed :: [UnprocessedMessage]
version :: forall state. Projected state -> StreamVersion
state :: forall state. Projected state -> state
unprocessed :: forall state. Projected state -> [UnprocessedMessage]
..} =
  let unprocessedPositions :: [StreamVersion]
unprocessedPositions =
        StreamPosition -> StreamVersion
Functions.DoesExist (StreamPosition -> StreamVersion)
-> (UnprocessedMessage -> StreamPosition)
-> UnprocessedMessage
-> StreamVersion
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Message -> StreamPosition
Message.messageStreamPosition (Message -> StreamPosition)
-> (UnprocessedMessage -> Message)
-> UnprocessedMessage
-> StreamPosition
forall b c a. (b -> c) -> (a -> b) -> a -> c
. UnprocessedMessage -> Message
message (UnprocessedMessage -> StreamVersion)
-> [UnprocessedMessage] -> [StreamVersion]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [UnprocessedMessage]
unprocessed

      allPositions :: NonEmpty StreamVersion
allPositions =
        StreamVersion
version StreamVersion -> [StreamVersion] -> NonEmpty StreamVersion
forall a. a -> [a] -> NonEmpty a
:| [StreamVersion]
unprocessedPositions
   in NonEmpty StreamVersion -> StreamVersion
forall (t :: * -> *) a. (Foldable t, Ord a) => t a -> a
maximum NonEmpty StreamVersion
allPositions


reverseUnprocessed :: Projected state -> Projected state
reverseUnprocessed :: Projected state -> Projected state
reverseUnprocessed Projected state
projected =
  Projected state
projected
    { unprocessed :: [UnprocessedMessage]
unprocessed = [UnprocessedMessage] -> [UnprocessedMessage]
forall a. [a] -> [a]
reverse (Projected state -> [UnprocessedMessage]
forall state. Projected state -> [UnprocessedMessage]
unprocessed Projected state
projected)
    }


project' :: Projection state -> NonEmpty Message -> Projected state
project' :: Projection state -> NonEmpty Message -> Projected state
project' Projection{state
initial :: state
initial :: forall state. Projection state -> state
initial, ProjectionHandlers state
handlers :: ProjectionHandlers state
handlers :: forall state. Projection state -> ProjectionHandlers state
handlers} NonEmpty Message
messages =
  let applyHandler :: Message -> Projected state -> Projected state
applyHandler Message
message projected :: Projected state
projected@Projected{state
state :: state
state :: forall state. Projected state -> state
state, [UnprocessedMessage]
unprocessed :: [UnprocessedMessage]
unprocessed :: forall state. Projected state -> [UnprocessedMessage]
unprocessed} =
        case ProjectionHandlers state
-> Message -> state -> Either HandleError state
forall state.
ProjectionHandlers state
-> Message -> state -> Either HandleError state
Handlers.projectionHandle ProjectionHandlers state
handlers Message
message state
state of
          Right state
updatedState ->
            Projected state
projected
              { state :: state
state = state
updatedState
              , version :: StreamVersion
version = StreamPosition -> StreamVersion
Functions.DoesExist (StreamPosition -> StreamVersion)
-> StreamPosition -> StreamVersion
forall a b. (a -> b) -> a -> b
$ Message -> StreamPosition
Message.messageStreamPosition Message
message
              }
          Left HandleError
newError ->
            Projected state
projected
              { unprocessed :: [UnprocessedMessage]
unprocessed = Message -> HandleError -> UnprocessedMessage
UnprocessedMessage Message
message HandleError
newError UnprocessedMessage -> [UnprocessedMessage] -> [UnprocessedMessage]
forall a. a -> [a] -> [a]
: [UnprocessedMessage]
unprocessed
              }
   in (Projected state -> Message -> Projected state)
-> Projected state -> [Message] -> Projected state
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' ((Message -> Projected state -> Projected state)
-> Projected state -> Message -> Projected state
forall a b c. (a -> b -> c) -> b -> a -> c
flip Message -> Projected state -> Projected state
applyHandler) (state -> Projected state
forall state. state -> Projected state
empty state
initial) (NonEmpty Message -> [Message]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty Message
messages)


-- | Project a state of a stream by aggregating messages.
project :: Projection state -> NonEmpty Message -> Projected state
project :: Projection state -> NonEmpty Message -> Projected state
project Projection state
messages =
  Projected state -> Projected state
forall state. Projected state -> Projected state
reverseUnprocessed (Projected state -> Projected state)
-> (NonEmpty Message -> Projected state)
-> NonEmpty Message
-> Projected state
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Projection state -> NonEmpty Message -> Projected state
forall state.
Projection state -> NonEmpty Message -> Projected state
project' Projection state
messages


-- | Query a stream and project the messages.
fetch :: forall state. Functions.WithConnection -> Functions.BatchSize -> StreamName -> Projection state -> IO (Maybe (Projected state))
fetch :: WithConnection
-> BatchSize
-> StreamName
-> Projection state
-> IO (Maybe (Projected state))
fetch WithConnection
withConnection BatchSize
batchSize StreamName
streamName Projection state
projection =
  let query :: StreamPosition -> Projected state -> IO (Maybe (Projected state))
query StreamPosition
position projected :: Projected state
projected@Projected{state
state :: state
state :: forall state. Projected state -> state
state, [UnprocessedMessage]
unprocessed :: [UnprocessedMessage]
unprocessed :: forall state. Projected state -> [UnprocessedMessage]
unprocessed} = do
        [Message]
messages <- (Connection -> IO [Message]) -> IO [Message]
WithConnection
withConnection ((Connection -> IO [Message]) -> IO [Message])
-> (Connection -> IO [Message]) -> IO [Message]
forall a b. (a -> b) -> a -> b
$ \Connection
connection ->
          Connection
-> StreamName
-> Maybe StreamPosition
-> Maybe BatchSize
-> Maybe Condition
-> IO [Message]
Functions.getStreamMessages Connection
connection StreamName
streamName (StreamPosition -> Maybe StreamPosition
forall a. a -> Maybe a
Just StreamPosition
position) (BatchSize -> Maybe BatchSize
forall a. a -> Maybe a
Just BatchSize
batchSize) Maybe Condition
forall a. Maybe a
Nothing

        case [Message]
messages of
          (Message
firstMessage : [Message]
otherMessages) -> do
            let nonEmptyMessages :: NonEmpty Message
nonEmptyMessages = Message
firstMessage Message -> [Message] -> NonEmpty Message
forall a. a -> [a] -> NonEmpty a
:| [Message]
otherMessages

            let (Projected [UnprocessedMessage]
newErrors state
updatedState StreamVersion
updatedVersion) =
                  Projection state -> NonEmpty Message -> Projected state
forall state.
Projection state -> NonEmpty Message -> Projected state
project' (Projection state
projection{initial :: state
initial = state
state}) NonEmpty Message
nonEmptyMessages

                updatedProjectedState :: Projected state
updatedProjectedState =
                  Projected :: forall state.
[UnprocessedMessage] -> state -> StreamVersion -> Projected state
Projected
                    { unprocessed :: [UnprocessedMessage]
unprocessed = [UnprocessedMessage]
newErrors [UnprocessedMessage]
-> [UnprocessedMessage] -> [UnprocessedMessage]
forall a. Semigroup a => a -> a -> a
<> [UnprocessedMessage]
unprocessed
                    , state :: state
state = state
updatedState
                    , version :: StreamVersion
version = StreamVersion
updatedVersion
                    }

            if BatchSize
batchSize BatchSize -> BatchSize -> Bool
forall a. Eq a => a -> a -> Bool
== BatchSize
Functions.Unlimited
              then Maybe (Projected state) -> IO (Maybe (Projected state))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Projected state) -> IO (Maybe (Projected state)))
-> Maybe (Projected state) -> IO (Maybe (Projected state))
forall a b. (a -> b) -> a -> b
$ Projected state -> Maybe (Projected state)
forall a. a -> Maybe a
Just Projected state
updatedProjectedState
              else
                let nextPosition :: StreamPosition
nextPosition = Message -> StreamPosition
Message.messageStreamPosition (NonEmpty Message -> Message
forall a. NonEmpty a -> a
NonEmpty.last NonEmpty Message
nonEmptyMessages) StreamPosition -> StreamPosition -> StreamPosition
forall a. Num a => a -> a -> a
+ StreamPosition
1
                 in StreamPosition -> Projected state -> IO (Maybe (Projected state))
query StreamPosition
nextPosition Projected state
updatedProjectedState
          [Message]
_ ->
            Maybe (Projected state) -> IO (Maybe (Projected state))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe (Projected state) -> IO (Maybe (Projected state)))
-> Maybe (Projected state) -> IO (Maybe (Projected state))
forall a b. (a -> b) -> a -> b
$
              if StreamPosition
position StreamPosition -> StreamPosition -> Bool
forall a. Ord a => a -> a -> Bool
<= StreamPosition
0
                then Maybe (Projected state)
forall a. Maybe a
Nothing
                else Projected state -> Maybe (Projected state)
forall a. a -> Maybe a
Just Projected state
projected
   in (Projected state -> Projected state)
-> Maybe (Projected state) -> Maybe (Projected state)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Projected state -> Projected state
forall state. Projected state -> Projected state
reverseUnprocessed (Maybe (Projected state) -> Maybe (Projected state))
-> IO (Maybe (Projected state)) -> IO (Maybe (Projected state))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamPosition -> Projected state -> IO (Maybe (Projected state))
query StreamPosition
0 (state -> Projected state
forall state. state -> Projected state
empty (state -> Projected state) -> state -> Projected state
forall a b. (a -> b) -> a -> b
$ Projection state -> state
forall state. Projection state -> state
initial Projection state
projection)