{-# LANGUAGE UndecidableInstances #-}
{-# OPTIONS_GHC -fno-warn-deprecations #-}
{-# OPTIONS_GHC -Wno-redundant-constraints #-}
module Streamly.Internal.Data.Stream.Ahead {-# DEPRECATED "Please use \"Streamly.Internal.Data.Stream.Concurrent\" from streamly package instead." #-}
(
AheadT(..)
, Ahead
, aheadK
, consM
)
where
import Control.Concurrent.MVar (putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (void, when)
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
#endif
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
#if !(MIN_VERSION_transformers(0,6,0))
import Control.Monad.Trans.Class (MonadTrans(lift))
#endif
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Maybe (fromJust)
import GHC.Exts (inline)
import qualified Data.Heap as H
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, askRunInIO, restoreM)
import Streamly.Internal.Data.StreamK (Stream)
import qualified Streamly.Internal.Data.StreamK as K
(foldStreamShared, cons, mkStream, foldStream, fromEffect
, nil, concatMapWith, fromPure, bindWith)
import qualified Streamly.Internal.Data.Stream as D
(mapM, fromStreamK, toStreamK)
import qualified Streamly.Internal.Data.Stream.Serial as Stream (toStreamK)
import Streamly.Internal.Data.Stream.SVar.Generate
import Streamly.Internal.Data.SVar
import Prelude hiding (map)
#include "Instances.hs"
{-# INLINABLE withLocal #-}
withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a
withLocal :: forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
m =
(forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
let single :: a -> m r
single = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> (a -> m r) -> a -> m r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m r
sng
yieldk :: a -> Stream m a -> m r
yieldk a
a Stream m a
r = (r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f (m r -> m r) -> m r -> m r
forall a b. (a -> b) -> a -> b
$ a -> Stream m a -> m r
yld a
a ((r -> r) -> Stream m a -> Stream m a
forall r (m :: * -> *) a.
MonadReader r m =>
(r -> r) -> Stream m a -> Stream m a
withLocal r -> r
f Stream m a
r)
in State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yieldk a -> m r
single ((r -> r) -> m r -> m r
forall a. (r -> r) -> m a -> m a
forall r (m :: * -> *) a. MonadReader r m => (r -> r) -> m a -> m a
local r -> r
f m r
stp) Stream m a
m
{-# INLINE underMaxHeap #-}
underMaxHeap ::
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a))
-> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp = do
([ChildEvent a]
_, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar StreamK m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar StreamK m a
sv)
let maxHeap :: Limit
maxHeap = case SVar StreamK m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar StreamK m a
sv of
Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$
Word -> Word -> Word
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim Word -> Word -> Word
forall a. Num a => a -> a -> a
- Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
Limit
Unlimited -> Limit
Unlimited
case Limit
maxHeap of
Limited Word
lim -> do
Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar StreamK m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar StreamK m a
sv)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
hp Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
Limit
Unlimited -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
preStopCheck ::
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Maybe Int)
-> IO Bool
preStopCheck :: forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap =
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool)
-> IO Bool
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool)
-> IO Bool)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool)
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
_) -> do
Bool
heapOk <- SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv)
let stop :: IO Bool
stop = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv) ()
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
continue :: IO Bool
continue = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SVar StreamK m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar StreamK m a
sv) ()
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
if Bool
heapOk
then
case SVar StreamK m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar StreamK m a
sv of
Maybe YieldRateInfo
Nothing -> IO Bool
continue
Just YieldRateInfo
yinfo -> do
Bool
rateOk <- SVar StreamK m a -> YieldRateInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar StreamK m a
sv YieldRateInfo
yinfo
if Bool
rateOk then IO Bool
continue else IO Bool
stop
else IO Bool
stop
abortExecution ::
IORef ([Stream m a], Int)
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> IO ()
abortExecution :: forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m = do
SVar StreamK m a
-> IORef ([Stream m a], Int) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar StreamK m a
sv IORef ([Stream m a], Int)
q Stream m a
m
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
processHeap
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
sno AheadHeapEntry StreamK m a
entry
where
stopIfNeeded :: AheadHeapEntry StreamK m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo Stream m a
r = do
Bool
stopIt <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
if Bool
stopIt
then IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Int
seqNo
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
else Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r
loopHeap :: Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
ent =
case AheadHeapEntry StreamK m a
ent of
AheadHeapEntry StreamK m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
AheadEntryPure a
a -> do
m Int -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int -> m ()) -> m Int -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> m Int
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar StreamK m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
r) ->
if Bool
stopping
then AheadHeapEntry StreamK m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo Stream m a
r
else do
StM m ()
res <- IO (StM m ()) -> m (StM m ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m ()) -> m (StM m ())) -> IO (StM m ()) -> m (StM m ())
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
forall b. m b -> IO (StM m b)
runin (Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r)
StM m () -> m ()
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res
nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
HeapDequeueResult StreamK m a
res <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
case HeapDequeueResult StreamK m a
res of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) -> Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
hent
HeapDequeueResult StreamK m a
Clearing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ ->
if Bool
stopping
then do
Bool
r <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
SVar StreamK m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck SVar StreamK m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
if Bool
r
then IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
else Int -> m ()
processWorkQueue Int
prevSeqNo
else (Int -> m ()) -> Int -> m ()
forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo
processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
then IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m
singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
runStreamWithYieldLimit :: Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r = do
Bool
_ <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
continue
then do
let stop :: m ()
stop = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
Int -> m ()
nextHeap Int
seqNo
State StreamK m a
-> (a -> Stream m a -> m ())
-> (a -> m ())
-> m ()
-> Stream m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
(Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo)
(Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
m ()
stop
Stream m a
r
else do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, Stream m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
yieldStreamFromHeap :: Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo a
a Stream m a
r = do
Bool
continue <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r
{-# NOINLINE drainHeap #-}
drainHeap
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult StreamK m a
r <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
case HeapDequeueResult StreamK m a
r of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
True
HeapDequeueResult StreamK m a
_ -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
data HeapStatus = HContinue | HStop
data WorkerStatus = Continue | Suspend
processWithoutToken
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo = do
let stop :: m WorkerStatus
stop = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
(\a
a Stream m a
r -> do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap (AheadHeapEntry StreamK m a -> m WorkerStatus)
-> AheadHeapEntry StreamK m a -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ (RunInIO m, Stream m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, a -> Stream m a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
a Stream m a
r))
(AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap (AheadHeapEntry StreamK m a -> m WorkerStatus)
-> (a -> AheadHeapEntry StreamK m a) -> a -> m WorkerStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
m WorkerStatus
stop
Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
WorkerStatus
Suspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
where
toHeap :: AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
ent = do
Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp <- IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
snum) ->
let hp' :: Heap (Entry Int (AheadHeapEntry StreamK m a))
hp' = Entry Int (AheadHeapEntry StreamK m a)
-> Heap (Entry Int (AheadHeapEntry StreamK m a))
-> Heap (Entry Int (AheadHeapEntry StreamK m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
in Bool
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry StreamK m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry StreamK m a))
hp')
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar StreamK m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar StreamK m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar StreamK m a
sv)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxHp) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar StreamK m a
sv) (Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp)
Bool
heapOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
forall (m :: * -> *) a.
SVar StreamK m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap SVar StreamK m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp
HeapStatus
status <-
case SVar StreamK m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar StreamK m a
sv of
Maybe YieldRateInfo
Nothing -> HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
Just YieldRateInfo
yinfo ->
case Maybe WorkerInfo
winfo of
Just WorkerInfo
info -> do
Bool
rateOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> YieldRateInfo -> WorkerInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar StreamK m a
sv YieldRateInfo
yinfo WorkerInfo
info
if Bool
rateOk
then HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
else HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
Maybe WorkerInfo
Nothing -> HeapStatus -> m HeapStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
if Bool
heapOk
then
case HeapStatus
status of
HeapStatus
HContinue -> WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
HeapStatus
HStop -> WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
else WorkerStatus -> m WorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
data TokenWorkerStatus = TokenContinue Int | TokenSuspend
processWithToken
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
action Int
sno = do
let stop :: m TokenWorkerStatus
stop = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv
StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
sno) (Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stop Stream m a
action
TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
where
singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
Bool
continue <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
if Bool
continue
then TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
else do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
yieldOutput :: Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a Stream m a
r = do
Bool
continue <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar StreamK m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then do
let stop :: m TokenWorkerStatus
stop = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
State StreamK m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
(Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stop
Stream m a
r
else do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, Stream m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
let undo :: m ()
undo = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
SVar StreamK m a
-> IORef ([Stream m a], Int) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar StreamK m a
sv IORef ([Stream m a], Int)
q Stream m a
m
SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
nextSeqNo
then do
let stop :: m TokenWorkerStatus
stop = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar StreamK m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar StreamK m a
sv)
TokenWorkerStatus -> m TokenWorkerStatus
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar StreamK m a
sv
StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st
(Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stop
Stream m a
m
TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall a. StM m a -> m a
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo1 -> Int -> m ()
loopWithToken Int
seqNo1
TokenWorkerStatus
TokenSuspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
else
m ()
undo m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
else m ()
undo m () -> m () -> m ()
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo
workLoopAhead
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult StreamK m a
r <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
case HeapDequeueResult StreamK m a
r of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
False
HeapDequeueResult StreamK m a
Clearing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ -> do
Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar StreamK m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar StreamK m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap State StreamK m a
st SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar StreamK m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar StreamK m a
sv Maybe WorkerInfo
winfo Stream m a
m
forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarAhead :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2 = (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar StreamK m a
sv <- State StreamK m a
-> Stream m a
-> (IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar StreamK m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar State StreamK m a
st (Stream m a -> Stream m a -> Stream m a
forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
StreamK m a -> StreamK m a -> StreamK m a
concurrently Stream m a
m1 Stream m a
m2)
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> State StreamK m a
-> SVar StreamK m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead
State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
Stream.toStreamK (SVar StreamK m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar StreamK m a -> SerialT m a
fromSVar SVar StreamK m a
sv)
where
concurrently :: StreamK m a -> StreamK m a -> StreamK m a
concurrently StreamK m a
ma StreamK m a
mb = (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a)
-> (forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> (RunInIO m, StreamK m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a. (?callStack::CallStack) => Maybe a -> a
fromJust (Maybe (SVar StreamK m a) -> SVar StreamK m a)
-> Maybe (SVar StreamK m a) -> SVar StreamK m a
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st) (RunInIO m
runInIO, StreamK m a
mb)
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
ma
{-# INLINE aheadK #-}
aheadK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
aheadK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2 = (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State StreamK m a
-> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream ((forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State StreamK m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case State StreamK m a -> Maybe (SVar StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State StreamK m a
st of
Just SVar StreamK m a
sv | SVar StreamK m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar StreamK m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar StreamK m a -> (RunInIO m, Stream m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar StreamK m a
sv (RunInIO m
runInIO, Stream m a
m2)
State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
Maybe (SVar StreamK m a)
_ -> State StreamK m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2)
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> AheadT IO a -> AheadT IO a #-}
consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
consM m a
m (AheadT Stream m a
r) = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> Stream m a -> AheadT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> StreamK m a
K.fromEffect m a
m) Stream m a
r
newtype AheadT m a = AheadT {forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT :: Stream m a}
#if !(MIN_VERSION_transformers(0,6,0))
instance MonadTrans AheadT where
{-# INLINE lift #-}
lift = AheadT . K.fromEffect
#endif
type Ahead = AheadT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: AheadT IO a -> AheadT IO a -> AheadT IO a #-}
append :: MonadAsync m => AheadT m a -> AheadT m a -> AheadT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append (AheadT Stream m a
m1) (AheadT Stream m a
m2) = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> Stream m a -> AheadT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (AheadT m a) where
<> :: AheadT m a -> AheadT m a -> AheadT m a
(<>) = AheadT m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append
instance MonadAsync m => Monoid (AheadT m a) where
mempty :: AheadT m a
mempty = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT Stream m a
forall (m :: * -> *) a. StreamK m a
K.nil
mappend :: AheadT m a -> AheadT m a -> AheadT m a
mappend = AheadT m a -> AheadT m a -> AheadT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apAhead #-}
apAhead :: MonadAsync m => AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead (AheadT Stream m (a -> b)
m1) (AheadT Stream m a
m2) =
let f :: (a -> b) -> StreamK m b
f a -> b
x1 = (StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> Stream m a -> StreamK m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith StreamK m b -> StreamK m b -> StreamK m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (b -> StreamK m b
forall a (m :: * -> *). a -> StreamK m a
K.fromPure (b -> StreamK m b) -> (a -> b) -> a -> StreamK m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> AheadT m b
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m b -> AheadT m b) -> Stream m b -> AheadT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> (a -> StreamK m b) -> StreamK m a -> StreamK m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (a -> b) -> Stream m b
forall {b}. (a -> b) -> StreamK m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (AheadT m) where
{-# INLINE pure #-}
pure :: forall a. a -> AheadT m a
pure = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> (a -> Stream m a) -> a -> AheadT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> StreamK m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. AheadT m (a -> b) -> AheadT m a -> AheadT m b
(<*>) = AheadT m (a -> b) -> AheadT m a -> AheadT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead
{-# INLINE bindAhead #-}
{-# SPECIALIZE bindAhead ::
AheadT IO a -> (a -> AheadT IO b) -> AheadT IO b #-}
bindAhead :: MonadAsync m => AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead (AheadT Stream m a
m) a -> AheadT m b
f = Stream m b -> AheadT m b
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m b -> AheadT m b) -> Stream m b -> AheadT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(StreamK m b -> StreamK m b -> StreamK m b)
-> StreamK m a -> (a -> StreamK m b) -> StreamK m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m (AheadT m b -> Stream m b
forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT (AheadT m b -> Stream m b) -> (a -> AheadT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadT m b
f)
instance MonadAsync m => Monad (AheadT m) where
return :: forall a. a -> AheadT m a
return = a -> AheadT m a
forall a. a -> AheadT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (>>=) #-}
>>= :: forall a b. AheadT m a -> (a -> AheadT m b) -> AheadT m b
(>>=) = AheadT m a -> (a -> AheadT m b) -> AheadT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead
#if !(MIN_VERSION_transformers(0,6,0))
instance (MonadBase b m, Monad m, MonadAsync m) => MonadBase b (AheadT m) where
liftBase = liftBaseDefault
#endif
MONAD_COMMON_INSTANCES(AheadT, MONADPARALLEL)