{-# LANGUAGE CPP #-}
{-# LANGUAGE KindSignatures #-}
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE UnboxedTuples #-}
module Streamly.Internal.Data.SVar
(
MonadAsync
, SVarStyle (..)
, SVarStopStyle (..)
, SVar (..)
, Limit (..)
, State (streamVar)
, defState
, adaptState
, getMaxThreads
, setMaxThreads
, getMaxBuffer
, setMaxBuffer
, getStreamRate
, setStreamRate
, setStreamLatency
, getYieldLimit
, setYieldLimit
, getInspectMode
, setInspectMode
, recordMaxWorkers
, cleanupSVar
, cleanupSVarFromWorker
, newAheadVar
, newParallelVar
, captureMonadState
, RunInIO (..)
, WorkerInfo (..)
, YieldRateInfo (..)
, ThreadAbort (..)
, ChildEvent (..)
, AheadHeapEntry (..)
, send
, sendToProducer
, sendYield
, sendStop
, sendStopToProducer
, enqueueLIFO
, enqueueFIFO
, enqueueAhead
, reEnqueueAhead
, pushWorkerPar
, handleChildException
, handleFoldException
, queueEmptyAhead
, dequeueAhead
, HeapDequeueResult(..)
, dequeueFromHeap
, dequeueFromHeapSeq
, requeueOnHeapTop
, updateHeapSeq
, withIORef
, heapIsSane
, Rate (..)
, getYieldRateInfo
, newSVarStats
, collectLatency
, workerUpdateLatency
, isBeyondMaxRate
, workerRateControl
, updateYieldCount
, decrementYieldLimit
, incrementYieldLimit
, decrementBufferLimit
, incrementBufferLimit
, postProcessBounded
, postProcessPaced
, readOutputQBounded
, readOutputQPaced
, readOutputQBasic
, dispatchWorkerPaced
, sendFirstWorker
, delThread
, modifyThread
, doFork
, fork
, forkManaged
, toStreamVar
, SVarStats (..)
, dumpSVar
, printSVar
, withDiagMVar
)
where
import Control.Concurrent
(ThreadId, myThreadId, threadDelay, throwTo, forkIO, killThread)
import Control.Concurrent.MVar
(MVar, newEmptyMVar, tryPutMVar, takeMVar, tryTakeMVar, newMVar,
tryReadMVar)
import Control.Exception
(SomeException(..), catch, mask, assert, Exception, catches,
throwIO, Handler(..), BlockedIndefinitelyOnMVar(..),
BlockedIndefinitelyOnSTM(..))
import Control.Monad (when)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control
(MonadBaseControl, control, StM, liftBaseDiscard)
import Streamly.Internal.Data.Atomics
(atomicModifyIORefCAS, atomicModifyIORefCAS_, writeBarrier,
storeLoadBarrier)
import Data.Concurrent.Queue.MichaelScott (LinkedQueue, pushL)
import Data.Functor (void)
import Data.Heap (Heap, Entry(..))
import Data.Int (Int64)
#if __GLASGOW_HASKELL__ >= 800
import Data.Kind (Type)
#endif
import Data.IORef
(IORef, modifyIORef, newIORef, readIORef, writeIORef, atomicModifyIORef)
import Data.Maybe (fromJust)
#if __GLASGOW_HASKELL__ < 808
import Data.Semigroup ((<>))
#endif
import Data.Set (Set)
import GHC.Conc (ThreadId(..))
import GHC.Exts
import GHC.IO (IO(..))
import System.IO (hPutStrLn, stderr)
import System.Mem.Weak (addFinalizer)
import Streamly.Internal.Data.Time.Clock (Clock(..), getTime)
import Streamly.Internal.Data.Time.Units
(AbsTime, NanoSecond64(..), MicroSecond64(..), diffAbsTime64,
fromRelTime64, toRelTime64, showNanoSecond64, showRelTime64)
import qualified Data.Heap as H
import qualified Data.Set as S
newtype Count = Count Int64
deriving ( Eq
, Read
, Show
, Enum
, Bounded
, Num
, Real
, Integral
, Ord
)
data ThreadAbort = ThreadAbort deriving Show
instance Exception ThreadAbort
data ChildEvent a =
ChildYield a
| ChildStop ThreadId (Maybe SomeException)
#if __GLASGOW_HASKELL__ < 800
#define Type *
#endif
data AheadHeapEntry (t :: (Type -> Type) -> Type -> Type) m a =
AheadEntryNull
| AheadEntryPure a
| AheadEntryStream (t m a)
#undef Type
data SVarStyle =
AsyncVar
| WAsyncVar
| ParallelVar
| AheadVar
deriving (Eq, Show)
data WorkerInfo = WorkerInfo
{ workerYieldMax :: Count
, workerYieldCount :: IORef Count
, workerLatencyStart :: IORef (Count, AbsTime)
}
data Rate = Rate
{ rateLow :: Double
, rateGoal :: Double
, rateHigh :: Double
, rateBuffer :: Int
}
data LatencyRange = LatencyRange
{ minLatency :: NanoSecond64
, maxLatency :: NanoSecond64
} deriving Show
data YieldRateInfo = YieldRateInfo
{ svarLatencyTarget :: NanoSecond64
, svarLatencyRange :: LatencyRange
, svarRateBuffer :: Int
, svarGainedLostYields :: IORef Count
, svarAllTimeLatency :: IORef (Count, AbsTime)
, workerBootstrapLatency :: Maybe NanoSecond64
, workerPollingInterval :: IORef Count
, workerPendingLatency :: IORef (Count, Count, NanoSecond64)
, workerCollectedLatency :: IORef (Count, Count, NanoSecond64)
, workerMeasuredLatency :: IORef NanoSecond64
}
data SVarStats = SVarStats {
totalDispatches :: IORef Int
, maxWorkers :: IORef Int
, maxOutQSize :: IORef Int
, maxHeapSize :: IORef Int
, maxWorkQSize :: IORef Int
, avgWorkerLatency :: IORef (Count, NanoSecond64)
, minWorkerLatency :: IORef NanoSecond64
, maxWorkerLatency :: IORef NanoSecond64
, svarStopTime :: IORef (Maybe AbsTime)
}
data Limit = Unlimited | Limited Word deriving Show
instance Eq Limit where
Unlimited == Unlimited = True
Unlimited == Limited _ = False
Limited _ == Unlimited = False
Limited x == Limited y = x == y
instance Ord Limit where
Unlimited <= Unlimited = True
Unlimited <= Limited _ = False
Limited _ <= Unlimited = True
Limited x <= Limited y = x <= y
data SVarStopStyle =
StopNone
| StopAny
| StopBy
deriving (Eq, Show)
data PushBufferPolicy =
PushBufferDropNew
| PushBufferDropOld
| PushBufferBlock
data SVar t m a = SVar
{
svarStyle :: SVarStyle
, svarMrun :: RunInIO m
, svarStopStyle :: SVarStopStyle
, svarStopBy :: IORef ThreadId
, outputQueue :: IORef ([ChildEvent a], Int)
, outputDoorBell :: MVar ()
, readOutputQ :: m [ChildEvent a]
, postProcess :: m Bool
, outputQueueFromConsumer :: IORef ([ChildEvent a], Int)
, outputDoorBellFromConsumer :: MVar ()
, maxWorkerLimit :: Limit
, maxBufferLimit :: Limit
, pushBufferSpace :: IORef Count
, pushBufferPolicy :: PushBufferPolicy
, pushBufferMVar :: MVar ()
, remainingWork :: Maybe (IORef Count)
, yieldRateInfo :: Maybe YieldRateInfo
, enqueue :: t m a -> IO ()
, isWorkDone :: IO Bool
, isQueueDone :: IO Bool
, needDoorBell :: IORef Bool
, workLoop :: Maybe WorkerInfo -> m ()
, workerThreads :: IORef (Set ThreadId)
, workerCount :: IORef Int
, accountThread :: ThreadId -> m ()
, workerStopMVar :: MVar ()
, svarStats :: SVarStats
, svarRef :: Maybe (IORef ())
, svarInspectMode :: Bool
, svarCreator :: ThreadId
, outputHeap :: IORef ( Heap (Entry Int (AheadHeapEntry t m a))
, Maybe Int)
, aheadWorkQueue :: IORef ([t m a], Int)
}
data State t m a = State
{
streamVar :: Maybe (SVar t m a)
, _yieldLimit :: Maybe Count
, _threadsHigh :: Limit
, _bufferHigh :: Limit
, _streamLatency :: Maybe NanoSecond64
, _maxStreamRate :: Maybe Rate
, _inspectMode :: Bool
}
magicMaxBuffer :: Word
magicMaxBuffer = 1500
defaultMaxThreads, defaultMaxBuffer :: Limit
defaultMaxThreads = Limited magicMaxBuffer
defaultMaxBuffer = Limited magicMaxBuffer
defState :: State t m a
defState = State
{ streamVar = Nothing
, _yieldLimit = Nothing
, _threadsHigh = defaultMaxThreads
, _bufferHigh = defaultMaxBuffer
, _maxStreamRate = Nothing
, _streamLatency = Nothing
, _inspectMode = False
}
adaptState :: State t m a -> State t n b
adaptState st = st
{ streamVar = Nothing
, _yieldLimit = Nothing
}
setYieldLimit :: Maybe Int64 -> State t m a -> State t m a
setYieldLimit lim st =
st { _yieldLimit =
case lim of
Nothing -> Nothing
Just n ->
if n <= 0
then Just 0
else Just (fromIntegral n)
}
getYieldLimit :: State t m a -> Maybe Count
getYieldLimit = _yieldLimit
setMaxThreads :: Int -> State t m a -> State t m a
setMaxThreads n st =
st { _threadsHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxThreads
else Limited (fromIntegral n)
}
getMaxThreads :: State t m a -> Limit
getMaxThreads = _threadsHigh
setMaxBuffer :: Int -> State t m a -> State t m a
setMaxBuffer n st =
st { _bufferHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxBuffer
else Limited (fromIntegral n)
}
getMaxBuffer :: State t m a -> Limit
getMaxBuffer = _bufferHigh
setStreamRate :: Maybe Rate -> State t m a -> State t m a
setStreamRate r st = st { _maxStreamRate = r }
getStreamRate :: State t m a -> Maybe Rate
getStreamRate = _maxStreamRate
setStreamLatency :: Int -> State t m a -> State t m a
setStreamLatency n st =
st { _streamLatency =
if n <= 0
then Nothing
else Just (fromIntegral n)
}
getStreamLatency :: State t m a -> Maybe NanoSecond64
getStreamLatency = _streamLatency
setInspectMode :: State t m a -> State t m a
setInspectMode st = st { _inspectMode = True }
getInspectMode :: State t m a -> Bool
getInspectMode = _inspectMode
cleanupSVar :: SVar t m a -> IO ()
cleanupSVar sv = do
workers <- readIORef (workerThreads sv)
Prelude.mapM_ (`throwTo` ThreadAbort)
workers
cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker sv = do
workers <- readIORef (workerThreads sv)
self <- myThreadId
Prelude.mapM_ (`throwTo` ThreadAbort)
(Prelude.filter (/= self) $ S.toList workers)
updateWorkerPollingInterval :: YieldRateInfo -> NanoSecond64 -> IO ()
updateWorkerPollingInterval yinfo latency = do
let periodRef = workerPollingInterval yinfo
cnt = max 1 $ minThreadDelay `div` latency
period = min cnt (fromIntegral magicMaxBuffer)
writeIORef periodRef (fromIntegral period)
{-# INLINE recordMinMaxLatency #-}
recordMinMaxLatency :: SVar t m a -> NanoSecond64 -> IO ()
recordMinMaxLatency sv new = do
let ss = svarStats sv
minLat <- readIORef (minWorkerLatency ss)
when (new < minLat || minLat == 0) $
writeIORef (minWorkerLatency ss) new
maxLat <- readIORef (maxWorkerLatency ss)
when (new > maxLat) $ writeIORef (maxWorkerLatency ss) new
recordAvgLatency :: SVar t m a -> (Count, NanoSecond64) -> IO ()
recordAvgLatency sv (count, time) = do
let ss = svarStats sv
modifyIORef (avgWorkerLatency ss) $
\(cnt, t) -> (cnt + count, t + time)
{-# INLINE collectWorkerPendingLatency #-}
collectWorkerPendingLatency
:: IORef (Count, Count, NanoSecond64)
-> IORef (Count, Count, NanoSecond64)
-> IO (Count, Maybe (Count, NanoSecond64))
collectWorkerPendingLatency cur col = do
(fcount, count, time) <- atomicModifyIORefCAS cur $ \v -> ((0,0,0), v)
(fcnt, cnt, t) <- readIORef col
let totalCount = fcnt + fcount
latCount = cnt + count
latTime = t + time
writeIORef col (totalCount, latCount, latTime)
assert (latCount == 0 || latTime /= 0) (return ())
let latPair =
if latCount > 0 && latTime > 0
then Just $ (latCount, latTime)
else Nothing
return (totalCount, latPair)
{-# INLINE shouldUseCollectedBatch #-}
shouldUseCollectedBatch
:: Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> Bool
shouldUseCollectedBatch collectedYields collectedTime newLat prevLat =
let r = fromIntegral newLat / fromIntegral prevLat :: Double
in (collectedYields > fromIntegral magicMaxBuffer)
|| (collectedTime > minThreadDelay)
|| (prevLat > 0 && (r > 2 || r < 0.5))
|| (prevLat == 0)
collectLatency :: SVar t m a
-> YieldRateInfo
-> Bool
-> IO (Count, AbsTime, NanoSecond64)
collectLatency sv yinfo drain = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
measured = workerMeasuredLatency yinfo
(newCount, newLatPair) <- collectWorkerPendingLatency cur col
(lcount, ltime) <- readIORef longTerm
prevLat <- readIORef measured
let newLcount = lcount + newCount
retWith lat = return (newLcount, ltime, lat)
case newLatPair of
Nothing -> retWith prevLat
Just (count, time) -> do
let newLat = time `div` (fromIntegral count)
when (svarInspectMode sv) $ recordMinMaxLatency sv newLat
if shouldUseCollectedBatch newCount time newLat prevLat || drain
then do
updateWorkerPollingInterval yinfo (max newLat prevLat)
when (svarInspectMode sv) $ recordAvgLatency sv (count, time)
writeIORef col (0, 0, 0)
writeIORef measured ((prevLat + newLat) `div` 2)
modifyIORef longTerm $ \(_, t) -> (newLcount, t)
retWith newLat
else retWith prevLat
dumpSVarStats :: SVar t m a -> SVarStats -> SVarStyle -> IO String
dumpSVarStats sv ss style = do
case yieldRateInfo sv of
Nothing -> return ()
Just yinfo -> do
_ <- liftIO $ collectLatency sv yinfo True
return ()
dispatches <- readIORef $ totalDispatches ss
maxWrk <- readIORef $ maxWorkers ss
maxOq <- readIORef $ maxOutQSize ss
maxHp <- readIORef $ maxHeapSize ss
minLat <- readIORef $ minWorkerLatency ss
maxLat <- readIORef $ maxWorkerLatency ss
(avgCnt, avgTime) <- readIORef $ avgWorkerLatency ss
(svarCnt, svarGainLossCnt, svarLat) <- case yieldRateInfo sv of
Nothing -> return (0, 0, 0)
Just yinfo -> do
(cnt, startTime) <- readIORef $ svarAllTimeLatency yinfo
if cnt > 0
then do
t <- readIORef (svarStopTime ss)
gl <- readIORef (svarGainedLostYields yinfo)
case t of
Nothing -> do
now <- getTime Monotonic
let interval = diffAbsTime64 now startTime
return (cnt, gl, interval `div` fromIntegral cnt)
Just stopTime -> do
let interval = diffAbsTime64 stopTime startTime
return (cnt, gl, interval `div` fromIntegral cnt)
else return (0, 0, 0)
return $ unlines
[ "total dispatches = " <> show dispatches
, "max workers = " <> show maxWrk
, "max outQSize = " <> show maxOq
<> (if style == AheadVar
then "\nheap max size = " <> show maxHp
else "")
<> (if minLat > 0
then "\nmin worker latency = " <> showNanoSecond64 minLat
else "")
<> (if maxLat > 0
then "\nmax worker latency = " <> showNanoSecond64 maxLat
else "")
<> (if avgCnt > 0
then let lat = avgTime `div` fromIntegral avgCnt
in "\navg worker latency = " <> showNanoSecond64 lat
else "")
<> (if svarLat > 0
then "\nSVar latency = " <> showRelTime64 svarLat
else "")
<> (if svarCnt > 0
then "\nSVar yield count = " <> show svarCnt
else "")
<> (if svarGainLossCnt > 0
then "\nSVar gain/loss yield count = " <> show svarGainLossCnt
else "")
]
{-# NOINLINE dumpSVar #-}
dumpSVar :: SVar t m a -> IO String
dumpSVar sv = do
(oqList, oqLen) <- readIORef $ outputQueue sv
db <- tryReadMVar $ outputDoorBell sv
aheadDump <-
if svarStyle sv == AheadVar
then do
(oheap, oheapSeq) <- readIORef $ outputHeap sv
(wq, wqSeq) <- readIORef $ aheadWorkQueue sv
return $ unlines
[ "heap length = " <> show (H.size oheap)
, "heap seqeunce = " <> show oheapSeq
, "work queue length = " <> show (length wq)
, "work queue sequence = " <> show wqSeq
]
else return []
let style = svarStyle sv
waiting <-
if style /= ParallelVar
then readIORef $ needDoorBell sv
else return False
rthread <- readIORef $ workerThreads sv
workers <- readIORef $ workerCount sv
stats <- dumpSVarStats sv (svarStats sv) (svarStyle sv)
return $ unlines
[
"Creator tid = " <> show (svarCreator sv),
"style = " <> show (svarStyle sv)
, "---------CURRENT STATE-----------"
, "outputQueue length computed = " <> show (length oqList)
, "outputQueue length maintained = " <> show oqLen
, "outputDoorBell = " <> show db
]
<> aheadDump
<> unlines
[ "needDoorBell = " <> show waiting
, "running threads = " <> show rthread
, "running thread count = " <> show workers
]
<> "---------STATS-----------\n"
<> stats
printSVar :: SVar t m a -> String -> IO ()
printSVar sv how = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ "\n" <> how <> "\n" <> svInfo
{-# NOINLINE mvarExcHandler #-}
mvarExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnMVar -> IO ()
mvarExcHandler sv label e@BlockedIndefinitelyOnMVar = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ label <> " " <> "BlockedIndefinitelyOnMVar\n" <> svInfo
throwIO e
{-# NOINLINE stmExcHandler #-}
stmExcHandler :: SVar t m a -> String -> BlockedIndefinitelyOnSTM -> IO ()
stmExcHandler sv label e@BlockedIndefinitelyOnSTM = do
svInfo <- dumpSVar sv
hPutStrLn stderr $ label <> " " <> "BlockedIndefinitelyOnSTM\n" <> svInfo
throwIO e
withDiagMVar :: SVar t m a -> String -> IO () -> IO ()
withDiagMVar sv label action =
if svarInspectMode sv
then
action `catches` [ Handler (mvarExcHandler sv label)
, Handler (stmExcHandler sv label)
]
else action
type MonadAsync m = (MonadIO m, MonadBaseControl IO m, MonadThrow m)
newtype RunInIO m = RunInIO { runInIO :: forall b. m b -> IO (StM m b) }
captureMonadState :: MonadBaseControl IO m => m (RunInIO m)
captureMonadState = control $ \run -> run (return $ RunInIO run)
{-# INLINE rawForkIO #-}
rawForkIO :: IO () -> IO ThreadId
rawForkIO action = IO $ \ s ->
case fork# action s of (# s1, tid #) -> (# s1, ThreadId tid #)
{-# INLINE doFork #-}
doFork :: MonadBaseControl IO m
=> m ()
-> RunInIO m
-> (SomeException -> IO ())
-> m ThreadId
doFork action (RunInIO mrun) exHandler =
control $ \run ->
mask $ \restore -> do
tid <- rawForkIO $ catch (restore $ void $ mrun action)
exHandler
run (return tid)
{-# INLINABLE fork #-}
fork :: MonadBaseControl IO m => m () -> m ThreadId
fork = liftBaseDiscard forkIO
{-# INLINABLE forkManaged #-}
forkManaged :: (MonadIO m, MonadBaseControl IO m) => m () -> m ThreadId
forkManaged action = do
tid <- fork action
liftIO $ addFinalizer tid (killThread tid)
return tid
{-# INLINE decrementYieldLimit #-}
decrementYieldLimit :: SVar t m a -> IO Bool
decrementYieldLimit sv =
case remainingWork sv of
Nothing -> return True
Just ref -> do
r <- atomicModifyIORefCAS ref $ \x -> (x - 1, x)
return $ r >= 1
{-# INLINE incrementYieldLimit #-}
incrementYieldLimit :: SVar t m a -> IO ()
incrementYieldLimit sv =
case remainingWork sv of
Nothing -> return ()
Just ref -> atomicModifyIORefCAS_ ref (+ 1)
{-# INLINE decrementBufferLimit #-}
decrementBufferLimit :: SVar t m a -> IO ()
decrementBufferLimit sv =
case maxBufferLimit sv of
Unlimited -> return ()
Limited _ -> do
let ref = pushBufferSpace sv
old <- atomicModifyIORefCAS ref $ \x ->
(if x >= 1 then x - 1 else x, x)
when (old <= 0) $
case pushBufferPolicy sv of
PushBufferBlock -> blockAndRetry
PushBufferDropNew -> do
block <- atomicModifyIORefCAS (outputQueue sv) $
\(es, n) ->
case es of
[] -> (([],n), True)
_ : xs -> ((xs, n - 1), False)
when block blockAndRetry
PushBufferDropOld -> undefined
where
blockAndRetry = do
let ref = pushBufferSpace sv
liftIO $ takeMVar (pushBufferMVar sv)
old <- atomicModifyIORefCAS ref $ \x ->
(if x >= 1 then x - 1 else x, x)
if old >= 1
then void $ liftIO $ tryPutMVar (pushBufferMVar sv) ()
else blockAndRetry
{-# INLINE incrementBufferLimit #-}
incrementBufferLimit :: SVar t m a -> IO ()
incrementBufferLimit sv =
case maxBufferLimit sv of
Unlimited -> return ()
Limited _ -> do
atomicModifyIORefCAS_ (pushBufferSpace sv) (+ 1)
writeBarrier
void $ liftIO $ tryPutMVar (pushBufferMVar sv) ()
{-# INLINE resetBufferLimit #-}
resetBufferLimit :: SVar t m a -> IO ()
resetBufferLimit sv =
case maxBufferLimit sv of
Unlimited -> return ()
Limited n -> atomicModifyIORefCAS_ (pushBufferSpace sv)
(const (fromIntegral n))
{-# INLINE sendWithDoorBell #-}
sendWithDoorBell ::
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell q bell msg = do
oldlen <- atomicModifyIORefCAS q $ \(es, n) ->
((msg : es, n + 1), n)
when (oldlen <= 0) $ do
writeBarrier
void $ tryPutMVar bell ()
return oldlen
send :: SVar t m a -> ChildEvent a -> IO Int
send sv msg = sendWithDoorBell (outputQueue sv) (outputDoorBell sv) msg
sendToProducer :: SVar t m a -> ChildEvent a -> IO Int
sendToProducer sv msg = do
void $ tryPutMVar (pushBufferMVar sv) ()
sendWithDoorBell (outputQueueFromConsumer sv)
(outputDoorBellFromConsumer sv) msg
sendStopToProducer :: MonadIO m => SVar t m a -> m ()
sendStopToProducer sv = liftIO $ do
tid <- myThreadId
void $ sendToProducer sv (ChildStop tid Nothing)
workerCollectLatency :: WorkerInfo -> IO (Maybe (Count, NanoSecond64))
workerCollectLatency winfo = do
(cnt0, t0) <- readIORef (workerLatencyStart winfo)
cnt1 <- readIORef (workerYieldCount winfo)
let cnt = cnt1 - cnt0
if cnt > 0
then do
t1 <- getTime Monotonic
let period = fromRelTime64 $ diffAbsTime64 t1 t0
writeIORef (workerLatencyStart winfo) (cnt1, t1)
return $ Just (cnt, period)
else return Nothing
workerUpdateLatency :: YieldRateInfo -> WorkerInfo -> IO ()
workerUpdateLatency yinfo winfo = do
r <- workerCollectLatency winfo
case r of
Just (cnt, period) -> do
let ref = workerPendingLatency yinfo
(cnt1, t1) = if period > 0 then (cnt, period) else (0, 0)
atomicModifyIORefCAS_ ref $
\(fc, n, t) -> (fc + cnt, n + cnt1, t + t1)
Nothing -> return ()
updateYieldCount :: WorkerInfo -> IO Count
updateYieldCount winfo = do
cnt <- readIORef (workerYieldCount winfo)
let cnt1 = cnt + 1
writeIORef (workerYieldCount winfo) cnt1
return cnt1
isBeyondMaxYield :: Count -> WorkerInfo -> Bool
isBeyondMaxYield cnt winfo =
let ymax = workerYieldMax winfo
in ymax /= 0 && cnt >= ymax
{-# NOINLINE checkRatePeriodic #-}
checkRatePeriodic :: SVar t m a
-> YieldRateInfo
-> WorkerInfo
-> Count
-> IO Bool
checkRatePeriodic sv yinfo winfo ycnt = do
i <- readIORef (workerPollingInterval yinfo)
if i /= 0 && (ycnt `mod` i) == 0
then do
workerUpdateLatency yinfo winfo
isBeyondMaxRate sv yinfo
else return False
{-# NOINLINE workerRateControl #-}
workerRateControl :: SVar t m a -> YieldRateInfo -> WorkerInfo -> IO Bool
workerRateControl sv yinfo winfo = do
cnt <- updateYieldCount winfo
beyondMaxRate <- checkRatePeriodic sv yinfo winfo cnt
return $ not (isBeyondMaxYield cnt winfo || beyondMaxRate)
{-# INLINE sendYield #-}
sendYield :: SVar t m a -> Maybe WorkerInfo -> ChildEvent a -> IO Bool
sendYield sv mwinfo msg = do
oldlen <- send sv msg
let limit = maxBufferLimit sv
bufferSpaceOk <- case limit of
Unlimited -> return True
Limited lim -> do
active <- readIORef (workerCount sv)
return $ (oldlen + 1) < (fromIntegral lim - active)
rateLimitOk <-
case mwinfo of
Just winfo ->
case yieldRateInfo sv of
Nothing -> return True
Just yinfo -> workerRateControl sv yinfo winfo
Nothing -> return True
return $ bufferSpaceOk && rateLimitOk
{-# INLINE workerStopUpdate #-}
workerStopUpdate :: WorkerInfo -> YieldRateInfo -> IO ()
workerStopUpdate winfo info = do
i <- readIORef (workerPollingInterval info)
when (i /= 0) $ workerUpdateLatency info winfo
{-# INLINABLE sendStop #-}
sendStop :: SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop sv mwinfo = do
atomicModifyIORefCAS_ (workerCount sv) $ \n -> n - 1
case (mwinfo, yieldRateInfo sv) of
(Just winfo, Just info) ->
workerStopUpdate winfo info
_ ->
return ()
myThreadId >>= \tid -> void $ send sv (ChildStop tid Nothing)
{-# INLINE ringDoorBell #-}
ringDoorBell :: SVar t m a -> IO ()
ringDoorBell sv = do
storeLoadBarrier
w <- readIORef $ needDoorBell sv
when w $ do
atomicModifyIORefCAS_ (needDoorBell sv) (const False)
void $ tryPutMVar (outputDoorBell sv) ()
{-# INLINE enqueueLIFO #-}
enqueueLIFO :: SVar t m a -> IORef [t m a] -> t m a -> IO ()
enqueueLIFO sv q m = do
atomicModifyIORefCAS_ q $ \ms -> m : ms
ringDoorBell sv
{-# INLINE enqueueFIFO #-}
enqueueFIFO :: SVar t m a -> LinkedQueue (t m a) -> t m a -> IO ()
enqueueFIFO sv q m = do
pushL q m
ringDoorBell sv
{-# INLINE enqueueAhead #-}
enqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
enqueueAhead sv q m = do
atomicModifyIORefCAS_ q $ \ case
([], n) -> ([m], n + 1)
_ -> error "enqueueAhead: queue is not empty"
ringDoorBell sv
{-# INLINE reEnqueueAhead #-}
reEnqueueAhead :: SVar t m a -> IORef ([t m a], Int) -> t m a -> IO ()
reEnqueueAhead sv q m = do
atomicModifyIORefCAS_ q $ \ case
([], n) -> ([m], n)
_ -> error "reEnqueueAhead: queue is not empty"
ringDoorBell sv
{-# INLINE queueEmptyAhead #-}
queueEmptyAhead :: MonadIO m => IORef ([t m a], Int) -> m Bool
queueEmptyAhead q = liftIO $ do
(xs, _) <- readIORef q
return $ null xs
{-# INLINE dequeueAhead #-}
dequeueAhead :: MonadIO m
=> IORef ([t m a], Int) -> m (Maybe (t m a, Int))
dequeueAhead q = liftIO $
atomicModifyIORefCAS q $ \case
([], n) -> (([], n), Nothing)
(x : [], n) -> (([], n), Just (x, n))
_ -> error "more than one item on queue"
withIORef :: IORef a -> (a -> IO b) -> IO b
withIORef ref f = readIORef ref >>= f
atomicModifyIORef_ :: IORef a -> (a -> a) -> IO ()
atomicModifyIORef_ ref f =
atomicModifyIORef ref $ \x -> (f x, ())
data HeapDequeueResult t m a =
Clearing
| Waiting Int
| Ready (Entry Int (AheadHeapEntry t m a))
{-# INLINE dequeueFromHeap #-}
dequeueFromHeap
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> IO (HeapDequeueResult t m a)
dequeueFromHeap hpVar =
atomicModifyIORef hpVar $ \pair@(hp, snum) ->
case snum of
Nothing -> (pair, Clearing)
Just n -> do
let r = H.uncons hp
case r of
Just (ent@(Entry seqNo _ev), hp') ->
if seqNo == n
then ((hp', Nothing), Ready ent)
else assert (seqNo >= n) (pair, Waiting n)
Nothing -> (pair, Waiting n)
{-# INLINE dequeueFromHeapSeq #-}
dequeueFromHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO (HeapDequeueResult t m a)
dequeueFromHeapSeq hpVar i =
atomicModifyIORef hpVar $ \(hp, snum) ->
case snum of
Nothing -> do
let r = H.uncons hp
case r of
Just (ent@(Entry seqNo _ev), hp') ->
if seqNo == i
then ((hp', Nothing), Ready ent)
else assert (seqNo >= i) ((hp, Just i), Waiting i)
Nothing -> ((hp, Just i), Waiting i)
Just _ -> error "dequeueFromHeapSeq: unreachable"
heapIsSane :: Maybe Int -> Int -> Bool
heapIsSane snum seqNo =
case snum of
Nothing -> True
Just n -> seqNo >= n
{-# INLINE requeueOnHeapTop #-}
requeueOnHeapTop
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Entry Int (AheadHeapEntry t m a)
-> Int
-> IO ()
requeueOnHeapTop hpVar ent seqNo =
atomicModifyIORef_ hpVar $ \(hp, snum) ->
assert (heapIsSane snum seqNo) (H.insert ent hp, Just seqNo)
{-# INLINE updateHeapSeq #-}
updateHeapSeq
:: IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> Int
-> IO ()
updateHeapSeq hpVar seqNo =
atomicModifyIORef_ hpVar $ \(hp, snum) ->
assert (heapIsSane snum seqNo) (hp, Just seqNo)
{-# NOINLINE addThread #-}
addThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
addThread sv tid =
liftIO $ modifyIORef (workerThreads sv) (S.insert tid)
{-# INLINE delThread #-}
delThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
delThread sv tid =
liftIO $ modifyIORef (workerThreads sv) (S.delete tid)
{-# INLINE modifyThread #-}
modifyThread :: MonadIO m => SVar t m a -> ThreadId -> m ()
modifyThread sv tid = do
changed <- liftIO $ atomicModifyIORefCAS (workerThreads sv) $ \old ->
if S.member tid old
then let new = S.delete tid old in (new, new)
else let new = S.insert tid old in (new, old)
when (null changed) $
liftIO $ do
writeBarrier
void $ tryPutMVar (outputDoorBell sv) ()
{-# INLINE allThreadsDone #-}
allThreadsDone :: MonadIO m => SVar t m a -> m Bool
allThreadsDone sv = liftIO $ S.null <$> readIORef (workerThreads sv)
{-# NOINLINE handleChildException #-}
handleChildException :: SVar t m a -> SomeException -> IO ()
handleChildException sv e = do
tid <- myThreadId
void $ send sv (ChildStop tid (Just e))
{-# NOINLINE handleFoldException #-}
handleFoldException :: SVar t m a -> SomeException -> IO ()
handleFoldException sv e = do
tid <- myThreadId
void $ sendToProducer sv (ChildStop tid (Just e))
{-# NOINLINE recordMaxWorkers #-}
recordMaxWorkers :: MonadIO m => SVar t m a -> m ()
recordMaxWorkers sv = liftIO $ do
active <- readIORef (workerCount sv)
maxWrk <- readIORef (maxWorkers $ svarStats sv)
when (active > maxWrk) $ writeIORef (maxWorkers $ svarStats sv) active
modifyIORef (totalDispatches $ svarStats sv) (+1)
{-# NOINLINE pushWorker #-}
pushWorker :: MonadAsync m => Count -> SVar t m a -> m ()
pushWorker yieldMax sv = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
when (svarInspectMode sv) $ recordMaxWorkers sv
winfo <-
case yieldRateInfo sv of
Nothing -> return Nothing
Just _ -> liftIO $ do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
return $ Just WorkerInfo
{ workerYieldMax = yieldMax
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
doFork (workLoop sv winfo) (svarMrun sv) (handleChildException sv)
>>= addThread sv
{-# INLINE pushWorkerPar #-}
pushWorkerPar
:: MonadAsync m
=> SVar t m a -> (Maybe WorkerInfo -> m ()) -> m ()
pushWorkerPar sv wloop =
if svarInspectMode sv
then forkWithDiag
else doFork (wloop Nothing) (svarMrun sv) (handleChildException sv)
>>= modifyThread sv
where
{-# NOINLINE forkWithDiag #-}
forkWithDiag = do
liftIO $ atomicModifyIORefCAS_ (workerCount sv) $ \n -> n + 1
recordMaxWorkers sv
winfo <-
case yieldRateInfo sv of
Nothing -> return Nothing
Just _ -> liftIO $ do
cntRef <- newIORef 0
t <- getTime Monotonic
lat <- newIORef (0, t)
return $ Just WorkerInfo
{ workerYieldMax = 0
, workerYieldCount = cntRef
, workerLatencyStart = lat
}
doFork (wloop winfo) (svarMrun sv) (handleChildException sv)
>>= modifyThread sv
dispatchWorker :: MonadAsync m => Count -> SVar t m a -> m Bool
dispatchWorker yieldCount sv = do
let workerLimit = maxWorkerLimit sv
done <- liftIO $ isWorkDone sv
if not done
then do
qDone <- liftIO $ isQueueDone sv
active <- liftIO $ readIORef $ workerCount sv
if not qDone
then do
limit <- case remainingWork sv of
Nothing -> return workerLimit
Just ref -> do
n <- liftIO $ readIORef ref
case yieldRateInfo sv of
Just _ -> return workerLimit
Nothing ->
return $
case workerLimit of
Unlimited -> Limited (fromIntegral n)
Limited lim -> Limited $ min lim (fromIntegral n)
let dispatch = pushWorker yieldCount sv >> return True
in case limit of
Unlimited -> dispatch
Limited lim | lim > fromIntegral active -> dispatch
_ -> return False
else do
when (active <= 0) $ pushWorker 0 sv
return False
else return False
minThreadDelay :: NanoSecond64
minThreadDelay = 1000000
rateRecoveryTime :: NanoSecond64
rateRecoveryTime = 1000000
data Work
= BlockWait NanoSecond64
| PartialWorker Count
| ManyWorkers Int Count
deriving Show
estimateWorkers
:: Limit
-> Count
-> Count
-> NanoSecond64
-> NanoSecond64
-> NanoSecond64
-> LatencyRange
-> Work
estimateWorkers workerLimit svarYields gainLossYields
svarElapsed wLatency targetLat range =
let
targetYields = (svarElapsed + wLatency + targetLat - 1) `div` targetLat
effectiveYields = svarYields + gainLossYields
deltaYields = fromIntegral targetYields - effectiveYields
in if deltaYields > 0
then
let deltaYieldsFreq :: Double
deltaYieldsFreq =
fromIntegral deltaYields /
fromIntegral rateRecoveryTime
yieldsFreq = 1.0 / fromIntegral targetLat
totalYieldsFreq = yieldsFreq + deltaYieldsFreq
requiredLat = NanoSecond64 $ round $ 1.0 / totalYieldsFreq
adjustedLat = min (max requiredLat (minLatency range))
(maxLatency range)
in assert (adjustedLat > 0) $
if wLatency <= adjustedLat
then PartialWorker deltaYields
else let workers = withLimit $ wLatency `div` adjustedLat
limited = min workers (fromIntegral deltaYields)
in ManyWorkers (fromIntegral limited) deltaYields
else
let expectedDuration = fromIntegral effectiveYields * targetLat
sleepTime = expectedDuration - svarElapsed
maxSleepTime = maxLatency range - wLatency
s = min sleepTime maxSleepTime
in assert (sleepTime >= 0) $
if s > 0 then BlockWait s else ManyWorkers 1 (Count 0)
where
withLimit n =
case workerLimit of
Unlimited -> n
Limited x -> min n (fromIntegral x)
getWorkerLatency :: YieldRateInfo -> IO (Count, AbsTime, NanoSecond64)
getWorkerLatency yinfo = do
let cur = workerPendingLatency yinfo
col = workerCollectedLatency yinfo
longTerm = svarAllTimeLatency yinfo
measured = workerMeasuredLatency yinfo
(curTotalCount, curCount, curTime) <- readIORef cur
(colTotalCount, colCount, colTime) <- readIORef col
(lcount, ltime) <- readIORef longTerm
prevLat <- readIORef measured
let latCount = colCount + curCount
latTime = colTime + curTime
totalCount = colTotalCount + curTotalCount
newLat =
if latCount > 0 && latTime > 0
then let lat = latTime `div` fromIntegral latCount
in (lat + prevLat) `div` 2
else prevLat
return (lcount + totalCount, ltime, newLat)
isBeyondMaxRate :: SVar t m a -> YieldRateInfo -> IO Bool
isBeyondMaxRate sv yinfo = do
(count, tstamp, wLatency) <- getWorkerLatency yinfo
now <- getTime Monotonic
let duration = fromRelTime64 $ diffAbsTime64 now tstamp
let targetLat = svarLatencyTarget yinfo
gainLoss <- readIORef (svarGainedLostYields yinfo)
let work = estimateWorkers (maxWorkerLimit sv) count gainLoss duration
wLatency targetLat (svarLatencyRange yinfo)
cnt <- readIORef $ workerCount sv
return $ case work of
PartialWorker _yields -> cnt > 1
ManyWorkers n _ -> cnt > n
BlockWait _ -> True
dispatchWorkerPaced :: MonadAsync m => SVar t m a -> m Bool
dispatchWorkerPaced sv = do
let yinfo = fromJust $ yieldRateInfo sv
(svarYields, svarElapsed, wLatency) <- do
now <- liftIO $ getTime Monotonic
(yieldCount, baseTime, lat) <-
liftIO $ collectLatency sv yinfo False
let elapsed = fromRelTime64 $ diffAbsTime64 now baseTime
let latency =
if lat == 0
then
case workerBootstrapLatency yinfo of
Nothing -> lat
Just t -> t
else lat
return (yieldCount, elapsed, latency)
if wLatency == 0
then return False
else do
let workerLimit = maxWorkerLimit sv
let targetLat = svarLatencyTarget yinfo
let range = svarLatencyRange yinfo
gainLoss <- liftIO $ readIORef (svarGainedLostYields yinfo)
let work = estimateWorkers workerLimit svarYields gainLoss svarElapsed
wLatency targetLat range
case work of
BlockWait s -> do
assert (s >= 0) (return ())
done <- allThreadsDone sv
when done $ void $ do
let us = fromRelTime64 (toRelTime64 s) :: MicroSecond64
liftIO $ threadDelay (fromIntegral us)
dispatchWorker 1 sv
return False
PartialWorker yields -> do
assert (yields > 0) (return ())
updateGainedLostYields yinfo yields
done <- allThreadsDone sv
when done $ void $ dispatchWorker yields sv
return False
ManyWorkers netWorkers yields -> do
assert (netWorkers >= 1) (return ())
assert (yields >= 0) (return ())
updateGainedLostYields yinfo yields
let periodRef = workerPollingInterval yinfo
ycnt = max 1 $ yields `div` fromIntegral netWorkers
period = min ycnt (fromIntegral magicMaxBuffer)
old <- liftIO $ readIORef periodRef
when (period < old) $
liftIO $ writeIORef periodRef period
cnt <- liftIO $ readIORef $ workerCount sv
if cnt < netWorkers
then do
let total = netWorkers - cnt
batch = max 1 $ fromIntegral $
minThreadDelay `div` targetLat
dispatchN (min total batch)
else return False
where
updateGainedLostYields yinfo yields = do
let buf = fromIntegral $ svarRateBuffer yinfo
when (yields /= 0 && abs yields > buf) $ do
let delta =
if yields > 0
then yields - buf
else yields + buf
liftIO $ modifyIORef (svarGainedLostYields yinfo) (+ delta)
dispatchN n =
if n == 0
then return True
else do
r <- dispatchWorker 0 sv
if r
then dispatchN (n - 1)
else return False
sendWorkerDelayPaced :: SVar t m a -> IO ()
sendWorkerDelayPaced _ = return ()
sendWorkerDelay :: SVar t m a -> IO ()
sendWorkerDelay _sv =
return ()
{-# NOINLINE sendWorkerWait #-}
sendWorkerWait
:: MonadAsync m
=> (SVar t m a -> IO ())
-> (SVar t m a -> m Bool)
-> SVar t m a
-> m ()
sendWorkerWait delay dispatch sv = do
liftIO $ delay sv
(_, n) <- liftIO $ readIORef (outputQueue sv)
when (n <= 0) $ do
liftIO $ atomicModifyIORefCAS_ (needDoorBell sv) $ const True
liftIO storeLoadBarrier
canDoMore <- dispatch sv
if canDoMore
then sendWorkerWait delay dispatch sv
else do
liftIO $ withDiagMVar sv "sendWorkerWait: nothing to do"
$ takeMVar (outputDoorBell sv)
(_, len) <- liftIO $ readIORef (outputQueue sv)
when (len <= 0) $ sendWorkerWait delay dispatch sv
{-# INLINE readOutputQBasic #-}
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic q = atomicModifyIORefCAS q $ \x -> (([],0), x)
{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw sv = do
(list, len) <- readOutputQBasic (outputQueue sv)
when (svarInspectMode sv) $ do
let ref = maxOutQSize $ svarStats sv
oqLen <- readIORef ref
when (len > oqLen) $ writeIORef ref len
return (list, len)
readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQBounded sv = do
(list, len) <- liftIO $ readOutputQRaw sv
if len <= 0
then blockingRead
else do
sendOneWorker
return list
where
sendOneWorker = do
cnt <- liftIO $ readIORef $ workerCount sv
when (cnt <= 0) $ do
done <- liftIO $ isWorkDone sv
when (not done) (pushWorker 0 sv)
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sendWorkerDelay (dispatchWorker 0) sv
liftIO (fst `fmap` readOutputQRaw sv)
readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQPaced sv = do
(list, len) <- liftIO $ readOutputQRaw sv
if len <= 0
then blockingRead
else do
void $ dispatchWorkerPaced sv
return list
where
{-# INLINE blockingRead #-}
blockingRead = do
sendWorkerWait sendWorkerDelayPaced dispatchWorkerPaced sv
liftIO (fst `fmap` readOutputQRaw sv)
postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded sv = do
workersDone <- allThreadsDone sv
if workersDone
then do
r <- liftIO $ isWorkDone sv
when (not r) (pushWorker 0 sv)
return r
else return False
postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
postProcessPaced sv = do
workersDone <- allThreadsDone sv
if workersDone
then do
r <- liftIO $ isWorkDone sv
when (not r) $ do
void $ dispatchWorkerPaced sv
noWorker <- allThreadsDone sv
when noWorker $ pushWorker 0 sv
return r
else return False
getYieldRateInfo :: State t m a -> IO (Maybe YieldRateInfo)
getYieldRateInfo st = do
let rateToLatency r = if r <= 0 then maxBound else round $ 1.0e9 / r
case getStreamRate st of
Just (Rate low goal high buf) ->
let l = rateToLatency goal
minl = rateToLatency high
maxl = rateToLatency low
in mkYieldRateInfo l (LatencyRange minl maxl) buf
Nothing -> return Nothing
where
mkYieldRateInfo latency latRange buf = do
measured <- newIORef 0
wcur <- newIORef (0,0,0)
wcol <- newIORef (0,0,0)
now <- getTime Monotonic
wlong <- newIORef (0,now)
period <- newIORef 1
gainLoss <- newIORef (Count 0)
return $ Just YieldRateInfo
{ svarLatencyTarget = latency
, svarLatencyRange = latRange
, svarRateBuffer = buf
, svarGainedLostYields = gainLoss
, workerBootstrapLatency = getStreamLatency st
, workerPollingInterval = period
, workerMeasuredLatency = measured
, workerPendingLatency = wcur
, workerCollectedLatency = wcol
, svarAllTimeLatency = wlong
}
newSVarStats :: IO SVarStats
newSVarStats = do
disp <- newIORef 0
maxWrk <- newIORef 0
maxOq <- newIORef 0
maxHs <- newIORef 0
maxWq <- newIORef 0
avgLat <- newIORef (0, NanoSecond64 0)
maxLat <- newIORef (NanoSecond64 0)
minLat <- newIORef (NanoSecond64 0)
stpTime <- newIORef Nothing
return SVarStats
{ totalDispatches = disp
, maxWorkers = maxWrk
, maxOutQSize = maxOq
, maxHeapSize = maxHs
, maxWorkQSize = maxWq
, avgWorkerLatency = avgLat
, minWorkerLatency = minLat
, maxWorkerLatency = maxLat
, svarStopTime = stpTime
}
getAheadSVar :: MonadAsync m
=> State t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> RunInIO m
-> IO (SVar t m a)
getAheadSVar st f mrun = do
outQ <- newIORef ([], 0)
outH <- newIORef (H.empty, Just 0)
outQMv <- newEmptyMVar
active <- newIORef 0
wfw <- newIORef False
running <- newIORef S.empty
q <- newIORef ([], -1)
stopMVar <- newMVar ()
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
stats <- newSVarStats
tid <- myThreadId
let getSVar sv readOutput postProc = SVar
{ outputQueue = outQ
, outputQueueFromConsumer = undefined
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, pushBufferSpace = undefined
, pushBufferPolicy = undefined
, pushBufferMVar = undefined
, maxWorkerLimit = min (getMaxThreads st) (getMaxBuffer st)
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, outputDoorBellFromConsumer = undefined
, readOutputQ = readOutput sv
, postProcess = postProc sv
, workerThreads = running
, workLoop = f q outH st{streamVar = Just sv} sv
, enqueue = enqueueAhead sv q
, isWorkDone = isWorkDoneAhead sv q outH
, isQueueDone = isQueueDoneAhead sv q
, needDoorBell = wfw
, svarStyle = AheadVar
, svarStopStyle = StopNone
, svarStopBy = undefined
, svarMrun = mrun
, workerCount = active
, accountThread = delThread sv
, workerStopMVar = stopMVar
, svarRef = Nothing
, svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = q
, outputHeap = outH
, svarStats = stats
}
let sv =
case getStreamRate st of
Nothing -> getSVar sv readOutputQBounded postProcessBounded
Just _ -> getSVar sv readOutputQPaced postProcessPaced
in return sv
where
{-# INLINE isQueueDoneAhead #-}
isQueueDoneAhead sv q = do
queueDone <- checkEmpty q
yieldsDone <-
case remainingWork sv of
Just yref -> do
n <- readIORef yref
return (n <= 0)
Nothing -> return False
return $ yieldsDone || queueDone
{-# INLINE isWorkDoneAhead #-}
isWorkDoneAhead sv q ref = do
heapDone <- do
(hp, _) <- readIORef ref
return (H.size hp <= 0)
queueDone <- isQueueDoneAhead sv q
return $ heapDone && queueDone
checkEmpty q = do
(xs, _) <- readIORef q
return $ null xs
getParallelSVar :: MonadIO m
=> SVarStopStyle -> State t m a -> RunInIO m -> IO (SVar t m a)
getParallelSVar ss st mrun = do
outQ <- newIORef ([], 0)
outQRev <- newIORef ([], 0)
outQMv <- newEmptyMVar
outQMvRev <- newEmptyMVar
active <- newIORef 0
running <- newIORef S.empty
yl <- case getYieldLimit st of
Nothing -> return Nothing
Just x -> Just <$> newIORef x
rateInfo <- getYieldRateInfo st
let bufLim =
case getMaxBuffer st of
Unlimited -> undefined
Limited x -> (fromIntegral x)
remBuf <- newIORef bufLim
pbMVar <- newMVar ()
stats <- newSVarStats
tid <- myThreadId
stopBy <-
case ss of
StopBy -> liftIO $ newIORef undefined
_ -> return undefined
let sv =
SVar { outputQueue = outQ
, outputQueueFromConsumer = outQRev
, remainingWork = yl
, maxBufferLimit = getMaxBuffer st
, pushBufferSpace = remBuf
, pushBufferPolicy = PushBufferBlock
, pushBufferMVar = pbMVar
, maxWorkerLimit = Unlimited
, yieldRateInfo = rateInfo
, outputDoorBell = outQMv
, outputDoorBellFromConsumer = outQMvRev
, readOutputQ = readOutputQPar sv
, postProcess = allThreadsDone sv
, workerThreads = running
, workLoop = undefined
, enqueue = undefined
, isWorkDone = undefined
, isQueueDone = undefined
, needDoorBell = undefined
, svarStyle = ParallelVar
, svarStopStyle = ss
, svarStopBy = stopBy
, svarMrun = mrun
, workerCount = active
, accountThread = modifyThread sv
, workerStopMVar = undefined
, svarRef = Nothing
, svarInspectMode = getInspectMode st
, svarCreator = tid
, aheadWorkQueue = undefined
, outputHeap = undefined
, svarStats = stats
}
in return sv
where
readOutputQPar sv = liftIO $ do
withDiagMVar sv "readOutputQPar: doorbell"
$ takeMVar (outputDoorBell sv)
case yieldRateInfo sv of
Nothing -> return ()
Just yinfo -> void $ collectLatency sv yinfo False
r <- fst `fmap` readOutputQRaw sv
liftIO $ do
void $ tryTakeMVar (pushBufferMVar sv)
resetBufferLimit sv
writeBarrier
void $ tryPutMVar (pushBufferMVar sv) ()
return r
sendFirstWorker :: MonadAsync m => SVar t m a -> t m a -> m (SVar t m a)
sendFirstWorker sv m = do
liftIO $ enqueue sv m
case yieldRateInfo sv of
Nothing -> pushWorker 0 sv
Just yinfo ->
if svarLatencyTarget yinfo == maxBound
then liftIO $ threadDelay maxBound
else pushWorker 1 sv
return sv
{-# INLINABLE newAheadVar #-}
newAheadVar :: MonadAsync m
=> State t m a
-> t m a
-> ( IORef ([t m a], Int)
-> IORef (Heap (Entry Int (AheadHeapEntry t m a)), Maybe Int)
-> State t m a
-> SVar t m a
-> Maybe WorkerInfo
-> m ())
-> m (SVar t m a)
newAheadVar st m wloop = do
mrun <- captureMonadState
sv <- liftIO $ getAheadSVar st wloop mrun
sendFirstWorker sv m
{-# INLINABLE newParallelVar #-}
newParallelVar :: MonadAsync m
=> SVarStopStyle -> State t m a -> m (SVar t m a)
newParallelVar ss st = do
mrun <- captureMonadState
liftIO $ getParallelSVar ss st mrun
toStreamVar :: MonadAsync m => SVar t m a -> t m a -> m ()
toStreamVar sv m = do
liftIO $ enqueue sv m
done <- allThreadsDone sv
when done $
case yieldRateInfo sv of
Nothing -> pushWorker 0 sv
Just _ -> pushWorker 1 sv