-- |Description: Events Combinators
module Polysemy.Conc.Events where

import Polysemy.Conc.Async (withAsync_)
import qualified Polysemy.Conc.Effect.Events as Events
import Polysemy.Conc.Effect.Events (Consume)
import Polysemy.Conc.Effect.Gate (Gate, Gates, gate, signal, withGate)
import Polysemy.Conc.Effect.Race (Race)
import Polysemy.Scoped (Scoped_)
import Polysemy.Conc.Interpreter.Events (EventConsumer)

-- |Create a new scope for 'Polysemy.Conc.Events', causing the nested program to get its own copy of the event stream.
--
-- Calls 'signal' before running the argument to ensure that 'Events.subscribe' has finished creating a channel, for use
-- with asynchronous execution.
subscribeGated ::
   e r .
  Members [EventConsumer e, Gate] r =>
  InterpreterFor (Consume e) r
subscribeGated :: forall e (r :: EffectRow).
Members '[EventConsumer e, Gate] r =>
InterpreterFor (Consume e) r
subscribeGated Sem (Consume e : r) a
action =
  forall e (r :: EffectRow).
Member (Scoped_ (Consume e)) r =>
InterpreterFor (Consume e) r
Events.subscribe @e do
    Sem (Consume e : r) ()
forall (r :: EffectRow). Member Gate r => Sem r ()
signal
    Sem (Consume e : r) a
action

-- |Create a new scope for 'Polysemy.Conc.Events', causing the nested program to get its own copy of the event stream.
--
-- Executes in a new thread, ensuring that the main thread blocks until 'Events.subscribe' has finished creating a
-- channel.
subscribeAsync ::
   e r a .
  Members [EventConsumer e, Scoped_ Gate, Resource, Race, Async] r =>
  Sem (Consume e : r) () ->
  Sem r a ->
  Sem r a
subscribeAsync :: forall e (r :: EffectRow) a.
Members
  '[EventConsumer e, Scoped_ Gate, Resource, Race, Async] r =>
Sem (Consume e : r) () -> Sem r a -> Sem r a
subscribeAsync Sem (Consume e : r) ()
consumer Sem r a
ma =
  Sem (Gate : r) a -> Sem r a
forall (r :: EffectRow).
Member (Scoped_ Gate) r =>
InterpreterFor Gate r
withGate (Sem (Gate : r) a -> Sem r a) -> Sem (Gate : r) a -> Sem r a
forall a b. (a -> b) -> a -> b
$ Sem (Gate : r) () -> Sem (Gate : r) a -> Sem (Gate : r) a
forall (r :: EffectRow) b a.
Members '[Resource, Race, Async] r =>
Sem r b -> Sem r a -> Sem r a
withAsync_ (forall e (r :: EffectRow).
Members '[EventConsumer e, Gate] r =>
InterpreterFor (Consume e) r
subscribeGated @_ (forall (e2 :: (* -> *) -> * -> *) (e1 :: (* -> *) -> * -> *)
       (r :: EffectRow) a.
Sem (e1 : r) a -> Sem (e1 : e2 : r) a
raiseUnder @Gate Sem (Consume e : r) ()
consumer)) do
    Sem (Gate : r) ()
forall (r :: EffectRow). Member Gate r => Sem r ()
gate
    forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise @Gate Sem r a
ma

-- |Pull repeatedly from 'Polysemy.Conc.Consume', passing the event to the supplied callback.
-- Stop when the action returns @False@.
consumeWhile ::
  Member (Consume e) r =>
  (e -> Sem r Bool) ->
  Sem r ()
consumeWhile :: forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r Bool) -> Sem r ()
consumeWhile e -> Sem r Bool
action =
  Sem r ()
spin
  where
    spin :: Sem r ()
spin =
      Sem r Bool -> Sem r () -> Sem r ()
forall (m :: * -> *). Monad m => m Bool -> m () -> m ()
whenM (e -> Sem r Bool
action (e -> Sem r Bool) -> Sem r e -> Sem r Bool
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Sem r e
forall e (r :: EffectRow). Member (Consume e) r => Sem r e
Events.consume) Sem r ()
spin

