{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE OverloadedStrings #-} -------------------------------------------------------------------------------- -- | -- Module : EventSource.Store.GetEventStore -- Copyright : (C) 2016 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -- This module exposes a GetEventStore implementation of Store interface. -------------------------------------------------------------------------------- module EventSource.Store.GetEventStore ( GetEventStore , gesConnection , gesStore , fromGesEvent ) where -------------------------------------------------------------------------------- import Control.Monad.Base (MonadBase, liftBase) import Control.Monad.Except (withExceptT) import Control.Monad.State.Strict (execState) import Data.Aeson (encode, decodeStrict) import Data.String.Conversions (convertString) import qualified Database.EventStore as GES hiding (ReadError, StreamDeleted, AccessDenied) import qualified Database.EventStore.Streaming as GES import EventSource import Streaming (hoist) import qualified Streaming.Prelude as Streaming -------------------------------------------------------------------------------- newtype GetEventStore = GetEventStore { gesConnection :: GES.Connection } -------------------------------------------------------------------------------- toGesExpVer :: ExpectedVersion -> GES.ExpectedVersion toGesExpVer AnyVersion = GES.anyVersion toGesExpVer NoStream = GES.noStreamVersion toGesExpVer StreamExists = GES.streamExists toGesExpVer (ExactVersion n) = let EventNumber i = n in GES.exactEventVersion i -------------------------------------------------------------------------------- buildEvent :: (EncodeEvent a, MonadBase IO m) => a -> m Event buildEvent a = do eid <- freshEventId let start = Event { eventType = "" , eventId = eid , eventPayload = dataFromBytes "" , eventMetadata = Nothing } return $ execState (encodeEvent a) start -------------------------------------------------------------------------------- makeEvent :: EncodeEvent a => a -> IO GES.Event makeEvent a = toGesEvent <$> buildEvent a -------------------------------------------------------------------------------- toGesEvent :: Event -> GES.Event toGesEvent e = GES.createEvent (GES.UserDefined typ) (Just eid) eventData where EventType typ = eventType e EventId eid = eventId e eventData = case eventMetadata e of Nothing -> case eventPayload e of Data bs -> GES.withBinary bs DataAsJson v -> GES.withJson v Just p -> case eventPayload e of Data bs -> GES.withBinaryAndMetadata bs (convertString $ encode p) DataAsJson v -> GES.withJsonAndMetadata v p -------------------------------------------------------------------------------- -- | Converts a GetEventStore event to a more abstract 'SavedEvent' event. fromGesEvent :: GES.ResolvedEvent -> SavedEvent fromGesEvent e@(GES.ResolvedEvent recEvt lnkEvt _) = saved where re = case recEvt of Just result -> result _ -> GES.resolvedEventOriginal e make evt = let eid = EventId $ GES.recordedEventId evt etyp = EventType $ GES.recordedEventType evt payload = GES.recordedEventData evt metaBytes = GES.recordedEventMetadata evt in Event { eventType = etyp , eventId = eid , eventPayload = dataFromBytes payload , eventMetadata = decodeStrict =<< metaBytes } saved = SavedEvent { eventNumber = EventNumber $ GES.recordedEventNumber re , savedEvent = make re , linkEvent = fmap make lnkEvt } -------------------------------------------------------------------------------- toGESStreamName :: StreamName -> GES.StreamName toGESStreamName (StreamName name) = GES.StreamName name -------------------------------------------------------------------------------- fromGESStreamName :: GES.StreamId t -> StreamName fromGESStreamName (GES.StreamName n) = StreamName n fromGESStreamName GES.All = StreamName "$all" -------------------------------------------------------------------------------- fromGESError :: GES.ReadError t -> ReadFailure fromGESError (GES.StreamDeleted name) = StreamNotFound $ fromGESStreamName name fromGESError (GES.ReadError reason) = ReadError reason fromGESError (GES.AccessDenied name) = AccessDenied $ fromGESStreamName name -------------------------------------------------------------------------------- instance Store GetEventStore where appendEvents (GetEventStore conn) name ver xs = liftBase $ do events <- traverse makeEvent xs w <- GES.sendEvents conn (toGESStreamName name) (toGesExpVer ver) events Nothing return $ fmap (EventNumber . GES.writeNextExpectedVersion) w readStream (GetEventStore conn) name b = let EventNumber n = batchFrom b from = GES.rawEventNumber n stream = GES.readThroughForward conn (toGESStreamName name) GES.ResolveLink from (Just $ batchSize b) Nothing in hoist (hoist liftBase . withExceptT fromGESError) $ Streaming.map fromGesEvent stream subscribe (GetEventStore conn) name = liftBase $ do sub <- GES.subscribe conn (toGESStreamName name) GES.ResolveLink Nothing sid <- freshSubscriptionId pure $ Subscription sid $ Streaming.repeatM $ liftBase $ fmap fromGesEvent $ GES.nextEvent sub -------------------------------------------------------------------------------- -- | Returns a GetEventStore based store implementation. gesStore :: GES.Settings -> GES.ConnectionType -> IO GetEventStore gesStore setts typ = fmap GetEventStore $ GES.connect setts typ