{-# options_haddock prune #-}

-- |Description: Events/Consume Interpreters, Internal
module Polysemy.Conc.Interpreter.Events where

import Control.Concurrent.Chan.Unagi.Bounded (InChan, OutChan, dupChan, newChan, readChan, tryWriteChan)

import Polysemy.Conc.Async (withAsync_)
import qualified Polysemy.Conc.Effect.Events as Events
import Polysemy.Conc.Effect.Events (Consume, Events)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Scoped (Scoped_)
import Polysemy.Scoped (runScopedAs)

-- |Convenience alias for the consumer effect.
type EventConsumer e =
  Scoped_ (Consume e)

-- |Interpret 'Consume' by reading from an 'OutChan'.
-- Used internally by 'interpretEventsChan', not safe to use directly.
interpretConsumeChan ::
   e r .
  Member (Embed IO) r =>
  OutChan e ->
  InterpreterFor (Consume e) r
interpretConsumeChan :: forall e (r :: EffectRow).
Member (Embed IO) r =>
OutChan e -> InterpreterFor (Consume e) r
interpretConsumeChan OutChan e
chan =
  (forall (rInitial :: EffectRow) x.
 Consume e (Sem rInitial) x -> Sem r x)
-> Sem (Consume e : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
    Consume e (Sem rInitial) x
Events.Consume ->
      IO e -> Sem r e
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (OutChan e -> IO e
forall a. OutChan a -> IO a
readChan OutChan e
chan)

-- |Interpret 'Events' by writing to an 'InChan'.
-- Used internally by 'interpretEventsChan', not safe to use directly.
-- When the channel queue is full, this silently discards events.
interpretEventsInChan ::
   e r .
  Member (Embed IO) r =>
  InChan e ->
  InterpreterFor (Events e) r
interpretEventsInChan :: forall e (r :: EffectRow).
Member (Embed IO) r =>
InChan e -> InterpreterFor (Events e) r
interpretEventsInChan InChan e
inChan =
  (forall (rInitial :: EffectRow) x.
 Events e (Sem rInitial) x -> Sem r x)
-> Sem (Events e : r) a -> Sem r a
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
FirstOrder e "interpret" =>
(forall (rInitial :: EffectRow) x. e (Sem rInitial) x -> Sem r x)
-> Sem (e : r) a -> Sem r a
interpret \case
    Events.Publish e
e ->
      Sem r Bool -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> Sem r Bool
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (InChan e -> e -> IO Bool
forall a. InChan a -> a -> IO Bool
tryWriteChan InChan e
inChan e
e))

-- |Interpret 'Events' and 'Consume' together by connecting them to the two ends of an unagi channel.
-- 'Consume' is only interpreted in a 'Polysemy.Conc.Scoped' manner, ensuring that a new duplicate of the channel is
-- created so that all consumers see all events (from the moment they are connected).
--
-- This should be used in conjunction with 'Polysemy.Conc.subscribe':
--
-- @
-- interpretEventsChan do
--   async $ subscribe do
--     putStrLn =<< consume
--   publish "hello"
-- @
--
-- Whenever 'Polysemy.Conc.subscribe' creates a new scope, this interpreter calls 'dupChan' and passes the
-- duplicate to 'interpretConsumeChan'.
interpretEventsChan ::
   e r .
  Members [Resource, Race, Async, Embed IO] r =>
  InterpretersFor [Events e, EventConsumer e] r
interpretEventsChan :: forall e (r :: EffectRow).
Members '[Resource, Race, Async, Embed IO] r =>
InterpretersFor '[Events e, EventConsumer e] r
interpretEventsChan Sem (Append '[Events e, Scoped () (Consume e)] r) a
sem = do
  (InChan e
inChan, OutChan e
outChan) <- IO (InChan e, OutChan e) -> Sem r (InChan e, OutChan e)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (forall a. Int -> IO (InChan a, OutChan a)
newChan @e Int
64)
  Sem r Any -> Sem r a -> Sem r a
forall (r :: EffectRow) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> Sem r a -> Sem r a
withAsync_ (Sem r e -> Sem r Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO e -> Sem r e
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (OutChan e -> IO e
forall a. OutChan a -> IO a
readChan OutChan e
outChan))) do
    (() -> Sem r (OutChan e))
-> (forall (q :: (* -> *) -> * -> *).
    OutChan e -> InterpreterFor (Consume e) (Opaque q : r))
-> InterpreterFor (Scoped () (Consume e)) r
forall resource param (effect :: (* -> *) -> * -> *)
       (r :: EffectRow).
(param -> Sem r resource)
-> (forall (q :: (* -> *) -> * -> *).
    resource -> InterpreterFor effect (Opaque q : r))
-> InterpreterFor (Scoped param effect) r
runScopedAs (Sem r (OutChan e) -> () -> Sem r (OutChan e)
forall a b. a -> b -> a
const (IO (OutChan e) -> Sem r (OutChan e)
forall (m :: * -> *) (r :: EffectRow) a.
Member (Embed m) r =>
m a -> Sem r a
embed (InChan e -> IO (OutChan e)
forall a. InChan a -> IO (OutChan a)
dupChan InChan e
inChan))) forall e (r :: EffectRow).
Member (Embed IO) r =>
OutChan e -> InterpreterFor (Consume e) r
forall (q :: (* -> *) -> * -> *).
OutChan e -> InterpreterFor (Consume e) (Opaque q : r)
interpretConsumeChan (InChan e -> InterpreterFor (Events e) (Scoped () (Consume e) : r)
forall e (r :: EffectRow).
Member (Embed IO) r =>
InChan e -> InterpreterFor (Events e) r
interpretEventsInChan InChan e
inChan Sem (Events e : Scoped () (Consume e) : r) a
Sem (Append '[Events e, Scoped () (Consume e)] r) a
sem)