-- |Pull repeatedly from the 'Polysemy.Conc.Events' channel, passing the event to the supplied callback.
-- Stop when the action returns @False@.
subscribeWhile ::
   e r .
  Member (EventConsumer e) r =>
  (e -> Sem r Bool) ->
  Sem r ()
subscribeWhile :: forall e (r :: EffectRow).
Member (EventConsumer e) r =>
(e -> Sem r Bool) -> Sem r ()
subscribeWhile e -> Sem r Bool
action =
  forall e (r :: EffectRow).
Member (Scoped_ (Consume e)) r =>
InterpreterFor (Consume e) r
Events.subscribe @e ((e -> Sem (Consume e : r) Bool) -> Sem (Consume e : r) ()
forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r Bool) -> Sem r ()
consumeWhile (Sem r Bool -> Sem (Consume e : r) Bool
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise (Sem r Bool -> Sem (Consume e : r) Bool)
-> (e -> Sem r Bool) -> e -> Sem (Consume e : r) Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Sem r Bool
action))

-- |Pull repeatedly from the 'Polysemy.Conc.Events' channel, passing the event to the supplied callback.
-- Stop when the action returns @False@.
--
-- Signals the caller that the channel was successfully subscribed to using the 'Gate' effect.
subscribeWhileGated ::
   e r .
  Members [EventConsumer e, Gate] r =>
  (e -> Sem r Bool) ->
  Sem r ()
subscribeWhileGated :: forall e (r :: EffectRow).
Members '[EventConsumer e, Gate] r =>
(e -> Sem r Bool) -> Sem r ()
subscribeWhileGated e -> Sem r Bool
action =
  forall e (r :: EffectRow).
Members '[EventConsumer e, Gate] r =>
InterpreterFor (Consume e) r
subscribeGated @e do
    (e -> Sem (Consume e : r) Bool) -> Sem (Consume e : r) ()
forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r Bool) -> Sem r ()
consumeWhile (Sem r Bool -> Sem (Consume e : r) Bool
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise (Sem r Bool -> Sem (Consume e : r) Bool)
-> (e -> Sem r Bool) -> e -> Sem (Consume e : r) Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Sem r Bool
action)

-- |Start a new thread that pulls repeatedly from the 'Polysemy.Conc.Events' channel, passing the event to the supplied
-- callback and stops when the action returns @False@.
subscribeWhileAsync ::
   e r a .
  Members [EventConsumer e, Gates, Resource, Race, Async] r =>
  (e -> Sem (Consume e : r) Bool) ->
  Sem r a ->
  Sem r a
subscribeWhileAsync :: forall e (r :: EffectRow) a.
Members
  '[EventConsumer e, Scoped_ Gate, Resource, Race, Async] r =>
(e -> Sem (Consume e : r) Bool) -> Sem r a -> Sem r a
subscribeWhileAsync e -> Sem (Consume e : r) Bool
action =
  forall e (r :: EffectRow) a.
Members
  '[EventConsumer e, Scoped_ Gate, Resource, Race, Async] r =>
Sem (Consume e : r) () -> Sem r a -> Sem r a
subscribeAsync @e ((e -> Sem (Consume e : r) Bool) -> Sem (Consume e : r) ()
forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r Bool) -> Sem r ()
consumeWhile e -> Sem (Consume e : r) Bool
action)

-- |Pull repeatedly from 'Polysemy.Conc.Consume', passing the event to the supplied callback.
consumeLoop ::
  Member (Consume e) r =>
  (e -> Sem r ()) ->
  Sem r ()
consumeLoop :: forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r ()) -> Sem r ()
consumeLoop e -> Sem r ()
action =
  Sem r () -> Sem r ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (e -> Sem r ()
action (e -> Sem r ()) -> Sem r e -> Sem r ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Sem r e
forall e (r :: EffectRow). Member (Consume e) r => Sem r e
Events.consume)

