module Streamly.Internal.Data.Stream.Concurrent.Channel.Append
(
newAppendChannel
)
where
import Control.Concurrent (myThreadId)
import Control.Concurrent.MVar (newEmptyMVar, newMVar, putMVar, takeMVar)
import Control.Exception (assert)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..))
import Data.IORef (IORef, newIORef, readIORef, atomicModifyIORef, writeIORef)
import Data.Kind (Type)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, RunInIO(..), askRunInIO, restoreM)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_)
import Streamly.Internal.Data.Channel.Dispatcher (modifyThread)
import qualified Data.Heap as H
import qualified Data.Set as Set
import qualified Streamly.Internal.Data.StreamK as K
import Streamly.Internal.Data.Stream.Concurrent.Channel.Consumer
import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Channel.Worker
{-# INLINE enqueueLIFO #-}
enqueueLIFO ::
Channel m a
-> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Bool
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueLIFO :: forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Bool
inner (RunInIO m, StreamK m a)
m = do
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q ((([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]))
-> IO ())
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) ->
if Bool
inner then ([(RunInIO m, StreamK m a)]
xs, (RunInIO m, StreamK m a)
m (RunInIO m, StreamK m a)
-> [(RunInIO m, StreamK m a)] -> [(RunInIO m, StreamK m a)]
forall a. a -> [a] -> [a]
: [(RunInIO m, StreamK m a)]
ys) else ((RunInIO m, StreamK m a)
m (RunInIO m, StreamK m a)
-> [(RunInIO m, StreamK m a)] -> [(RunInIO m, StreamK m a)]
forall a. a -> [a] -> [a]
: [(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys)
IORef Bool -> MVar () -> IO ()
ringDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
data QResult a = QEmpty | QOuter a | QInner a
{-# INLINE dequeue #-}
dequeue :: MonadIO m =>
IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> m (QResult (RunInIO m, K.StreamK m a))
dequeue :: forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref =
IO (QResult (RunInIO m, StreamK m a))
-> m (QResult (RunInIO m, StreamK m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (QResult (RunInIO m, StreamK m a))
-> m (QResult (RunInIO m, StreamK m a)))
-> IO (QResult (RunInIO m, StreamK m a))
-> m (QResult (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]),
QResult (RunInIO m, StreamK m a)))
-> IO (QResult (RunInIO m, StreamK m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
((([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]),
QResult (RunInIO m, StreamK m a)))
-> IO (QResult (RunInIO m, StreamK m a)))
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]),
QResult (RunInIO m, StreamK m a)))
-> IO (QResult (RunInIO m, StreamK m a))
forall a b. (a -> b) -> a -> b
$ \case
([(RunInIO m, StreamK m a)]
xs, (RunInIO m, StreamK m a)
y : [(RunInIO m, StreamK m a)]
ys) -> (([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys), (RunInIO m, StreamK m a) -> QResult (RunInIO m, StreamK m a)
forall a. a -> QResult a
QInner (RunInIO m, StreamK m a)
y)
((RunInIO m, StreamK m a)
x : [(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) -> (([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys), (RunInIO m, StreamK m a) -> QResult (RunInIO m, StreamK m a)
forall a. a -> QResult a
QOuter (RunInIO m, StreamK m a)
x)
([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
x -> (([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
x, QResult (RunInIO m, StreamK m a)
forall a. QResult a
QEmpty)
data WorkerStatus = Continue | Suspend
{-# INLINE workLoopLIFO #-}
workLoopLIFO
:: MonadRunInIO m
=> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFO :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
run :: m ()
run = do
QResult (RunInIO m, StreamK m a)
work <- IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
case QResult (RunInIO m, StreamK m a)
work of
QResult (RunInIO m, StreamK m a)
QEmpty ->
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
QInner (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) ->
(m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin StreamK m a
m Bool
True
QOuter (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) ->
(m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin StreamK m a
m Bool
False
process :: (m WorkerStatus -> IO (StM m WorkerStatus))
-> StreamK m a -> Bool -> m ()
process m WorkerStatus -> IO (StM m WorkerStatus)
runin StreamK m a
m Bool
inner = do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
State StreamK m a
forall a. HasCallStack => a
undefined
a -> StreamK m a -> m WorkerStatus
yieldk
a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
(WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue)
StreamK m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
where
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
if Bool
res
then State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single (WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue) StreamK m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m
runInIO, StreamK m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
{-# INLINE workLoopLIFOLimited #-}
workLoopLIFOLimited
:: forall m a. MonadRunInIO m
=> IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopLIFOLimited :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Channel m a
sv Maybe WorkerInfo
winfo = m ()
run
where
incrContinue :: m WorkerStatus
incrContinue =
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)) m () -> m WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
run :: m ()
run = do
QResult (RunInIO m, StreamK m a)
work <- IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
forall (m :: * -> *) a.
MonadIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> m (QResult (RunInIO m, StreamK m a))
dequeue IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref
case QResult (RunInIO m, StreamK m a)
work of
QResult (RunInIO m, StreamK m a)
QEmpty ->
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
QInner (RunInIO m, StreamK m a)
item ->
(RunInIO m, StreamK m a) -> Bool -> m ()
process (RunInIO m, StreamK m a)
item Bool
True
QOuter (RunInIO m, StreamK m a)
item ->
(RunInIO m, StreamK m a) -> Bool -> m ()
process (RunInIO m, StreamK m a)
item Bool
False
process :: (RunInIO m, StreamK m a) -> Bool -> m ()
process item :: (RunInIO m, StreamK m a)
item@(RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
m) Bool
inner = do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall b. m b -> IO (StM m b)
runin (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
State StreamK m a
forall a. HasCallStack => a
undefined
a -> StreamK m a -> m WorkerStatus
yieldk
a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single
m WorkerStatus
incrContinue
StreamK m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> m ()
run
WorkerStatus
Suspend -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m, StreamK m a)
item
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
where
single :: a -> m WorkerStatus
single a
a = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (WorkerStatus -> m WorkerStatus) -> WorkerStatus -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ if Bool
res then WorkerStatus
Continue else WorkerStatus
Suspend
yieldk :: a -> StreamK m a -> m WorkerStatus
yieldk a
a StreamK m a
r = do
Bool
res <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
res Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined a -> StreamK m a -> m WorkerStatus
yieldk a -> m WorkerStatus
forall {m :: * -> *}. MonadIO m => a -> m WorkerStatus
single m WorkerStatus
incrContinue StreamK m a
r
else do
RunInIO m
runInIO <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
qref Bool
inner (RunInIO m
runInIO, StreamK m a
r)
WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
{-# ANN enqueueAhead "HLint: ignore" #-}
{-# INLINE enqueueAhead #-}
enqueueAhead ::
Channel m a
-> IORef ([K.StreamK m a], Int)
-> (RunInIO m, K.StreamK m a)
-> IO ()
enqueueAhead :: forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
q (RunInIO m, StreamK m a)
m = do
IORef ([StreamK m a], Int)
-> (([StreamK m a], Int) -> ([StreamK m a], Int)) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([StreamK m a], Int)
q ((([StreamK m a], Int) -> ([StreamK m a], Int)) -> IO ())
-> (([StreamK m a], Int) -> ([StreamK m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \([StreamK m a]
xs, Int
n) -> ((RunInIO m, StreamK m a) -> StreamK m a
forall a b. (a, b) -> b
snd (RunInIO m, StreamK m a)
mStreamK m a -> [StreamK m a] -> [StreamK m a]
forall a. a -> [a] -> [a]
:[StreamK m a]
xs, Int
n)
IORef Bool -> MVar () -> IO ()
ringDoorBell (Channel m a -> IORef Bool
forall (m :: * -> *) a. Channel m a -> IORef Bool
doorBellOnWorkQ Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv)
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$
IORef ([t m a], Int)
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q ((([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int)))
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), Maybe (t m a, Int)
forall a. Maybe a
Nothing)
(t m a
x : [t m a]
xs, Int
n) -> (([t m a]
xs, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), (t m a, Int) -> Maybe (t m a, Int)
forall a. a -> Maybe a
Just (t m a
x, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))
{-# INLINE dequeueAheadSeqCheck #-}
dequeueAheadSeqCheck :: MonadIO m
=> IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck IORef ([t m a], Int)
q Int
seqNo = IO (Maybe (t m a)) -> m (Maybe (t m a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (t m a)) -> m (Maybe (t m a)))
-> IO (Maybe (t m a)) -> m (Maybe (t m a))
forall a b. (a -> b) -> a -> b
$
IORef ([t m a], Int)
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a)))
-> IO (Maybe (t m a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q ((([t m a], Int) -> (([t m a], Int), Maybe (t m a)))
-> IO (Maybe (t m a)))
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a)))
-> IO (Maybe (t m a))
forall a b. (a -> b) -> a -> b
$ \case
([], Int
n) -> (([], Int
n), Maybe (t m a)
forall a. Maybe a
Nothing)
(t m a
x : [t m a]
xs, Int
n) ->
if Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
seqNo
then (([t m a]
xs, Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), t m a -> Maybe (t m a)
forall a. a -> Maybe a
Just t m a
x)
else ((t m a
x t m a -> [t m a] -> [t m a]
forall a. a -> [a] -> [a]
: [t m a]
xs, Int
n), Maybe (t m a)
forall a. Maybe a
Nothing)
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
IORef a -> (a -> (a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref ((a -> (a, ())) -> IO ()) -> (a -> (a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
AheadEntryNull
| AheadEntryPure a
| AheadEntryStream (RunInIO m, t m a)
data HeapDequeueResult t m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
Just Int
n -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
case Maybe Int
snum of
Maybe Int
Nothing -> do
let r :: Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
case Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
r of
Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i
then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Maybe
(Entry Int (AheadHeapEntry t m a),
Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
Just Int
_ -> [Char]
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
HeapDequeueResult t m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"dequeueFromHeapSeq: unreachable"
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
case Maybe Int
snum of
Maybe Int
Nothing -> Bool
True
Just Int
n -> Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a)
-> Int
-> IO ()
requeueOnHeapTop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Entry Int (AheadHeapEntry t m a)
-> Heap (Entry Int (AheadHeapEntry t m a))
-> Heap (Entry Int (AheadHeapEntry t m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)
{-# INLINE underMaxHeap #-}
underMaxHeap ::
Channel m a
-> Heap (Entry Int (AheadHeapEntry K.StreamK m a))
-> IO Bool
underMaxHeap :: forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp = do
([ChildEvent a]
_, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv)
let maxHeap :: Limit
maxHeap = case Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxBufferLimit Channel m a
sv of
Limited Word
lim -> Word -> Limit
Limited (Word -> Limit) -> Word -> Limit
forall a b. (a -> b) -> a -> b
$
Word -> Word -> Word
forall a. Ord a => a -> a -> a
max Word
0 (Word
lim Word -> Word -> Word
forall a. Num a => a -> a -> a
- Int -> Word
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
len)
Limit
Unlimited -> Limit
Unlimited
case Limit
maxHeap of
Limited Word
lim -> do
Int
active <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
hp Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
active Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim
Limit
Unlimited -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
preStopCheck ::
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)) , Maybe Int)
-> IO Bool
preStopCheck :: forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap =
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool)
-> IO Bool
forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool)
-> IO Bool)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool)
-> IO Bool
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
_) -> do
Bool
heapOk <- Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv)
let stopping :: IO Bool
stopping = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
continue :: IO Bool
continue = do
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
workerStopMVar Channel m a
sv) ()
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
if Bool
heapOk
then
case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Maybe YieldRateInfo
Nothing -> IO Bool
continue
Just YieldRateInfo
yinfo -> do
Bool
rateOk <-
Limit -> IORef Int -> YieldRateInfo -> IO Bool
isBeyondMaxRate
(Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv) (Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv) YieldRateInfo
yinfo
if Bool
rateOk then IO Bool
continue else IO Bool
stopping
else IO Bool
stopping
abortExecution :: Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution :: forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo = do
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
processHeap
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry K.StreamK m a
-> Int
-> Bool
-> m ()
processHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
entry Int
sno Bool
stopping = Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
sno AheadHeapEntry StreamK m a
entry
where
stopIfNeeded :: AheadHeapEntry StreamK m a -> Int -> StreamK m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo StreamK m a
r = do
Bool
stopIt <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
if Bool
stopIt
then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Int
seqNo
Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
else Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r
loopHeap :: Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
ent =
case AheadHeapEntry StreamK m a
ent of
AheadHeapEntry StreamK m a
AheadEntryNull -> Int -> m ()
nextHeap Int
seqNo
AheadEntryPure a
a -> do
m Int -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(m Int -> m ()) -> m Int -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> m Int
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(Channel m a -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a. Channel m a -> IORef ([ChildEvent a], Int)
outputQueue Channel m a
sv) (Channel m a -> MVar ()
forall (m :: * -> *) a. Channel m a -> MVar ()
outputDoorBell Channel m a
sv) (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
Int -> m ()
nextHeap Int
seqNo
AheadEntryStream (RunInIO forall b. m b -> IO (StM m b)
runin, StreamK m a
r) -> do
if Bool
stopping
then AheadHeapEntry StreamK m a -> Int -> StreamK m a -> m ()
stopIfNeeded AheadHeapEntry StreamK m a
ent Int
seqNo StreamK m a
r
else do
StM m ()
res <- IO (StM m ()) -> m (StM m ())
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m ()) -> m (StM m ())) -> IO (StM m ()) -> m (StM m ())
forall a b. (a -> b) -> a -> b
$ m () -> IO (StM m ())
forall b. m b -> IO (StM m b)
runin (Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
True Int
seqNo StreamK m a
r)
StM m () -> m ()
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m ()
res
nextHeap :: Int -> m ()
nextHeap Int
prevSeqNo = do
HeapDequeueResult StreamK m a
res <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
case HeapDequeueResult StreamK m a
res of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) -> Int -> AheadHeapEntry StreamK m a -> m ()
loopHeap Int
seqNo AheadHeapEntry StreamK m a
hent
HeapDequeueResult StreamK m a
Clearing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ ->
if Bool
stopping
then do
Bool
r <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall (m :: * -> *) a.
Channel m a
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
preStopCheck Channel m a
sv IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
if Bool
r
then IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
else Int -> m ()
processWorkQueue Int
prevSeqNo
else (Int -> m ()) -> Int -> m ()
forall a. a -> a
inline Int -> m ()
processWorkQueue Int
prevSeqNo
processWorkQueue :: Int -> m ()
processWorkQueue Int
prevSeqNo = do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
Maybe (StreamK m a, Int)
work <- IORef ([StreamK m a], Int) -> m (Maybe (StreamK m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
case Maybe (StreamK m a, Int)
work of
Maybe (StreamK m a, Int)
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Just (StreamK m a
m, Int
seqNo) -> do
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
prevSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
then IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo
singleStreamFromHeap :: Int -> a -> m ()
singleStreamFromHeap Int
seqNo a
a = do
m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
Int -> m ()
nextHeap Int
seqNo
runStreamWithYieldLimit :: Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r = do
Bool
_ <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
continue
then do
let stopk :: m ()
stopk = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
Int -> m ()
nextHeap Int
seqNo
State StreamK m a
-> (a -> StreamK m a -> m ())
-> (a -> m ())
-> m ()
-> StreamK m a
-> m ()
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
(Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo)
(Int -> a -> m ()
singleStreamFromHeap Int
seqNo)
m ()
stopk
StreamK m a
r
else do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, StreamK m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, StreamK m a
r))
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
yieldStreamFromHeap :: Int -> a -> StreamK m a -> m ()
yieldStreamFromHeap Int
seqNo a
a StreamK m a
r = do
Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
Bool -> Int -> StreamK m a -> m ()
runStreamWithYieldLimit Bool
continue Int
seqNo StreamK m a
r
{-# NOINLINE drainHeap #-}
drainHeap
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult StreamK m a
r <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
case HeapDequeueResult StreamK m a
r of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
True
HeapDequeueResult StreamK m a
_ -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
data HeapStatus = HContinue | HStop
processWithoutToken
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> K.StreamK m a
-> Int
-> m ()
processWithoutToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo = do
let stopk :: m WorkerStatus
stopk = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
AheadHeapEntry t m a
AheadEntryNull
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> m b -> IO (StM m b))
-> RunInIO m -> m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
StM m WorkerStatus
r <- IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m WorkerStatus) -> m (StM m WorkerStatus))
-> IO (StM m WorkerStatus) -> m (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$ m WorkerStatus -> IO (StM m WorkerStatus)
forall {b}. m b -> IO (StM m b)
mrun (m WorkerStatus -> IO (StM m WorkerStatus))
-> m WorkerStatus -> IO (StM m WorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> StreamK m a -> m WorkerStatus)
-> (a -> m WorkerStatus)
-> m WorkerStatus
-> StreamK m a
-> m WorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
(\a
a StreamK m a
r -> do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap (AheadHeapEntry StreamK m a -> m WorkerStatus)
-> AheadHeapEntry StreamK m a -> m WorkerStatus
forall a b. (a -> b) -> a -> b
$ (RunInIO m, StreamK m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, a -> StreamK m a -> StreamK m a
forall a (m :: * -> *). a -> StreamK m a -> StreamK m a
K.cons a
a StreamK m a
r))
(AheadHeapEntry StreamK m a -> m WorkerStatus
forall {m :: * -> *}.
MonadIO m =>
AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap (AheadHeapEntry StreamK m a -> m WorkerStatus)
-> (a -> AheadHeapEntry StreamK m a) -> a -> m WorkerStatus
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
a -> AheadHeapEntry t m a
AheadEntryPure)
m WorkerStatus
stopk
StreamK m a
m
WorkerStatus
res <- StM m WorkerStatus -> m WorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m WorkerStatus
r
case WorkerStatus
res of
WorkerStatus
Continue -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
WorkerStatus
Suspend -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
where
toHeap :: AheadHeapEntry StreamK m a -> m WorkerStatus
toHeap AheadHeapEntry StreamK m a
ent = do
Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp <- IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> m (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a))))
-> IO (Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry StreamK m a))
hp, Maybe Int
snum) ->
let hp' :: Heap (Entry Int (AheadHeapEntry StreamK m a))
hp' = Entry Int (AheadHeapEntry StreamK m a)
-> Heap (Entry Int (AheadHeapEntry StreamK m a))
-> Heap (Entry Int (AheadHeapEntry StreamK m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert (Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo AheadHeapEntry StreamK m a
ent) Heap (Entry Int (AheadHeapEntry StreamK m a))
hp
in Bool
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a)))
-> ((Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int),
Heap (Entry Int (AheadHeapEntry StreamK m a)))
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) ((Heap (Entry Int (AheadHeapEntry StreamK m a))
hp', Maybe Int
snum), Heap (Entry Int (AheadHeapEntry StreamK m a))
hp')
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a -> Bool
forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
Int
maxHp <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
maxHp) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef Int
maxHeapSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ Channel m a -> SVarStats
forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv) (Heap (Entry Int (AheadHeapEntry StreamK m a)) -> Int
forall a. Heap a -> Int
H.size Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp)
Bool
heapOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
forall (m :: * -> *) a.
Channel m a
-> Heap (Entry Int (AheadHeapEntry StreamK m a)) -> IO Bool
underMaxHeap Channel m a
sv Heap (Entry Int (AheadHeapEntry StreamK m a))
newHp
HeapStatus
status <-
case Channel m a -> Maybe YieldRateInfo
forall (m :: * -> *) a. Channel m a -> Maybe YieldRateInfo
yieldRateInfo Channel m a
sv of
Maybe YieldRateInfo
Nothing -> HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
Just YieldRateInfo
yinfo ->
case Maybe WorkerInfo
winfo of
Just WorkerInfo
info -> do
Bool
rateOk <-
IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Limit -> IORef Int -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl
(Channel m a -> Limit
forall (m :: * -> *) a. Channel m a -> Limit
maxWorkerLimit Channel m a
sv)
(Channel m a -> IORef Int
forall (m :: * -> *) a. Channel m a -> IORef Int
workerCount Channel m a
sv)
YieldRateInfo
yinfo
WorkerInfo
info
if Bool
rateOk
then HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
else HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HStop
Maybe WorkerInfo
Nothing -> HeapStatus -> m HeapStatus
forall (m :: * -> *) a. Monad m => a -> m a
return HeapStatus
HContinue
if Bool
heapOk
then
case HeapStatus
status of
HeapStatus
HContinue -> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Continue
HeapStatus
HStop -> WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
else WorkerStatus -> m WorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerStatus
Suspend
data TokenWorkerStatus = TokenContinue Int | TokenSuspend
processWithToken
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> K.StreamK m a
-> Int
-> m ()
processWithToken :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
action Int
sno = do
let stopk :: m TokenWorkerStatus
stopk = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
sno Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> m b -> IO (StM m b))
-> RunInIO m -> m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
StM m TokenWorkerStatus
r <-
IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall {b}. m b -> IO (StM m b)
mrun
(m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ State StreamK m a
-> (a -> StreamK m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> StreamK m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared
State StreamK m a
forall a. HasCallStack => a
undefined (Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
sno) (Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
sno) m TokenWorkerStatus
stopk StreamK m a
action
TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
where
singleOutput :: Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo a
a = do
Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
if Bool
continue
then TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
else do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
yieldOutput :: Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo a
a StreamK m a
r = do
Bool
continue <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> a -> IO Bool
forall (m :: * -> *) a.
Channel m a -> Maybe WorkerInfo -> a -> IO Bool
yield Channel m a
sv Maybe WorkerInfo
winfo a
a
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
continue Bool -> Bool -> Bool
&& Bool
yieldLimitOk
then do
let stopk :: m TokenWorkerStatus
stopk = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
seqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
State StreamK m a
-> (a -> StreamK m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> StreamK m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
(Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
seqNo)
(Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
seqNo)
m TokenWorkerStatus
stopk
StreamK m a
r
else do
RunInIO m
runIn <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
let ent :: Entry Int (AheadHeapEntry StreamK m a)
ent = Int
-> AheadHeapEntry StreamK m a
-> Entry Int (AheadHeapEntry StreamK m a)
forall p a. p -> a -> Entry p a
Entry Int
seqNo ((RunInIO m, StreamK m a) -> AheadHeapEntry StreamK m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(RunInIO m, t m a) -> AheadHeapEntry t m a
AheadEntryStream (RunInIO m
runIn, StreamK m a
r))
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Entry Int (AheadHeapEntry StreamK m a) -> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Entry Int (AheadHeapEntry StreamK m a)
ent Int
seqNo
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return TokenWorkerStatus
TokenSuspend
loopWithToken :: Int -> m ()
loopWithToken Int
nextSeqNo = do
let preExit :: m ()
preExit = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Int -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Int
nextSeqNo
Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
Maybe (StreamK m a)
work <- IORef ([StreamK m a], Int) -> Int -> m (Maybe (StreamK m a))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> Int -> m (Maybe (t m a))
dequeueAheadSeqCheck IORef ([StreamK m a], Int)
q Int
nextSeqNo
case Maybe (StreamK m a)
work of
Maybe (StreamK m a)
Nothing -> m ()
preExit m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
Just StreamK m a
m -> do
let stopk :: m TokenWorkerStatus
stopk = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Maybe (IORef Count) -> IO ()
incrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv))
TokenWorkerStatus -> m TokenWorkerStatus
forall (m :: * -> *) a. Monad m => a -> m a
return (TokenWorkerStatus -> m TokenWorkerStatus)
-> TokenWorkerStatus -> m TokenWorkerStatus
forall a b. (a -> b) -> a -> b
$ Int -> TokenWorkerStatus
TokenContinue (Int
nextSeqNo Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
mrun :: m b -> IO (StM m b)
mrun = RunInIO m -> m b -> IO (StM m b)
forall (m :: * -> *). RunInIO m -> forall b. m b -> IO (StM m b)
runInIO (RunInIO m -> m b -> IO (StM m b))
-> RunInIO m -> m b -> IO (StM m b)
forall a b. (a -> b) -> a -> b
$ Channel m a -> RunInIO m
forall (m :: * -> *) a. Channel m a -> RunInIO m
svarMrun Channel m a
sv
StM m TokenWorkerStatus
r <- IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus))
-> IO (StM m TokenWorkerStatus) -> m (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$ m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall {b}. m b -> IO (StM m b)
mrun (m TokenWorkerStatus -> IO (StM m TokenWorkerStatus))
-> m TokenWorkerStatus -> IO (StM m TokenWorkerStatus)
forall a b. (a -> b) -> a -> b
$
State StreamK m a
-> (a -> StreamK m a -> m TokenWorkerStatus)
-> (a -> m TokenWorkerStatus)
-> m TokenWorkerStatus
-> StreamK m a
-> m TokenWorkerStatus
forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
forall a. HasCallStack => a
undefined
(Int -> a -> StreamK m a -> m TokenWorkerStatus
yieldOutput Int
nextSeqNo)
(Int -> a -> m TokenWorkerStatus
forall {m :: * -> *}. MonadIO m => Int -> a -> m TokenWorkerStatus
singleOutput Int
nextSeqNo)
m TokenWorkerStatus
stopk
StreamK m a
m
TokenWorkerStatus
res <- StM m TokenWorkerStatus -> m TokenWorkerStatus
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m TokenWorkerStatus
r
case TokenWorkerStatus
res of
TokenContinue Int
seqNo -> Int -> m ()
loopWithToken Int
seqNo
TokenWorkerStatus
TokenSuspend -> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
else m ()
preExit m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
drainHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo
workLoopAhead
:: MonadRunInIO m
=> IORef ([K.StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry K.StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead :: forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo = do
HeapDequeueResult StreamK m a
r <- IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a))
-> IO (HeapDequeueResult StreamK m a)
-> m (HeapDequeueResult StreamK m a)
forall a b. (a -> b) -> a -> b
$ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO (HeapDequeueResult StreamK m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap
case HeapDequeueResult StreamK m a
r of
Ready (Entry Int
seqNo AheadHeapEntry StreamK m a
hent) ->
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> AheadHeapEntry StreamK m a
-> Int
-> Bool
-> m ()
processHeap IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo AheadHeapEntry StreamK m a
hent Int
seqNo Bool
False
HeapDequeueResult StreamK m a
Clearing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Waiting Int
_ -> do
Bool
yieldLimitOk <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Maybe (IORef Count) -> IO Bool
decrementYieldLimit (Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv)
if Bool
yieldLimitOk
then do
Maybe (StreamK m a, Int)
work <- IORef ([StreamK m a], Int) -> m (Maybe (StreamK m a, Int))
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([StreamK m a], Int)
q
case Maybe (StreamK m a, Int)
work of
Maybe (StreamK m a, Int)
Nothing -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
stop Channel m a
sv Maybe WorkerInfo
winfo
Just (StreamK m a
m, Int
seqNo) -> do
if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
then IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> StreamK m a
-> Int
-> m ()
processWithoutToken IORef ([StreamK m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
heap Channel m a
sv Maybe WorkerInfo
winfo StreamK m a
m Int
seqNo
else IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a -> Maybe WorkerInfo -> IO ()
forall (m :: * -> *) a. Channel m a -> Maybe WorkerInfo -> IO ()
abortExecution Channel m a
sv Maybe WorkerInfo
winfo
getLifoSVar :: forall m a. MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar :: forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun Config
cfg = do
IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH <- (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO
(IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int))
forall a. a -> IO (IORef a)
newIORef (Heap (Entry Int (AheadHeapEntry StreamK m a))
forall a. Heap a
H.empty, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
0)
MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
IORef Int
active <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
IORef Bool
wfw <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
Set.empty
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q <- ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> IO
(IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)]))
forall a. a -> IO (IORef a)
newIORef
( [] :: [(RunInIO m, K.StreamK m a)]
, [] :: [(RunInIO m, K.StreamK m a)]
)
IORef ([StreamK m a], Int)
aheadQ <- ([StreamK m a], Int) -> IO (IORef ([StreamK m a], Int))
forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
MVar ()
stopMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
Maybe (IORef Count)
yl <-
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
Maybe YieldRateInfo
rateInfo <- Config -> IO (Maybe YieldRateInfo)
newRateInfo Config
cfg
SVarStats
stats <- IO SVarStats
newSVarStats
ThreadId
tid <- IO ThreadId
myThreadId
let isWorkFinished :: p -> IO Bool
isWorkFinished p
_ = do
([(RunInIO m, StreamK m a)]
xs, [(RunInIO m, StreamK m a)]
ys) <- IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> IO ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
forall a. IORef a -> IO a
readIORef IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return ([(RunInIO m, StreamK m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(RunInIO m, StreamK m a)]
xs Bool -> Bool -> Bool
&& [(RunInIO m, StreamK m a)] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(RunInIO m, StreamK m a)]
ys)
let isWorkFinishedLimited :: Channel m a -> IO Bool
isWorkFinishedLimited Channel m a
sv = do
Bool
yieldsDone <-
case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Just IORef Count
ref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool
qEmpty <- Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished Channel m a
sv
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
qEmpty Bool -> Bool -> Bool
|| Bool
yieldsDone
let eagerEval :: Bool
eagerEval = Config -> Bool
getEagerDispatch Config
cfg
inOrder :: Bool
inOrder = Config -> Bool
getOrdered Config
cfg
let getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, K.StreamK m a)], [(RunInIO m, K.StreamK m a)])
-> Channel m a
-> Maybe WorkerInfo
-> m())
-> Channel m a
getSVar :: Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
readOutput Channel m a -> m Bool
postProc Channel m a -> IO Bool
workDone IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop = Channel :: forall (m :: * -> *) a.
RunInIO m
-> IORef ([ChildEvent a], Int)
-> MVar ()
-> m [ChildEvent a]
-> m Bool
-> Limit
-> Limit
-> Maybe (IORef Count)
-> Maybe YieldRateInfo
-> (Bool -> (RunInIO m, StreamK m a) -> IO ())
-> m ()
-> IO Bool
-> IO Bool
-> IORef Bool
-> (Maybe WorkerInfo -> m ())
-> IORef (Set ThreadId)
-> IORef Int
-> (ThreadId -> m ())
-> MVar ()
-> Maybe (IORef ())
-> SVarStats
-> Bool
-> ThreadId
-> Channel m a
Channel
{ outputQueue :: IORef ([ChildEvent a], Int)
outputQueue = IORef ([ChildEvent a], Int)
outQ
, remainingWork :: Maybe (IORef Count)
remainingWork = Maybe (IORef Count)
yl
, maxBufferLimit :: Limit
maxBufferLimit = Config -> Limit
getMaxBuffer Config
cfg
, maxWorkerLimit :: Limit
maxWorkerLimit = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (Config -> Limit
getMaxThreads Config
cfg) (Config -> Limit
getMaxBuffer Config
cfg)
, yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo = Maybe YieldRateInfo
rateInfo
, outputDoorBell :: MVar ()
outputDoorBell = MVar ()
outQMv
, readOutputQ :: m [ChildEvent a]
readOutputQ = Channel m a -> m [ChildEvent a]
readOutput Channel m a
sv
, postProcess :: m Bool
postProcess = Channel m a -> m Bool
postProc Channel m a
sv
, workerThreads :: IORef (Set ThreadId)
workerThreads = IORef (Set ThreadId)
running
, workLoop :: Maybe WorkerInfo -> m ()
workLoop =
if Bool
inOrder
then IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> Channel m a
-> Maybe WorkerInfo
-> m ()
workLoopAhead IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH Channel m a
sv
else IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
wloop IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Channel m a
sv
, enqueue :: Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue =
\Bool
inner ->
if Bool
inOrder
then Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([StreamK m a], Int) -> (RunInIO m, StreamK m a) -> IO ()
enqueueAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
else Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
forall (m :: * -> *) a.
Channel m a
-> IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Bool
-> (RunInIO m, StreamK m a)
-> IO ()
enqueueLIFO Channel m a
sv IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
q Bool
inner
, eagerDispatch :: m ()
eagerDispatch = Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
eagerEval (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> Channel m a -> m Bool
forall (m :: * -> *) a.
MonadRunInIO m =>
Count -> Channel m a -> m Bool
dispatchWorker Count
0 Channel m a
sv
, isWorkDone :: IO Bool
isWorkDone =
if Bool
inOrder
then Channel m a
-> IORef ([StreamK m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
-> IO Bool
forall {t :: * -> *} {m :: * -> *} {a} {a} {b} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ IORef (Heap (Entry Int (AheadHeapEntry StreamK m a)), Maybe Int)
outH
else Channel m a -> IO Bool
workDone Channel m a
sv
, isQueueDone :: IO Bool
isQueueDone =
if Bool
inOrder
then Channel m a -> IORef ([StreamK m a], Int) -> IO Bool
forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef ([StreamK m a], Int)
aheadQ
else Channel m a -> IO Bool
workDone Channel m a
sv
, doorBellOnWorkQ :: IORef Bool
doorBellOnWorkQ = IORef Bool
wfw
, svarMrun :: RunInIO m
svarMrun = RunInIO m
mrun
, workerCount :: IORef Int
workerCount = IORef Int
active
, accountThread :: ThreadId -> m ()
accountThread = IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
forall (m :: * -> *).
MonadIO m =>
IORef (Set ThreadId) -> MVar () -> ThreadId -> m ()
modifyThread IORef (Set ThreadId)
running MVar ()
outQMv
, workerStopMVar :: MVar ()
workerStopMVar = MVar ()
stopMVar
, svarRef :: Maybe (IORef ())
svarRef = Maybe (IORef ())
forall a. Maybe a
Nothing
, svarInspectMode :: Bool
svarInspectMode = Config -> Bool
getInspectMode Config
cfg
, svarCreator :: ThreadId
svarCreator = ThreadId
tid
, svarStats :: SVarStats
svarStats = SVarStats
stats
}
let sv :: Channel m a
sv =
case Config -> Maybe Rate
getStreamRate Config
cfg of
Maybe Rate
Nothing ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv (Bool -> Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Bool -> Channel m a -> m [ChildEvent a]
readOutputQBounded Bool
eagerEval)
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessBounded
Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
Just Rate
_ ->
case Config -> Maybe Count
getYieldLimit Config
cfg of
Maybe Count
Nothing -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
Channel m a -> IO Bool
forall {p}. p -> IO Bool
isWorkFinished
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFO
Just Count
_ -> Channel m a
-> (Channel m a -> m [ChildEvent a])
-> (Channel m a -> m Bool)
-> (Channel m a -> IO Bool)
-> (IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ())
-> Channel m a
getSVar Channel m a
sv Channel m a -> m [ChildEvent a]
forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> m [ChildEvent a]
readOutputQPaced
Channel m a -> m Bool
forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m Bool
postProcessPaced
Channel m a -> IO Bool
forall {m :: * -> *} {a}. Channel m a -> IO Bool
isWorkFinishedLimited
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
forall (m :: * -> *) a.
MonadRunInIO m =>
IORef ([(RunInIO m, StreamK m a)], [(RunInIO m, StreamK m a)])
-> Channel m a -> Maybe WorkerInfo -> m ()
workLoopLIFOLimited
in Channel m a -> IO (Channel m a)
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a
sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead :: Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q = do
Bool
queueDone <- IORef (t a, b) -> IO Bool
forall {t :: * -> *} {a} {b}.
Foldable t =>
IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
Bool
yieldsDone <-
case Channel m a -> Maybe (IORef Count)
forall (m :: * -> *) a. Channel m a -> Maybe (IORef Count)
remainingWork Channel m a
sv of
Just IORef Count
yref -> do
Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
yref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead :: Channel m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead Channel m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
Bool
heapDone <- do
(Heap a
hp, b
_) <- IORef (Heap a, b) -> IO (Heap a, b)
forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
hp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0)
Bool
queueDone <- Channel m a -> IORef (t a, b) -> IO Bool
forall {t :: * -> *} {m :: * -> *} {a} {a} {b}.
Foldable t =>
Channel m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead Channel m a
sv IORef (t a, b)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone
checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
(t a
xs, b
_) <- IORef (t a, b) -> IO (t a, b)
forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ t a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs
{-# INLINABLE newAppendChannel #-}
{-# SPECIALIZE newAppendChannel :: (Config -> Config) -> IO (Channel IO a) #-}
newAppendChannel :: MonadRunInIO m => (Config -> Config) -> m (Channel m a)
newAppendChannel :: forall (m :: * -> *) a.
MonadRunInIO m =>
(Config -> Config) -> m (Channel m a)
newAppendChannel Config -> Config
modifier = do
RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
IO (Channel m a) -> m (Channel m a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a) -> m (Channel m a))
-> IO (Channel m a) -> m (Channel m a)
forall a b. (a -> b) -> a -> b
$ RunInIO m -> Config -> IO (Channel m a)
forall (m :: * -> *) a.
MonadRunInIO m =>
RunInIO m -> Config -> IO (Channel m a)
getLifoSVar RunInIO m
mrun (Config -> Config
modifier Config
defaultConfig)