module Polysemy.Input.Streaming
(
module Polysemy.Input
, yieldInput
, yieldRace
, exhaust
, runInputViaStream
, runInputViaInfiniteStream
) where
import qualified Control.Concurrent.Async as A
import Data.Functor.Of
import Data.Void
import Polysemy
import Polysemy.Final
import Polysemy.Input
import Polysemy.State
import qualified Streaming as S
import qualified Streaming.Prelude as S
runInputViaStream
:: S.Stream (Of i) (Sem r) ()
-> InterpreterFor (Input (Maybe i)) r
runInputViaStream stream
= evalState (Just stream)
. reinterpret ( \Input ->
get >>= \case
Nothing -> pure Nothing
Just s ->
raise (S.inspect s) >>= \case
Left () -> pure Nothing
Right (i :> s') -> do
put $ Just s'
pure $ Just i
)
runInputViaInfiniteStream
:: forall i r
. S.Stream (Of i) (Sem r) Void
-> InterpreterFor (Input i) r
runInputViaInfiniteStream stream
= evalState stream
. reinterpret ( \Input -> do
s <- get
raise (S.inspect s) >>= \case
Left g -> absurd g
Right (i :> s') -> do
put s'
pure i
)
yieldRace
:: Members
'[ Final IO
, Input i1
, Input i2
] r
=> S.Stream (S.Of (Either i1 i2)) (Sem r) ()
yieldRace = do
z <- S.lift $ withStrategicToFinal $ do
input1 <- runS input
input2 <- runS input
pure $ fmap sequenceEither $ A.race input1 input2
S.yield z
sequenceEither :: Functor f => Either (f a) (f b) -> f (Either a b)
sequenceEither (Left fa) = Left <$> fa
sequenceEither (Right fb) = Right <$> fb
yieldInput :: Member (Input i) r => S.Stream (Of i) (Sem r) ()
yieldInput = S.lift input >>= S.yield
exhaust :: Member (Input i) r => S.Stream (Of i) (Sem r) a
exhaust = S.repeatM input