module Streamly.Internal.Data.Fold.SVar
(
write
, writeLimited
)
where
#include "inline.hs"
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Data.Fold.Type (Fold(..))
import qualified Streamly.Internal.Data.Fold.Type as FL
import Streamly.Internal.Data.SVar
{-# INLINE write #-}
write :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write :: SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
svar Maybe WorkerInfo
winfo = (() -> a -> m (Step () ()))
-> m (Step () ()) -> (() -> m ()) -> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold () -> a -> m (Step () ())
forall (m :: * -> *) b. MonadIO m => () -> a -> m (Step () b)
step m (Step () ())
forall b. m (Step () b)
initial () -> m ()
forall (m :: * -> *). MonadIO m => () -> m ()
extract
where
initial :: m (Step () b)
initial = Step () b -> m (Step () b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () b -> m (Step () b)) -> Step () b -> m (Step () b)
forall a b. (a -> b) -> a -> b
$ () -> Step () b
forall s b. s -> Step s b
FL.Partial ()
step :: () -> a -> m (Step () b)
step () a
x =
IO (Step () b) -> m (Step () b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Step () b) -> m (Step () b))
-> IO (Step () b) -> m (Step () b)
forall a b. (a -> b) -> a -> b
$ do
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
x)
Step () b -> IO (Step () b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () b -> IO (Step () b)) -> Step () b -> IO (Step () b)
forall a b. (a -> b) -> a -> b
$ () -> Step () b
forall s b. s -> Step s b
FL.Partial ()
extract :: () -> m ()
extract () = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
{-# INLINE writeLimited #-}
writeLimited :: MonadIO m
=> SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited :: SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
svar Maybe WorkerInfo
winfo = (Bool -> a -> m (Step Bool ()))
-> m (Step Bool ()) -> (Bool -> m ()) -> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> Fold m a b
Fold Bool -> a -> m (Step Bool ())
forall (m :: * -> *). MonadIO m => Bool -> a -> m (Step Bool ())
step m (Step Bool ())
forall b. m (Step Bool b)
initial Bool -> m ()
forall (m :: * -> *). MonadIO m => Bool -> m ()
extract
where
initial :: m (Step Bool b)
initial = Step Bool b -> m (Step Bool b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool b -> m (Step Bool b)) -> Step Bool b -> m (Step Bool b)
forall a b. (a -> b) -> a -> b
$ Bool -> Step Bool b
forall s b. s -> Step s b
FL.Partial Bool
True
step :: Bool -> a -> m (Step Bool ())
step Bool
True a
x =
IO (Step Bool ()) -> m (Step Bool ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Step Bool ()) -> m (Step Bool ()))
-> IO (Step Bool ()) -> m (Step Bool ())
forall a b. (a -> b) -> a -> b
$ do
Bool
yieldLimitOk <- SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
svar
if Bool
yieldLimitOk
then do
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
x)
Step Bool () -> IO (Step Bool ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> IO (Step Bool ()))
-> Step Bool () -> IO (Step Bool ())
forall a b. (a -> b) -> a -> b
$ Bool -> Step Bool ()
forall s b. s -> Step s b
FL.Partial Bool
True
else do
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
svar
SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
Step Bool () -> IO (Step Bool ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> IO (Step Bool ()))
-> Step Bool () -> IO (Step Bool ())
forall a b. (a -> b) -> a -> b
$ () -> Step Bool ()
forall s b. b -> Step s b
FL.Done ()
step Bool
False a
_ = Step Bool () -> m (Step Bool ())
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> m (Step Bool ()))
-> Step Bool () -> m (Step Bool ())
forall a b. (a -> b) -> a -> b
$ () -> Step Bool ()
forall s b. b -> Step s b
FL.Done ()
extract :: Bool -> m ()
extract Bool
True = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
extract Bool
False = () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()