-- |
-- Module      : Streamly.Internal.Data.SVar
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.SVar
    {-# DEPRECATED "SVar is replaced by Channel." #-}
    (
      module Streamly.Internal.Data.SVar.Type
    , module Streamly.Internal.Data.SVar.Worker
    , module Streamly.Internal.Data.SVar.Dispatch
    , module Streamly.Internal.Data.SVar.Pull

    -- * New SVar
    , getYieldRateInfo
    , newSVarStats

    -- ** Parallel
    , newParallelVar

    -- ** Ahead
    , enqueueAhead
    , reEnqueueAhead
    , queueEmptyAhead
    , dequeueAhead
    , HeapDequeueResult(..)
    , dequeueFromHeap
    , dequeueFromHeapSeq
    , requeueOnHeapTop
    , updateHeapSeq
    , withIORef
    , heapIsSane
    , newAheadVar
    )
where

#include "inline.hs"

import Control.Concurrent (myThreadId, takeMVar)
import Control.Concurrent.MVar (newEmptyMVar, tryPutMVar, tryTakeMVar, newMVar)
import Control.Exception (assert)
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.Heap (Heap, Entry(..))
import Data.IORef (newIORef, readIORef)
import Data.IORef (IORef, atomicModifyIORef)
import Streamly.Internal.Control.Concurrent (MonadAsync, askRunInIO)
import Streamly.Internal.Data.Atomics
       (atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Time.Units (NanoSecond64(..))

import qualified Data.Heap as H
import qualified Data.Set as S

import Streamly.Internal.Data.SVar.Dispatch
import Streamly.Internal.Data.SVar.Pull
import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.SVar.Worker

-------------------------------------------------------------------------------
-- Creating an SVar
-------------------------------------------------------------------------------

getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st = do
    -- convert rate in Hertz to latency in Nanoseconds
    let rateToLatency :: a -> a
rateToLatency a
r = if a
r a -> a -> Bool
forall a. Ord a => a -> a -> Bool
<= a
0 then a
forall a. Bounded a => a
maxBound else a -> a
forall b. Integral b => a -> b
forall a b. (RealFrac a, Integral b) => a -> b
round (a -> a) -> a -> a
forall a b. (a -> b) -> a -> b
$ a
1.0e9 a -> a -> a
forall a. Fractional a => a -> a -> a
/ a
r
    case State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
        Just (Rate Double
low Double
goal Double
high Int
buf) ->
            let l :: NanoSecond64
l    = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
goal
                minl :: NanoSecond64
minl = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
high
                maxl :: NanoSecond64
maxl = Double -> NanoSecond64
forall {a} {a}. (Bounded a, RealFrac a, Integral a) => a -> a
rateToLatency Double
low
            in NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
l (NanoSecond64 -> NanoSecond64 -> LatencyRange
LatencyRange NanoSecond64
minl NanoSecond64
maxl) Int
buf
        Maybe Rate
Nothing -> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe YieldRateInfo
forall a. Maybe a
Nothing

    where

    mkYieldRateInfo :: NanoSecond64 -> LatencyRange -> Int -> IO (Maybe YieldRateInfo)
mkYieldRateInfo NanoSecond64
latency LatencyRange
latRange Int
buf = do
        IORef NanoSecond64
measured <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef NanoSecond64
0
        IORef (Count, Count, NanoSecond64)
wcur     <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        IORef (Count, Count, NanoSecond64)
wcol     <- (Count, Count, NanoSecond64)
-> IO (IORef (Count, Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0,Count
0,NanoSecond64
0)
        AbsTime
now      <- Clock -> IO AbsTime
getTime Clock
Monotonic
        IORef (Count, AbsTime)
wlong    <- (Count, AbsTime) -> IO (IORef (Count, AbsTime))
forall a. a -> IO (IORef a)
newIORef (Count
0,AbsTime
now)
        IORef Count
period   <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
1
        IORef Count
gainLoss <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef (Int64 -> Count
Count Int64
0)

        Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe YieldRateInfo -> IO (Maybe YieldRateInfo))
-> Maybe YieldRateInfo -> IO (Maybe YieldRateInfo)
forall a b. (a -> b) -> a -> b
$ YieldRateInfo -> Maybe YieldRateInfo
forall a. a -> Maybe a
Just YieldRateInfo
            { svarLatencyTarget :: NanoSecond64
svarLatencyTarget      = NanoSecond64
latency
            , svarLatencyRange :: LatencyRange
svarLatencyRange       = LatencyRange
latRange
            , svarRateBuffer :: Int
svarRateBuffer         = Int
buf
            , svarGainedLostYields :: IORef Count
svarGainedLostYields   = IORef Count
gainLoss
            , workerBootstrapLatency :: Maybe NanoSecond64
workerBootstrapLatency = State t m a -> Maybe NanoSecond64
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe NanoSecond64
getStreamLatency State t m a
st
            , workerPollingInterval :: IORef Count
workerPollingInterval  = IORef Count
period
            , workerMeasuredLatency :: IORef NanoSecond64
workerMeasuredLatency  = IORef NanoSecond64
measured
            , workerPendingLatency :: IORef (Count, Count, NanoSecond64)
workerPendingLatency   = IORef (Count, Count, NanoSecond64)
wcur
            , workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
workerCollectedLatency = IORef (Count, Count, NanoSecond64)
wcol
            , svarAllTimeLatency :: IORef (Count, AbsTime)
svarAllTimeLatency     = IORef (Count, AbsTime)
wlong
            }

newSVarStats :: IO SVarStats
newSVarStats :: IO SVarStats
newSVarStats = do
    IORef Int
disp   <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWrk <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxOq  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxHs  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Int
maxWq  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef (Count, NanoSecond64)
avgLat <- (Count, NanoSecond64) -> IO (IORef (Count, NanoSecond64))
forall a. a -> IO (IORef a)
newIORef (Count
0, Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
maxLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef NanoSecond64
minLat <- NanoSecond64 -> IO (IORef NanoSecond64)
forall a. a -> IO (IORef a)
newIORef (Int64 -> NanoSecond64
NanoSecond64 Int64
0)
    IORef (Maybe AbsTime)
stpTime <- Maybe AbsTime -> IO (IORef (Maybe AbsTime))
forall a. a -> IO (IORef a)
newIORef Maybe AbsTime
forall a. Maybe a
Nothing

    SVarStats -> IO SVarStats
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return SVarStats
        { totalDispatches :: IORef Int
totalDispatches  = IORef Int
disp
        , maxWorkers :: IORef Int
maxWorkers       = IORef Int
maxWrk
        , maxOutQSize :: IORef Int
maxOutQSize      = IORef Int
maxOq
        , maxHeapSize :: IORef Int
maxHeapSize      = IORef Int
maxHs
        , maxWorkQSize :: IORef Int
maxWorkQSize     = IORef Int
maxWq
        , avgWorkerLatency :: IORef (Count, NanoSecond64)
avgWorkerLatency = IORef (Count, NanoSecond64)
avgLat
        , minWorkerLatency :: IORef NanoSecond64
minWorkerLatency = IORef NanoSecond64
minLat
        , maxWorkerLatency :: IORef NanoSecond64
maxWorkerLatency = IORef NanoSecond64
maxLat
        , svarStopTime :: IORef (Maybe AbsTime)
svarStopTime     = IORef (Maybe AbsTime)
stpTime
        }

-------------------------------------------------------------------------------
-- Ahead
-------------------------------------------------------------------------------

-- Lookahead streams can execute multiple tasks concurrently, ahead of time,
-- but always serve them in the same order as they appear in the stream. To
-- implement lookahead streams efficiently we assign a sequence number to each
-- task when the task is picked up for execution. When the task finishes, the
-- output is tagged with the same sequence number and we rearrange the outputs
-- in sequence based on that number.
--
-- To explain the mechanism imagine that the current task at the head of the
-- stream has a "token" to yield to the outputQueue. The ownership of the token
-- is determined by the current sequence number is maintained in outputHeap.
-- Sequence number is assigned when a task is queued. When a thread dequeues a
-- task it picks up the sequence number as well and when the output is ready it
-- uses the sequence number to queue the output to the outputQueue.
--
-- The thread with current sequence number sends the output directly to the
-- outputQueue. Other threads push the output to the outputHeap. When the task
-- being queued on the heap is a stream of many elements we evaluate only the
-- first element and keep the rest of the unevaluated computation in the heap.
-- When such a task gets the "token" for outputQueue it evaluates and directly
-- yields all the elements to the outputQueue without checking for the
-- "token".
--
-- Note that no two outputs in the heap can have the same sequence numbers and
-- therefore we do not need a stable heap. We have also separated the buffer
-- for the current task (outputQueue) and the pending tasks (outputHeap) so
-- that the pending tasks cannot interfere with the current task. Note that for
-- a single task just the outputQueue is enough and for the case of many
-- threads just a heap is good enough. However we balance between these two
-- cases, so that both are efficient.
--
-- For bigger streams it may make sense to have separate buffers for each
-- stream. However, for singleton streams this may become inefficient. However,
-- if we do not have separate buffers, then the streams that come later in
-- sequence may hog the buffer, hindering the streams that are ahead. For this
-- reason we have a single element buffer limitation for the streams being
-- executed in advance.
--
-- This scheme works pretty efficiently with less than 40% extra overhead
-- compared to the Async streams where we do not have any kind of sequencing of
-- the outputs. It is especially devised so that we are most efficient when we
-- have short tasks and need just a single thread. Also when a thread yields
-- many items it can hold lockfree access to the outputQueue and do it
-- efficiently.
--
-- XXX Maybe we can start the ahead threads at a lower cpu and IO priority so
-- that they do not hog the resources and hinder the progress of the threads in
-- front of them.

-- XXX Left associated ahead expressions are expensive. We start a new SVar for
-- each left associative expression. The queue is used only for right
-- associated expression, we queue the right expression and execute the left.
-- Thererefore the queue never has more than one item in it.
--
-- XXX we can fix this. When we queue more than one item on the queue we can
-- mark the previously queued item as not-runnable. The not-runnable item is
-- not dequeued until the already running one has finished and at that time we
-- would also know the exact sequence number of the already queued item.
--
-- we can even run the already queued items but they will have to be sorted in
-- layers in the heap. We can use a list of heaps for that.
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q (RunInIO m, t m a)
m = do
    IORef ([t m a], Int) -> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q ((([t m a], Int) -> ([t m a], Int)) -> IO ())
-> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ case
        ([], Int
n) -> ([(RunInIO m, t m a) -> t m a
forall a b. (a, b) -> b
snd (RunInIO m, t m a)
m], Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)  -- increment sequence
        ([t m a], Int)
_ -> [Char] -> ([t m a], Int)
forall a. HasCallStack => [Char] -> a
error [Char]
"enqueueAhead: queue is not empty"
    SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv

-- enqueue without incrementing the sequence number
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead SVar t m a
sv IORef ([t m a], Int)
q t m a
m = do
    IORef ([t m a], Int) -> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef ([t m a], Int)
q ((([t m a], Int) -> ([t m a], Int)) -> IO ())
-> (([t m a], Int) -> ([t m a], Int)) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ case
        ([], Int
n) -> ([t m a
m], Int
n)  -- DO NOT increment sequence
        ([t m a], Int)
_ -> [Char] -> ([t m a], Int)
forall a. HasCallStack => [Char] -> a
error [Char]
"reEnqueueAhead: queue is not empty"
    SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
ringDoorBell SVar t m a
sv

-- Normally the thread that has the token should never go away. The token gets
-- handed over to another thread, but someone or the other has the token at any
-- point of time. But if the task that has the token finds that the outputQueue
-- is full, in that case it can go away without even handing over the token to
-- another thread. In that case it sets the nextSequence number in the heap its
-- own sequence number before going away. To handle this case, any task that
-- does not have the token tries to dequeue from the heap first before
-- dequeuing from the work queue. If it finds that the task at the top of the
-- heap is the one that owns the current sequence number then it grabs the
-- token and starts with that.
--
-- XXX instead of queueing just the head element and the remaining computation
-- on the heap, evaluate as many as we can and place them on the heap. But we
-- need to give higher priority to the lower sequence numbers so that lower
-- priority tasks do not fill up the heap making higher priority tasks block
-- due to full heap. Maybe we can have a weighted space for them in the heap.
-- The weight is inversely proportional to the sequence number.
--
-- XXX review for livelock
--
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m Bool
queueEmptyAhead IORef ([t m a], Int)
q = IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
    ([t m a]
xs, Int
_) <- IORef ([t m a], Int) -> IO ([t m a], Int)
forall a. IORef a -> IO a
readIORef IORef ([t m a], Int)
q
    Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ [t m a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [t m a]
xs

{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
    => IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead IORef ([t m a], Int)
q = IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int)) -> m (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$
    IORef ([t m a], Int)
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([t m a], Int)
q ((([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
 -> IO (Maybe (t m a, Int)))
-> (([t m a], Int) -> (([t m a], Int), Maybe (t m a, Int)))
-> IO (Maybe (t m a, Int))
forall a b. (a -> b) -> a -> b
$ \case
            ([], Int
n) -> (([], Int
n), Maybe (t m a, Int)
forall a. Maybe a
Nothing)
            (t m a
x : [], Int
n) -> (([], Int
n), (t m a, Int) -> Maybe (t m a, Int)
forall a. a -> Maybe a
Just (t m a
x, Int
n))
            ([t m a], Int)
_ -> [Char] -> (([t m a], Int), Maybe (t m a, Int))
forall a. HasCallStack => [Char] -> a
error [Char]
"more than one item on queue"

-------------------------------------------------------------------------------
-- Heap manipulation
-------------------------------------------------------------------------------

withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef :: forall a b. IORef a -> (a -> IO b) -> IO b
withIORef IORef a
ref a -> IO b
f = IORef a -> IO a
forall a. IORef a -> IO a
readIORef IORef a
ref IO a -> (a -> IO b) -> IO b
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= a -> IO b
f

atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ :: forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef a
ref a -> a
f =
    IORef a -> (a -> (a, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef a
ref ((a -> (a, ())) -> IO ()) -> (a -> (a, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \a
x -> (a -> a
f a
x, ())

data HeapDequeueResult t m a =
      Clearing
    | Waiting Int
    | Ready (Entry Int (AheadHeapEntry t m a))

{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> IO (HeapDequeueResult t m a)
dequeueFromHeap :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
      HeapDequeueResult t m a))
 -> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \pair :: (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair@(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
HeapDequeueResult t m a
Clearing)
            Just Int
n -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
     (Entry Int (AheadHeapEntry t m a),
      Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                            if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
n
                            then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                            else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n) ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
pair, Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
n)

{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
i =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
      HeapDequeueResult t m a))
 -> IO (HeapDequeueResult t m a))
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
        HeapDequeueResult t m a))
-> IO (HeapDequeueResult t m a)
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        case Maybe Int
snum of
            Maybe Int
Nothing -> do
                let r :: Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r = Heap (Entry Int (AheadHeapEntry t m a))
-> Maybe
     (Entry Int (AheadHeapEntry t m a),
      Heap (Entry Int (AheadHeapEntry t m a)))
forall a. Heap a -> Maybe (a, Heap a)
H.uncons Heap (Entry Int (AheadHeapEntry t m a))
hp
                case Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
r of
                    Just (ent :: Entry Int (AheadHeapEntry t m a)
ent@(Entry Int
seqNo AheadHeapEntry t m a
_ev), Heap (Entry Int (AheadHeapEntry t m a))
hp') ->
                        if Int
seqNo Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
i
                        then ((Heap (Entry Int (AheadHeapEntry t m a))
hp', Maybe Int
forall a. Maybe a
Nothing), Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Entry Int (AheadHeapEntry t m a) -> HeapDequeueResult t m a
Ready Entry Int (AheadHeapEntry t m a)
ent)
                        else Bool
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
forall a. HasCallStack => Bool -> a -> a
assert (Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
i) ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
                    Maybe
  (Entry Int (AheadHeapEntry t m a),
   Heap (Entry Int (AheadHeapEntry t m a)))
Nothing -> ((Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
i), Int -> HeapDequeueResult t m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
Int -> HeapDequeueResult t m a
Waiting Int
i)
            Just Int
_ -> [Char]
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int),
    HeapDequeueResult t m a)
forall a. HasCallStack => [Char] -> a
error [Char]
"dequeueFromHeapSeq: unreachable"

heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo =
    case Maybe Int
snum of
        Maybe Int
Nothing -> Bool
True
        Just Int
n -> Int
seqNo Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
n

{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Entry Int (AheadHeapEntry t m a)
    -> Int
    -> IO ()
requeueOnHeapTop :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a) -> Int -> IO ()
requeueOnHeapTop IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Entry Int (AheadHeapEntry t m a)
ent Int
seqNo =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
 -> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Entry Int (AheadHeapEntry t m a)
-> Heap (Entry Int (AheadHeapEntry t m a))
-> Heap (Entry Int (AheadHeapEntry t m a))
forall a. Ord a => a -> Heap a -> Heap a
H.insert Entry Int (AheadHeapEntry t m a)
ent Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)

{-# INLINE updateHeapSeq #-}
updateHeapSeq
    :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> Int
    -> IO ()
updateHeapSeq :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int -> IO ()
updateHeapSeq IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar Int
seqNo =
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORef_ IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
hpVar (((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
  -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
 -> IO ())
-> ((Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
-> IO ()
forall a b. (a -> b) -> a -> b
$ \(Heap (Entry Int (AheadHeapEntry t m a))
hp, Maybe Int
snum) ->
        Bool
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => Bool -> a -> a
assert (Maybe Int -> Int -> Bool
heapIsSane Maybe Int
snum Int
seqNo) (Heap (Entry Int (AheadHeapEntry t m a))
hp, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
seqNo)

-- XXX remove polymorphism in t, inline f
getAheadSVar :: MonadAsync m
    => State t m a
    -> (   IORef ([t m a], Int)
        -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
        -> State t m a
        -> SVar t m a
        -> Maybe WorkerInfo
        -> m ())
    -> RunInIO m
    -> IO (SVar t m a)
getAheadSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f RunInIO m
mrun = do
    IORef ([ChildEvent a], Int)
outQ    <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    -- the second component of the tuple is "Nothing" when heap is being
    -- cleared, "Just n" when we are expecting sequence number n to arrive
    -- before we can start clearing the heap.
    IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH    <- (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int))
forall a. a -> IO (IORef a)
newIORef (Heap (Entry Int (AheadHeapEntry t m a))
forall a. Heap a
H.empty, Int -> Maybe Int
forall a. a -> Maybe a
Just Int
0)
    MVar ()
outQMv  <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef Bool
wfw     <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
    IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
    -- Sequence number is incremented whenever something is queued, therefore,
    -- first sequence number would be 0
    IORef ([t m a], Int)
q <- ([t m a], Int) -> IO (IORef ([t m a], Int))
forall a. a -> IO (IORef a)
newIORef ([], -Int
1)
    MVar ()
stopMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
    Maybe (IORef Count)
yl <- case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
            Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
            Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- State t m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let getSVar :: SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
readOutput SVar t m a -> m Bool
postProc = SVar
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
forall a. HasCallStack => a
undefined
            , remainingWork :: Maybe (IORef Count)
remainingWork  = Maybe (IORef Count)
yl
            , maxBufferLimit :: Limit
maxBufferLimit   = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
            , pushBufferSpace :: IORef Count
pushBufferSpace = IORef Count
forall a. HasCallStack => a
undefined
            , pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
forall a. HasCallStack => a
undefined
            , pushBufferMVar :: MVar ()
pushBufferMVar   = MVar ()
forall a. HasCallStack => a
undefined
            , maxWorkerLimit :: Limit
maxWorkerLimit   = Limit -> Limit -> Limit
forall a. Ord a => a -> a -> a
min (State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxThreads State t m a
st) (State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st)
            , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
forall a. HasCallStack => a
undefined
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = SVar t m a -> m [ChildEvent a]
readOutput SVar t m a
sv
            , postProcess :: m Bool
postProcess      = SVar t m a -> m Bool
postProc SVar t m a
sv
            , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
            , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
f IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH State t m a
st{streamVar = Just sv} SVar t m a
sv
            , enqueue :: (RunInIO m, t m a) -> IO ()
enqueue          = SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([t m a], Int) -> (RunInIO m, t m a) -> IO ()
enqueueAhead SVar t m a
sv IORef ([t m a], Int)
q
            , isWorkDone :: IO Bool
isWorkDone       = SVar t m a
-> IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO Bool
forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
       {a} {b} {a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef ([t m a], Int)
q IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
            , isQueueDone :: IO Bool
isQueueDone      = SVar t m a -> IORef ([t m a], Int) -> IO Bool
forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
       {a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef ([t m a], Int)
q
            , needDoorBell :: IORef Bool
needDoorBell     = IORef Bool
wfw
            , svarStyle :: SVarStyle
svarStyle        = SVarStyle
AheadVar
            , svarStopStyle :: SVarStopStyle
svarStopStyle    = SVarStopStyle
StopNone
            , svarStopBy :: IORef ThreadId
svarStopBy       = IORef ThreadId
forall a. HasCallStack => a
undefined
            , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
            , workerCount :: IORef Int
workerCount      = IORef Int
active
            , accountThread :: ThreadId -> m ()
accountThread    = SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
delThread SVar t m a
sv
            , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
stopMVar
            , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue   = IORef ([t m a], Int)
q
            , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap       = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outH
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: SVar t m a
sv =
            case State t m a -> Maybe Rate
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Rate
getStreamRate State t m a
st of
                Maybe Rate
Nothing -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded
                Just Rate
_  -> SVar t m a
-> (SVar t m a -> m [ChildEvent a])
-> (SVar t m a -> m Bool)
-> SVar t m a
getSVar SVar t m a
sv SVar t m a -> m [ChildEvent a]
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced
     in SVar t m a -> IO (SVar t m a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
sv

    where

    {-# INLINE isQueueDoneAhead #-}
    isQueueDoneAhead :: SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q = do
        Bool
queueDone <- IORef (t a, b) -> IO Bool
forall {t :: * -> *} {a} {b}.
Foldable t =>
IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q
        Bool
yieldsDone <-
                case SVar t m a -> Maybe (IORef Count)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe (IORef Count)
remainingWork SVar t m a
sv of
                    Just IORef Count
yref -> do
                        Count
n <- IORef Count -> IO Count
forall a. IORef a -> IO a
readIORef IORef Count
yref
                        Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Count
n Count -> Count -> Bool
forall a. Ord a => a -> a -> Bool
<= Count
0)
                    Maybe (IORef Count)
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        -- XXX note that yieldsDone can only be authoritative only when there
        -- are no workers running. If there are active workers they can
        -- later increment the yield count and therefore change the result.
        Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
yieldsDone Bool -> Bool -> Bool
|| Bool
queueDone

    {-# INLINE isWorkDoneAhead #-}
    isWorkDoneAhead :: SVar t m a -> IORef (t a, b) -> IORef (Heap a, b) -> IO Bool
isWorkDoneAhead SVar t m a
sv IORef (t a, b)
q IORef (Heap a, b)
ref = do
        Bool
heapDone <- do
                (Heap a
hp, b
_) <- IORef (Heap a, b) -> IO (Heap a, b)
forall a. IORef a -> IO a
readIORef IORef (Heap a, b)
ref
                Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Heap a -> Int
forall a. Heap a -> Int
H.size Heap a
hp Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0)
        Bool
queueDone <- SVar t m a -> IORef (t a, b) -> IO Bool
forall {t :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}
       {a} {b}.
Foldable t =>
SVar t m a -> IORef (t a, b) -> IO Bool
isQueueDoneAhead SVar t m a
sv IORef (t a, b)
q
        Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ Bool
heapDone Bool -> Bool -> Bool
&& Bool
queueDone

    checkEmpty :: IORef (t a, b) -> IO Bool
checkEmpty IORef (t a, b)
q = do
        (t a
xs, b
_) <- IORef (t a, b) -> IO (t a, b)
forall a. IORef a -> IO a
readIORef IORef (t a, b)
q
        Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ t a -> Bool
forall a. t a -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null t a
xs

{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
    => State t m a
    -> t m a
    -> (   IORef ([t m a], Int)
        -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
        -> State t m a
        -> SVar t m a
        -> Maybe WorkerInfo
        -> m ())
    -> m (SVar t m a)
newAheadVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> m (SVar t m a)
newAheadVar State t m a
st t m a
m IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop = do
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    SVar t m a
sv <- IO (SVar t m a) -> m (SVar t m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar t m a) -> m (SVar t m a))
-> IO (SVar t m a) -> m (SVar t m a)
forall a b. (a -> b) -> a -> b
$ State t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> RunInIO m
-> IO (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
State t m a
-> (IORef ([t m a], Int)
    -> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
    -> State t m a
    -> SVar t m a
    -> Maybe WorkerInfo
    -> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar State t m a
st IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ()
wloop RunInIO m
mrun
    SVar t m a -> t m a -> m (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker SVar t m a
sv t m a
m

-------------------------------------------------------------------------------
-- WAhead
-------------------------------------------------------------------------------

-- XXX To be implemented. Use a linked queue like WAsync and put back the
-- remaining computation at the back of the queue instead of the heap, and
-- increment the sequence number.

-------------------------------------------------------------------------------
-- Parallel
-------------------------------------------------------------------------------

getParallelSVar :: MonadIO m
    => SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun = do
    IORef ([ChildEvent a], Int)
outQ    <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    IORef ([ChildEvent a], Int)
outQRev <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv  <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    MVar ()
outQMvRev <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef Int
active  <- Int -> IO (IORef Int)
forall a. a -> IO (IORef a)
newIORef Int
0
    IORef (Set ThreadId)
running <- Set ThreadId -> IO (IORef (Set ThreadId))
forall a. a -> IO (IORef a)
newIORef Set ThreadId
forall a. Set a
S.empty
    Maybe (IORef Count)
yl <- case State t m a -> Maybe Count
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Maybe Count
getYieldLimit State t m a
st of
            Maybe Count
Nothing -> Maybe (IORef Count) -> IO (Maybe (IORef Count))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IORef Count)
forall a. Maybe a
Nothing
            Just Count
x -> IORef Count -> Maybe (IORef Count)
forall a. a -> Maybe a
Just (IORef Count -> Maybe (IORef Count))
-> IO (IORef Count) -> IO (Maybe (IORef Count))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
x
    Maybe YieldRateInfo
rateInfo <- State t m a -> IO (Maybe YieldRateInfo)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo State t m a
st
    let bufLim :: Count
bufLim =
            case State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st of
                Limit
Unlimited -> Count
forall a. HasCallStack => a
undefined
                Limited Word
x -> Word -> Count
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
x
    IORef Count
remBuf <- Count -> IO (IORef Count)
forall a. a -> IO (IORef a)
newIORef Count
bufLim
    MVar ()
pbMVar <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    IORef ThreadId
stopBy <-
        case SVarStopStyle
ss of
            SVarStopStyle
StopBy -> IO (IORef ThreadId) -> IO (IORef ThreadId)
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef ThreadId) -> IO (IORef ThreadId))
-> IO (IORef ThreadId) -> IO (IORef ThreadId)
forall a b. (a -> b) -> a -> b
$ ThreadId -> IO (IORef ThreadId)
forall a. a -> IO (IORef a)
newIORef ThreadId
forall a. HasCallStack => a
undefined
            SVarStopStyle
_ -> IORef ThreadId -> IO (IORef ThreadId)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return IORef ThreadId
forall a. HasCallStack => a
undefined

    let sv :: SVar t m a
sv =
            SVar { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
                 , outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
outputQueueFromConsumer = IORef ([ChildEvent a], Int)
outQRev
                 , remainingWork :: Maybe (IORef Count)
remainingWork    = Maybe (IORef Count)
yl
                 , maxBufferLimit :: Limit
maxBufferLimit   = State t m a -> Limit
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Limit
getMaxBuffer State t m a
st
                 , pushBufferSpace :: IORef Count
pushBufferSpace  = IORef Count
remBuf
                 , pushBufferPolicy :: PushBufferPolicy
pushBufferPolicy = PushBufferPolicy
PushBufferBlock
                 , pushBufferMVar :: MVar ()
pushBufferMVar   = MVar ()
pbMVar
                 , maxWorkerLimit :: Limit
maxWorkerLimit   = Limit
Unlimited
                 -- Used only for diagnostics
                 , yieldRateInfo :: Maybe YieldRateInfo
yieldRateInfo    = Maybe YieldRateInfo
rateInfo
                 , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
                 , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
                 , readOutputQ :: m [ChildEvent a]
readOutputQ      = SVar t m a -> m [ChildEvent a]
forall {m :: * -> *} {t :: (* -> *) -> * -> *} {m :: * -> *} {a}.
MonadIO m =>
SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv
                 , postProcess :: m Bool
postProcess      = SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
                 , workerThreads :: IORef (Set ThreadId)
workerThreads    = IORef (Set ThreadId)
running
                 , workLoop :: Maybe WorkerInfo -> m ()
workLoop         = Maybe WorkerInfo -> m ()
forall a. HasCallStack => a
undefined
                 , enqueue :: (RunInIO m, t m a) -> IO ()
enqueue          = (RunInIO m, t m a) -> IO ()
forall a. HasCallStack => a
undefined
                 , isWorkDone :: IO Bool
isWorkDone       = IO Bool
forall a. HasCallStack => a
undefined
                 , isQueueDone :: IO Bool
isQueueDone      = IO Bool
forall a. HasCallStack => a
undefined
                 , needDoorBell :: IORef Bool
needDoorBell     = IORef Bool
forall a. HasCallStack => a
undefined
                 , svarStyle :: SVarStyle
svarStyle        = SVarStyle
ParallelVar
                 , svarStopStyle :: SVarStopStyle
svarStopStyle    = SVarStopStyle
ss
                 , svarStopBy :: IORef ThreadId
svarStopBy       = IORef ThreadId
stopBy
                 , svarMrun :: RunInIO m
svarMrun         = RunInIO m
mrun
                 , workerCount :: IORef Int
workerCount      = IORef Int
active
                 , accountThread :: ThreadId -> m ()
accountThread    = SVar t m a -> ThreadId -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> ThreadId -> m ()
modifyThread SVar t m a
sv
                 , workerStopMVar :: MVar ()
workerStopMVar   = MVar ()
forall a. HasCallStack => a
undefined
                 , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
                 , svarInspectMode :: Bool
svarInspectMode  = State t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
State t m a -> Bool
getInspectMode State t m a
st
                 , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
                 , aheadWorkQueue :: IORef ([t m a], Int)
aheadWorkQueue   = IORef ([t m a], Int)
forall a. HasCallStack => a
undefined
                 , outputHeap :: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
outputHeap       = IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
forall a. HasCallStack => a
undefined
                 , svarStats :: SVarStats
svarStats        = SVarStats
stats
                 }
     in SVar t m a -> IO (SVar t m a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return SVar t m a
forall {t :: (* -> *) -> * -> *}. SVar t m a
sv

    where

    readOutputQPar :: SVar t m a -> m [ChildEvent a]
readOutputQPar SVar t m a
sv = IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ do
        SVar t m a -> [Char] -> IO () -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [Char] -> IO () -> IO ()
withDiagMVar SVar t m a
sv [Char]
"readOutputQPar: doorbell"
            (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
outputDoorBell SVar t m a
sv)
        case SVar t m a -> Maybe YieldRateInfo
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe YieldRateInfo
yieldRateInfo SVar t m a
sv of
            Maybe YieldRateInfo
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just YieldRateInfo
yinfo -> IO (Count, AbsTime, NanoSecond64) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Count, AbsTime, NanoSecond64) -> IO ())
-> IO (Count, AbsTime, NanoSecond64) -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a
-> YieldRateInfo -> Bool -> IO (Count, AbsTime, NanoSecond64)
collectLatency SVar t m a
sv YieldRateInfo
yinfo Bool
False
        [ChildEvent a]
r <- ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
        IO () -> IO ()
forall a. IO a -> IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            IO (Maybe ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Maybe ()) -> IO ()) -> IO (Maybe ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryTakeMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv)
            SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
resetBufferLimit SVar t m a
sv
            IO ()
writeBarrier
            IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (SVar t m a -> MVar ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> MVar ()
pushBufferMVar SVar t m a
sv) ()
        [ChildEvent a] -> IO [ChildEvent a]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
r

{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m
    => SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar SVarStopStyle
ss State t m a
st = do
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    IO (SVar t m a) -> m (SVar t m a)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SVar t m a) -> m (SVar t m a))
-> IO (SVar t m a) -> m (SVar t m a)
forall a b. (a -> b) -> a -> b
$ SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar SVarStopStyle
ss State t m a
st RunInIO m
mrun