{-# LANGUAGE UndecidableInstances #-}
#include "inline.hs"
module Streamly.Internal.Data.Stream.Parallel
(
ParallelT
, Parallel
, fromParallel
, parallel
, parallelFst
, parallelMin
, mkParallel
, mkParallelD
, mkParallelK
, tapAsync
, tapAsyncF
, distributeAsync_
, newCallbackStream
)
where
import Control.Concurrent (myThreadId, takeMVar)
import Control.Monad (when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Functor (void)
import Data.IORef (readIORef, writeIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Prelude hiding (map)
import qualified Data.Set as Set
import Streamly.Internal.Data.Fold.Type (Fold)
import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))
import Streamly.Internal.Data.Stream.StreamK
(IsStream(..), Stream, mkStream, foldStream, foldStreamShared, adapt)
import Streamly.Internal.Data.SVar
import qualified Streamly.Internal.Data.Stream.StreamK as K (withLocal)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.SVar as SVar
#include "Instances.hs"
{-# NOINLINE runOne #-}
runOne
:: MonadIO m
=> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne :: State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo =
case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> Stream m a -> m ()
forall (t :: (* -> *) -> * -> *). IsStream t => t m a -> m ()
go Stream m a
m0
Just Count
_ -> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo
where
go :: t m a -> m ()
go t m a
m = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
State Stream m a
-> (a -> t m a -> m ()) -> (a -> m ()) -> m () -> t m a -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m ()
yieldk a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
single m ()
stop t m a
m
sv :: SVar Stream m a
sv = Maybe (SVar Stream m a) -> SVar Stream m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar Stream m a
sv
SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> t m a -> m ()
yieldk a
a t m a
r = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> t m a -> m ()
go t m a
r
runOneLimited
:: MonadIO m
=> State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited :: State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOneLimited State Stream m a
st Stream m a
m0 Maybe WorkerInfo
winfo = Stream m a -> m ()
forall (t :: (* -> *) -> * -> *). IsStream t => t m a -> m ()
go Stream m a
m0
where
go :: t m a -> m ()
go t m a
m = do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar Stream m a
sv
State Stream m a
-> (a -> t m a -> m ()) -> (a -> m ()) -> m () -> t m a -> m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m ()
yieldk a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
single m ()
stop t m a
m
else do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar Stream m a
sv
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
sv :: SVar Stream m a
sv = Maybe (SVar Stream m a) -> SVar Stream m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementBufferLimit SVar Stream m a
sv
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
sendit :: a -> m ()
sendit a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
single :: a -> m ()
single a
a = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo)
yieldk :: a -> t m a -> m ()
yieldk a
a t m a
r = a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
sendit a
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> t m a -> m ()
go t m a
r
{-# NOINLINE forkSVarPar #-}
forkSVarPar :: (IsStream t, MonadAsync m)
=> SVarStopStyle -> t m a -> t m a -> t m a
forkSVarPar :: SVarStopStyle -> t m a -> t m a -> t m a
forkSVarPar SVarStopStyle
ss t m a
m t m a
r = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State Stream m a
st
SVar Stream m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} (Stream m a -> Maybe WorkerInfo -> m ())
-> Stream m a -> Maybe WorkerInfo -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m)
case SVarStopStyle
ss of
SVarStopStyle
StopBy -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Set ThreadId
set <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar Stream m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar Stream m a
sv)
IORef ThreadId -> ThreadId -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVar Stream m a -> IORef ThreadId
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar Stream m a
sv) (ThreadId -> IO ()) -> ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Set ThreadId -> ThreadId
forall a. Int -> Set a -> a
Set.elemAt Int
0 Set ThreadId
set
SVarStopStyle
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
SVar Stream m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} (Stream m a -> Maybe WorkerInfo -> m ())
-> Stream m a -> Maybe WorkerInfo -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
r)
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (SVar Stream m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
SVar Stream m a -> t m a
SVar.fromSVar SVar Stream m a
sv)
{-# INLINE joinStreamVarPar #-}
joinStreamVarPar :: (IsStream t, MonadAsync m)
=> SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar :: SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar SVarStyle
style SVarStopStyle
ss t m a
m1 t m a
m2 = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp ->
case State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
Just SVar Stream m a
sv | SVar Stream m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style Bool -> Bool -> Bool
&& SVar Stream m a -> SVarStopStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar Stream m a
sv SVarStopStyle -> SVarStopStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStopStyle
ss -> do
SVar Stream m a -> (Maybe WorkerInfo -> m ()) -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar SVar Stream m a
sv (State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadIO m =>
State Stream m a -> Stream m a -> Maybe WorkerInfo -> m ()
runOne State Stream m a
st (Stream m a -> Maybe WorkerInfo -> m ())
-> Stream m a -> Maybe WorkerInfo -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream t m a
m1)
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp t m a
m2
Maybe (SVar Stream m a)
_ ->
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (SVarStopStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStopStyle -> t m a -> t m a -> t m a
forkSVarPar SVarStopStyle
ss t m a
m1 t m a
m2)
{-# INLINE consMParallel #-}
{-# SPECIALIZE consMParallel :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consMParallel :: MonadAsync m => m a -> ParallelT m a -> ParallelT m a
consMParallel :: m a -> ParallelT m a -> ParallelT m a
consMParallel m a
m ParallelT m a
r = Stream m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> ParallelT m a) -> Stream m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(Monad m, IsStream t) =>
m a -> t m a
K.fromEffect m a
m Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
`parallel` (ParallelT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream ParallelT m a
r)
infixr 6 `parallel`
{-# INLINE parallel #-}
parallel :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallel :: t m a -> t m a -> t m a
parallel = SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopNone
{-# INLINE parallelFst #-}
parallelFst :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallelFst :: t m a -> t m a -> t m a
parallelFst = SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopBy
{-# INLINE parallelMin #-}
parallelMin :: (IsStream t, MonadAsync m) => t m a -> t m a -> t m a
parallelMin :: t m a -> t m a -> t m a
parallelMin = SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVarStyle -> SVarStopStyle -> t m a -> t m a -> t m a
joinStreamVarPar SVarStyle
ParallelVar SVarStopStyle
StopAny
mkParallelK :: (IsStream t, MonadAsync m) => t m a -> t m a
mkParallelK :: t m a -> t m a
mkParallelK t m a
m = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st)
State Stream m a -> SVar Stream m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State Stream m a
st SVar Stream m a
sv (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD t m a
m
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStream State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (t m a -> m r) -> t m a -> m r
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> t m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
(MonadAsync m, IsStream t) =>
SVar Stream m a -> t m a
SVar.fromSVar SVar Stream m a
sv
{-# INLINE_NORMAL mkParallelD #-}
mkParallelD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkParallelD :: Stream m a -> Stream m a
mkParallelD Stream m a
m = (State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
where
step :: State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State Stream m a
gst Maybe (Stream m a)
Nothing = do
SVar Stream m a
sv <- SVarStopStyle -> State Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State Stream m a
gst
State Stream m a -> SVar Stream m a -> Stream m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a -> SVar t m a -> Stream m a -> m ()
SVar.toSVarParallel State Stream m a
gst SVar Stream m a
sv Stream m a
m
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Stream m a
sv
step State Stream m a
gst (Just (D.UnStream State Stream m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
Skip s
s -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
Step s a
Stop -> Step (Maybe (Stream m a)) a
forall s a. Step s a
Stop
{-# INLINE_NORMAL mkParallel #-}
mkParallel :: (K.IsStream t, MonadAsync m) => t m a -> t m a
mkParallel :: t m a -> t m a
mkParallel = Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (Stream m a -> t m a) -> (t m a -> Stream m a) -> t m a -> t m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Stream m a
forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkParallelD (Stream m a -> Stream m a)
-> (t m a -> Stream m a) -> t m a -> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. t m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
t m a -> Stream m a
D.toStreamD
{-# INLINE tapAsync #-}
tapAsync :: (IsStream t, MonadAsync m) => (t m a -> m b) -> t m a -> t m a
tapAsync :: (t m a -> m b) -> t m a -> t m a
tapAsync t m a -> m b
f t m a
m = (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
(forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
mkStream ((forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a)
-> (forall r.
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> m r)
-> t m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- State Stream m a -> (t m a -> m b) -> m (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
State Stream m a -> (t m a -> m b) -> m (SVar Stream m a)
SVar.newFoldSVar State Stream m a
st t m a -> m b
f
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a r.
IsStream t =>
State Stream m a
-> (a -> t m a -> m r) -> (a -> m r) -> m r -> t m a -> m r
foldStreamShared State Stream m a
st a -> t m a -> m r
yld a -> m r
sng m r
stp (SVar Stream m a -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
SVar Stream m a -> t m a -> t m a
SVar.teeToSVar SVar Stream m a
sv t m a
m)
data TapState fs st a = TapInit | Tapping !fs st | TapDone st
{-# INLINE_NORMAL tapAsyncF #-}
tapAsyncF :: MonadAsync m => Fold m a b -> D.Stream m a -> D.Stream m a
tapAsyncF :: Fold m a b -> Stream m a -> Stream m a
tapAsyncF Fold m a b
f (D.Stream State Stream m a -> s -> m (Step s a)
step1 s
state1) = (State Stream m a
-> TapState (SVar Stream m a) s Any
-> m (Step (TapState (SVar Stream m a) s Any) a))
-> TapState (SVar Stream m a) s Any -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> TapState (SVar Stream m a) s Any
-> m (Step (TapState (SVar Stream m a) s Any) a)
forall a a.
State Stream m a
-> TapState (SVar Stream m a) s a
-> m (Step (TapState (SVar Stream m a) s a) a)
step TapState (SVar Stream m a) s Any
forall fs st a. TapState fs st a
TapInit
where
drainFold :: SVar Stream m a -> m ()
drainFold SVar Stream m a
svr = do
Bool
done <- SVar Stream m a -> m Bool
forall (m :: * -> *) a. MonadAsync m => SVar Stream m a -> m Bool
SVar.fromConsumer SVar Stream m a
svr
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> String -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO () -> IO ()
withDiagMVar SVar Stream m a
svr String
"teeToSVar: waiting to drain"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBellFromConsumer SVar Stream m a
svr)
SVar Stream m a -> m ()
drainFold SVar Stream m a
svr
stopFold :: SVar Stream m a -> m ()
stopFold SVar Stream m a
svr = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
svr Maybe WorkerInfo
forall a. Maybe a
Nothing
SVar Stream m a -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar Stream m a -> m ()
drainFold SVar Stream m a
svr
{-# INLINE_LATE step #-}
step :: State Stream m a
-> TapState (SVar Stream m a) s a
-> m (Step (TapState (SVar Stream m a) s a) a)
step State Stream m a
gst TapState (SVar Stream m a) s a
TapInit = do
SVar Stream m a
sv <- State Stream m a -> Fold m a b -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a b.
MonadAsync m =>
State t m a -> Fold m a b -> m (SVar t m a)
SVar.newFoldSVarF State Stream m a
gst Fold m a b
f
Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. s -> Step s a
Skip (SVar Stream m a -> s -> TapState (SVar Stream m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar Stream m a
sv s
state1)
step State Stream m a
gst (Tapping SVar Stream m a
sv s
st) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
case Step s a
r of
Yield a
a s
s -> do
Bool
done <- SVar Stream m a -> a -> m Bool
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> a -> m Bool
SVar.pushToFold SVar Stream m a
sv a
a
if Bool
done
then do
SVar Stream m a -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar Stream m a -> m ()
stopFold SVar Stream m a
sv
Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar Stream m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
else Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ a
-> TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (SVar Stream m a -> s -> TapState (SVar Stream m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar Stream m a
sv s
s)
Skip s
s -> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. s -> Step s a
Skip (SVar Stream m a -> s -> TapState (SVar Stream m a) s a
forall fs st a. fs -> st -> TapState fs st a
Tapping SVar Stream m a
sv s
s)
Step s a
Stop -> do
SVar Stream m a -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
SVar Stream m a -> m ()
stopFold SVar Stream m a
sv
Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (TapState (SVar Stream m a) s a) a
forall s a. Step s a
Stop
step State Stream m a
gst (TapDone s
st) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a))
-> Step (TapState (SVar Stream m a) s a) a
-> m (Step (TapState (SVar Stream m a) s a) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
Yield a
a s
s -> a
-> TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. a -> s -> Step s a
Yield a
a (s -> TapState (SVar Stream m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
Skip s
s -> TapState (SVar Stream m a) s a
-> Step (TapState (SVar Stream m a) s a) a
forall s a. s -> Step s a
Skip (s -> TapState (SVar Stream m a) s a
forall fs st a. st -> TapState fs st a
TapDone s
s)
Step s a
Stop -> Step (TapState (SVar Stream m a) s a) a
forall s a. Step s a
Stop
{-# INLINE distributeAsync_ #-}
distributeAsync_ :: (Foldable f, IsStream t, MonadAsync m)
=> f (t m a -> m b) -> t m a -> t m a
distributeAsync_ :: f (t m a -> m b) -> t m a -> t m a
distributeAsync_ = (t m a -> f (t m a -> m b) -> t m a)
-> f (t m a -> m b) -> t m a -> t m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip (((t m a -> m b) -> t m a -> t m a)
-> t m a -> f (t m a -> m b) -> t m a
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr (t m a -> m b) -> t m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a b.
(IsStream t, MonadAsync m) =>
(t m a -> m b) -> t m a -> t m a
tapAsync)
newtype ParallelT m a = ParallelT {ParallelT m a -> Stream m a
getParallelT :: Stream m a}
deriving (m a -> ParallelT m a
(forall (m :: * -> *) a. Monad m => m a -> ParallelT m a)
-> MonadTrans ParallelT
forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> ParallelT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> ParallelT m a
MonadTrans)
type Parallel = ParallelT IO
fromParallel :: IsStream t => ParallelT m a -> t m a
fromParallel :: ParallelT m a -> t m a
fromParallel = ParallelT m a -> t m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
adapt
instance IsStream ParallelT where
toStream :: ParallelT m a -> Stream m a
toStream = ParallelT m a -> Stream m a
forall (m :: * -> *) a. ParallelT m a -> Stream m a
getParallelT
fromStream :: Stream m a -> ParallelT m a
fromStream = Stream m a -> ParallelT m a
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> ParallelT IO a -> ParallelT IO a #-}
consM :: m a -> ParallelT m a -> ParallelT m a
consM = m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
m a -> ParallelT m a -> ParallelT m a
consMParallel
{-# INLINE (|:) #-}
{-# SPECIALIZE (|:) :: IO a -> ParallelT IO a -> ParallelT IO a #-}
|: :: m a -> ParallelT m a -> ParallelT m a
(|:) = m a -> ParallelT m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
m a -> t m a -> t m a
consM
{-# INLINE mappendParallel #-}
{-# SPECIALIZE mappendParallel :: ParallelT IO a -> ParallelT IO a -> ParallelT IO a #-}
mappendParallel :: MonadAsync m => ParallelT m a -> ParallelT m a -> ParallelT m a
mappendParallel :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappendParallel ParallelT m a
m1 ParallelT m a
m2 = Stream m a -> ParallelT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m a -> ParallelT m a) -> Stream m a -> ParallelT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallel (ParallelT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream ParallelT m a
m1) (ParallelT m a -> Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a -> Stream m a
toStream ParallelT m a
m2)
instance MonadAsync m => Semigroup (ParallelT m a) where
<> :: ParallelT m a -> ParallelT m a -> ParallelT m a
(<>) = ParallelT m a -> ParallelT m a -> ParallelT m a
forall (m :: * -> *) a.
MonadAsync m =>
ParallelT m a -> ParallelT m a -> ParallelT m a
mappendParallel
instance MonadAsync m => Monoid (ParallelT m a) where
mempty :: ParallelT m a
mempty = ParallelT m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
t m a
K.nil
mappend :: ParallelT m a -> ParallelT m a -> ParallelT m a
mappend = ParallelT m a -> ParallelT m a -> ParallelT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apParallel #-}
{-# SPECIALIZE apParallel :: ParallelT IO (a -> b) -> ParallelT IO a -> ParallelT IO b #-}
apParallel :: MonadAsync m => ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel (ParallelT Stream m (a -> b)
m1) (ParallelT Stream m a
m2) =
let f :: (a -> b) -> Stream m b
f a -> b
x1 = (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
K.concatMapBy Stream m b -> Stream m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallel (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> ParallelT m b
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT (Stream m b -> ParallelT m b) -> Stream m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> (a -> t m b) -> t m a -> t m b
K.concatMapBy Stream m b -> Stream m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallel (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (ParallelT m) where
{-# INLINE pure #-}
pure :: a -> ParallelT m a
pure = Stream m a -> ParallelT m a
forall (m :: * -> *) a. Stream m a -> ParallelT m a
ParallelT (Stream m a -> ParallelT m a)
-> (a -> Stream m a) -> a -> ParallelT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall (t :: (* -> *) -> * -> *) a (m :: * -> *).
IsStream t =>
a -> t m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
(<*>) = ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m (a -> b) -> ParallelT m a -> ParallelT m b
apParallel
{-# INLINE bindParallel #-}
{-# SPECIALIZE bindParallel :: ParallelT IO a -> (a -> ParallelT IO b) -> ParallelT IO b #-}
bindParallel :: MonadAsync m => ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bindParallel :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bindParallel ParallelT m a
m a -> ParallelT m b
f = Stream m b -> ParallelT m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IsStream t =>
Stream m a -> t m a
fromStream (Stream m b -> ParallelT m b) -> Stream m b -> ParallelT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) b a.
IsStream t =>
(t m b -> t m b -> t m b) -> t m a -> (a -> t m b) -> t m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, MonadAsync m) =>
t m a -> t m a -> t m a
parallel (ParallelT m a -> Stream m a
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt ParallelT m a
m) (\a
a -> ParallelT m b -> Stream m b
forall (t1 :: (* -> *) -> * -> *) (t2 :: (* -> *) -> * -> *)
(m :: * -> *) a.
(IsStream t1, IsStream t2) =>
t1 m a -> t2 m a
K.adapt (ParallelT m b -> Stream m b) -> ParallelT m b -> Stream m b
forall a b. (a -> b) -> a -> b
$ a -> ParallelT m b
f a
a)
instance MonadAsync m => Monad (ParallelT m) where
return :: a -> ParallelT m a
return = a -> ParallelT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
(>>=) = ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
forall (m :: * -> *) a b.
MonadAsync m =>
ParallelT m a -> (a -> ParallelT m b) -> ParallelT m b
bindParallel
MONAD_COMMON_INSTANCES(ParallelT, MONADPARALLEL)
{-# INLINE_NORMAL newCallbackStream #-}
newCallbackStream :: (K.IsStream t, MonadAsync m) => m (a -> m (), t m a)
newCallbackStream :: m (a -> m (), t m a)
newCallbackStream = do
SVar Any m a
sv <- SVarStopStyle -> State Any m a -> m (SVar Any m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
StopNone State Any m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a. State t m a
defState
IO ThreadId -> m ThreadId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ThreadId
myThreadId m ThreadId -> (ThreadId -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= SVar Any m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar Any m a
sv
let callback :: a -> m ()
callback a
a = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar Any m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Any m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
(a -> m (), t m a) -> m (a -> m (), t m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> m ()
forall (m :: * -> *). MonadIO m => a -> m ()
callback, Stream m a -> t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(IsStream t, Monad m) =>
Stream m a -> t m a
D.fromStreamD (SVar Any m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
SVar.fromSVarD SVar Any m a
sv))