-- |
-- Module      : Streamly.Internal.Data.SVar.Pull
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.SVar.Pull
    (
    -- * Read Output
      readOutputQBasic
    , readOutputQRaw
    , readOutputQPaced
    , readOutputQBounded

    -- * Postprocess Hook After Reading
    , postProcessPaced
    , postProcessBounded

    -- * Release Resources
    , cleanupSVar
    , cleanupSVarFromWorker
    )
where

#include "inline.hs"

import Control.Concurrent (myThreadId, throwTo)
import Control.Monad (when, void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (readIORef, writeIORef)
import Data.IORef (IORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS)

import qualified Data.Set as S

import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.SVar.Dispatch

-------------------------------------------------------------------------------
-- Reading from the workers' output queue/buffer
-------------------------------------------------------------------------------

{-# INLINE readOutputQBasic #-}
readOutputQBasic :: IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic :: forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic IORef ([ChildEvent a], Int)
q = IORef ([ChildEvent a], Int)
-> (([ChildEvent a], Int)
    -> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef ([ChildEvent a], Int)
q ((([ChildEvent a], Int)
  -> (([ChildEvent a], Int), ([ChildEvent a], Int)))
 -> IO ([ChildEvent a], Int))
-> (([ChildEvent a], Int)
    -> (([ChildEvent a], Int), ([ChildEvent a], Int)))
-> IO ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ \([ChildEvent a], Int)
x -> (([],Int
0), ([ChildEvent a], Int)
x)

{-# INLINE readOutputQRaw #-}
readOutputQRaw :: SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv = do
    ([ChildEvent a]
list, Int
len) <- IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (SVar t m a -> IORef ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ([ChildEvent a], Int)
outputQueue SVar t m a
sv)
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (SVar t m a -> Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        let ref :: IORef Int
ref = SVarStats -> IORef Int
maxOutQSize (SVarStats -> IORef Int) -> SVarStats -> IORef Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> SVarStats
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv
        Int
oqLen <- IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef IORef Int
ref
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
oqLen) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Int -> Int -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Int
ref Int
len
    ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a]
list, Int
len)

readOutputQBounded :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQBounded :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQBounded SVar t m a
sv = do
    ([ChildEvent a]
list, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
    -- When there is no output seen we dispatch more workers to help
    -- out if there is work pending in the work queue.
    if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
    then m [ChildEvent a]
blockingRead
    else do
        -- send a worker proactively, if needed, even before we start
        -- processing the output.  This may degrade single processor
        -- perf but improves multi-processor, because of more
        -- parallelism
        m ()
sendOneWorker
        [ChildEvent a] -> m [ChildEvent a]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list

    where

    sendOneWorker :: m ()
sendOneWorker = do
        Int
cnt <- IO Int -> m Int
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Int -> m Int) -> IO Int -> m Int
forall a b. (a -> b) -> a -> b
$ IORef Int -> IO Int
forall a. IORef a -> IO a
readIORef (IORef Int -> IO Int) -> IORef Int -> IO Int
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IORef Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef Int
workerCount SVar t m a
sv
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            Bool
done <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
done) (Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)

    {-# INLINE blockingRead #-}
    blockingRead :: m [ChildEvent a]
blockingRead = do
        (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelay (Count -> SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m Bool
dispatchWorker Count
0) SVar t m a
sv
        IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)

readOutputQPaced :: MonadAsync m => SVar t m a -> m [ChildEvent a]
readOutputQPaced :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m [ChildEvent a]
readOutputQPaced SVar t m a
sv = do
    ([ChildEvent a]
list, Int
len) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv
    if Int
len Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
    then m [ChildEvent a]
blockingRead
    else do
        -- XXX send a worker proactively, if needed, even before we start
        -- processing the output.
        m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
        [ChildEvent a] -> m [ChildEvent a]
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return [ChildEvent a]
list

    where

    {-# INLINE blockingRead #-}
    blockingRead :: m [ChildEvent a]
blockingRead = do
        (SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
(SVar t m a -> IO ())
-> (SVar t m a -> m Bool) -> SVar t m a -> m ()
sendWorkerWait SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
sendWorkerDelayPaced SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
        IO [ChildEvent a] -> m [ChildEvent a]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
`fmap` SVar t m a -> IO ([ChildEvent a], Int)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ([ChildEvent a], Int)
readOutputQRaw SVar t m a
sv)

postProcessPaced :: MonadAsync m => SVar t m a -> m Bool
postProcessPaced :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessPaced SVar t m a
sv = do
    Bool
workersDone <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
    -- XXX If during consumption we figure out we are getting delayed then we
    -- should trigger dispatch there as well.  We should try to check on the
    -- workers after consuming every n item from the buffer?
    if Bool
workersDone
    then do
        Bool
r <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
            m Bool -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m Bool -> m ()) -> m Bool -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
dispatchWorkerPaced SVar t m a
sv
            -- Note that we need to guarantee a worker since the work is not
            -- finished, therefore we cannot just rely on dispatchWorkerPaced
            -- which may or may not send a worker.
            Bool
noWorker <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
noWorker (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv
        Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
    else Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

postProcessBounded :: MonadAsync m => SVar t m a -> m Bool
postProcessBounded :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
SVar t m a -> m Bool
postProcessBounded SVar t m a
sv = do
    Bool
workersDone <- SVar t m a -> m Bool
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m Bool
allThreadsDone SVar t m a
sv
    -- There may still be work pending even if there are no workers pending
    -- because all the workers may return if the outputQueue becomes full. In
    -- that case send off a worker to kickstart the work again.
    --
    -- Note that isWorkDone can only be safely checked if all workers are done.
    -- When some workers are in progress they may have decremented the yield
    -- Limit and later ending up incrementing it again. If we look at the yield
    -- limit in that window we may falsely say that it is 0 and therefore we
    -- are done.
    if Bool
workersDone
    then do
        Bool
r <- IO Bool -> m Bool
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
isWorkDone SVar t m a
sv
        -- Note that we need to guarantee a worker, therefore we cannot just
        -- use dispatchWorker which may or may not send a worker.
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
r) (Count -> SVar t m a -> m ()
forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Count -> SVar t m a -> m ()
pushWorker Count
0 SVar t m a
sv)
        -- XXX do we need to dispatch many here?
        -- void $ dispatchWorker sv
        Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
r
    else Bool -> m Bool
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

-------------------------------------------------------------------------------
-- Cleanup
-------------------------------------------------------------------------------

cleanupSVar :: SVar t m a -> IO ()
cleanupSVar :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv = do
    Set ThreadId
workers <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
    (ThreadId -> IO ()) -> Set ThreadId -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
          Set ThreadId
workers

cleanupSVarFromWorker :: SVar t m a -> IO ()
cleanupSVarFromWorker :: forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
sv = do
    Set ThreadId
workers <- IORef (Set ThreadId) -> IO (Set ThreadId)
forall a. IORef a -> IO a
readIORef (SVar t m a -> IORef (Set ThreadId)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef (Set ThreadId)
workerThreads SVar t m a
sv)
    ThreadId
self <- IO ThreadId
myThreadId
    (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
Prelude.mapM_ (ThreadId -> ThreadAbort -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` ThreadAbort
ThreadAbort)
          ((ThreadId -> Bool) -> [ThreadId] -> [ThreadId]
forall a. (a -> Bool) -> [a] -> [a]
Prelude.filter (ThreadId -> ThreadId -> Bool
forall a. Eq a => a -> a -> Bool
/= ThreadId
self) ([ThreadId] -> [ThreadId]) -> [ThreadId] -> [ThreadId]
forall a b. (a -> b) -> a -> b
$ Set ThreadId -> [ThreadId]
forall a. Set a -> [a]
S.toList Set ThreadId
workers)