-- |Pull repeatedly from the 'Polysemy.Conc.Events' channel, passing the event to the supplied callback.
subscribeLoop ::
   e r .
  Member (EventConsumer e) r =>
  (e -> Sem r ()) ->
  Sem r ()
subscribeLoop :: forall e (r :: EffectRow).
Member (EventConsumer e) r =>
(e -> Sem r ()) -> Sem r ()
subscribeLoop e -> Sem r ()
action =
  forall e (r :: EffectRow).
Member (Scoped_ (Consume e)) r =>
InterpreterFor (Consume e) r
Events.subscribe @e ((e -> Sem (Consume e : r) ()) -> Sem (Consume e : r) ()
forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r ()) -> Sem r ()
consumeLoop (Sem r () -> Sem (Consume e : r) ()
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise (Sem r () -> Sem (Consume e : r) ())
-> (e -> Sem r ()) -> e -> Sem (Consume e : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Sem r ()
action))

-- |Pull repeatedly from the 'Polysemy.Conc.Events' channel, passing the event to the supplied callback.
--
-- Signals the caller that the channel was successfully subscribed to using the 'Gate' effect.
subscribeLoopGated ::
   e r .
  Members [EventConsumer e, Gate] r =>
  (e -> Sem r ()) ->
  Sem r ()
subscribeLoopGated :: forall e (r :: EffectRow).
Members '[EventConsumer e, Gate] r =>
(e -> Sem r ()) -> Sem r ()
subscribeLoopGated e -> Sem r ()
action =
  forall e (r :: EffectRow).
Members '[EventConsumer e, Gate] r =>
InterpreterFor (Consume e) r
subscribeGated @e do
    (e -> Sem (Consume e : r) ()) -> Sem (Consume e : r) ()
forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r ()) -> Sem r ()
consumeLoop (Sem r () -> Sem (Consume e : r) ()
forall (e :: (* -> *) -> * -> *) (r :: EffectRow) a.
Sem r a -> Sem (e : r) a
raise (Sem r () -> Sem (Consume e : r) ())
-> (e -> Sem r ()) -> e -> Sem (Consume e : r) ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. e -> Sem r ()
action)

-- |Start a new thread that pulls repeatedly from the 'Polysemy.Conc.Events' channel, passing the event to the supplied
-- callback.
subscribeLoopAsync ::
   e r a .
  Members [EventConsumer e, Gates, Resource, Race, Async] r =>
  (e -> Sem (Consume e : r) ()) ->
  Sem r a ->
  Sem r a
subscribeLoopAsync :: forall e (r :: EffectRow) a.
Members
  '[EventConsumer e, Scoped_ Gate, Resource, Race, Async] r =>
(e -> Sem (Consume e : r) ()) -> Sem r a -> Sem r a
subscribeLoopAsync e -> Sem (Consume e : r) ()
action =
  forall e (r :: EffectRow) a.
Members
  '[EventConsumer e, Scoped_ Gate, Resource, Race, Async] r =>
Sem (Consume e : r) () -> Sem r a -> Sem r a
subscribeAsync @e ((e -> Sem (Consume e : r) ()) -> Sem (Consume e : r) ()
forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r ()) -> Sem r ()
consumeLoop e -> Sem (Consume e : r) ()
action)

-- |Block until a value matching the predicate has been returned by 'Polysemy.Conc.Consume'.
consumeFind ::
   e r .
  Member (Consume e) r =>
  (e -> Sem r Bool) ->
  Sem r e
consumeFind :: forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r Bool) -> Sem r e
consumeFind e -> Sem r Bool
f =
  Sem r e
spin
  where
    spin :: Sem r e
spin = do
      e
e <- Sem r e
forall e (r :: EffectRow). Member (Consume e) r => Sem r e
Events.consume
      Sem r Bool -> Sem r e -> Sem r e -> Sem r e
forall (m :: * -> *) a. Monad m => m Bool -> m a -> m a -> m a
ifM (e -> Sem r Bool
f e
e) (e -> Sem r e
forall (f :: * -> *) a. Applicative f => a -> f a
pure e
e) Sem r e
spin

