-- |
-- Module      : Streamly.Internal.Data.Fold.Concurrent.Channel.Type
-- Copyright   : (c) 2017, 2022 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Fold.Concurrent.Channel.Type
    ( Channel (..)
    , newChannel
    , Config
    , sendToWorker
    , checkFoldStatus
    , dumpSVar
    )
where

#include "inline.hs"

import Control.Concurrent (ThreadId, myThreadId, tryPutMVar)
import Control.Concurrent.MVar (MVar, newEmptyMVar, takeMVar)
import Control.Exception (SomeException(..))
import Control.Monad (void)
import Control.Monad.Catch (throwM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (IORef, newIORef, readIORef)
import Data.List (intersperse)
import Streamly.Internal.Control.Concurrent
    (MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doForkWith)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
import Streamly.Internal.Data.Channel.Worker (sendWithDoorBell)

import qualified Streamly.Internal.Data.Fold as Fold
import qualified Streamly.Internal.Data.Stream as D

import Streamly.Internal.Data.Channel.Types

data Channel m a b = Channel
    {
    -- FORWARD FLOW: Flow of data from the driver to the consuming fold

    -- XXX This is inputQueue instead.

    -- Shared output queue (events, length)
    --
    -- [LOCKING] Frequent locked access. This is updated by the driver on each
    -- yield and once in a while read by the consumer fold thread.
    --
    -- XXX Use a different type than ChildEvent. We can do with a simpler type
    -- in folds.
      forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue :: IORef ([ChildEvent a], Int)
    -- This is capped to maxBufferLimit if set to more than that. Otherwise
    -- potentially each worker may yield one value to the buffer in the worst
    -- case exceeding the requested buffer size.
    , forall (m :: * -> *) a b. Channel m a b -> Limit
maxBufferLimit :: Limit

    -- [LOCKING] Infrequent MVar. Used when the outputQ transitions from empty
    -- to non-empty.
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell :: MVar ()  -- signal the consumer about output
    , forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readOutputQ :: m [ChildEvent a]

    -- receive async events from the fold consumer to the driver.
    , forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer :: IORef ([ChildEvent b], Int)
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer :: MVar ()
    , forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell :: MVar ()

    -- cleanup: to track garbage collection of SVar --
    , forall (m :: * -> *) a b. Channel m a b -> Maybe (IORef ())
svarRef :: Maybe (IORef ())

    -- Stats --
    , forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats :: SVarStats

    -- Diagnostics --
    , forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode :: Bool
    , forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator :: ThreadId
    }

{-# NOINLINE dumpSVar #-}
dumpSVar :: Channel m a b -> IO String
dumpSVar :: forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
sv = do
    [String]
xs <- [IO String] -> IO [String]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence ([IO String] -> IO [String]) -> [IO String] -> IO [String]
forall a b. (a -> b) -> a -> b
$ IO String -> [IO String] -> [IO String]
forall a. a -> [a] -> [a]
intersperse (String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return String
"\n")
        [ String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId -> String
forall a. Show a => a -> String
dumpCreator (Channel m a b -> ThreadId
forall (m :: * -> *) a b. Channel m a b -> ThreadId
svarCreator Channel m a b
sv))
        , String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------CURRENT STATE-----------"
        , IORef ([ChildEvent a], Int) -> IO String
forall (t :: * -> *) a1 a2.
(Foldable t, Show a1) =>
IORef (t a2, a1) -> IO String
dumpOutputQ (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
sv)
        -- XXX print the types of events in the outputQueue, first 5
        , MVar () -> IO String
forall a. Show a => MVar a -> IO String
dumpDoorBell (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
sv)
        , String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return String
"---------STATS-----------\n"
        , Bool -> Maybe YieldRateInfo -> SVarStats -> IO String
dumpSVarStats (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
sv) Maybe YieldRateInfo
forall a. Maybe a
Nothing (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
sv)
        ]
    String -> IO String
forall (m :: * -> *) a. Monad m => a -> m a
return (String -> IO String) -> String -> IO String
forall a b. (a -> b) -> a -> b
$ [String] -> String
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [String]
xs

-------------------------------------------------------------------------------
-- Support for running folds concurrently
-------------------------------------------------------------------------------

-- $concurrentFolds
--
-- To run folds concurrently, we need to decouple the fold execution from the
-- stream production. We use the SVar to do that, we have a single worker
-- pushing the stream elements to the SVar and on the consumer side a fold
-- driver pulls the values and folds them.
--
-- @
--
-- Fold worker <------Channel<------Fold driver
--     |  exceptions  |
--     --------------->
--
-- @
--
-- We need a channel for pushing exceptions from the fold worker to the fold
-- driver. The stream may be pushed to multiple folds at the same time. For
-- that we need one Channel per fold:
--
-- @
--
-- Fold worker <------Channel--
--                    |        |
-- Fold worker <------Channel------Driver
--                    |        |
-- Fold worker <------Channel--
--
-- @
--
-- Note: If the stream pusher terminates due to an exception, we do not
-- actively terminate the fold. It gets cleaned up by the GC.

-------------------------------------------------------------------------------
-- Process events received by a fold worker from a fold driver
-------------------------------------------------------------------------------

sendToDriver :: Channel m a b -> ChildEvent b -> IO Int
sendToDriver :: forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv ChildEvent b
msg = do
    -- In case the producer stream is blocked on pushing to the fold buffer
    -- then wake it up so that it can check for the stop event or exception
    -- being sent to it otherwise we will be deadlocked.
    -- void $ tryPutMVar (pushBufferMVar sv) ()
    IORef ([ChildEvent b], Int) -> MVar () -> ChildEvent b -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell (Channel m a b -> IORef ([ChildEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
sv)
                     (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer Channel m a b
sv) ChildEvent b
msg

sendYieldToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
sv b
res = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> ChildEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv (b -> ChildEvent b
forall a. a -> ChildEvent a
ChildYield b
res)

{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver :: forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv SomeException
e = do
    ThreadId
tid <- IO ThreadId
myThreadId
    IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> ChildEvent b -> IO Int
forall (m :: * -> *) a b. Channel m a b -> ChildEvent b -> IO Int
sendToDriver Channel m a b
sv (ThreadId -> Maybe SomeException -> ChildEvent b
forall a. ThreadId -> Maybe SomeException -> ChildEvent a
ChildStop ThreadId
tid (SomeException -> Maybe SomeException
forall a. a -> Maybe a
Just SomeException
e))

data FromSVarState m a b =
      FromSVarRead (Channel m a b)
    | FromSVarLoop (Channel m a b) [ChildEvent a]

{-# INLINE_NORMAL fromProducerD #-}
fromProducerD :: MonadIO m => Channel m a b -> D.Stream m a
fromProducerD :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromProducerD Channel m a b
svar = (State StreamK m a
 -> FromSVarState m a b -> m (Step (FromSVarState m a b) a))
-> FromSVarState m a b -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
forall {m :: * -> *} {p} {a} {b}.
Monad m =>
p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step (Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
svar)

    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState m a b -> m (Step (FromSVarState m a b) a)
step p
_ (FromSVarRead Channel m a b
sv) = do
        [ChildEvent a]
list <- Channel m a b -> m [ChildEvent a]
forall (m :: * -> *) a b. Channel m a b -> m [ChildEvent a]
readOutputQ Channel m a b
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv ([ChildEvent a] -> [ChildEvent a]
forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop Channel m a b
sv []) = Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. s -> Step s a
D.Skip (FromSVarState m a b -> Step (FromSVarState m a b) a)
-> FromSVarState m a b -> Step (FromSVarState m a b) a
forall a b. (a -> b) -> a -> b
$ Channel m a b -> FromSVarState m a b
forall (m :: * -> *) a b. Channel m a b -> FromSVarState m a b
FromSVarRead Channel m a b
sv
    step p
_ (FromSVarLoop Channel m a b
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a))
-> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall a b. (a -> b) -> a -> b
$ a -> FromSVarState m a b -> Step (FromSVarState m a b) a
forall s a. a -> s -> Step s a
D.Yield a
a (Channel m a b -> [ChildEvent a] -> FromSVarState m a b
forall (m :: * -> *) a b.
Channel m a b -> [ChildEvent a] -> FromSVarState m a b
FromSVarLoop Channel m a b
sv [ChildEvent a]
es)
            ChildEvent a
ChildStopChannel -> Step (FromSVarState m a b) a -> m (Step (FromSVarState m a b) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (FromSVarState m a b) a
forall s a. Step s a
D.Stop
            ChildEvent a
_ -> m (Step (FromSVarState m a b) a)
forall a. HasCallStack => a
undefined

{-# INLINE readOutputQChan #-}
readOutputQChan :: Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a b
chan = do
    let ss :: Maybe SVarStats
ss = if Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan then SVarStats -> Maybe SVarStats
forall a. a -> Maybe a
Just (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan) else Maybe SVarStats
forall a. Maybe a
Nothing
    r :: ([ChildEvent a], Int)
r@([ChildEvent a]
_, Int
n) <- IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan) Maybe SVarStats
ss
    if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0
    then do
        IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
            (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO String -> String -> IO () -> IO ()
withDiagMVar
                (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
                (Channel m a b -> IO String
forall (m :: * -> *) a b. Channel m a b -> IO String
dumpSVar Channel m a b
chan)
                String
"readOutputQChan: nothing to do"
            (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
        IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
forall a.
IORef ([ChildEvent a], Int)
-> Maybe SVarStats -> IO ([ChildEvent a], Int)
readOutputQRaw (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan) Maybe SVarStats
ss
    else ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r

{-# INLINE readOutputQDB #-}
readOutputQDB :: Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB :: forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB Channel m a b
chan = do
    ([ChildEvent a], Int)
r <- Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQChan Channel m a b
chan
    -- XXX We can do this only if needed, if someone sleeps because of buffer
    -- then they can set a flag and we ring the doorbell only if the flag is
    -- set. Like we do in sendWorkerWait for streams.
    Bool
_ <- MVar () -> () -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell Channel m a b
chan) ()
    ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a. Monad m => a -> m a
return ([ChildEvent a], Int)
r

mkNewChannel :: forall m a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel :: forall (m :: * -> *) a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel Config
cfg = do
    IORef ([ChildEvent a], Int)
outQ <- ([ChildEvent a], Int) -> IO (IORef ([ChildEvent a], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    IORef ([ChildEvent b], Int)
outQRev <- ([ChildEvent b], Int) -> IO (IORef ([ChildEvent b], Int))
forall a. a -> IO (IORef a)
newIORef ([], Int
0)
    MVar ()
outQMvRev <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
    MVar ()
bufferMv <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

    SVarStats
stats <- IO SVarStats
newSVarStats
    ThreadId
tid <- IO ThreadId
myThreadId

    let getSVar :: Channel m a b -> Channel m a b
        getSVar :: Channel m a b -> Channel m a b
getSVar Channel m a b
sv = Channel :: forall (m :: * -> *) a b.
IORef ([ChildEvent a], Int)
-> Limit
-> MVar ()
-> m [ChildEvent a]
-> IORef ([ChildEvent b], Int)
-> MVar ()
-> MVar ()
-> Maybe (IORef ())
-> SVarStats
-> Bool
-> ThreadId
-> Channel m a b
Channel
            { outputQueue :: IORef ([ChildEvent a], Int)
outputQueue      = IORef ([ChildEvent a], Int)
outQ
            , outputDoorBell :: MVar ()
outputDoorBell   = MVar ()
outQMv
            , outputQueueFromConsumer :: IORef ([ChildEvent b], Int)
outputQueueFromConsumer = IORef ([ChildEvent b], Int)
outQRev
            , outputDoorBellFromConsumer :: MVar ()
outputDoorBellFromConsumer = MVar ()
outQMvRev
            , bufferSpaceDoorBell :: MVar ()
bufferSpaceDoorBell = MVar ()
bufferMv
            , maxBufferLimit :: Limit
maxBufferLimit   = Config -> Limit
getMaxBuffer Config
cfg
            , readOutputQ :: m [ChildEvent a]
readOutputQ      = IO [ChildEvent a] -> m [ChildEvent a]
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [ChildEvent a] -> m [ChildEvent a])
-> IO [ChildEvent a] -> m [ChildEvent a]
forall a b. (a -> b) -> a -> b
$ (([ChildEvent a], Int) -> [ChildEvent a])
-> IO ([ChildEvent a], Int) -> IO [ChildEvent a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([ChildEvent a], Int) -> [ChildEvent a]
forall a b. (a, b) -> a
fst (Channel m a b -> IO ([ChildEvent a], Int)
forall (m :: * -> *) a b. Channel m a b -> IO ([ChildEvent a], Int)
readOutputQDB Channel m a b
sv)
            , svarRef :: Maybe (IORef ())
svarRef          = Maybe (IORef ())
forall a. Maybe a
Nothing
            , svarInspectMode :: Bool
svarInspectMode  = Config -> Bool
getInspectMode Config
cfg
            , svarCreator :: ThreadId
svarCreator      = ThreadId
tid
            , svarStats :: SVarStats
svarStats        = SVarStats
stats
            }

    let sv :: Channel m a b
sv = Channel m a b -> Channel m a b
getSVar Channel m a b
sv in Channel m a b -> IO (Channel m a b)
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv

{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
    (Config -> Config) -> Fold IO a b -> IO (Channel IO a b) #-}
newChannel :: (MonadRunInIO m) =>
    (Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel :: forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f = do
    let config :: Config
config = Config -> Config
modifier Config
defaultConfig
    Channel m a b
sv <- IO (Channel m a b) -> m (Channel m a b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Channel m a b) -> m (Channel m a b))
-> IO (Channel m a b) -> m (Channel m a b)
forall a b. (a -> b) -> a -> b
$ Config -> IO (Channel m a b)
forall (m :: * -> *) a b. MonadIO m => Config -> IO (Channel m a b)
mkNewChannel Config
config
    RunInIO m
mrun <- m (RunInIO m)
forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    m ThreadId -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m ThreadId -> m ()) -> m ThreadId -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
forall (m :: * -> *).
MonadRunInIO m =>
Bool -> m () -> RunInIO m -> (SomeException -> IO ()) -> m ThreadId
doForkWith (Config -> Bool
getBound Config
config) (Channel m a b -> m ()
work Channel m a b
sv) RunInIO m
mrun (Channel m a b -> SomeException -> IO ()
forall (m :: * -> *) a b. Channel m a b -> SomeException -> IO ()
sendExceptionToDriver Channel m a b
sv)
    Channel m a b -> m (Channel m a b)
forall (m :: * -> *) a. Monad m => a -> m a
return Channel m a b
sv

    where

    {-# NOINLINE work #-}
    work :: Channel m a b -> m ()
work Channel m a b
chan =
        let f1 :: Fold m a ()
f1 = (b -> m ()) -> Fold m a b -> Fold m a ()
forall (m :: * -> *) b c a.
Monad m =>
(b -> m c) -> Fold m a b -> Fold m a c
Fold.rmapM (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (b -> m ()) -> b -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Channel m a b -> b -> m ()
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> b -> m ()
sendYieldToDriver Channel m a b
chan) Fold m a b
f
         in Fold m a () -> Stream m a -> m ()
forall (m :: * -> *) a b.
Monad m =>
Fold m a b -> Stream m a -> m b
D.fold Fold m a ()
f1 (Stream m a -> m ()) -> Stream m a -> m ()
forall a b. (a -> b) -> a -> b
$ Channel m a b -> Stream m a
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> Stream m a
fromProducerD Channel m a b
chan

-------------------------------------------------------------------------------
-- Process events received by the driver thread from the fold worker side
-------------------------------------------------------------------------------

-- XXX currently only one event is sent by a fold consumer to the stream
-- producer. But we can potentially have multiple events e.g. the fold step can
-- generate exception more than once and the producer can ignore those
-- exceptions or handle them and still keep driving the fold.

-- XXX In case of scan this could be a stream.

-- | Poll for events sent by the fold worker to the fold driver. The fold
-- consumer can send a "Stop" event or an exception. When a "Stop" is received
-- this function returns 'True'. If an exception is recieved then it throws the
-- exception.
--
{-# NOINLINE checkFoldStatus #-}
checkFoldStatus :: MonadAsync m => Channel m a b -> m (Maybe b)
checkFoldStatus :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
sv = do
    ([ChildEvent b]
list, Int
_) <- IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int))
-> IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent b], Int) -> IO ([ChildEvent b], Int)
forall a. IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
readOutputQBasic (Channel m a b -> IORef ([ChildEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
sv)
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    [ChildEvent b] -> m (Maybe b)
forall {m :: * -> *} {a}.
MonadThrow m =>
[ChildEvent a] -> m (Maybe a)
processEvents ([ChildEvent b] -> m (Maybe b)) -> [ChildEvent b] -> m (Maybe b)
forall a b. (a -> b) -> a -> b
$ [ChildEvent b] -> [ChildEvent b]
forall a. [a] -> [a]
reverse [ChildEvent b]
list

    where

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> m (Maybe a)
processEvents [] = Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    processEvents (ChildEvent a
ev : [ChildEvent a]
_) = do
        case ChildEvent a
ev of
            ChildStop ThreadId
_ Maybe SomeException
e -> m (Maybe a)
-> (SomeException -> m (Maybe a))
-> Maybe SomeException
-> m (Maybe a)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe m (Maybe a)
forall a. HasCallStack => a
undefined SomeException -> m (Maybe a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM Maybe SomeException
e
            ChildEvent a
ChildStopChannel -> m (Maybe a)
forall a. HasCallStack => a
undefined
            ChildYield a
b -> Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
b)

{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
isBufferAvailable :: forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
sv = do
    let limit :: Limit
limit = Channel m a b -> Limit
forall (m :: * -> *) a b. Channel m a b -> Limit
maxBufferLimit Channel m a b
sv
    case Limit
limit of
        Limit
Unlimited -> Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Limited Word
lim -> do
            ([ChildEvent a]
_, Int
n) <- IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int))
-> IO ([ChildEvent a], Int) -> m ([ChildEvent a], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> IO ([ChildEvent a], Int)
forall a. IORef a -> IO a
readIORef (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
sv)
            Bool -> m Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> m Bool) -> Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Word -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word
lim Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
n

-- | Push values from a driver to a fold worker via a Channel. Before pushing a
-- value to the Channel it polls for events received from the fold worker.  If a
-- stop event is received then it returns 'True' otherwise false.  Propagates
-- exceptions received from the fold wroker.
--
{-# INLINE sendToWorker #-}
sendToWorker :: MonadAsync m => Channel m a b -> a -> m (Maybe b)
sendToWorker :: forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a = m (Maybe b)
go

    where

    -- Recursive function, should we use SPEC?
    go :: m (Maybe b)
go = do
        let qref :: IORef ([ChildEvent b], Int)
qref = Channel m a b -> IORef ([ChildEvent b], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent b], Int)
outputQueueFromConsumer Channel m a b
chan
        Maybe b
status <- do
            ([ChildEvent b]
_, Int
n) <- IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int))
-> IO ([ChildEvent b], Int) -> m ([ChildEvent b], Int)
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent b], Int) -> IO ([ChildEvent b], Int)
forall a. IORef a -> IO a
readIORef IORef ([ChildEvent b], Int)
qref
            if Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
            then Channel m a b -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
            else Maybe b -> m (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
        case Maybe b
status of
            Just b
_ -> Maybe b -> m (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
status
            Maybe b
Nothing -> do
                    Bool
r <- Channel m a b -> m Bool
forall (m :: * -> *) a b. MonadIO m => Channel m a b -> m Bool
isBufferAvailable Channel m a b
chan
                    if Bool
r
                    then do
                        IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
                            (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
                            (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
                                (Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan)
                                (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
                                (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
a)
                        Maybe b -> m (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
                    else do
                        () <- IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
bufferSpaceDoorBell Channel m a b
chan)
                        m (Maybe b)
go