{-# LANGUAGE UndecidableInstances #-}
#include "inline.hs"
module Streamly.Internal.Data.Stream.Async
(
AsyncT(..)
, Async
, consMAsync
, asyncK
, mkAsyncK
, mkAsyncD
, WAsyncT(..)
, WAsync
, consMWAsync
, wAsyncK
)
where
import Control.Concurrent (myThreadId)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR, pushL)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import Prelude hiding (map)
import qualified Data.Set as S
import Streamly.Internal.Control.Concurrent
(MonadAsync, RunInIO(..), captureMonadState)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Streamly.Internal.Data.Stream.Serial (SerialT (..))
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import Streamly.Internal.Data.Stream.SVar.Generate (fromSVar, fromSVarD)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.SVar
#include "Instances.hs"
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
SVar t m a -> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO :: SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar t m a
sv IORef [(RunInIO m, t m a)]
q (RunInIO m, t m a)
m = do
IORef [(RunInIO m, t m a)]
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef [(RunInIO m, t m a)]
q (([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ())
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(RunInIO m, t m a)]
ms -> (RunInIO m, t m a)
m (RunInIO m, t m a) -> [(RunInIO m, t m a)] -> [(RunInIO m, t m a)]
forall a. a -> [a] -> [a]
: [(RunInIO m, t m a)]
ms
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: (MonadIO m, MonadBaseControl IO m)
=> IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO :: IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFO IORef [(RunInIO m, Stream m a)]
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- m (Maybe (RunInIO m, Stream m a))
dequeue
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
if Bool
res
then State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [(RunInIO m, Stream m a)]
q (RunInIO m
runInIO, Stream m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (RunInIO m, Stream m a))
dequeue = IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ IORef [(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [(RunInIO m, Stream m a)]
q (([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a)))
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], Maybe (RunInIO m, Stream m a)
forall a. Maybe a
Nothing)
(RunInIO m, Stream m a)
x : [(RunInIO m, Stream m a)]
xs -> ([(RunInIO m, Stream m a)]
xs, (RunInIO m, Stream m a) -> Maybe (RunInIO m, Stream m a)
forall a. a -> Maybe a
Just (RunInIO m, Stream m a)
x)
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: forall m a. (MonadIO m, MonadBaseControl IO m)
=> IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited :: IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited IORef [(RunInIO m, Stream m a)]
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run
where
incrContinue :: m WorkerStatus
incrContinue = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- m (Maybe (RunInIO m, Stream m a))
dequeue
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar Stream m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [(RunInIO m, Stream m a)]
q ((forall b. m b -> IO (StM m b)) -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m)
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [(RunInIO m, Stream m a)]
q (RunInIO m
runInIO, Stream m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (RunInIO m, Stream m a))
dequeue = IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ IORef [(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [(RunInIO m, Stream m a)]
q (([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a)))
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], Maybe (RunInIO m, Stream m a)
forall a. Maybe a
Nothing)
(RunInIO m, Stream m a)
x : [(RunInIO m, Stream m a)]
xs -> ([(RunInIO m, Stream m a)]
xs, (RunInIO m, Stream m a) -> Maybe (RunInIO m, Stream m a)
forall a. a -> Maybe a
Just (RunInIO m, Stream m a)
x)
{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
SVar t m a
-> LinkedQueue (RunInIO m, t m a)
-> (RunInIO m, t m a)
-> IO ()
enqueueFIFO :: SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar t m a
sv LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m = do
LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: (MonadIO m, MonadBaseControl IO m)
=> LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO :: LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFO LinkedQueue (RunInIO m, Stream m a)
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, Stream m a)
-> IO (Maybe (RunInIO m, Stream m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, Stream m a)
q
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (RunInIO m, Stream m a)
q (RunInIO m
runInIO, Stream m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: forall m a. (MonadIO m, MonadBaseControl IO m)
=> LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited :: LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited LinkedQueue (RunInIO m, Stream m a)
q State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
incrContinue :: m WorkerStatus
incrContinue = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, Stream m a)
-> IO (Maybe (RunInIO m, Stream m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, Stream m a)
q
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall (m :: * -> *). MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar Stream m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (RunInIO m, Stream m a)
q ((forall b. m b -> IO (StM m b)) -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m)
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (RunInIO m, Stream m a)
q (RunInIO m
runInIO, Stream m a
r)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
else IO WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerStatus -> m WorkerStatus)
-> IO WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ do
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
WorkerStatus -> IO WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
getLifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar :: State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar State Stream m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
IORef [(RunInIO m, Stream m a)]
q <- [(RunInIO m, Stream m a)] -> IO (IORef [(RunInIO m, Stream m a)])
forall a. a -> IO (IORef a)
newIORef ([] :: [(RunInIO m, Stream m a)])
Maybe (IORef Count)
yl <- case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State Stream m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State Stream m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = [(RunInIO m, Stream m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([(RunInIO m, Stream m a)] -> Bool)
-> IO [(RunInIO m, Stream m a)] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [(RunInIO m, Stream m a)] -> IO [(RunInIO m, Stream m a)]
forall a. IORef a -> IO a
readIORef IORef [(RunInIO m, Stream m a)]
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- [(RunInIO m, Stream m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([(RunInIO m, Stream m a)] -> Bool)
-> IO [(RunInIO m, Stream m a)] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [(RunInIO m, Stream m a)] -> IO [(RunInIO m, Stream m a)]
forall a. IORef a -> IO a
readIORef IORef [(RunInIO m, Stream m a)]
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a -> m Bool
postProc SVar Stream m a -> IO Bool
workDone IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State Stream m a
st) (State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a
sv
, postProcess :: m Bool
postProcess = SVar Stream m a -> m Bool
postProc SVar Stream m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop IORef [(RunInIO m, Stream m a)]
q State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} SVar Stream m a
sv
, enqueue :: (RunInIO m, Stream m a) -> IO ()
enqueue = SVar Stream m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar Stream m a
sv IORef [(RunInIO m, Stream m a)]
q
, isWorkDone :: IO Bool
isWorkDone = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
AsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar Stream m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar Stream m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State Stream m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = IORef ([Stream m a], Int)
forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar Stream m a
sv =
case State Stream m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State Stream m a
st of
Maybe Rate
Nothing ->
case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar Stream m a -> IO Bool
forall p. p -> IO Bool
isWorkFinished
IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
Just Rate
_ ->
case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar Stream m a -> IO Bool
forall p. p -> IO Bool
isWorkFinished
IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef [(RunInIO m, Stream m a)]
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
in SVar Stream m a -> IO (SVar Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv
getFifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar :: State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar State Stream m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
LinkedQueue (RunInIO m, Stream m a)
q <- IO (LinkedQueue (RunInIO m, Stream m a))
forall a. IO (LinkedQueue a)
newQ
Maybe (IORef Count)
yl <- case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State Stream m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State Stream m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = LinkedQueue (RunInIO m, Stream m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, Stream m a)
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- LinkedQueue (RunInIO m, Stream m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, Stream m a)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a -> m Bool
postProc SVar Stream m a -> IO Bool
workDone LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State Stream m a
st) (State Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State Stream m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar Stream m a -> m [ChildEvent a]
readOutput SVar Stream m a
sv
, postProcess :: m Bool
postProcess = SVar Stream m a -> m Bool
postProc SVar Stream m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
wloop LinkedQueue (RunInIO m, Stream m a)
q State Stream m a
st{streamVar :: Maybe (SVar Stream m a)
streamVar = SVar Stream m a -> Maybe (SVar Stream m a)
forall a. a -> Maybe a
Just SVar Stream m a
sv} SVar Stream m a
sv
, enqueue :: (RunInIO m, Stream m a) -> IO ()
enqueue = SVar Stream m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar Stream m a
sv LinkedQueue (RunInIO m, Stream m a)
q
, isWorkDone :: IO Bool
isWorkDone = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar Stream m a -> IO Bool
workDone SVar Stream m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
WAsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar Stream m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar Stream m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State Stream m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = IORef ([Stream m a], Int)
forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar Stream m a
sv =
case State Stream m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State Stream m a
st of
Maybe Rate
Nothing ->
case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar Stream m a -> IO Bool
forall p. p -> IO Bool
isWorkFinished
LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
Just Count
_ -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
Just Rate
_ ->
case State Stream m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State Stream m a
st of
Maybe Count
Nothing -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar Stream m a -> IO Bool
forall p. p -> IO Bool
isWorkFinished
LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFO
Just Count
_ -> SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ())
-> SVar Stream m a
getSVar SVar Stream m a
sv SVar Stream m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar Stream m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a -> SVar Stream m a -> Maybe WorkerInfo -> m ()
workLoopFIFOLimited
in SVar Stream m a -> IO (SVar Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar Stream m a
sv
{-# INLINABLE newAsyncVar #-}
newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar :: State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar State Stream m a
st Stream m a
m = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
SVar Stream m a
sv <- IO (SVar Stream m a) -> m (SVar Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar Stream m a) -> m (SVar Stream m a))
-> IO (SVar Stream m a) -> m (SVar Stream m a)
forall a b. (a -> b) -> a -> b
$ State Stream m a -> RunInIO m -> IO (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar State Stream m a
st RunInIO m
mrun
SVar Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar Stream m a
sv Stream m a
m
{-# INLINABLE mkAsyncK #-}
mkAsyncK :: MonadAsync m => Stream m a -> Stream m a
mkAsyncK :: Stream m a -> Stream m a
mkAsyncK Stream m a
m = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- State Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar (State Stream m a -> State Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State Stream m a
st) Stream m a
m
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m a -> Stream m a) -> SerialT m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
fromSVar SVar Stream m a
sv
{-# INLINE_NORMAL mkAsyncD #-}
mkAsyncD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkAsyncD :: Stream m a -> Stream m a
mkAsyncD Stream m a
m = (State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
where
step :: State Stream m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State Stream m a
gst Maybe (Stream m a)
Nothing = do
SVar Stream m a
sv <- State Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar State Stream m a
gst (Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> Stream m a
D.toStreamK Stream m a
m)
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
D.Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromSVarD SVar Stream m a
sv
step State Stream m a
gst (Just (D.UnStream State Stream m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State Stream m a -> s -> m (Step s a)
step1 State Stream m a
gst s
st
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
D.Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
D.Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
D.Skip s
s -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
D.Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State Stream m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State Stream m a -> s -> m (Step s a)
step1 s
s)
Step s a
D.Stop -> Step (Maybe (Stream m a)) a
forall s a. Step s a
D.Stop
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar :: State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar State Stream m a
st Stream m a
m = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
SVar Stream m a
sv <- IO (SVar Stream m a) -> m (SVar Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar Stream m a) -> m (SVar Stream m a))
-> IO (SVar Stream m a) -> m (SVar Stream m a)
forall a b. (a -> b) -> a -> b
$ State Stream m a -> RunInIO m -> IO (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar State Stream m a
st RunInIO m
mrun
SVar Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar Stream m a
sv Stream m a
m
forkSVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync :: SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2 = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- case SVarStyle
style of
SVarStyle
AsyncVar -> State Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar State Stream m a
st (Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
(MonadBaseControl IO m, MonadIO m) =>
Stream m a -> Stream m a -> Stream m a
concurrently Stream m a
m1 Stream m a
m2)
SVarStyle
WAsyncVar -> State Stream m a -> Stream m a -> m (SVar Stream m a)
forall (m :: * -> *) a.
MonadAsync m =>
State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar State Stream m a
st (Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
(MonadBaseControl IO m, MonadIO m) =>
Stream m a -> Stream m a -> Stream m a
concurrently Stream m a
m1 Stream m a
m2)
SVarStyle
_ -> [Char] -> m (SVar Stream m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"illegal svar type"
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SerialT m a -> Stream m a) -> SerialT m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
fromSVar SVar Stream m a
sv
where
concurrently :: Stream m a -> Stream m a -> Stream m a
concurrently Stream m a
ma Stream m a
mb = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> (RunInIO m, Stream m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (Maybe (SVar Stream m a) -> SVar Stream m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st) (RunInIO m
runInIO, Stream m a
mb)
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
ma
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync :: SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2 = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
Just SVar Stream m a
sv | SVar Stream m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> (RunInIO m, Stream m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar Stream m a
sv (RunInIO m
runInIO, Stream m a
m2)
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
Maybe (SVar Stream m a)
_ -> State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (SVarStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2)
{-# INLINE asyncK #-}
asyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
asyncK :: Stream m a -> Stream m a -> Stream m a
asyncK = SVarStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
AsyncVar
{-# INLINE consMAsync #-}
{-# SPECIALIZE consMAsync :: IO a -> AsyncT IO a -> AsyncT IO a #-}
consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a
consMAsync :: m a -> AsyncT m a -> AsyncT m a
consMAsync m a
m (AsyncT Stream m a
r) = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> Stream m a -> AsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
K.fromEffect m a
m) Stream m a
r
newtype AsyncT m a = AsyncT {AsyncT m a -> Stream m a
getAsyncT :: Stream m a}
deriving (m a -> AsyncT m a
(forall (m :: * -> *) a. Monad m => m a -> AsyncT m a)
-> MonadTrans AsyncT
forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> AsyncT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
MonadTrans)
type Async = AsyncT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: AsyncT IO a -> AsyncT IO a -> AsyncT IO a #-}
append :: MonadAsync m => AsyncT m a -> AsyncT m a -> AsyncT m a
append :: AsyncT m a -> AsyncT m a -> AsyncT m a
append (AsyncT Stream m a
m1) (AsyncT Stream m a
m2) = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> Stream m a -> AsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (AsyncT m a) where
<> :: AsyncT m a -> AsyncT m a -> AsyncT m a
(<>) = AsyncT m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
append
instance MonadAsync m => Monoid (AsyncT m a) where
mempty :: AsyncT m a
mempty = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT Stream m a
forall (m :: * -> *) a. Stream m a
K.nil
mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a
mappend = AsyncT m a -> AsyncT m a -> AsyncT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apAsync #-}
{-# SPECIALIZE apAsync :: AsyncT IO (a -> b) -> AsyncT IO a -> AsyncT IO b #-}
apAsync :: MonadAsync m => AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync (AsyncT Stream m (a -> b)
m1) (AsyncT Stream m a
m2) =
let f :: (a -> b) -> Stream m b
f a -> b
x1 = (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> AsyncT m b
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m b -> AsyncT m b) -> Stream m b -> AsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (AsyncT m) where
{-# INLINE pure #-}
pure :: a -> AsyncT m a
pure = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> (a -> Stream m a) -> a -> AsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> Stream m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
(<*>) = AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync
{-# INLINE bindAsync #-}
{-# SPECIALIZE bindAsync ::
AsyncT IO a -> (a -> AsyncT IO b) -> AsyncT IO b #-}
bindAsync :: MonadAsync m => AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync (AsyncT Stream m a
m) a -> AsyncT m b
f = Stream m b -> AsyncT m b
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m b -> AsyncT m b) -> Stream m b -> AsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK Stream m a
m (AsyncT m b -> Stream m b
forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT (AsyncT m b -> Stream m b) -> (a -> AsyncT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AsyncT m b
f)
instance MonadAsync m => Monad (AsyncT m) where
return :: a -> AsyncT m a
return = a -> AsyncT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
(>>=) = AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync
MONAD_COMMON_INSTANCES(AsyncT, MONADPARALLEL)
{-# INLINE wAsyncK #-}
wAsyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
wAsyncK :: Stream m a -> Stream m a -> Stream m a
wAsyncK = SVarStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
WAsyncVar
{-# INLINE consMWAsync #-}
{-# SPECIALIZE consMWAsync :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a
consMWAsync :: m a -> WAsyncT m a -> WAsyncT m a
consMWAsync m a
m (WAsyncT Stream m a
r) = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a) -> Stream m a -> WAsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
K.fromEffect m a
m) Stream m a
r
newtype WAsyncT m a = WAsyncT {WAsyncT m a -> Stream m a
getWAsyncT :: Stream m a}
deriving (m a -> WAsyncT m a
(forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a)
-> MonadTrans WAsyncT
forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> WAsyncT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
MonadTrans)
type WAsync = WAsyncT IO
{-# INLINE wAppend #-}
{-# SPECIALIZE wAppend :: WAsyncT IO a -> WAsyncT IO a -> WAsyncT IO a #-}
wAppend :: MonadAsync m => WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend (WAsyncT Stream m a
m1) (WAsyncT Stream m a
m2) = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a) -> Stream m a -> WAsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (WAsyncT m a) where
<> :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
(<>) = WAsyncT m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend
instance MonadAsync m => Monoid (WAsyncT m a) where
mempty :: WAsyncT m a
mempty = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT Stream m a
forall (m :: * -> *) a. Stream m a
K.nil
mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappend = WAsyncT m a -> WAsyncT m a -> WAsyncT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apWAsync #-}
{-# SPECIALIZE apWAsync ::
WAsyncT IO (a -> b) -> WAsyncT IO a -> WAsyncT IO b #-}
apWAsync :: MonadAsync m => WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync (WAsyncT Stream m (a -> b)
m1) (WAsyncT Stream m a
m2) =
let f :: (a -> b) -> Stream m b
f a -> b
x1 = (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> WAsyncT m b
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m b -> WAsyncT m b) -> Stream m b -> WAsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (WAsyncT m) where
pure :: a -> WAsyncT m a
pure = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a)
-> (a -> Stream m a) -> a -> WAsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> Stream m a
K.fromPure
<*> :: WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
(<*>) = WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync
{-# INLINE bindWAsync #-}
{-# SPECIALIZE bindWAsync ::
WAsyncT IO a -> (a -> WAsyncT IO b) -> WAsyncT IO b #-}
bindWAsync :: MonadAsync m => WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync (WAsyncT Stream m a
m) a -> WAsyncT m b
f = Stream m b -> WAsyncT m b
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m b -> WAsyncT m b) -> Stream m b -> WAsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK Stream m a
m (WAsyncT m b -> Stream m b
forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT (WAsyncT m b -> Stream m b)
-> (a -> WAsyncT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> WAsyncT m b
f)
instance MonadAsync m => Monad (WAsyncT m) where
return :: a -> WAsyncT m a
return = a -> WAsyncT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
(>>=) = WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync
MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)