-- |Block until a value matching the predicate has been published to the 'Polysemy.Conc.Events' channel.
subscribeFind ::
   e r .
  Member (EventConsumer e) r =>
  (e -> Sem (Consume e : r) Bool) ->
  Sem r e
subscribeFind :: forall e (r :: EffectRow).
Member (EventConsumer e) r =>
(e -> Sem (Consume e : r) Bool) -> Sem r e
subscribeFind e -> Sem (Consume e : r) Bool
f =
  forall e (r :: EffectRow).
Member (Scoped_ (Consume e)) r =>
InterpreterFor (Consume e) r
Events.subscribe @e ((e -> Sem (Consume e : r) Bool) -> Sem (Consume e : r) e
forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r Bool) -> Sem r e
consumeFind e -> Sem (Consume e : r) Bool
f)

-- |Return the first value returned by 'Polysemy.Conc.Consume' for which the function produces 'Just'.
consumeFirstJust ::
   e a r .
  Member (Consume e) r =>
  (e -> Sem r (Maybe a)) ->
  Sem r a
consumeFirstJust :: forall e a (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r (Maybe a)) -> Sem r a
consumeFirstJust e -> Sem r (Maybe a)
f =
  Sem r a
spin
  where
    spin :: Sem r a
spin = do
      e
e <- Sem r e
forall e (r :: EffectRow). Member (Consume e) r => Sem r e
Events.consume
      Sem r a -> (a -> Sem r a) -> Maybe a -> Sem r a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Sem r a
spin a -> Sem r a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe a -> Sem r a) -> Sem r (Maybe a) -> Sem r a
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< e -> Sem r (Maybe a)
f e
e

-- |Return the first value published to the 'Polysemy.Conc.Events' channel for which the function produces 'Just'.
subscribeFirstJust ::
   e a r .
  Member (EventConsumer e) r =>
  (e -> Sem (Consume e : r) (Maybe a)) ->
  Sem r a
subscribeFirstJust :: forall e a (r :: EffectRow).
Member (EventConsumer e) r =>
(e -> Sem (Consume e : r) (Maybe a)) -> Sem r a
subscribeFirstJust e -> Sem (Consume e : r) (Maybe a)
f =
  forall e (r :: EffectRow).
Member (Scoped_ (Consume e)) r =>
InterpreterFor (Consume e) r
Events.subscribe @e ((e -> Sem (Consume e : r) (Maybe a)) -> Sem (Consume e : r) a
forall e a (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r (Maybe a)) -> Sem r a
consumeFirstJust e -> Sem (Consume e : r) (Maybe a)
f)

-- |Block until the specified value has been returned by 'Polysemy.Conc.Consume'.
consumeElem ::
   e r .
  Eq e =>
  Member (Consume e) r =>
  e ->
  Sem r ()
consumeElem :: forall e (r :: EffectRow).
(Eq e, Member (Consume e) r) =>
e -> Sem r ()
consumeElem e
target =
  Sem r e -> Sem r ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void ((e -> Sem r Bool) -> Sem r e
forall e (r :: EffectRow).
Member (Consume e) r =>
(e -> Sem r Bool) -> Sem r e
consumeFind (Bool -> Sem r Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Bool -> Sem r Bool) -> (e -> Bool) -> e -> Sem r Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (e
target e -> e -> Bool
forall a. Eq a => a -> a -> Bool
==)))

-- |Block until the specified value has been published to the 'Polysemy.Conc.Events' channel.
subscribeElem ::
   e r .
  Eq e =>
  Member (EventConsumer e) r =>
  e ->
  Sem r ()
subscribeElem :: forall e (r :: EffectRow).
(Eq e, Member (EventConsumer e) r) =>
e -> Sem r ()
subscribeElem e
target =
  forall e (r :: EffectRow).
Member (Scoped_ (Consume e)) r =>
InterpreterFor (Consume e) r
Events.subscribe @e (e -> Sem (Consume e : r) ()
forall e (r :: EffectRow).
(Eq e, Member (Consume e) r) =>
e -> Sem r ()
consumeElem e
target)