{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE Rank2Types #-}
module EventSource.Store
( Batch'(..)
, Batch
, Subscription(..)
, SubscriptionId
, ExpectedVersionException(..)
, Store(..)
, SomeStore(..)
, freshSubscriptionId
, startFrom
, appendEvent
, unhandled
) where
import Control.Exception (Exception, throwIO)
import Control.Concurrent.Async.Lifted (Async)
import Control.Monad.Base (MonadBase, liftBase)
import Control.Monad.Except (ExceptT, runExceptT)
import Data.UUID (UUID)
import Data.UUID.V4 (nextRandom)
import Streaming (hoist)
import Streaming.Prelude (Stream, Of)
import EventSource.Types
import EventSource.Store.Internal.Iterator (Batch'(..), startFrom)
type Batch = Batch' EventNumber
newtype SubscriptionId = SubscriptionId UUID deriving (Eq, Ord, Show)
freshSubscriptionId :: MonadBase IO m => m SubscriptionId
freshSubscriptionId = liftBase $ fmap SubscriptionId nextRandom
data Subscription =
Subscription { subscriptionId :: SubscriptionId
, subscriptionStream
:: forall m. MonadBase IO m => Stream (Of SavedEvent) m ()
}
data ExpectedVersionException
= ExpectedVersionException
{ versionExceptionExpected :: ExpectedVersion
, versionExceptionActual :: ExpectedVersion
} deriving Show
instance Exception ExpectedVersionException
class Store store where
appendEvents :: (EncodeEvent a, MonadBase IO m)
=> store
-> StreamName
-> ExpectedVersion
-> [a]
-> m (Async EventNumber)
readStream :: MonadBase IO m
=> store
-> StreamName
-> Batch
-> Stream (Of SavedEvent) (ExceptT ReadFailure m) ()
subscribe :: MonadBase IO m => store -> StreamName -> m Subscription
toStore :: store -> SomeStore
toStore = SomeStore
data SomeStore = forall store. Store store => SomeStore store
instance Store SomeStore where
appendEvents (SomeStore store) = appendEvents store
readStream (SomeStore store) = readStream store
subscribe (SomeStore store) = subscribe store
appendEvent :: (EncodeEvent a, MonadBase IO m, Store store)
=> store
-> StreamName
-> ExpectedVersion
-> a
-> m (Async EventNumber)
appendEvent store stream ver a = appendEvents store stream ver [a]
unhandled :: (MonadBase IO m, Exception e)
=> Stream (Of a) (ExceptT e m) ()
-> Stream (Of a) m ()
unhandled = hoist go
where
go m = runExceptT m >>= \case
Left e -> liftBase $ throwIO e
Right a -> pure a