-- | A projection is an aggregation of all messages in a stream.
module MessageDb.Projection
  ( Projection (..),
    Functions.BatchSize (..),
    UnprocessedMessage (..),
    Projected (..),
    versionIncludingUnprocessed,
    project,
    fetch,
  )
where

import Control.Exception (Exception)
import Data.Foldable (foldl', toList)
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.List.NonEmpty as NonEmpty
import qualified MessageDb.Functions as Functions
import MessageDb.Handlers (HandleError (..))
import qualified MessageDb.Handlers as Handlers
import MessageDb.Message (Message)
import qualified MessageDb.Message as Message
import MessageDb.StreamName (StreamName)


-- | Defines how to perform a projection a stream.
data Projection state = Projection
  { initial :: state
  , handlers :: Handlers.ProjectionHandlers state
  }


-- | A message that was not able to be processed.
data UnprocessedMessage = UnprocessedMessage
  { message :: Message
  , reason :: HandleError
  }
  deriving (Show, Eq)


instance Exception UnprocessedMessage


-- | A projected state
data Projected state = Projected
  { unprocessed :: [UnprocessedMessage]
  , state :: state
  , version :: Functions.StreamVersion
  }
  deriving (Show, Eq, Functor)


-- | Constructs an empty projection.
empty :: state -> Projected state
empty initialState =
  Projected
    { unprocessed = []
    , state = initialState
    , version = Functions.DoesNotExist
    }


-- | Version of the projection with unprocessed messages included.
versionIncludingUnprocessed :: Projected state -> Functions.StreamVersion
versionIncludingUnprocessed Projected{..} =
  let unprocessedPositions =
        Functions.DoesExist . Message.messageStreamPosition . message <$> unprocessed

      allPositions =
        version :| unprocessedPositions
   in maximum allPositions


reverseUnprocessed :: Projected state -> Projected state
reverseUnprocessed projected =
  projected
    { unprocessed = reverse (unprocessed projected)
    }


project' :: Projection state -> NonEmpty Message -> Projected state
project' Projection{initial, handlers} messages =
  let applyHandler message projected@Projected{state, unprocessed} =
        case Handlers.projectionHandle handlers message state of
          Right updatedState ->
            projected
              { state = updatedState
              , version = Functions.DoesExist $ Message.messageStreamPosition message
              }
          Left newError ->
            projected
              { unprocessed = UnprocessedMessage message newError : unprocessed
              }
   in foldl' (flip applyHandler) (empty initial) (toList messages)


-- | Project a state of a stream by aggregating messages.
project :: Projection state -> NonEmpty Message -> Projected state
project messages =
  reverseUnprocessed . project' messages


-- | Query a stream and project the messages.
fetch :: forall state. Functions.WithConnection -> Functions.BatchSize -> StreamName -> Projection state -> IO (Maybe (Projected state))
fetch withConnection batchSize streamName projection =
  let query position projected@Projected{state, unprocessed} = do
        messages <- withConnection $ \connection ->
          Functions.getStreamMessages connection streamName (Just position) (Just batchSize) Nothing

        case messages of
          (firstMessage : otherMessages) -> do
            let nonEmptyMessages = firstMessage :| otherMessages

            let (Projected newErrors updatedState updatedVersion) =
                  project' (projection{initial = state}) nonEmptyMessages

                updatedProjectedState =
                  Projected
                    { unprocessed = newErrors <> unprocessed
                    , state = updatedState
                    , version = updatedVersion
                    }

            if batchSize == Functions.Unlimited
              then pure $ Just updatedProjectedState
              else
                let nextPosition = Message.messageStreamPosition (NonEmpty.last nonEmptyMessages) + 1
                 in query nextPosition updatedProjectedState
          _ ->
            pure $
              if position <= 0
                then Nothing
                else Just projected
   in fmap reverseUnprocessed <$> query 0 (empty $ initial projection)