{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
module EventSource.Store.GetEventStore
( GetEventStore
, gesConnection
, gesStore
, fromGesEvent
) where
import Control.Monad.Base (MonadBase, liftBase)
import Control.Monad.Except (withExceptT)
import Control.Monad.State.Strict (execState)
import Data.Aeson (encode, decodeStrict)
import Data.String.Conversions (convertString)
import qualified Database.EventStore as GES hiding (ReadError, StreamDeleted, AccessDenied)
import qualified Database.EventStore.Streaming as GES
import EventSource
import Streaming (hoist)
import qualified Streaming.Prelude as Streaming
newtype GetEventStore = GetEventStore { gesConnection :: GES.Connection }
toGesExpVer :: ExpectedVersion -> GES.ExpectedVersion
toGesExpVer AnyVersion = GES.anyVersion
toGesExpVer NoStream = GES.noStreamVersion
toGesExpVer StreamExists = GES.streamExists
toGesExpVer (ExactVersion n) =
let EventNumber i = n in
GES.exactEventVersion i
buildEvent :: (EncodeEvent a, MonadBase IO m) => a -> m Event
buildEvent a = do
eid <- freshEventId
let start = Event { eventType = ""
, eventId = eid
, eventPayload = dataFromBytes ""
, eventMetadata = Nothing
}
return $ execState (encodeEvent a) start
makeEvent :: EncodeEvent a => a -> IO GES.Event
makeEvent a = toGesEvent <$> buildEvent a
toGesEvent :: Event -> GES.Event
toGesEvent e = GES.createEvent (GES.UserDefined typ) (Just eid) eventData
where
EventType typ = eventType e
EventId eid = eventId e
eventData =
case eventMetadata e of
Nothing ->
case eventPayload e of
Data bs -> GES.withBinary bs
DataAsJson v -> GES.withJson v
Just p ->
case eventPayload e of
Data bs -> GES.withBinaryAndMetadata bs (convertString $ encode p)
DataAsJson v -> GES.withJsonAndMetadata v p
fromGesEvent :: GES.ResolvedEvent -> SavedEvent
fromGesEvent e@(GES.ResolvedEvent recEvt lnkEvt _) = saved
where
re =
case recEvt of
Just result -> result
_ -> GES.resolvedEventOriginal e
make evt =
let eid = EventId $ GES.recordedEventId evt
etyp = EventType $ GES.recordedEventType evt
payload = GES.recordedEventData evt
metaBytes = GES.recordedEventMetadata evt in
Event { eventType = etyp
, eventId = eid
, eventPayload = dataFromBytes payload
, eventMetadata = decodeStrict =<< metaBytes
}
saved = SavedEvent { eventNumber = EventNumber $ GES.recordedEventNumber re
, savedEvent = make re
, linkEvent = fmap make lnkEvt
}
toGESStreamName :: StreamName -> GES.StreamName
toGESStreamName (StreamName name) = GES.StreamName name
fromGESStreamName :: GES.StreamId t -> StreamName
fromGESStreamName (GES.StreamName n) = StreamName n
fromGESStreamName GES.All = StreamName "$all"
fromGESError :: GES.ReadError t -> ReadFailure
fromGESError (GES.StreamDeleted name) = StreamNotFound $ fromGESStreamName name
fromGESError (GES.ReadError reason) = ReadError reason
fromGESError (GES.AccessDenied name) = AccessDenied $ fromGESStreamName name
instance Store GetEventStore where
appendEvents (GetEventStore conn) name ver xs = liftBase $ do
events <- traverse makeEvent xs
w <- GES.sendEvents conn (toGESStreamName name) (toGesExpVer ver) events
Nothing
return $ fmap (EventNumber . GES.writeNextExpectedVersion) w
readStream (GetEventStore conn) name b =
let EventNumber n = batchFrom b
from = GES.rawEventNumber n
stream =
GES.readThroughForward conn
(toGESStreamName name) GES.ResolveLink from (Just $ batchSize b) Nothing
in hoist (hoist liftBase . withExceptT fromGESError)
$ Streaming.map fromGesEvent stream
subscribe (GetEventStore conn) name = liftBase $ do
sub <- GES.subscribe conn (toGESStreamName name) GES.ResolveLink Nothing
sid <- freshSubscriptionId
pure $ Subscription sid
$ Streaming.repeatM
$ liftBase
$ fmap fromGesEvent
$ GES.nextEvent sub
gesStore :: GES.Settings -> GES.ConnectionType -> IO GetEventStore
gesStore setts typ = fmap GetEventStore $ GES.connect setts typ