module Streamly.Internal.Data.Stream.SVar.Eliminate
(
toSVarParallel
, newFoldSVar
, newFoldSVarF
, fromConsumer
, pushToFold
, teeToSVar
)
where
#include "inline.hs"
import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when, void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, writeIORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Control.ForkLifted (doFork)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS_)
import Streamly.Internal.Data.Fold.SVar (write, writeLimited)
import Streamly.Internal.Data.Fold.Type (Fold(..))
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import Streamly.Internal.Data.SVar
{-# INLINE toSVarParallel #-}
toSVarParallel :: MonadAsync m
=> State t m a -> SVar t m a -> D.Stream m a -> m ()
toSVarParallel :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
toSVarParallel State t m a
st SVar t m a
sv Stream m a
xs =
if forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv
then m ()
forkWithDiag
else do
ThreadId
tid <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
work forall a. Maybe a
Nothing)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
Just Count
_ -> forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
workLim forall a. Maybe a
Nothing)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid
where
{-# NOINLINE work #-}
work :: Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
info = forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
sv Maybe WorkerInfo
info) Stream m a
xs
{-# NOINLINE workLim #-}
workLim :: Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
info = forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
sv Maybe WorkerInfo
info) Stream m a
xs
{-# NOINLINE forkWithDiag #-}
forkWithDiag :: m ()
forkWithDiag = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ \Int
n -> Int
n forall a. Num a => a -> a -> a
+ Int
1
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
recordMaxWorkers SVar t m a
sv
Maybe WorkerInfo
winfo <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
Just YieldRateInfo
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
IORef Count
cntRef <- forall a. a -> IO (IORef a)
newIORef Count
0
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Count, AbsTime)
lat <- forall a. a -> IO (IORef a)
newIORef (Count
0, AbsTime
t)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just WorkerInfo
{ workerYieldMax :: Count
workerYieldMax = Count
0
, workerYieldCount :: IORef Count
workerYieldCount = IORef Count
cntRef
, workerLatencyStart :: IORef (Count, AbsTime)
workerLatencyStart = IORef (Count, AbsTime)
lat
}
ThreadId
tid <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
Maybe Count
Nothing -> forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
work Maybe WorkerInfo
winfo)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
Just Count
_ -> forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (Maybe WorkerInfo -> m ()
workLim Maybe WorkerInfo
winfo)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleChildException SVar t m a
sv)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv ThreadId
tid
{-# NOINLINE fromProducer #-}
fromProducer :: forall m a . MonadAsync m => SVar K.Stream m a -> K.Stream m a
fromProducer :: forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
[ChildEvent a]
list <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar Stream m a
sv
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> Stream m a
processEvents forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
reverse [ChildEvent a]
list
where
allDone :: m r -> m r
allDone :: forall r. m r -> m r
allDone m r
stp = do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar Stream m a
sv String
"SVar Done"
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar Stream m a
sv
m r
stp
{-# INLINE processEvents #-}
processEvents :: [ChildEvent a] -> K.Stream m a
processEvents :: [ChildEvent a] -> Stream m a
processEvents [] = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv
processEvents (ChildEvent a
ev : [ChildEvent a]
es) = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
_ a -> Stream m a -> m r
yld a -> m r
_ m r
stp -> do
let rest :: Stream m a
rest = [ChildEvent a] -> Stream m a
processEvents [ChildEvent a]
es
case ChildEvent a
ev of
ChildYield a
a -> a -> Stream m a -> m r
yld a
a Stream m a
rest
ChildStop ThreadId
tid Maybe SomeException
e -> do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar Stream m a
sv ThreadId
tid
case Maybe SomeException
e of
Maybe SomeException
Nothing -> forall r. m r -> m r
allDone m r
stp
Just SomeException
_ -> forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"
{-# INLINE newFoldSVar #-}
newFoldSVar :: MonadAsync m
=> State K.Stream m a -> (SerialT m a -> m b) -> m (SVar K.Stream m a)
newFoldSVar :: forall (m :: * -> *) a b.
MonadAsync m =>
State Stream m a -> (SerialT m a -> m b) -> m (SVar Stream m a)
newFoldSVar State Stream m a
stt SerialT m a -> m b
f = do
SVar Stream m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopAny (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
stt)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Stream m a
sv
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ SerialT m a -> m b
f forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> Stream m a
fromProducer SVar Stream m a
sv)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv)
(forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar Stream m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv
data FromSVarState t m a =
FromSVarInit
| FromSVarRead (SVar t m a)
| FromSVarLoop (SVar t m a) [ChildEvent a]
| FromSVarDone (SVar t m a)
{-# INLINE_NORMAL fromProducerD #-}
fromProducerD :: (MonadAsync m) => SVar t m a -> D.Stream m a
fromProducerD :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromProducerD SVar t m a
svar = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {m :: * -> *} {p} {t :: (* -> *) -> * -> *} {a}.
MonadIO m =>
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
svar)
where
{-# INLINE_LATE step #-}
step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ (FromSVarRead SVar t m a
sv) = do
[ChildEvent a]
list <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv (forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)
step p
_ (FromSVarLoop SVar t m a
sv []) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv
step p
_ (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
case ChildEvent a
ev of
ChildYield a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield a
a (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
ChildStop ThreadId
tid Maybe SomeException
e -> do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
case Maybe SomeException
e of
Maybe SomeException
Nothing -> do
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
Just SomeException
_ -> forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"
step p
_ (FromSVarDone SVar t m a
sv) = do
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop
step p
_ FromSVarState t m a
FromSVarInit = forall a. HasCallStack => a
undefined
{-# INLINE newFoldSVarF #-}
newFoldSVarF :: MonadAsync m => State t m a -> Fold m a b -> m (SVar t m a)
newFoldSVarF :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
MonadAsync m =>
State t m a -> Fold m a b -> m (SVar t m a)
newFoldSVarF State t m a
stt Fold m a b
f = do
SVar t m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopAny (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State t m a
stt)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *).
MonadRunInIO m =>
m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doFork (forall {t :: (* -> *) -> * -> *}. SVar t m a -> m ()
work SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar t m a
sv) (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SomeException -> IO ()
handleFoldException SVar t m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv
where
{-# NOINLINE work #-}
work :: SVar t m a -> m ()
work SVar t m a
sv = forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a b
f forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromProducerD SVar t m a
sv
{-# NOINLINE fromConsumer #-}
fromConsumer :: MonadAsync m => SVar K.Stream m a -> m Bool
fromConsumer :: forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
sv = do
([ChildEvent a]
list, Int
_) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar Stream m a
sv)
forall {m :: * -> *} {a}. MonadThrow m => [ChildEvent a] -> m Bool
processEvents forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
reverse [ChildEvent a]
list
where
{-# INLINE processEvents #-}
processEvents :: [ChildEvent a] -> m Bool
processEvents [] = forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
processEvents (ChildEvent a
ev : [ChildEvent a]
_) = do
case ChildEvent a
ev of
ChildStop ThreadId
_ Maybe SomeException
e -> do
case Maybe SomeException
e of
Maybe SomeException
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just SomeException
ex -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex
ChildYield a
_ -> forall a. HasCallStack => String -> a
error String
"Bug: fromConsumer: invalid ChildYield event"
{-# INLINE pushToFold #-}
pushToFold :: MonadAsync m => SVar K.Stream m a -> a -> m Bool
pushToFold :: forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
sv a
a = do
let qref :: IORef ([ChildEvent a], Int)
qref = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueueFromConsumer SVar Stream m a
sv
Bool
done <- do
([ChildEvent a]
_, Int
n) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef ([ChildEvent a], Int)
qref
if Int
n forall a. Ord a => a -> a -> Bool
> Int
0
then forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
sv
else forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
if Bool
done
then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
{-# INLINE teeToSVar #-}
teeToSVar :: MonadAsync m =>
SVar K.Stream m a -> SerialT m a -> SerialT m a
teeToSVar :: forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a -> SerialT m a
teeToSVar SVar Stream m a
svr (SerialT Stream m a
m) = forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Bool -> Stream m a -> Stream m a
go Bool
False Stream m a
m)
where
go :: Bool -> Stream m a -> Stream m a
go Bool
False Stream m a
m0 = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
_ m r
stp -> do
let drain :: m ()
drain = do
Bool
done <- forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
fromConsumer SVar Stream m a
svr
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar Stream m a
svr String
"teeToSVar: waiting to drain"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar Stream m a
svr)
m ()
drain
stopFold :: m ()
stopFold = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
svr forall a. Maybe a
Nothing
m ()
drain
stop :: m r
stop = m ()
stopFold forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
single :: a -> m r
single a
a = do
Bool
done <- forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a
a -> Stream m a -> m r
yld a
a (Bool -> Stream m a -> Stream m a
go Bool
done (forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
K.nilM m ()
stopFold))
yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
pushToFold SVar Stream m a
svr a
a forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Bool
done -> a -> Stream m a -> m r
yld a
a (Bool -> Stream m a -> Stream m a
go Bool
done Stream m a
r)
in forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yieldk a -> m r
single m r
stop Stream m a
m0
go Bool
True Stream m a
m0 = Stream m a
m0