{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
#include "inline.hs"
module Streamly.Internal.Data.Stream.Async {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" from streamly package instead." #-}
(
AsyncT(..)
, Async
, consMAsync
, asyncK
, mkAsyncK
, mkAsyncD
, WAsyncT(..)
, WAsync
, consMWAsync
, wAsyncK
)
where
import Control.Concurrent (myThreadId)
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
#endif
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Concurrent.MVar (newEmptyMVar)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Trans.Class (MonadTrans(lift))
#endif
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, newQ, nullQ, tryPopR, pushL)
import Data.IORef (IORef, newIORef, readIORef)
import Data.Maybe (fromJust)
import Prelude hiding (map)
import qualified Data.Set as S
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, askRunInIO, restoreM)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Streamly.Internal.Data.StreamK (Stream)
import Streamly.Internal.Data.Stream.SVar.Generate (fromSVar, fromSVarD)
import qualified Streamly.Internal.Data.StreamK as K
(foldStreamShared, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream as D
(Stream(..), Step(..), mapM, toStreamK, fromStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (toStreamK)
import Streamly.Internal.Data.SVar
#include "Instances.hs"
{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
m =
(forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
let single :: a -> m r
single = (r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> (a -> m r) -> a -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = (r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> m r -> m r
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> m r
yld a
a ((r -> r) -> Stream m a -> Stream m a
forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
r)
in State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yieldk a -> m r
single ((r -> r) -> m r -> m r
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) Stream m a
m
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
SVar t m a -> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar t m a
sv IORef [(RunInIO m, t m a)]
q (RunInIO m, t m a)
m = do
IORef [(RunInIO m, t m a)]
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef [(RunInIO m, t m a)]
q (([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ())
-> ([(RunInIO m, t m a)] -> [(RunInIO m, t m a)]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[(RunInIO m, t m a)]
ms -> (RunInIO m, t m a)
m (RunInIO m, t m a) -> [(RunInIO m, t m a)] -> [(RunInIO m, t m a)]
forall a. a -> [a] -> [a]
: [(RunInIO m, t m a)]
ms
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: MonadRunInIO m
=> IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO IORef [(RunInIO m, Stream m a)]
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- m (Maybe (RunInIO m, Stream m a))
dequeue
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
if Bool
res
then State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q (RunInIO m
runInIO, Stream m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (RunInIO m, Stream m a))
dequeue = IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ IORef [(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [(RunInIO m, Stream m a)]
q (([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a)))
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], Maybe (RunInIO m, Stream m a)
forall a. Maybe a
Nothing)
(RunInIO m, Stream m a)
x : [(RunInIO m, Stream m a)]
xs -> ([(RunInIO m, Stream m a)]
xs, (RunInIO m, Stream m a) -> Maybe (RunInIO m, Stream m a)
forall a. a -> Maybe a
Just (RunInIO m, Stream m a)
x)
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: forall m a. MonadRunInIO m
=> IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited IORef [(RunInIO m, Stream m a)]
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
incrContinue :: m WorkerStatus
incrContinue = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- m (Maybe (RunInIO m, Stream m a))
dequeue
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q ((forall b. m b -> IO (StM m b)) -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m)
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q (RunInIO m
runInIO, Stream m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
dequeue :: m (Maybe (RunInIO m, Stream m a))
dequeue = IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ IORef [(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [(RunInIO m, Stream m a)]
q (([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a)))
-> ([(RunInIO m, Stream m a)]
-> ([(RunInIO m, Stream m a)], Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ \case
[] -> ([], Maybe (RunInIO m, Stream m a)
forall a. Maybe a
Nothing)
(RunInIO m, Stream m a)
x : [(RunInIO m, Stream m a)]
xs -> ([(RunInIO m, Stream m a)]
xs, (RunInIO m, Stream m a) -> Maybe (RunInIO m, Stream m a)
forall a. a -> Maybe a
Just (RunInIO m, Stream m a)
x)
{-# INLINE enqueueFIFO #-}
enqueueFIFO ::
SVar t m a
-> LinkedQueue (RunInIO m, t m a)
-> (RunInIO m, t m a)
-> IO ()
enqueueFIFO :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar t m a
sv LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m = do
LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
forall a. LinkedQueue a -> a -> IO ()
pushL LinkedQueue (RunInIO m, t m a)
q (RunInIO m, t m a)
m
SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv
{-# INLINE workLoopFIFO #-}
workLoopFIFO
:: MonadRunInIO m
=> LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO LinkedQueue (RunInIO m, Stream m a)
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, Stream m a)
-> IO (Maybe (RunInIO m, Stream m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, Stream m a)
q
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q (RunInIO m
runInIO, Stream m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
{-# INLINE workLoopFIFOLimited #-}
workLoopFIFOLimited
:: forall m a. MonadRunInIO m
=> LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited LinkedQueue (RunInIO m, Stream m a)
q State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = m ()
run
where
stop :: m ()
stop = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
incrContinue :: m WorkerStatus
incrContinue = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
Maybe (RunInIO m, Stream m a)
work <- IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a)))
-> IO (Maybe (RunInIO m, Stream m a))
-> m (Maybe (RunInIO m, Stream m a))
forall a b. (a -> b) -> a -> b
$ LinkedQueue (RunInIO m, Stream m a)
-> IO (Maybe (RunInIO m, Stream m a))
forall a. LinkedQueue a -> IO (Maybe a)
tryPopR LinkedQueue (RunInIO m, Stream m a)
q
case Maybe (RunInIO m, Stream m a)
work of
Maybe (RunInIO m, Stream m a)
Nothing -> m ()
stop
Just (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> m ()
stop
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q ((forall b. m b -> IO (StM m b)) -> RunInIO m
forall (m :: * -> *). (forall b. m b -> IO (StM m b)) -> RunInIO m
RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
m)
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> Stream m a -> m WorkerStatus
yieldk a
a Stream m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q (RunInIO m
runInIO, Stream m a
r)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
else IO WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO WorkerStatus -> m WorkerStatus)
-> IO WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ do
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
WorkerStatus -> IO WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
getLifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getLifoSVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getLifoSVar State StreamK m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
IORef [(RunInIO m, Stream m a)]
q <- [(RunInIO m, Stream m a)] -> IO (IORef [(RunInIO m, Stream m a)])
forall a. a -> IO (IORef a)
newIORef ([] :: [(RunInIO m, Stream m a)])
Maybe (IORef Count)
yl <- case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State StreamK m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State StreamK m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = [(RunInIO m, Stream m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([(RunInIO m, Stream m a)] -> Bool)
-> IO [(RunInIO m, Stream m a)] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [(RunInIO m, Stream m a)] -> IO [(RunInIO m, Stream m a)]
forall a. IORef a -> IO a
readIORef IORef [(RunInIO m, Stream m a)]
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- [(RunInIO m, Stream m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([(RunInIO m, Stream m a)] -> Bool)
-> IO [(RunInIO m, Stream m a)] -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef [(RunInIO m, Stream m a)] -> IO [(RunInIO m, Stream m a)]
forall a. IORef a -> IO a
readIORef IORef [(RunInIO m, Stream m a)]
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a -> m Bool
postProc SVar StreamK m a -> IO Bool
workDone IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State StreamK m a
st) (State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a
sv
, postProcess :: m Bool
postProcess = SVar StreamK m a -> m Bool
postProc SVar StreamK m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop IORef [(RunInIO m, Stream m a)]
q State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} SVar StreamK m a
sv
, enqueue :: (RunInIO m, Stream m a) -> IO ()
enqueue = SVar StreamK m a
-> IORef [(RunInIO m, Stream m a)]
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> IORef [(RunInIO m, t m a)] -> (RunInIO m, t m a) -> IO ()
enqueueLIFO SVar StreamK m a
sv IORef [(RunInIO m, Stream m a)]
q
, isWorkDone :: IO Bool
isWorkDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
AsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar StreamK m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar StreamK m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State StreamK m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State StreamK m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = IORef ([Stream m a], Int)
forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar StreamK m a
sv =
case State StreamK m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State StreamK m a
st of
Maybe Rate
Nothing ->
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar StreamK m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited
Just Rate
_ ->
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar StreamK m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef [(RunInIO m, Stream m a)]
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited
in SVar StreamK m a -> IO (SVar StreamK m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar StreamK m a
sv
getFifoSVar :: forall m a. MonadAsync m
=> State Stream m a -> RunInIO m -> IO (SVar Stream m a)
getFifoSVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getFifoSVar State StreamK m a
st RunInIO m
mrun = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
LinkedQueue (RunInIO m, Stream m a)
q <- IO (LinkedQueue (RunInIO m, Stream m a))
forall a. IO (LinkedQueue a)
newQ
Maybe (IORef Count)
yl <- case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- State StreamK m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State StreamK m a
st
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = LinkedQueue (RunInIO m, Stream m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, Stream m a)
q
let isWorkFinishedLimited :: SVar t m a -> IO Bool
isWorkFinishedLimited SVar t m a
sv = do
Bool
yieldsDone <-
case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
Just IORef Count
ref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- LinkedQueue (RunInIO m, Stream m a) -> IO Bool
forall a. LinkedQueue a -> IO Bool
nullQ LinkedQueue (RunInIO m, Stream m a)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let getSVar :: SVar Stream m a
-> (SVar Stream m a -> m [ChildEvent a])
-> (SVar Stream m a -> m Bool)
-> (SVar Stream m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m())
-> SVar Stream m a
getSVar :: SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a -> m Bool
postProc SVar StreamK m a -> IO Bool
workDone LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop = SVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVarStyle
-> RunInIO m
-> SVarStopStyle
-> IORef ThreadId
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> Limit
-> Limit
-> IORef Count
-> PushBufferPolicy
-> MVar ()
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> ((RunInIO m, t m a) -> IO ())
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> SVarStats
-> Maybe (IORef ())
-> Bool
-> ThreadId
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IORef ([t m a], Int)
-> SVar t m a
SVar
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st
, pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. HasCallStack => a
undefined
, pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
, pushBufferMVar :: MVar ()
pushBufferMVar = MVar ()
forall a. HasCallStack => a
undefined
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State StreamK m a
st) (State StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State StreamK m a
st)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
, readOutputQ :: m [ChildEvent a]
readOutputQ = SVar StreamK m a -> m [ChildEvent a]
readOutput SVar StreamK m a
sv
, postProcess :: m Bool
postProcess = SVar StreamK m a -> m Bool
postProc SVar StreamK m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop = LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
wloop LinkedQueue (RunInIO m, Stream m a)
q State StreamK m a
st{streamVar :: Maybe (SVar StreamK m a)
streamVar = SVar StreamK m a -> Maybe (SVar StreamK m a)
forall a. a -> Maybe a
Just SVar StreamK m a
sv} SVar StreamK m a
sv
, enqueue :: (RunInIO m, Stream m a) -> IO ()
enqueue = SVar StreamK m a
-> LinkedQueue (RunInIO m, Stream m a)
-> (RunInIO m, Stream m a)
-> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> LinkedQueue (RunInIO m, t m a) -> (RunInIO m, t m a) -> IO ()
enqueueFIFO SVar StreamK m a
sv LinkedQueue (RunInIO m, Stream m a)
q
, isWorkDone :: IO Bool
isWorkDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, isQueueDone :: IO Bool
isQueueDone = SVar StreamK m a -> IO Bool
workDone SVar StreamK m a
sv
, needDoorBell :: IORef Bool
needDoorBell = IORef Bool
wfw
, svarStyle :: SVarStyle
svarStyle = SVarStyle
WAsyncVar
, svarStopStyle :: SVarStopStyle
svarStopStyle = SVarStopStyle
StopNone
, svarStopBy :: IORef ThreadId
svarStopBy = IORef ThreadId
forall a. HasCallStack => a
undefined
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = SVar StreamK m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar StreamK m a
sv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
forall a. HasCallStack => a
undefined
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = State StreamK m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State StreamK m a
st
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, aheadWorkQueue :: IORef ([Stream m a], Int)
aheadWorkQueue = IORef ([Stream m a], Int)
forall a. HasCallStack => a
undefined
, outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outputHeap = IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
forall a. HasCallStack => a
undefined
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: SVar StreamK m a
sv =
case State StreamK m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State StreamK m a
st of
Maybe Rate
Nothing ->
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar StreamK m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited
Just Rate
_ ->
case State StreamK m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State StreamK m a
st of
Maybe Count
Nothing -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar StreamK m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFO
Just Count
_ -> SVar StreamK m a
-> (SVar StreamK m a -> m [ChildEvent a])
-> (SVar StreamK m a -> m Bool)
-> (SVar StreamK m a -> IO Bool)
-> (LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> SVar StreamK m a
getSVar SVar StreamK m a
sv SVar StreamK m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced
SVar StreamK m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkFinishedLimited
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
LinkedQueue (RunInIO m, Stream m a)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopFIFOLimited
in SVar StreamK m a -> IO (SVar StreamK m a)
forall (m :: * -> *) a. Monad m => a -> m a
return SVar StreamK m a
sv
{-# INLINABLE newAsyncVar #-}
newAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newAsyncVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar State StreamK m a
st Stream m a
m = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
SVar StreamK m a
sv <- IO (SVar StreamK m a) -> m (SVar StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar StreamK m a) -> m (SVar StreamK m a))
-> IO (SVar StreamK m a) -> m (SVar StreamK m a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getLifoSVar State StreamK m a
st RunInIO m
mrun
SVar StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar StreamK m a
sv Stream m a
m
{-# INLINABLE mkAsyncK #-}
mkAsyncK :: MonadAsync m => Stream m a -> Stream m a
mkAsyncK :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkAsyncK Stream m a
m = (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- State StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar (State StreamK m a -> State StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a (n :: * -> *) b.
State t m a -> State t n b
adaptState State StreamK m a
st) Stream m a
m
State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> Stream m a) -> SerialT m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv
{-# INLINE_NORMAL mkAsyncD #-}
mkAsyncD :: MonadAsync m => D.Stream m a -> D.Stream m a
mkAsyncD :: forall (m :: * -> *) a. MonadAsync m => Stream m a -> Stream m a
mkAsyncD Stream m a
m = (State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a))
-> Maybe (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step Maybe (Stream m a)
forall a. Maybe a
Nothing
where
step :: State StreamK m a
-> Maybe (Stream m a) -> m (Step (Maybe (Stream m a)) a)
step State StreamK m a
gst Maybe (Stream m a)
Nothing = do
SVar StreamK m a
sv <- State StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar State StreamK m a
gst (Stream m a -> Stream m a
forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
D.toStreamK Stream m a
m)
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
D.Skip (Maybe (Stream m a) -> Step (Maybe (Stream m a)) a)
-> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Stream m a
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> Stream m a
fromSVarD SVar StreamK m a
sv
step State StreamK m a
gst (Just (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
st)) = do
Step s a
r <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
st
Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a))
-> Step (Maybe (Stream m a)) a -> m (Step (Maybe (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
r of
D.Yield a
a s
s -> a -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. a -> s -> Step s a
D.Yield a
a (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
D.Skip s
s -> Maybe (Stream m a) -> Step (Maybe (Stream m a)) a
forall s a. s -> Step s a
D.Skip (Stream m a -> Maybe (Stream m a)
forall a. a -> Maybe a
Just (Stream m a -> Maybe (Stream m a))
-> Stream m a -> Maybe (Stream m a)
forall a b. (a -> b) -> a -> b
$ (State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
s)
Step s a
D.Stop -> Step (Maybe (Stream m a)) a
forall s a. Step s a
D.Stop
{-# INLINABLE newWAsyncVar #-}
newWAsyncVar :: MonadAsync m
=> State Stream m a -> Stream m a -> m (SVar Stream m a)
newWAsyncVar :: forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newWAsyncVar State StreamK m a
st Stream m a
m = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
SVar StreamK m a
sv <- IO (SVar StreamK m a) -> m (SVar StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar StreamK m a) -> m (SVar StreamK m a))
-> IO (SVar StreamK m a) -> m (SVar StreamK m a)
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> RunInIO m -> IO (SVar StreamK m a)
getFifoSVar State StreamK m a
st RunInIO m
mrun
SVar StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar StreamK m a
sv Stream m a
m
forkSVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2 = (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- case SVarStyle
style of
SVarStyle
AsyncVar -> State StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newAsyncVar State StreamK m a
st (Stream m a -> Stream m a -> Stream m a
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
SVarStyle
WAsyncVar -> State StreamK m a -> Stream m a -> m (SVar StreamK m a)
forall (m :: * -> *) a.
MonadAsync m =>
State StreamK m a -> Stream m a -> m (SVar StreamK m a)
newWAsyncVar State StreamK m a
st (Stream m a -> Stream m a -> Stream m a
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
SVarStyle
_ -> [Char] -> m (SVar StreamK m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"illegal svar type"
State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SerialT m a -> Stream m a) -> SerialT m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv
where
concurrently :: StreamK m a -> StreamK m a -> StreamK m a
concurrently StreamK m a
ma StreamK m a
mb = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> (RunInIO m, StreamK m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st) (RunInIO m
runInIO, StreamK m a
mb)
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
ma
{-# INLINE joinStreamVarAsync #-}
joinStreamVarAsync :: MonadAsync m
=> SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync :: forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2 = (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
Just SVar StreamK m a
sv | SVar StreamK m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
style -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> (RunInIO m, Stream m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar StreamK m a
sv (RunInIO m
runInIO, Stream m a
m2)
State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
Maybe (SVar StreamK m a)
_ -> State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (SVarStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
forkSVarAsync SVarStyle
style Stream m a
m1 Stream m a
m2)
{-# INLINE asyncK #-}
asyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
asyncK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK = SVarStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
AsyncVar
{-# INLINE consMAsync #-}
{-# SPECIALIZE consMAsync :: IO a -> AsyncT IO a -> AsyncT IO a #-}
consMAsync :: MonadAsync m => m a -> AsyncT m a -> AsyncT m a
consMAsync :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AsyncT m a -> AsyncT m a
consMAsync m a
m (AsyncT Stream m a
r) = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> Stream m a -> AsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r
newtype AsyncT m a = AsyncT {forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT :: Stream m a}
#if !(MIN_VERSION_transformers(0,6,0))
instance MonadTrans AsyncT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> AsyncT m a
lift = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a)
-> (m a -> Stream m a) -> m a -> AsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
#endif
type Async = AsyncT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: AsyncT IO a -> AsyncT IO a -> AsyncT IO a #-}
append :: MonadAsync m => AsyncT m a -> AsyncT m a -> AsyncT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
append (AsyncT Stream m a
m1) (AsyncT Stream m a
m2) = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> Stream m a -> AsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (AsyncT m a) where
<> :: AsyncT m a -> AsyncT m a -> AsyncT m a
(<>) = AsyncT m a -> AsyncT m a -> AsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
AsyncT m a -> AsyncT m a -> AsyncT m a
append
instance MonadAsync m => Monoid (AsyncT m a) where
mempty :: AsyncT m a
mempty = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT Stream m a
forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: AsyncT m a -> AsyncT m a -> AsyncT m a
mappend = AsyncT m a -> AsyncT m a -> AsyncT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apAsync #-}
{-# SPECIALIZE apAsync :: AsyncT IO (a -> b) -> AsyncT IO a -> AsyncT IO b #-}
apAsync :: MonadAsync m => AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync (AsyncT Stream m (a -> b)
m1) (AsyncT Stream m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> Stream m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> AsyncT m b
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m b -> AsyncT m b) -> Stream m b -> AsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK (a -> b) -> Stream m b
forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (AsyncT m) where
{-# INLINE pure #-}
pure :: forall a. a -> AsyncT m a
pure = Stream m a -> AsyncT m a
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m a -> AsyncT m a) -> (a -> Stream m a) -> a -> AsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
(<*>) = AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m (a -> b) -> AsyncT m a -> AsyncT m b
apAsync
{-# INLINE bindAsync #-}
{-# SPECIALIZE bindAsync ::
AsyncT IO a -> (a -> AsyncT IO b) -> AsyncT IO b #-}
bindAsync :: MonadAsync m => AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync (AsyncT Stream m a
m) a -> AsyncT m b
f = Stream m b -> AsyncT m b
forall (m :: * -> *) a. Stream m a -> AsyncT m a
AsyncT (Stream m b -> AsyncT m b) -> Stream m b -> AsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
asyncK Stream m a
m (AsyncT m b -> Stream m b
forall (m :: * -> *) a. AsyncT m a -> Stream m a
getAsyncT (AsyncT m b -> Stream m b) -> (a -> AsyncT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AsyncT m b
f)
instance MonadAsync m => Monad (AsyncT m) where
return :: forall a. a -> AsyncT m a
return = a -> AsyncT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: forall a b. AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
(>>=) = AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AsyncT m a -> (a -> AsyncT m b) -> AsyncT m b
bindAsync
#if !(MIN_VERSION_transformers(0,6,0))
instance (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AsyncT m) where
liftBase :: forall α. b α -> AsyncT m α
liftBase = b α -> AsyncT m α
forall (t :: (* -> *) -> * -> *) (b :: * -> *) (m :: * -> *) α.
(MonadTrans t, MonadBase b m) =>
b α -> t m α
liftBaseDefault
#endif
MONAD_COMMON_INSTANCES(AsyncT, MONADPARALLEL)
{-# INLINE wAsyncK #-}
wAsyncK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
wAsyncK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK = SVarStyle -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
SVarStyle -> Stream m a -> Stream m a -> Stream m a
joinStreamVarAsync SVarStyle
WAsyncVar
{-# INLINE consMWAsync #-}
{-# SPECIALIZE consMWAsync :: IO a -> WAsyncT IO a -> WAsyncT IO a #-}
consMWAsync :: MonadAsync m => m a -> WAsyncT m a -> WAsyncT m a
consMWAsync :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> WAsyncT m a -> WAsyncT m a
consMWAsync m a
m (WAsyncT Stream m a
r) = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a) -> Stream m a -> WAsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r
newtype WAsyncT m a = WAsyncT {forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT :: Stream m a}
#if !(MIN_VERSION_transformers(0,6,0))
instance MonadTrans WAsyncT where
{-# INLINE lift #-}
lift :: forall (m :: * -> *) a. Monad m => m a -> WAsyncT m a
lift = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a)
-> (m a -> Stream m a) -> m a -> WAsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect
#endif
type WAsync = WAsyncT IO
{-# INLINE wAppend #-}
{-# SPECIALIZE wAppend :: WAsyncT IO a -> WAsyncT IO a -> WAsyncT IO a #-}
wAppend :: MonadAsync m => WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend :: forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend (WAsyncT Stream m a
m1) (WAsyncT Stream m a
m2) = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a) -> Stream m a -> WAsyncT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (WAsyncT m a) where
<> :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
(<>) = WAsyncT m a -> WAsyncT m a -> WAsyncT m a
forall (m :: * -> *) a.
MonadAsync m =>
WAsyncT m a -> WAsyncT m a -> WAsyncT m a
wAppend
instance MonadAsync m => Monoid (WAsyncT m a) where
mempty :: WAsyncT m a
mempty = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT Stream m a
forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: WAsyncT m a -> WAsyncT m a -> WAsyncT m a
mappend = WAsyncT m a -> WAsyncT m a -> WAsyncT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apWAsync #-}
{-# SPECIALIZE apWAsync ::
WAsyncT IO (a -> b) -> WAsyncT IO a -> WAsyncT IO b #-}
apWAsync :: MonadAsync m => WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync (WAsyncT Stream m (a -> b)
m1) (WAsyncT Stream m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> Stream m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> WAsyncT m b
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m b -> WAsyncT m b) -> Stream m b -> WAsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK (a -> b) -> Stream m b
forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (WAsyncT m) where
pure :: forall a. a -> WAsyncT m a
pure = Stream m a -> WAsyncT m a
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m a -> WAsyncT m a)
-> (a -> Stream m a) -> a -> WAsyncT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure
<*> :: forall a b. WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
(<*>) = WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m (a -> b) -> WAsyncT m a -> WAsyncT m b
apWAsync
{-# INLINE bindWAsync #-}
{-# SPECIALIZE bindWAsync ::
WAsyncT IO a -> (a -> WAsyncT IO b) -> WAsyncT IO b #-}
bindWAsync :: MonadAsync m => WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync :: forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync (WAsyncT Stream m a
m) a -> WAsyncT m b
f = Stream m b -> WAsyncT m b
forall (m :: * -> *) a. Stream m a -> WAsyncT m a
WAsyncT (Stream m b -> WAsyncT m b) -> Stream m b -> WAsyncT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
wAsyncK Stream m a
m (WAsyncT m b -> Stream m b
forall (m :: * -> *) a. WAsyncT m a -> Stream m a
getWAsyncT (WAsyncT m b -> Stream m b)
-> (a -> WAsyncT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> WAsyncT m b
f)
instance MonadAsync m => Monad (WAsyncT m) where
return :: forall a. a -> WAsyncT m a
return = a -> WAsyncT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
>>= :: forall a b. WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
(>>=) = WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
forall (m :: * -> *) a b.
MonadAsync m =>
WAsyncT m a -> (a -> WAsyncT m b) -> WAsyncT m b
bindWAsync
#if !(MIN_VERSION_transformers(0,6,0))
instance (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (WAsyncT m) where
liftBase :: forall α. b α -> WAsyncT m α
liftBase = b α -> WAsyncT m α
forall (t :: (* -> *) -> * -> *) (b :: * -> *) (m :: * -> *) α.
(MonadTrans t, MonadBase b m) =>
b α -> t m α
liftBaseDefault
#endif
MONAD_COMMON_INSTANCES(WAsyncT, MONADPARALLEL)