-- |Description: Queue Interpreters for 'TBQueue'
module Polysemy.Conc.Interpreter.Queue.TB where

import Control.Concurrent.STM (
  TBQueue,
  atomically,
  isFullTBQueue,
  newTBQueueIO,
  peekTBQueue,
  readTBQueue,
  tryPeekTBQueue,
  tryReadTBQueue,
  writeTBQueue,
  )

import qualified Polysemy.Conc.Data.QueueResult as QueueResult
import qualified Polysemy.Conc.Effect.Queue as Queue
import Polysemy.Conc.Effect.Queue (Queue)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Conc.Queue.Result (naResult)
import Polysemy.Conc.Queue.Timeout (withTimeout)

-- |Interpret 'Queue' with a 'TBQueue'.
--
-- This variant expects an allocated queue as an argument.
interpretQueueTBWith ::
   d r .
  Members [Race, Embed IO] r =>
  TBQueue d ->
  InterpreterFor (Queue d) r
interpretQueueTBWith :: forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
TBQueue d -> InterpreterFor (Queue d) r
interpretQueueTBWith TBQueue d
queue =
  (forall (rInitial :: [(* -> *) -> * -> *]) x.
 Queue d (Sem rInitial) x -> Sem r x)
-> Sem (Queue d : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: [(* -> *) -> * -> *]) a.
FirstOrder e "interpret" =>
(forall (rInitial :: [(* -> *) -> * -> *]) x.
 e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
    Queue d (Sem rInitial) x
Queue.Read ->
      IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (d -> QueueResult d
forall d. d -> QueueResult d
QueueResult.Success (d -> QueueResult d) -> STM d -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM d
forall a. TBQueue a -> STM a
readTBQueue TBQueue d
queue))
    Queue d (Sem rInitial) x
Queue.TryRead ->
      IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
naResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM (Maybe d)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue d
queue))
    Queue.ReadTimeout t
timeout ->
      t -> STM (Maybe d) -> Sem r (QueueResult d)
forall t (r :: [(* -> *) -> * -> *]) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout (d -> Maybe d
forall a. a -> Maybe a
Just (d -> Maybe d) -> STM d -> STM (Maybe d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM d
forall a. TBQueue a -> STM a
readTBQueue TBQueue d
queue)
    Queue d (Sem rInitial) x
Queue.Peek ->
      IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (d -> QueueResult d
forall d. d -> QueueResult d
QueueResult.Success (d -> QueueResult d) -> STM d -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM d
forall a. TBQueue a -> STM a
peekTBQueue TBQueue d
queue))
    Queue d (Sem rInitial) x
Queue.TryPeek ->
      IO (QueueResult d) -> Sem r (QueueResult d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM (QueueResult d) -> IO (QueueResult d)
forall a. STM a -> IO a
atomically (Maybe d -> QueueResult d
forall d. Maybe d -> QueueResult d
naResult (Maybe d -> QueueResult d) -> STM (Maybe d) -> STM (QueueResult d)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> STM (Maybe d)
forall a. TBQueue a -> STM (Maybe a)
tryPeekTBQueue TBQueue d
queue))
    Queue.Write d
d ->
      IO () -> Sem r ()
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (STM () -> IO ()
forall a. STM a -> IO a
atomically (TBQueue d -> d -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue d
queue d
d))
    Queue.TryWrite d
d ->
      IO (QueueResult ()) -> Sem r (QueueResult ())
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (IO (QueueResult ()) -> Sem r (QueueResult ()))
-> IO (QueueResult ()) -> Sem r (QueueResult ())
forall a b. (a -> b) -> a -> b
$ STM (QueueResult ()) -> IO (QueueResult ())
forall a. STM a -> IO a
atomically do
        STM Bool
-> STM (QueueResult ())
-> STM (QueueResult ())
-> STM (QueueResult ())
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (TBQueue d -> STM Bool
forall a. TBQueue a -> STM Bool
isFullTBQueue TBQueue d
queue) (QueueResult () -> STM (QueueResult ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure QueueResult ()
forall d. QueueResult d
QueueResult.NotAvailable) (() -> QueueResult ()
forall d. d -> QueueResult d
QueueResult.Success (() -> QueueResult ()) -> STM () -> STM (QueueResult ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> d -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue d
queue d
d)
    Queue.WriteTimeout t
timeout d
d ->
      t -> STM (Maybe ()) -> Sem r (QueueResult ())
forall t (r :: [(* -> *) -> * -> *]) d.
(TimeUnit t, Members '[Race, Embed IO] r) =>
t -> STM (Maybe d) -> Sem r (QueueResult d)
withTimeout t
timeout (() -> Maybe ()
forall a. a -> Maybe a
Just (() -> Maybe ()) -> STM () -> STM (Maybe ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TBQueue d -> d -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue d
queue d
d)
    Queue d (Sem rInitial) x
Queue.Closed ->
      Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    Queue d (Sem rInitial) x
Queue.Close ->
      Sem r x
forall (f :: * -> *). Applicative f => f ()
unit
{-# inline interpretQueueTBWith #-}

-- |Interpret 'Queue' with a 'TBQueue'.
interpretQueueTB ::
   d r .
  Members [Race, Embed IO] r =>
  -- |Buffer size
  Natural ->
  InterpreterFor (Queue d) r
interpretQueueTB :: forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
Natural -> InterpreterFor (Queue d) r
interpretQueueTB Natural
maxQueued Sem (Queue d : r) a
sem = do
  TBQueue d
queue <- IO (TBQueue d) -> Sem r (TBQueue d)
forall (m :: * -> *) (r :: [(* -> *) -> * -> *]) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. Natural -> IO (TBQueue a)
newTBQueueIO @d Natural
maxQueued)
  TBQueue d -> InterpreterFor (Queue d) r
forall d (r :: [(* -> *) -> * -> *]).
Members '[Race, Embed IO] r =>
TBQueue d -> InterpreterFor (Queue d) r
interpretQueueTBWith TBQueue d
queue Sem (Queue d : r) a
sem
{-# inline interpretQueueTB #-}