{-# LANGUAGE GADTs               #-}
{-# LANGUAGE FlexibleInstances   #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE RecordWildCards     #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# OPTIONS -Wno-orphans #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Subscription.Catchup
-- Copyright : (C) 2017 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Subscription.Catchup where

--------------------------------------------------------------------------------
import Safe (fromJustNote)

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Communication
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Exec
import Database.EventStore.Internal.Operation.Catchup
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stream
import Database.EventStore.Internal.Subscription.Api
import Database.EventStore.Internal.Subscription.Types
import Database.EventStore.Internal.Subscription.Packages
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
receivedAlready :: StreamId t -> t -> ResolvedEvent -> Bool
receivedAlready :: forall t. StreamId t -> t -> ResolvedEvent -> Bool
receivedAlready StreamName{} t
old ResolvedEvent
e =
    Int64 -> EventNumber
EventNumber (ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
e) forall a. Ord a => a -> a -> Bool
< t
old
receivedAlready StreamId t
All t
old ResolvedEvent
e =
    let pos :: Position
pos =
            forall a. Partial => String -> Maybe a -> a
fromJustNote
                String
"Position is always defined when reading events from $all stream"
                forall a b. (a -> b) -> a -> b
$ ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
e in
    Position
pos forall a. Ord a => a -> a -> Bool
< t
old

--------------------------------------------------------------------------------
nextTarget :: StreamId t -> ResolvedEvent -> t
nextTarget :: forall t. StreamId t -> ResolvedEvent -> t
nextTarget StreamName{} ResolvedEvent
e =
    Int64 -> EventNumber
EventNumber (ResolvedEvent -> Int64
resolvedEventOriginalEventNumber ResolvedEvent
e)
nextTarget StreamId t
All ResolvedEvent
e =
    forall a. Partial => String -> Maybe a -> a
fromJustNote
        String
"Position is always defined when reading events from $all stream"
        forall a b. (a -> b) -> a -> b
$ ResolvedEvent -> Maybe Position
resolvedEventPosition ResolvedEvent
e

--------------------------------------------------------------------------------
-- | This kind of subscription specifies a starting point, in the form of an
--   event number or transaction file position. The given function will be
--   called for events from the starting point until the end of the stream, and
--   then for subsequently written events.
--
--   For example, if a starting point of 50 is specified when a stream has 100
--   events in it, the subscriber can expect to see events 51 through 100, and
--   then any events subsequently written until such time as the subscription is
--   dropped or closed.
data CatchupSubscription t =
  CatchupSubscription
    { forall t. CatchupSubscription t -> Exec
_catchupExec :: Exec
    , forall t. CatchupSubscription t -> StreamId t
_catchupStream :: StreamId t
    , forall t. CatchupSubscription t -> TVar (Maybe UUID)
_catchupSub :: TVar (Maybe UUID)
    , forall t. CatchupSubscription t -> Chan SubAction
_catchupChan :: Chan SubAction
    }

--------------------------------------------------------------------------------
instance Subscription (CatchupSubscription s) where
  nextSubEvent :: CatchupSubscription s -> IO SubAction
nextSubEvent CatchupSubscription s
s = forall (m :: * -> *) a. MonadBase IO m => Chan a -> m a
readChan (forall t. CatchupSubscription t -> Chan SubAction
_catchupChan CatchupSubscription s
s)

  unsubscribe :: CatchupSubscription s -> IO ()
unsubscribe CatchupSubscription s
s
    = do UUID
subId <- forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$
           do Maybe UUID
idMay <- forall a. TVar a -> STM a
readTVar (forall t. CatchupSubscription t -> TVar (Maybe UUID)
_catchupSub CatchupSubscription s
s)
              case Maybe UUID
idMay of
                Maybe UUID
Nothing -> forall a. STM a
retrySTM
                Just UUID
sid -> forall (f :: * -> *) a. Applicative f => a -> f a
pure UUID
sid

         let pkg :: Package
pkg = UUID -> Package
createUnsubscribePackage UUID
subId
         forall p a (m :: * -> *).
(Pub p, Typeable a, MonadIO m) =>
p -> a -> m ()
publishWith (forall t. CatchupSubscription t -> Exec
_catchupExec CatchupSubscription s
s) (Package -> SendPackage
SendPackage Package
pkg)

--------------------------------------------------------------------------------
instance SubscriptionStream (CatchupSubscription t) t where
    subscriptionStream :: CatchupSubscription t -> StreamId t
subscriptionStream = forall t. CatchupSubscription t -> StreamId t
_catchupStream

--------------------------------------------------------------------------------
newCatchupSubscription
  :: Exec
  -> Bool
  -> Maybe Int32
  -> Maybe Credentials
  -> StreamId t
  -> t
  -> IO (CatchupSubscription t)
newCatchupSubscription :: forall t.
Exec
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> StreamId t
-> t
-> IO (CatchupSubscription t)
newCatchupSubscription Exec
exec Bool
tos Maybe Int32
batch Maybe Credentials
cred StreamId t
streamId t
seed
  = do (TVar (Maybe UUID)
var, Chan SubAction
chan) <- forall t.
Settings
-> Exec
-> StreamId t
-> t
-> Bool
-> Maybe Int32
-> Maybe Credentials
-> IO (TVar (Maybe UUID), Chan SubAction)
catchup (Exec -> Settings
execSettings Exec
exec) Exec
exec StreamId t
streamId t
seed Bool
tos Maybe Int32
batch Maybe Credentials
cred
       let sub :: CatchupSubscription t
sub =
             CatchupSubscription
             { _catchupExec :: Exec
_catchupExec = Exec
exec
             , _catchupStream :: StreamId t
_catchupStream = StreamId t
streamId
             , _catchupSub :: TVar (Maybe UUID)
_catchupSub = TVar (Maybe UUID)
var
             , _catchupChan :: Chan SubAction
_catchupChan = Chan SubAction
chan
             }

       forall (f :: * -> *) a. Applicative f => a -> f a
pure CatchupSubscription t
sub