-- | A small process to capture and _share_ observation's by enqueueing them into an STM 'TBQeueu'.
module Control.Eff.Concurrent.Api.Observer.Queue
  ( ObservationQueue()
  , ObservationQueueReader
  , readObservationQueue
  , tryReadObservationQueue
  , flushObservationQueue
  , spawnLinkObserverationQueue
  )
where

import           Control.Concurrent.STM
import           Control.Eff
import           Control.Eff.Extend
import           Control.Eff.ExceptionExtra     ( )
import           Control.Eff.Lift
import           Control.Eff.Concurrent.Process
import           Control.Eff.Log
import           Control.Eff.Concurrent.Api
import           Control.Eff.Concurrent.Api.Client
import           Control.Eff.Concurrent.Api.Observer
import           Control.Eff.Concurrent.Api.Server
import           Control.Eff.Reader.Strict
import           Control.Exception.Safe        as Safe
import           Control.Monad.IO.Class
import           Control.Monad                  ( unless )
import           Data.Typeable
import           Text.Printf
import           GHC.Stack

-- | Contains a 'TBQueue' capturing observations received by 'enqueueObservationsRegistered'
-- or 'spawnLinkObserverationQueue'.
newtype ObservationQueue a = ObservationQueue (TBQueue a)

-- | A 'Reader' for an 'ObservationQueue'.
type ObservationQueueReader a = Reader (ObservationQueue a)

logPrefix :: forall o proxy . (HasCallStack, Typeable o) => proxy o -> String
logPrefix px = "observation queue: " ++ show (typeRep px)

-- | Read queued observations captured and enqueued in the shared 'TBQueue' by 'spawnLinkObserverationQueue'.
-- This blocks until something was captured or an interrupt or exceptions was thrown. For a non-blocking
-- variant use 'tryReadObservationQueue' or 'flushObservationQueue'.
readObservationQueue
  :: forall o r
   . ( Member (ObservationQueueReader o) r
     , HasCallStack
     , MonadIO (Eff r)
     , Typeable o
     , HasLogging IO r
     )
  => Eff r o
readObservationQueue = do
  ObservationQueue q <- ask @(ObservationQueue o)
  liftIO (atomically (readTBQueue q))

-- | Read queued observations captured and enqueued in the shared 'TBQueue' by 'spawnLinkObserverationQueue'.
-- Return the oldest enqueued observation immediately or 'Nothing' if the queue is empty.
-- Use 'readObservationQueue' to block until an observation is observed.
tryReadObservationQueue
  :: forall o r
   . ( Member (ObservationQueueReader o) r
     , HasCallStack
     , MonadIO (Eff r)
     , Typeable o
     , HasLogging IO r
     )
  => Eff r (Maybe o)
tryReadObservationQueue = do
  ObservationQueue q <- ask @(ObservationQueue o)
  liftIO (atomically (tryReadTBQueue q))

-- | Read at once all currently queued observations captured and enqueued in the shared 'TBQueue' by 'spawnLinkObserverationQueue'.
-- This returns immediately all currently enqueued 'Observation's. For a blocking
-- variant use 'readObservationQueue'.
flushObservationQueue
  :: forall o r
   . ( Member (ObservationQueueReader o) r
     , HasCallStack
     , MonadIO (Eff r)
     , Typeable o
     , HasLogging IO r
     )
  => Eff r [o]
flushObservationQueue = do
  ObservationQueue q <- ask @(ObservationQueue o)
  liftIO (atomically (flushTBQueue q))


-- | Capture an observation.
data instance Api (ObservationQueue a) r where
  EnqueueObservation :: a -> Api (ObservationQueue a) 'Asynchronous
  StopObservationQueue :: Api (ObservationQueue a) ('Synchronous ())

-- | Observe a 'Server' that implements an 'Observable' 'Api'. The observations
-- can be obtained by 'readObservationQueue'. All observations are captured up to
-- the queue size limit, such that the first message received will be first message
-- returned by 'readObservationQueue'.
spawnLinkObserverationQueue
  :: forall o q a
   . (Typeable o, Show o, HasLogging IO q, Lifted IO q, HasCallStack)
  => Server (ObserverRegistry o)
  -> Int
  -> Eff (ObservationQueueReader o ': InterruptableProcess q) a
  -> Eff (InterruptableProcess q) a
spawnLinkObserverationQueue oSvr queueLimit k = withQueue
  queueLimit
  (do
    ObservationQueue q <- ask @(ObservationQueue o)
    do
      logDebug
        (printf "%s starting with queue limit: %d"
                (logPrefix (Proxy @o))
                queueLimit
        )
      cbo <- raise
        (spawnLinkApiServer
          (handleCasts
            (\case
              EnqueueObservation o -> do
                lift (atomically (writeTBQueue q o))
                pure AwaitNext
            )
          )
          stopServerOnInterrupt
        )
      let thisObserver = toObserverFor EnqueueObservation cbo
      registerObserver thisObserver oSvr
      res <- k
      forgetObserver thisObserver oSvr
      call cbo StopObservationQueue
      logDebug (printf "%s stopped observer process" (logPrefix (Proxy @o)))
      return res
  )

withQueue
  :: forall o b e len
   . ( HasCallStack
     , Typeable o
     , Show o
     , HasLogging IO e
     , Integral len
     , Member Interrupts e
     )
  => len
  -> Eff (ObservationQueueReader o ': e) b
  -> Eff e b
withQueue queueLimit e = do
  q   <- liftIO (newTBQueueIO (fromIntegral queueLimit))
  res <- handleInterrupts (return . Left)
                          (Right <$> runReader (ObservationQueue q) e)
  rest <- liftIO (atomically (flushTBQueue q))
  unless
    (null rest)
    (logNotice (logPrefix (Proxy @o) ++ " unread observations: " ++ show rest))
  either (\em -> logError (show em) >> liftIO (throwIO em)) return res