{-# LANGUAGE UndecidableInstances #-}
module Streamly.Internal.Data.Stream.Parallel
(
ParallelT(..)
, Parallel
, consM
, parallelK
, parallelFstK
, parallelMinK
, mkParallelD
, mkParallelK
, tapAsyncK
, tapAsyncF
, newCallbackStream
)
where
import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Prelude hiding (map)
import qualified Data.Set as Set
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.SVar.Generate as SVar
import qualified Streamly.Internal.Data.Stream.SVar.Eliminate as SVar
import Streamly.Internal.Data.SVar
#include "inline.hs"
#include "Instances.hs"
{-# NOINLINE runOne #-}
runOne
:: MonadIO m
=> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne :: forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo =
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> Stream m a -> m ()
go Stream m a
m0
Just Count
_ -> forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo
where
go :: Stream m a -> m ()
go Stream m a
m = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m ()
yieldk forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop Stream m a
m
sv :: SVar Stream m a
sv = forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st
stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar Stream m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> Stream m a -> m ()
yieldk a
a Stream m a
r = forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Stream m a -> m ()
go Stream m a
r
runOneLimited
:: MonadIO m
=> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited :: forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo = Stream m a -> m ()
go Stream m a
m0
where
go :: Stream m a -> m ()
go Stream m a
m = do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m ()
yieldk forall {m :: * -> *}. MonadIO m => a -> m ()
single m ()
stop Stream m a
m
else do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar Stream m a
sv
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
sv :: SVar Stream m a
sv = forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st
stop :: m ()
stop = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar Stream m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> Stream m a -> m ()
yieldk a
a Stream m a
r = forall {m :: * -> *}. MonadIO m => a -> m ()
sendit a
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Stream m a -> m ()
go Stream m a
r
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: MonadAsync m
=> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarPar SVarStopStyle
ss Stream m a
m Stream m a
r = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State Stream m a
st
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = forall a. a -> Maybe a
Just SVar Stream m a
sv} Stream m a
m)
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Set ThreadId
set <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar Stream m a
sv)
forall a. IORef a -> a -> IO ()
writeIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar Stream m a
sv) forall a b. (a -> b) -> a -> b
$ forall a. Int -> Set a -> a
Set.elemAt Int
0 Set ThreadId
set
SVarStopStyle
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = forall a. a -> Maybe a
Just SVar Stream m a
sv} Stream m a
r)
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
SVar.fromSVar SVar Stream m a
sv)
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: MonadAsync m
=> SVarStyle -> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar SVarStyle
style SVarStopStyle
ss Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
Just SVar Stream m a
sv | forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv forall a. Eq a => a -> a -> Bool
== SVarStyle
style Bool -> Bool -> Bool
&& forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar Stream m a
sv forall a. Eq a => a -> a -> Bool
== SVarStopStyle
ss -> do
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st Stream m a
m1)
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m2
Maybe (SVar Stream m a)
_ ->
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (forall (m :: * -> *) a.
MonadAsync m =>
SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarPar SVarStopStyle
ss Stream m a
m1 Stream m a
m2)
{-# INLINE parallelK #-}
parallelK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK = forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopNone
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consM :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
consM m a
m (ParallelT Stream m a
r) = forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK (forall (m :: * -> *) a. Monad m => m a -> Stream m a
K.fromEffect m a
m) Stream m a
r
{-# INLINE parallelFstK #-}
parallelFstK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelFstK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelFstK = forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopBy
{-# INLINE parallelMinK #-}
parallelMinK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
parallelMinK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelMinK = forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle
-> SVarStopStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopAny
mkParallelK :: MonadAsync m => Stream m a -> Stream m a
mkParallelK :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkParallelK Stream m a
m = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State Stream m a
st SVar Stream m a
sv forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Applicative m => Stream m a -> Stream m a
D.fromStreamK Stream m a
m
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
SVar.fromSVar SVar Stream m a
sv
{-# INLINE_NORMAL mkParallelD #-}
mkParallelD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkParallelD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkParallelD Stream m a
m = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step forall a. Maybe a
Nothing
where
step :: State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State Stream m a
gst Maybe (Stream m a)
Nothing = do
SVar Stream m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State Stream m a
gst
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State Stream m a
gst SVar Stream m a
sv Stream m a
m
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Stream m a
sv
step State Stream m a
gst (Just (D.UnStream State Stream m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> forall s a. a -> s -> Step s a
Yield a
a (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
Skip s
s -> forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
Step s a
Stop -> forall s a. Step s a
Stop
{-# INLINE tapAsyncK #-}
tapAsyncK :: MonadAsync m => (Stream m a -> m b) -> Stream m a -> Stream m a
tapAsyncK :: forall (m :: * -> *) a b.
MonadAsync m =>
(Stream m a -> m b) -> Stream m a -> Stream m a
tapAsyncK Stream m a -> m b
f Stream m a
m = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- forall (m :: * -> *) a b.
MonadAsync m =>
State Stream m a -> (SerialT m a -> m b) -> m (SVar Stream m a)
SVar.newFoldSVar State Stream m a
st (Stream m a -> m b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT)
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp
forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a -> SerialT m a
SVar.teeToSVar SVar Stream m a
sv forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Stream m a -> SerialT m a
SerialT Stream m a
m)
data TapState fs st a = TapInit | Tapping !fs st | TapDone st
{-# INLINE_NORMAL tapAsyncF #-}
tapAsyncF :: MonadAsync m => Fold m a b -> D.Stream m a -> D.Stream m a
tapAsyncF :: forall (m :: * -> *) a b.
MonadAsync m =>
Fold m a b -> Stream m a -> Stream m a
tapAsyncF Fold m a b
f (D.Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {a} {a}.
State Stream m a
-> TapState (SVar Stream m a) s a
-> m (Step (TapState (SVar Stream m a) s a) a)
step forall fs st a. TapState fs st a
TapInit
where
drainFold :: SVar Stream m a -> m ()
drainFold SVar Stream m a
svr = do
Bool
done <- forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
SVar.fromConsumer SVar Stream m a
svr
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) forall a b. (a -> b) -> a -> b
$ do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar Stream m a
svr String
"teeToSVar: waiting to drain"
forall a b. (a -> b) -> a -> b
$ forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar Stream m a
svr)
SVar Stream m a -> m ()
drainFold SVar Stream m a
svr
stopFold :: SVar Stream m a -> m ()
stopFold SVar Stream m a
svr = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
svr forall a. Maybe a
Nothing
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar Stream m a -> m ()
drainFold SVar Stream m a
svr
{-# INLINE_LATE step #-}
step :: State Stream m a
-> TapState (SVar Stream m a) s a
-> m (Step (TapState (SVar Stream m a) s a) a)
step State Stream m a
gst TapState (SVar Stream m a) s a
TapInit = do
SVar Stream m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
MonadAsync m =>
State t m a -> Fold m a b -> m (SVar t m a)
SVar.newFoldSVarF State Stream m a
gst Fold m a b
f
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall fs st a. fs -> st -> TapState fs st a
Tapping SVar Stream m a
sv s
state1)
step State Stream m a
gst (Tapping SVar Stream m a
sv s
st) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
case Step s a
r of
Yield a
a s
s -> do
Bool
done <- forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
SVar.pushToFold SVar Stream m a
sv a
a
if Bool
done
then do
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar Stream m a -> m ()
stopFold SVar Stream m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall fs st a. st -> TapState fs st a
TapDone s
s)
else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall fs st a. fs -> st -> TapState fs st a
Tapping SVar Stream m a
sv s
s)
Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall fs st a. fs -> st -> TapState fs st a
Tapping SVar Stream m a
sv s
s)
Step s a
Stop -> do
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar Stream m a -> m ()
stopFold SVar Stream m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
step State Stream m a
gst (TapDone s
st) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> forall s a. a -> s -> Step s a
Yield a
a (forall fs st a. st -> TapState fs st a
TapDone s
s)
Skip s
s -> forall s a. s -> Step s a
Skip (forall fs st a. st -> TapState fs st a
TapDone s
s)
Step s a
Stop -> forall s a. Step s a
Stop
newtype ParallelT m a = ParallelT {forall (m :: * -> *) a. ParallelT m a -> Stream m a
getParallelT :: Stream m a}
deriving (forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
MonadTrans)
type Parallel = ParallelT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: ParallelT IO a -> ParallelT IO a -> ParallelT IO a #-}
append :: MonadAsync m => ParallelT m a -> ParallelT m a -> ParallelT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append (ParallelT Stream m a
m1) (ParallelT Stream m a
m2) = forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (ParallelT m a) where
<> :: ParallelT m a -> ParallelT m a -> ParallelT m a
(<>) = forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
append
instance MonadAsync m => Monoid (ParallelT m a) where
mempty :: ParallelT m a
mempty = forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT forall (m :: * -> *) a. Stream m a
K.nil
mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apParallel #-}
{-# SPECIALIZE apParallel ::
ParallelT IO (a -> b) -> ParallelT IO a -> ParallelT IO b #-}
apParallel :: MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel (ParallelT Stream m (a -> b)
m1) (ParallelT Stream m a
m2) =
let f :: (a -> b) -> Stream m b
f a -> b
x1 = forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK forall {b}. (a -> b) -> Stream m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (ParallelT m) where
{-# INLINE pure #-}
pure :: forall a. a -> ParallelT m a
pure = forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). a -> Stream m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
(<*>) = forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel
{-# INLINE bind #-}
{-# SPECIALIZE bind ::
ParallelT IO a -> (a -> ParallelT IO b) -> ParallelT IO b #-}
bind :: MonadAsync m => ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind :: forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind (ParallelT Stream m a
m) a -> ParallelT m b
f = forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
K.bindWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
parallelK Stream m a
m (forall (m :: * -> *) a. ParallelT m a -> Stream m a
getParallelT forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> ParallelT m b
f)
instance MonadAsync m => Monad (ParallelT m) where
return :: forall a. a -> ParallelT m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (>>=) #-}
>>= :: forall a b. ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
(>>=) = forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bind
MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream :: forall (m :: * -> *) a. MonadAsync m => m (a -> m (), Stream m a)
newCallbackStream = do
SVar Any m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Any m a
sv
let callback :: a -> m ()
callback a
a = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Any m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
forall (m :: * -> *) a. Monad m => a -> m a
return (forall {m :: * -> *}. MonadIO m => a -> m ()
callback, forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK (forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Any m a
sv))