{-# LANGUAGE UndecidableInstances #-}
module Streamly.Internal.Data.Stream.Ahead
(
AheadT(..)
, Ahead
, aheadK
, consM
)
where
import Control.Concurrent.MVar (putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (void, when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Control.Monad.Trans.Control (MonadBaseControl (..))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import GHC.Exts (inline)
import qualified Data.Heap as H
import Streamly.Internal.Control.Concurrent
(MonadAsync, RunInIO(..), captureMonadState)
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import Streamly.Internal.Data.Stream.SVar.Generate
import Streamly.Internal.Data.SVar
import Prelude hiding (map)
#include "Instances.hs"
{-# INLINE underMaxHeap #-}
underMaxHeap ::
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a))
-> IO Bool
underMaxHeap :: SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
underMaxHeap SVar Stream m a
sv Heap (Entry Int (AheadHeapEntry Stream m a))
hp = do
([ChildEvent a]
_, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (SVar Stream m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar Stream m a
sv)
let maxHeap :: Limit
maxHeap = case SVar Stream m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar Stream m a
sv of
Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$
Word -> Word -> Word
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim Word -> Word -> Word
forall a. Num a => a -> a -> a
- Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
Limit
Unlimited -> Limit
Unlimited
case Limit
maxHeap of
Limited Word
lim -> do
Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVar Stream m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar Stream m a
sv)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Heap (Entry Int (AheadHeapEntry Stream m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry Stream m a))
hp Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
Limit
Unlimited -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
preStopCheck ::
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Maybe Int)
-> IO Bool
preStopCheck :: SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
preStopCheck SVar Stream m a
sv IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap =
IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool)
-> IO Bool
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool)
-> IO Bool)
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool)
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry Stream m a))
hp, Maybe Int
_) -> do
Bool
heapOk <- SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
forall (m :: * -> *) a.
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
underMaxHeap SVar Stream m a
sv Heap (Entry Int (AheadHeapEntry Stream m a))
hp
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar Stream m a
sv)
let stop :: IO Bool
stop = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar Stream m a
sv) ()
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
continue :: IO Bool
continue = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (SVar Stream m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar Stream m a
sv) ()
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
if Bool
heapOk
then
case SVar Stream m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar Stream m a
sv of
Maybe YieldRateInfo
Nothing -> IO Bool
continue
Just YieldRateInfo
yinfo -> do
Bool
rateOk <- SVar Stream m a -> YieldRateInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar Stream m a
sv YieldRateInfo
yinfo
if Bool
rateOk then IO Bool
continue else IO Bool
stop
else IO Bool
stop
abortExecution ::
IORef ([Stream m a], Int)
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> IO ()
abortExecution :: IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m = do
SVar Stream m a -> IORef ([Stream m a], Int) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar Stream m a
sv IORef ([Stream m a], Int)
q Stream m a
m
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
processHeap
:: (MonadIO m, MonadBaseControl IO m)
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo AheadHeapEntry Stream m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry Stream m a -> m ()
loopHeap Int
sno AheadHeapEntry Stream m a
entry
where
stopIfNeeded :: AheadHeapEntry Stream m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry Stream m a
ent Int
seqNo Stream m a
r = do
Bool
stopIt <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
preStopCheck SVar Stream m a
sv IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
if Bool
stopIt
then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Entry Int (AheadHeapEntry Stream m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (Int
-> AheadHeapEntry Stream m a
-> Entry Int (AheadHeapEntry Stream m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry Stream m a
ent) Int
seqNo
SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
else Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r
loopHeap :: Int -> AheadHeapEntry Stream m a -> m ()
loopHeap Int
seqNo AheadHeapEntry Stream m a
ent =
case AheadHeapEntry Stream m a
ent of
AheadHeapEntry Stream m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
AheadEntryPure a
a -> do
m Int -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Int -> m ()) -> m Int -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
r) ->
if Bool
stopping
then AheadHeapEntry Stream m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry Stream m a
ent Int
seqNo Stream m a
r
else do
StM m ()
res <- IO (StM m ()) -> m (StM m ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m ()) -> m (StM m ())) -> IO (StM m ()) -> m (StM m ())
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
forall b. m b -> IO (StM m b)
runin (Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r)
StM m () -> m ()
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res
nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
HeapDequeueResult Stream m a
res <- IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a))
-> IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
case HeapDequeueResult Stream m a
res of
Ready (Entry Int
seqNo AheadHeapEntry Stream m a
hent) -> Int -> AheadHeapEntry Stream m a -> m ()
loopHeap Int
seqNo AheadHeapEntry Stream m a
hent
HeapDequeueResult Stream m a
Clearing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ ->
if Bool
stopping
then do
Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
preStopCheck SVar Stream m a
sv IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
if Bool
r
then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
else Int -> m ()
processWorkQueue Int
prevSeqNo
else (Int -> m ()) -> Int -> m ()
forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo
processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
then IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m
singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
runStreamWithYieldLimit :: Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r = do
Bool
_ <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
continue
then do
let stop :: m ()
stop = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
Int -> m ()
nextHeap Int
seqNo
State Stream m a
-> (a -> Stream m a -> m ())
-> (a -> m ())
-> m ()
-> Stream m a
-> m ()
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st
(Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo)
(Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
m ()
stop
Stream m a
r
else do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
let ent :: Entry Int (AheadHeapEntry Stream m a)
ent = Int
-> AheadHeapEntry Stream m a
-> Entry Int (AheadHeapEntry Stream m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, Stream m a) -> AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Entry Int (AheadHeapEntry Stream m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Entry Int (AheadHeapEntry Stream m a)
ent Int
seqNo
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
yieldStreamFromHeap :: Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo a
a Stream m a
r = do
Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r
{-# NOINLINE drainHeap #-}
drainHeap
:: (MonadIO m, MonadBaseControl IO m)
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult Stream m a
r <- IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a))
-> IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO (HeapDequeueResult Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
case HeapDequeueResult Stream m a
r of
Ready (Entry Int
seqNo AheadHeapEntry Stream m a
hent) ->
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo AheadHeapEntry Stream m a
hent Int
seqNo Bool
True
HeapDequeueResult Stream m a
_ -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
data HeapStatus = HContinue | HStop
data WorkerStatus = Continue | Suspend
processWithoutToken
:: (MonadIO m, MonadBaseControl IO m)
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo = do
let stop :: m WorkerStatus
stop = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
AheadHeapEntry Stream m a -> m WorkerStatus
forall (m :: * -> *).
MonadIO m =>
AheadHeapEntry Stream m a -> m WorkerStatus
toHeap AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State Stream m a
-> (a -> Stream m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> Stream m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st
(\a
a Stream m a
r -> do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
AheadHeapEntry Stream m a -> m WorkerStatus
forall (m :: * -> *).
MonadIO m =>
AheadHeapEntry Stream m a -> m WorkerStatus
toHeap (AheadHeapEntry Stream m a -> m WorkerStatus)
-> AheadHeapEntry Stream m a -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ (RunInIO m, Stream m a) -> AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, a -> Stream m a -> Stream m a
forall a (m :: * -> *). a -> Stream m a -> Stream m a
K.cons a
a Stream m a
r))
(AheadHeapEntry Stream m a -> m WorkerStatus
forall (m :: * -> *).
MonadIO m =>
AheadHeapEntry Stream m a -> m WorkerStatus
toHeap (AheadHeapEntry Stream m a -> m WorkerStatus)
-> (a -> AheadHeapEntry Stream m a) -> a -> m WorkerStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
m WorkerStatus
stop
Stream m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
WorkerStatus
Suspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
where
toHeap :: AheadHeapEntry Stream m a -> m WorkerStatus
toHeap AheadHeapEntry Stream m a
ent = do
Heap (Entry Int (AheadHeapEntry Stream m a))
newHp <- IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
-> m (Heap (Entry Int (AheadHeapEntry Stream m a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
-> m (Heap (Entry Int (AheadHeapEntry Stream m a))))
-> IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
-> m (Heap (Entry Int (AheadHeapEntry Stream m a)))
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry Stream m a))))
-> IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry Stream m a))))
-> IO (Heap (Entry Int (AheadHeapEntry Stream m a))))
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry Stream m a))))
-> IO (Heap (Entry Int (AheadHeapEntry Stream m a)))
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry Stream m a))
hp, Maybe Int
snum) ->
let hp' :: Heap (Entry Int (AheadHeapEntry Stream m a))
hp' = Entry Int (AheadHeapEntry Stream m a)
-> Heap (Entry Int (AheadHeapEntry Stream m a))
-> Heap (Entry Int (AheadHeapEntry Stream m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (Int
-> AheadHeapEntry Stream m a
-> Entry Int (AheadHeapEntry Stream m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry Stream m a
ent) Heap (Entry Int (AheadHeapEntry Stream m a))
hp
in Bool
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry Stream m a)))
-> ((Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry Stream m a)))
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry Stream m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry Stream m a))
hp')
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar Stream m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heap (Entry Int (AheadHeapEntry Stream m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry Stream m a))
newHp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxHp) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv) (Heap (Entry Int (AheadHeapEntry Stream m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry Stream m a))
newHp)
Bool
heapOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
forall (m :: * -> *) a.
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
underMaxHeap SVar Stream m a
sv Heap (Entry Int (AheadHeapEntry Stream m a))
newHp
HeapStatus
status <-
case SVar Stream m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar Stream m a
sv of
Maybe YieldRateInfo
Nothing -> HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
Just YieldRateInfo
yinfo ->
case Maybe WorkerInfo
winfo of
Just WorkerInfo
info -> do
Bool
rateOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> YieldRateInfo -> WorkerInfo -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar Stream m a
sv YieldRateInfo
yinfo WorkerInfo
info
if Bool
rateOk
then HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
else HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
Maybe WorkerInfo
Nothing -> HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
if Bool
heapOk
then
case HeapStatus
status of
HeapStatus
HContinue -> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
HeapStatus
HStop -> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
else WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
data TokenWorkerStatus = TokenContinue Int | TokenSuspend
processWithToken
:: (MonadIO m, MonadBaseControl IO m)
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
action Int
sno = do
let stop :: m TokenWorkerStatus
stop = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
State Stream m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
sno) (Int -> a -> m TokenWorkerStatus
forall (m :: * -> *). MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stop Stream m a
action
TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
where
singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
if Bool
continue
then TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
else do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
yieldOutput :: Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a Stream m a
r = do
Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then do
let stop :: m TokenWorkerStatus
stop = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
State Stream m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st
(Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(Int -> a -> m TokenWorkerStatus
forall (m :: * -> *). MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stop
Stream m a
r
else do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
let ent :: Entry Int (AheadHeapEntry Stream m a)
ent = Int
-> AheadHeapEntry Stream m a
-> Entry Int (AheadHeapEntry Stream m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, Stream m a) -> AheadHeapEntry Stream m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Entry Int (AheadHeapEntry Stream m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Entry Int (AheadHeapEntry Stream m a)
ent Int
seqNo
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Int
nextSeqNo
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
let undo :: m ()
undo = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Int
nextSeqNo
SVar Stream m a -> IORef ([Stream m a], Int) -> Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar Stream m a
sv IORef ([Stream m a], Int)
q Stream m a
m
SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
nextSeqNo
then do
let stop :: m TokenWorkerStatus
stop = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (SVar Stream m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> forall b. m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> forall b. m b -> IO (StM m b))
-> RunInIO m -> forall b. m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> RunInIO m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall b. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
State Stream m a
-> (a -> Stream m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> Stream m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st
(Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(Int -> a -> m TokenWorkerStatus
forall (m :: * -> *). MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stop
Stream m a
m
TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo1 -> Int -> m ()
loopWithToken Int
seqNo1
TokenWorkerStatus
TokenSuspend -> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
else
m ()
undo m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
else m ()
undo m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
workLoopAhead
:: (MonadIO m, MonadBaseControl IO m)
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead :: IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult Stream m a
r <- IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a))
-> IO (HeapDequeueResult Stream m a)
-> m (HeapDequeueResult Stream m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO (HeapDequeueResult Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
case HeapDequeueResult Stream m a
r of
Ready (Entry Int
seqNo AheadHeapEntry Stream m a
hent) ->
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo AheadHeapEntry Stream m a
hent Int
seqNo Bool
False
HeapDequeueResult Stream m a
Clearing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ -> do
Maybe (Stream m a, Int)
work <- IORef ([Stream m a], Int) -> m (Maybe (Stream m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m
forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarAhead :: Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2 = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- State Stream m a
-> Stream m a
-> (IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar Stream m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar State Stream m a
st (Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
(MonadBaseControl IO m, MonadIO m) =>
Stream m a -> Stream m a -> Stream m a
concurrently Stream m a
m1 Stream m a
m2)
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
(MonadIO m, MonadBaseControl IO m) =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> m r) -> Stream m a -> m r
forall a b. (a -> b) -> a -> b
$ SerialT m a -> Stream m a
forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (SVar Stream m a -> SerialT m a
forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
fromSVar SVar Stream m a
sv)
where
concurrently :: Stream m a -> Stream m a -> Stream m a
concurrently Stream m a
ma Stream m a
mb = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> (RunInIO m, Stream m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (Maybe (SVar Stream m a) -> SVar Stream m a
forall a. (?callStack::CallStack) => Maybe a -> a
fromJust (Maybe (SVar Stream m a) -> SVar Stream m a)
-> Maybe (SVar Stream m a) -> SVar Stream m a
forall a b. (a -> b) -> a -> b
$ State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st) (RunInIO m
runInIO, Stream m a
mb)
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
ma
{-# INLINE aheadK #-}
aheadK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
aheadK :: Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2 = (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream ((forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a)
-> (forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case State Stream m a -> Maybe (SVar Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
Just SVar Stream m a
sv | SVar Stream m a -> SVarStyle
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv SVarStyle -> SVarStyle -> Bool
forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar -> do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadBaseControl IO m => m (RunInIO m)
captureMonadState
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar Stream m a -> (RunInIO m, Stream m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar Stream m a
sv (RunInIO m
runInIO, Stream m a
m2)
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
Maybe (SVar Stream m a)
_ -> State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2)
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> AheadT IO a -> AheadT IO a #-}
consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a
consM :: m a -> AheadT m a -> AheadT m a
consM m a
m (AheadT Stream m a
r) = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> Stream m a -> AheadT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (m a -> Stream m a
forall (m :: * -> *) a. Monad m => m a -> Stream m a
K.fromEffect m a
m) Stream m a
r
newtype AheadT m a = AheadT {AheadT m a -> Stream m a
getAheadT :: Stream m a}
deriving (m a -> AheadT m a
(forall (m :: * -> *) a. Monad m => m a -> AheadT m a)
-> MonadTrans AheadT
forall (m :: * -> *) a. Monad m => m a -> AheadT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: m a -> AheadT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> AheadT m a
MonadTrans)
type Ahead = AheadT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: AheadT IO a -> AheadT IO a -> AheadT IO a #-}
append :: MonadAsync m => AheadT m a -> AheadT m a -> AheadT m a
append :: AheadT m a -> AheadT m a -> AheadT m a
append (AheadT Stream m a
m1) (AheadT Stream m a
m2) = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> Stream m a -> AheadT m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (AheadT m a) where
<> :: AheadT m a -> AheadT m a -> AheadT m a
(<>) = AheadT m a -> AheadT m a -> AheadT m a
forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append
instance MonadAsync m => Monoid (AheadT m a) where
mempty :: AheadT m a
mempty = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT Stream m a
forall (m :: * -> *) a. Stream m a
K.nil
mappend :: AheadT m a -> AheadT m a -> AheadT m a
mappend = AheadT m a -> AheadT m a -> AheadT m a
forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apAhead #-}
apAhead :: MonadAsync m => AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead :: AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead (AheadT Stream m (a -> b)
m1) (AheadT Stream m a
m2) =
let f :: (a -> b) -> Stream m b
f a -> b
x1 = (Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (b -> Stream m b
forall (f :: * -> *) a. Applicative f => a -> f a
pure (b -> Stream m b) -> (a -> b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in Stream m b -> AheadT m b
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m b -> AheadT m b) -> Stream m b -> AheadT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> ((a -> b) -> Stream m b) -> Stream m (a -> b) -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (a -> b) -> Stream m b
forall b. (a -> b) -> Stream m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (AheadT m) where
{-# INLINE pure #-}
pure :: a -> AheadT m a
pure = Stream m a -> AheadT m a
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m a -> AheadT m a) -> (a -> Stream m a) -> a -> AheadT m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Stream m a
forall a (m :: * -> *). a -> Stream m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: AheadT m (a -> b) -> AheadT m a -> AheadT m b
(<*>) = AheadT m (a -> b) -> AheadT m a -> AheadT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead
{-# INLINE bindAhead #-}
{-# SPECIALIZE bindAhead ::
AheadT IO a -> (a -> AheadT IO b) -> AheadT IO b #-}
bindAhead :: MonadAsync m => AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead :: AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead (AheadT Stream m a
m) a -> AheadT m b
f = Stream m b -> AheadT m b
forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT (Stream m b -> AheadT m b) -> Stream m b -> AheadT m b
forall a b. (a -> b) -> a -> b
$ (Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
K.bindWith Stream m b -> Stream m b -> Stream m b
forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m (AheadT m b -> Stream m b
forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT (AheadT m b -> Stream m b) -> (a -> AheadT m b) -> a -> Stream m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadT m b
f)
instance MonadAsync m => Monad (AheadT m) where
return :: a -> AheadT m a
return = a -> AheadT m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (>>=) #-}
>>= :: AheadT m a -> (a -> AheadT m b) -> AheadT m b
(>>=) = AheadT m a -> (a -> AheadT m b) -> AheadT m b
forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead
MONAD_COMMON_INSTANCES(AheadT, MONADPARALLEL)