{-# LANGUAGE UndecidableInstances #-}
module Streamly.Internal.Data.Stream.Ahead
(
AheadT(..)
, Ahead
, aheadK
, consM
)
where
import Control.Concurrent.MVar (putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (void, when)
import Control.Monad.Base (MonadBase(..), liftBaseDefault)
import Control.Monad.Catch (MonadThrow, throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(lift))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup (Semigroup(..))
#endif
import GHC.Exts (inline)
import qualified Data.Heap as H
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Stream.Serial (SerialT(..))
import Streamly.Internal.Data.Stream.StreamK.Type (Stream)
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import Streamly.Internal.Data.Stream.SVar.Generate
import Streamly.Internal.Data.SVar
import Prelude hiding (map)
#include "Instances.hs"
{-# INLINE underMaxHeap #-}
underMaxHeap ::
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a))
-> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
underMaxHeap SVar Stream m a
sv Heap (Entry Int (AheadHeapEntry Stream m a))
hp = do
([ChildEvent a]
_, Int
len) <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar Stream m a
sv)
let maxHeap :: Limit
maxHeap = case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Limit
maxBufferLimit SVar Stream m a
sv of
Limited Word
lim -> Word -> Limit
Limited forall a b. (a -> b) -> a -> b
$
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim forall a. Num a => a -> a -> a
- forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
Limit
Unlimited -> Limit
Unlimited
case Limit
maxHeap of
Limited Word
lim -> do
Int
active <- forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar Stream m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry Stream m a))
hp forall a. Num a => a -> a -> a
+ Int
active forall a. Ord a => a -> a -> Bool
<= forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
Limit
Unlimited -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
preStopCheck ::
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)) , Maybe Int)
-> IO Bool
preStopCheck :: forall (m :: * -> *) a.
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
preStopCheck SVar Stream m a
sv IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap =
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry Stream m a))
hp, Maybe Int
_) -> do
Bool
heapOk <- forall (m :: * -> *) a.
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
underMaxHeap SVar Stream m a
sv Heap (Entry Int (AheadHeapEntry Stream m a))
hp
forall a. MVar a -> IO a
takeMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar Stream m a
sv)
let stop :: IO Bool
stop = do
forall a. MVar a -> a -> IO ()
putMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar Stream m a
sv) ()
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
continue :: IO Bool
continue = do
forall a. MVar a -> a -> IO ()
putMVar (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
workerStopMVar SVar Stream m a
sv) ()
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
if Bool
heapOk
then
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar Stream m a
sv of
Maybe YieldRateInfo
Nothing -> IO Bool
continue
Just YieldRateInfo
yinfo -> do
Bool
rateOk <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate SVar Stream m a
sv YieldRateInfo
yinfo
if Bool
rateOk then IO Bool
continue else IO Bool
stop
else IO Bool
stop
abortExecution ::
IORef ([Stream m a], Int)
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> IO ()
abortExecution :: forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m = do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar Stream m a
sv IORef ([Stream m a], Int)
q Stream m a
m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
processHeap
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo AheadHeapEntry Stream m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry Stream m a -> m ()
loopHeap Int
sno AheadHeapEntry Stream m a
entry
where
stopIfNeeded :: AheadHeapEntry Stream m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry Stream m a
ent Int
seqNo Stream m a
r = do
Bool
stopIt <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
preStopCheck SVar Stream m a
sv IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
if Bool
stopIt
then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry Stream m a
ent) Int
seqNo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
else Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r
loopHeap :: Int -> AheadHeapEntry Stream m a -> m ()
loopHeap Int
seqNo AheadHeapEntry Stream m a
ent =
case AheadHeapEntry Stream m a
ent of
AheadHeapEntry Stream m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
AheadEntryPure a
a -> do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar Stream m a
sv (forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, Stream m a
r) ->
if Bool
stopping
then AheadHeapEntry Stream m a -> Int -> Stream m a -> m ()
stopIfNeeded AheadHeapEntry Stream m a
ent Int
seqNo Stream m a
r
else do
StM m ()
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
runin (Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo Stream m a
r)
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res
nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
HeapDequeueResult Stream m a
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1)
case HeapDequeueResult Stream m a
res of
Ready (Entry Int
seqNo AheadHeapEntry Stream m a
hent) -> Int -> AheadHeapEntry Stream m a -> m ()
loopHeap Int
seqNo AheadHeapEntry Stream m a
hent
HeapDequeueResult Stream m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ ->
if Bool
stopping
then do
Bool
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
SVar Stream m a
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> IO Bool
preStopCheck SVar Stream m a
sv IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
if Bool
r
then forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
else Int -> m ()
processWorkQueue Int
prevSeqNo
else forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo
processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
Maybe (Stream m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo forall a. Num a => a -> a -> a
+ Int
1
then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m
singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
runStreamWithYieldLimit :: Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r = do
Bool
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
continue
then do
let stop :: m ()
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
Int -> m ()
nextHeap Int
seqNo
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st
(Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo)
(Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
m ()
stop
Stream m a
r
else do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry Stream m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Entry Int (AheadHeapEntry Stream m a)
ent Int
seqNo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
yieldStreamFromHeap :: Int -> a -> Stream m a -> m ()
yieldStreamFromHeap Int
seqNo a
a Stream m a
r = do
Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
Bool -> Int -> Stream m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo Stream m a
r
{-# NOINLINE drainHeap #-}
drainHeap
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult Stream m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
case HeapDequeueResult Stream m a
r of
Ready (Entry Int
seqNo AheadHeapEntry Stream m a
hent) ->
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo AheadHeapEntry Stream m a
hent Int
seqNo Bool
True
HeapDequeueResult Stream m a
_ -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
data HeapStatus = HContinue | HStop
data WorkerStatus = Continue | Suspend
processWithoutToken
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo = do
let stop :: m WorkerStatus
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry Stream m a -> m WorkerStatus
toHeap forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
StM m WorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st
(\a
a Stream m a
r -> do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry Stream m a -> m WorkerStatus
toHeap forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, forall a (m :: * -> *). a -> Stream m a -> Stream m a
K.cons a
a Stream m a
r))
(forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry Stream m a -> m WorkerStatus
toHeap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
m WorkerStatus
stop
Stream m a
m
WorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
WorkerStatus
Suspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
where
toHeap :: AheadHeapEntry Stream m a -> m WorkerStatus
toHeap AheadHeapEntry Stream m a
ent = do
Heap (Entry Int (AheadHeapEntry Stream m a))
newHp <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry Stream m a))
hp, Maybe Int
snum) ->
let hp' :: Heap (Entry Int (AheadHeapEntry Stream m a))
hp' = forall a. Ord a => a -> Heap a -> Heap a
H.insert (forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry Stream m a
ent) Heap (Entry Int (AheadHeapEntry Stream m a))
hp
in forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry Stream m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry Stream m a))
hp')
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar Stream m a
sv) forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
Int
maxHp <- forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry Stream m a))
newHp forall a. Ord a => a -> a -> Bool
> Int
maxHp) forall a b. (a -> b) -> a -> b
$
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar Stream m a
sv) (forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry Stream m a))
newHp)
Bool
heapOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
SVar Stream m a
-> Heap (Entry Int (AheadHeapEntry Stream m a)) -> IO Bool
underMaxHeap SVar Stream m a
sv Heap (Entry Int (AheadHeapEntry Stream m a))
newHp
HeapStatus
status <-
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar Stream m a
sv of
Maybe YieldRateInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
Just YieldRateInfo
yinfo ->
case Maybe WorkerInfo
winfo of
Just WorkerInfo
info -> do
Bool
rateOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl SVar Stream m a
sv YieldRateInfo
yinfo WorkerInfo
info
if Bool
rateOk
then forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
else forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
Maybe WorkerInfo
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
if Bool
heapOk
then
case HeapStatus
status of
HeapStatus
HContinue -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
HeapStatus
HStop -> forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
else forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
data TokenWorkerStatus = TokenContinue Int | TokenSuspend
processWithToken
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
action Int
sno = do
let stop :: m TokenWorkerStatus
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
StM m TokenWorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st (Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
sno) (forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stop Stream m a
action
TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
where
singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
if Bool
continue
then forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
else do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
yieldOutput :: Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a Stream m a
r = do
Bool
continue <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield SVar Stream m a
sv Maybe WorkerInfo
winfo (forall a. a -> ChildEvent a
ChildYield a
a)
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then do
let stop :: m TokenWorkerStatus
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st
(Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stop
Stream m a
r
else do
RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry Stream m a)
ent = forall p a. p -> a -> Entry p a
Entry Int
seqNo (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, Stream m a
r))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Entry Int (AheadHeapEntry Stream m a)
ent Int
seqNo
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
Maybe (Stream m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Int
nextSeqNo
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
let undo :: m ()
undo = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap Int
nextSeqNo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar Stream m a
sv IORef ([Stream m a], Int)
q Stream m a
m
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
nextSeqNo
then do
let stop :: m TokenWorkerStatus
stop = do
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
incrementYieldLimit SVar Stream m a
sv)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> RunInIO m
svarMrun SVar Stream m a
sv
StM m TokenWorkerStatus
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall b. m b -> IO (StM m b)
mrun forall a b. (a -> b) -> a -> b
$
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st
(Int -> a -> Stream m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stop
Stream m a
m
TokenWorkerStatus
res <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo1 -> Int -> m ()
loopWithToken Int
seqNo1
TokenWorkerStatus
TokenSuspend -> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
else
m ()
undo forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
else m ()
undo forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo
workLoopAhead
:: MonadRunInIO m
=> IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult Stream m a
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap
case HeapDequeueResult Stream m a
r of
Ready (Entry Int
seqNo AheadHeapEntry Stream m a
hent) ->
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> AheadHeapEntry Stream m a
-> Int
-> Bool
-> m ()
processHeap IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo AheadHeapEntry Stream m a
hent Int
seqNo Bool
False
HeapDequeueResult Stream m a
Clearing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ -> do
Maybe (Stream m a, Int)
work <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([Stream m a], Int)
q
case Maybe (Stream m a, Int)
work of
Maybe (Stream m a, Int)
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar Stream m a
sv Maybe WorkerInfo
winfo
Just (Stream m a
m, Int
seqNo) -> do
Bool
yieldLimitOk <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar Stream m a
sv
if Bool
yieldLimitOk
then
if Int
seqNo forall a. Eq a => a -> a -> Bool
== Int
0
then forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> Stream m a
-> Int
-> m ()
processWithoutToken IORef ([Stream m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
heap State Stream m a
st SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m Int
seqNo
else forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
IORef ([Stream m a], Int)
-> SVar Stream m a -> Maybe WorkerInfo -> Stream m a -> IO ()
abortExecution IORef ([Stream m a], Int)
q SVar Stream m a
sv Maybe WorkerInfo
winfo Stream m a
m
forkSVarAhead :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
forkSVarAhead :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
SVar Stream m a
sv <- forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar State Stream m a
st (forall {m :: * -> *} {a}.
(MonadIO m, MonadBaseControl IO m) =>
Stream m a -> Stream m a -> Stream m a
concurrently Stream m a
m1 Stream m a
m2)
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([Stream m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry Stream m a)), Maybe Int)
-> State Stream m a
-> SVar Stream m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. SerialT m a -> Stream m a
getSerialT (forall (m :: * -> *) a.
MonadAsync m =>
SVar Stream m a -> SerialT m a
fromSVar SVar Stream m a
sv)
where
concurrently :: Stream m a -> Stream m a -> Stream m a
concurrently Stream m a
ma Stream m a
mb = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp -> do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue (forall a. (?callStack::CallStack) => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st) (RunInIO m
runInIO, Stream m a
mb)
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
ma
{-# INLINE aheadK #-}
aheadK :: MonadAsync m => Stream m a -> Stream m a -> Stream m a
aheadK :: forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2 = forall (m :: * -> *) a.
(forall r.
State Stream m a
-> (a -> Stream m a -> m r) -> (a -> m r) -> m r -> m r)
-> Stream m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp ->
case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe (SVar t m a)
streamVar State Stream m a
st of
Just SVar Stream m a
sv | forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStyle
svarStyle SVar Stream m a
sv forall a. Eq a => a -> a -> Bool
== SVarStyle
AheadVar -> do
RunInIO m
runInIO <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> (RunInIO m, t m a) -> IO ()
enqueue SVar Stream m a
sv (RunInIO m
runInIO, Stream m a
m2)
forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStream State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp Stream m a
m1
Maybe (SVar Stream m a)
_ -> forall (m :: * -> *) a r.
State Stream m a
-> (a -> Stream m a -> m r)
-> (a -> m r)
-> m r
-> Stream m a
-> m r
K.foldStreamShared State Stream m a
st a -> Stream m a -> m r
yld a -> m r
sng m r
stp (forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
forkSVarAhead Stream m a
m1 Stream m a
m2)
{-# INLINE consM #-}
{-# SPECIALIZE consM :: IO a -> AheadT IO a -> AheadT IO a #-}
consM :: MonadAsync m => m a -> AheadT m a -> AheadT m a
consM :: forall (m :: * -> *) a.
MonadAsync m =>
m a -> AheadT m a -> AheadT m a
consM m a
m (AheadT Stream m a
r) = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (forall (m :: * -> *) a. Monad m => m a -> Stream m a
K.fromEffect m a
m) Stream m a
r
newtype AheadT m a = AheadT {forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT :: Stream m a}
deriving (forall (m :: * -> *) a. Monad m => m a -> AheadT m a
forall (t :: (* -> *) -> * -> *).
(forall (m :: * -> *) a. Monad m => m a -> t m a) -> MonadTrans t
lift :: forall (m :: * -> *) a. Monad m => m a -> AheadT m a
$clift :: forall (m :: * -> *) a. Monad m => m a -> AheadT m a
MonadTrans)
type Ahead = AheadT IO
{-# INLINE append #-}
{-# SPECIALIZE append :: AheadT IO a -> AheadT IO a -> AheadT IO a #-}
append :: MonadAsync m => AheadT m a -> AheadT m a -> AheadT m a
append :: forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append (AheadT Stream m a
m1) (AheadT Stream m a
m2) = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m1 Stream m a
m2
instance MonadAsync m => Semigroup (AheadT m a) where
<> :: AheadT m a -> AheadT m a -> AheadT m a
(<>) = forall (m :: * -> *) a.
MonadAsync m =>
AheadT m a -> AheadT m a -> AheadT m a
append
instance MonadAsync m => Monoid (AheadT m a) where
mempty :: AheadT m a
mempty = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall (m :: * -> *) a. Stream m a
K.nil
mappend :: AheadT m a -> AheadT m a -> AheadT m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
{-# INLINE apAhead #-}
apAhead :: MonadAsync m => AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead (AheadT Stream m (a -> b)
m1) (AheadT Stream m a
m2) =
let f :: (a -> b) -> Stream m b
f a -> b
x1 = forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
x1) Stream m a
m2
in forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> (a -> Stream m b) -> Stream m a -> Stream m b
K.concatMapWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK forall {b}. (a -> b) -> Stream m b
f Stream m (a -> b)
m1
instance (Monad m, MonadAsync m) => Applicative (AheadT m) where
{-# INLINE pure #-}
pure :: forall a. a -> AheadT m a
pure = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a (m :: * -> *). a -> Stream m a
K.fromPure
{-# INLINE (<*>) #-}
<*> :: forall a b. AheadT m (a -> b) -> AheadT m a -> AheadT m b
(<*>) = forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m (a -> b) -> AheadT m a -> AheadT m b
apAhead
{-# INLINE bindAhead #-}
{-# SPECIALIZE bindAhead ::
AheadT IO a -> (a -> AheadT IO b) -> AheadT IO b #-}
bindAhead :: MonadAsync m => AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead :: forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead (AheadT Stream m a
m) a -> AheadT m b
f = forall (m :: * -> *) a. Stream m a -> AheadT m a
AheadT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) b a.
(Stream m b -> Stream m b -> Stream m b)
-> Stream m a -> (a -> Stream m b) -> Stream m b
K.bindWith forall (m :: * -> *) a.
MonadAsync m =>
Stream m a -> Stream m a -> Stream m a
aheadK Stream m a
m (forall (m :: * -> *) a. AheadT m a -> Stream m a
getAheadT forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadT m b
f)
instance MonadAsync m => Monad (AheadT m) where
return :: forall a. a -> AheadT m a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE (>>=) #-}
>>= :: forall a b. AheadT m a -> (a -> AheadT m b) -> AheadT m b
(>>=) = forall (m :: * -> *) a b.
MonadAsync m =>
AheadT m a -> (a -> AheadT m b) -> AheadT m b
bindAhead
MONAD_COMMON_INSTANCES(AheadT, MONADPARALLEL)