module Streamly.Internal.Data.Fold.Concurrent.Channel
(
module Streamly.Internal.Data.Fold.Concurrent.Channel.Type
, maxBuffer
, boundThreads
, inspect
, parEval
)
where
import Control.Concurrent (takeMVar)
import Control.Monad (void, when)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (writeIORef)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Fold (Fold(..), Step (..))
import Streamly.Internal.Data.Channel.Worker (sendWithDoorBell)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.Fold.Concurrent.Channel.Type
import Streamly.Internal.Data.Channel.Types
{-# INLINABLE parEval #-}
parEval :: MonadAsync m => (Config -> Config) -> Fold m a b -> Fold m a b
parEval :: forall (m :: * -> *) a b.
MonadAsync m =>
(Config -> Config) -> Fold m a b -> Fold m a b
parEval Config -> Config
modifier Fold m a b
f =
(Channel m a b -> a -> m (Step (Channel m a b) b))
-> m (Step (Channel m a b) b)
-> (Channel m a b -> m b)
-> (Channel m a b -> m b)
-> Fold m a b
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold Channel m a b -> a -> m (Step (Channel m a b) b)
forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> a -> m (Step (Channel m a b) b)
step m (Step (Channel m a b) b)
forall {b}. m (Step (Channel m a b) b)
initial Channel m a b -> m b
forall {p} {a}. p -> a
extract Channel m a b -> m b
forall {m :: * -> *} {a} {b}.
(MonadIO m, MonadBaseControl IO m, MonadThrow m) =>
Channel m a b -> m b
final
where
initial :: m (Step (Channel m a b) b)
initial = Channel m a b -> Step (Channel m a b) b
forall s b. s -> Step s b
Partial (Channel m a b -> Step (Channel m a b) b)
-> m (Channel m a b) -> m (Step (Channel m a b) b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Config -> Config) -> Fold m a b -> m (Channel m a b)
forall (m :: * -> *) a b.
MonadRunInIO m =>
(Config -> Config) -> Fold m a b -> m (Channel m a b)
newChannel Config -> Config
modifier Fold m a b
f
step :: Channel m a b -> a -> m (Step (Channel m a b) b)
step Channel m a b
chan a
a = do
Maybe b
status <- Channel m a b -> a -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> a -> m (Maybe b)
sendToWorker Channel m a b
chan a
a
Step (Channel m a b) b -> m (Step (Channel m a b) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Channel m a b) b -> m (Step (Channel m a b) b))
-> Step (Channel m a b) b -> m (Step (Channel m a b) b)
forall a b. (a -> b) -> a -> b
$ case Maybe b
status of
Maybe b
Nothing -> Channel m a b -> Step (Channel m a b) b
forall s b. s -> Step s b
Partial Channel m a b
chan
Just b
b -> b -> Step (Channel m a b) b
forall s b. b -> Step s b
Done b
b
extract :: p -> a
extract p
_ = [Char] -> a
forall a. HasCallStack => [Char] -> a
error [Char]
"Concurrent folds do not support scanning"
final :: Channel m a b -> m b
final Channel m a b
chan = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void
(IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
forall a.
IORef ([ChildEvent a], Int) -> MVar () -> ChildEvent a -> IO Int
sendWithDoorBell
(Channel m a b -> IORef ([ChildEvent a], Int)
forall (m :: * -> *) a b.
Channel m a b -> IORef ([ChildEvent a], Int)
outputQueue Channel m a b
chan)
(Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBell Channel m a b
chan)
ChildEvent a
forall a. ChildEvent a
ChildStopChannel
Maybe b
status <- Channel m a b -> m (Maybe b)
forall (m :: * -> *) a b.
MonadAsync m =>
Channel m a b -> m (Maybe b)
checkFoldStatus Channel m a b
chan
case Maybe b
status of
Maybe b
Nothing -> do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO
(IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Bool -> IO [Char] -> [Char] -> IO () -> IO ()
withDiagMVar
(Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan)
(Channel m a b -> IO [Char]
forall (m :: * -> *) a b. Channel m a b -> IO [Char]
dumpSVar Channel m a b
chan)
[Char]
"parEval: waiting to drain"
(IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar (Channel m a b -> MVar ()
forall (m :: * -> *) a b. Channel m a b -> MVar ()
outputDoorBellFromConsumer Channel m a b
chan)
Channel m a b -> m b
final Channel m a b
chan
Just b
b -> do
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Channel m a b -> Bool
forall (m :: * -> *) a b. Channel m a b -> Bool
svarInspectMode Channel m a b
chan) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
IORef (Maybe AbsTime) -> Maybe AbsTime -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (Channel m a b -> SVarStats
forall (m :: * -> *) a b. Channel m a b -> SVarStats
svarStats Channel m a b
chan)) (AbsTime -> Maybe AbsTime
forall a. a -> Maybe a
Just AbsTime
t)
IO [Char] -> [Char] -> IO ()
printSVar (Channel m a b -> IO [Char]
forall (m :: * -> *) a b. Channel m a b -> IO [Char]
dumpSVar Channel m a b
chan) [Char]
"SVar Done"
b -> m b
forall (m :: * -> *) a. Monad m => a -> m a
return b
b