{-# LANGUAGE DeriveDataTypeable     #-}
{-# LANGUAGE FunctionalDependencies #-}
{-# LANGUAGE ScopedTypeVariables    #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Subscription.Api
-- Copyright : (C) 2017 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
-- Main Subscription bookkeeping structure.
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Subscription.Api where

--------------------------------------------------------------------------------
import           Streaming
import qualified Streaming.Prelude as Streaming

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Types
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Types

--------------------------------------------------------------------------------
-- | Common operations supported by a subscription.
class Subscription s where
  -- | Asks for the next subcription event. If that function is called after
  --   a SubDropped event, expect it to hang indefinitely.
  nextSubEvent :: s -> IO SubAction

  -- | Asynchronously unsubscribe from a subscription.
  unsubscribe :: s -> IO ()

--------------------------------------------------------------------------------
-- | Returns the stream of a subscription.
class SubscriptionStream s t | t -> s where
    subscriptionStream :: s -> StreamId t

--------------------------------------------------------------------------------
-- | Streams a subscription events. The stream will end when hitting `Dropped`
--   event but will still emit it.
streamSubEvents :: Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents :: forall s. Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents s
s
  = do Stream (Of SubAction) IO Any
rest <- forall (m :: * -> *) a r.
Monad m =>
(a -> Bool)
-> Stream (Of a) m r -> Stream (Of a) m (Stream (Of a) m r)
Streaming.span SubAction -> Bool
predicate forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r. Monad m => m a -> Stream (Of a) m r
Streaming.repeatM (forall s. Subscription s => s -> IO SubAction
nextSubEvent s
s)
       Maybe (SubAction, Stream (Of SubAction) IO Any)
outcome <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a r.
Monad m =>
Stream (Of a) m r -> m (Maybe (a, Stream (Of a) m r))
Streaming.uncons Stream (Of SubAction) IO Any
rest
       forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (SubAction, Stream (Of SubAction) IO Any)
outcome forall a b. (a -> b) -> a -> b
$ \(SubAction
dropped, Stream (Of SubAction) IO Any
_) -> forall (m :: * -> *) a. Monad m => a -> Stream (Of a) m ()
Streaming.yield SubAction
dropped
  where
    predicate :: SubAction -> Bool
predicate (Dropped SubDropReason
_) = Bool
False
    predicate SubAction
_ = Bool
True

--------------------------------------------------------------------------------
-- | Like `streamSubEvent` but will only emit `ResolvedEvent`.
streamSubResolvedEvents :: Subscription s => s -> Stream (Of ResolvedEvent) IO ()
streamSubResolvedEvents :: forall s. Subscription s => s -> Stream (Of ResolvedEvent) IO ()
streamSubResolvedEvents = forall (m :: * -> *) a b r.
Monad m =>
(a -> Maybe b) -> Stream (Of a) m r -> Stream (Of b) m r
Streaming.mapMaybe SubAction -> Maybe ResolvedEvent
go forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall s. Subscription s => s -> Stream (Of SubAction) IO ()
streamSubEvents
  where
    go :: SubAction -> Maybe ResolvedEvent
go (Submit ResolvedEvent
e) = forall a. a -> Maybe a
Just ResolvedEvent
e
    go SubAction
_ = forall a. Maybe a
Nothing