module Streamly.Internal.Data.Fold.SVar
(
write
, writeLimited
)
where
#include "inline.hs"
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import qualified Streamly.Internal.Data.Fold.Type as FL
import Streamly.Internal.Data.SVar
{-# INLINE write #-}
write :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
svar Maybe WorkerInfo
winfo = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *} {b}. MonadIO m => () -> a -> m (Step () b)
step forall {b}. m (Step () b)
initial forall {m :: * -> *}. MonadIO m => () -> m ()
extract
where
initial :: m (Step () b)
initial = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial ()
step :: () -> a -> m (Step () b)
step () a
x =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (forall a. a -> ChildEvent a
ChildYield a
x)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial ()
extract :: () -> m ()
extract () = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
{-# INLINE writeLimited #-}
writeLimited :: MonadIO m
=> SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
svar Maybe WorkerInfo
winfo = forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold forall {m :: * -> *}. MonadIO m => Bool -> a -> m (Step Bool ())
step forall {b}. m (Step Bool b)
initial forall {m :: * -> *}. MonadIO m => Bool -> m ()
extract
where
initial :: m (Step Bool b)
initial = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial Bool
True
step :: Bool -> a -> m (Step Bool ())
step Bool
True a
x =
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Bool
yieldLimitOk <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
svar
if Bool
yieldLimitOk
then do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (forall a. a -> ChildEvent a
ChildYield a
x)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. s -> Step s b
FL.Partial Bool
True
else do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
svar
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
FL.Done ()
step Bool
False a
_ = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s b. b -> Step s b
FL.Done ()
extract :: Bool -> m ()
extract Bool
True = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
extract Bool
False = forall (m :: * -> *) a. Monad m => a -> m a
return ()