#ifdef __HADDOCK_VERSION__
#undef INSPECTION
#endif

#ifdef INSPECTION
{-# LANGUAGE TemplateHaskell #-}
{-# OPTIONS_GHC -fplugin Test.Inspection.Plugin #-}
#endif

-- |
-- Module      : Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Concurrent.Channel.Operations
    (
      toChannel
    , toChannelK
    , fromChannel
    , fromChannelK
    )
where

#include "inline.hs"

import Control.Exception (fromException)
import Control.Monad (when)
import Control.Monad.Catch (throwM, MonadThrow)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Control.Concurrent
    (MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Data.Stream.StreamD.Type (Stream)
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import System.Mem (performMajorGC)

import qualified Streamly.Internal.Data.Stream.StreamD as Stream
import qualified Streamly.Internal.Data.Stream.StreamD.Type as D
import qualified Streamly.Internal.Data.Stream.StreamK.Type as K

import Streamly.Internal.Data.Stream.Concurrent.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Concurrent.Channel.Type
import Streamly.Internal.Data.Stream.Channel.Types hiding (inspect)

import Prelude hiding (map, concat, concatMap)

#ifdef INSPECTION
import Control.Exception (Exception)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Typeable (Typeable)
import Test.Inspection (inspect, hasNoTypeClassesExcept)
#endif

------------------------------------------------------------------------------
-- Generating streams from a channel
------------------------------------------------------------------------------

-- $concurrentEval
--
-- Usually a channel is used to concurrently evaluate multiple actions in a
-- stream using many worker threads that push the results to the channel and a
-- single puller that pulls them from channel generating the evaluated stream.
--
-- @
--                  input stream
--                       |
--     <-----------------|<--------worker
--     |  exceptions     |
-- output stream <---Channel<------worker
--                       |
--                       |<--------worker
--
-- @
--
-- The puller itself schedules the worker threads based on demand.
-- Exceptions are propagated from the worker threads to the puller.

-------------------------------------------------------------------------------
-- Write a stream to a channel
-------------------------------------------------------------------------------

-- XXX Should be a Fold, singleton API could be called joinChannel, or the fold
-- can be called joinChannel.

-- | Write a stream to an 'SVar' in a non-blocking manner. The stream can then
-- be read back from the SVar using 'fromSVar'.
{-# INLINE toChannelK #-}
toChannelK :: MonadRunInIO m => Channel m a -> K.StreamK m a -> m ()
toChannelK :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
sv StreamK m a
m = do
    RunInIO m
runIn <- forall (m :: * -> *). MonadRunInIO m => m (RunInIO m)
askRunInIO
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
Channel m a -> Bool -> (RunInIO m, StreamK m a) -> IO ()
enqueue Channel m a
sv Bool
False (RunInIO m
runIn, StreamK m a
m)

-- INLINE for fromStreamK/toStreamK fusion

-- | Send a stream to a given channel for concurrent evaluation.
{-# INLINE toChannel #-}
toChannel :: MonadRunInIO m => Channel m a -> Stream m a -> m ()
toChannel :: forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> Stream m a -> m ()
toChannel Channel m a
chan = forall (m :: * -> *) a.
MonadRunInIO m =>
Channel m a -> StreamK m a -> m ()
toChannelK Channel m a
chan forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => Stream m a -> StreamK m a
Stream.toStreamK

{-
-- | Send a stream of streams to a concurrent channel for evaluation.
{-# INLINE joinChannel #-}
joinChannel :: Channel m a -> Fold m (Stream m a) ()
joinChannel = undefined
-}

-------------------------------------------------------------------------------
-- Read a stream from a channel
-------------------------------------------------------------------------------

-- | Pull a stream from an SVar.
{-# NOINLINE fromChannelRaw #-}
fromChannelRaw :: (MonadIO m, MonadThrow m) => Channel m a -> K.StreamK m a
fromChannelRaw :: forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
    [ChildEvent a]
list <- forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ Channel m a
sv
    -- Reversing the output is important to guarantee that we process the
    -- outputs in the same order as they were generated by the constituent
    -- streams.
    forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ [ChildEvent a] -> StreamK m a
processEvents forall a b. (a -> b) -> a -> b
$ forall a. [a] -> [a]
reverse [ChildEvent a]
list

    where

    cleanup :: m ()
cleanup = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- Clock -> IO AbsTime
getTime Clock
Monotonic
            forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
            IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
sv) String
"SVar Done"

    {-# INLINE processEvents #-}
    processEvents :: [ChildEvent a] -> StreamK m a
processEvents [] = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        Bool
done <- forall (m :: * -> *) a. Channel m a -> m Bool
postProcess Channel m a
sv
        if Bool
done
        then m ()
cleanup forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
        else forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv

    processEvents (ChildEvent a
ev : [ChildEvent a]
es) = forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.MkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        let rest :: StreamK m a
rest = [ChildEvent a] -> StreamK m a
processEvents [ChildEvent a]
es
        case ChildEvent a
ev of
            ChildYield a
a -> a -> StreamK m a -> m r
yld a
a StreamK m a
rest
            ChildEvent a
ChildStopChannel -> do
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
                m ()
cleanup forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m r
stp
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread Channel m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStream State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp StreamK m a
rest
                    Just SomeException
ex ->
                        case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                -- We terminate the loop after sending
                                -- ThreadAbort to workers so we should never
                                -- get it unless it is thrown from inside a
                                -- worker thread or by someone else to our
                                -- thread.
                                forall a. HasCallStack => String -> a
error String
"processEvents: got ThreadAbort"
                                -- K.foldStream st yld sng stp rest
                            Maybe ThreadAbort
Nothing -> do
                                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
                                m ()
cleanup forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex

#ifdef INSPECTION
-- Use of GHC constraint tuple (GHC.Classes.(%,,%)) in fromStreamVar leads to
-- space leak because the tuple gets allocated in every recursive call and each
-- allocation holds on to the previous allocation. This test is to make sure
-- that we do not use the constraint tuple type class.
--
inspect $ hasNoTypeClassesExcept 'fromChannelRaw
    [ ''Monad
    , ''Applicative
    , ''MonadThrow
    , ''Exception
    , ''MonadIO
    , ''MonadBaseControl
    , ''Typeable
    , ''Functor
    ]
#endif

-- XXX fromChannel Should not be called multiple times, we can add a
-- safeguard for that. Or we can replicate the stream so that we can distribute
-- it to multiple consumers. or should we use an explicit dupChannel for that?

{-# INLINE fromChannelK #-}
fromChannelK :: MonadAsync m => Channel m a -> K.StreamK m a
fromChannelK :: forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK Channel m a
sv =
    forall (m :: * -> *) a.
(forall r.
 State StreamK m a
 -> (a -> StreamK m a -> m r) -> (a -> m r) -> m r -> m r)
-> StreamK m a
K.mkStream forall a b. (a -> b) -> a -> b
$ \State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp -> do
        IORef ()
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook

        forall (m :: * -> *) a. MonadRunInIO m => Channel m a -> m ()
startChannel Channel m a
sv
        -- We pass a copy of sv to fromStreamVar, so that we know that it has
        -- no other references, when that copy gets garbage collected "ref"
        -- will get garbage collected and our hook will be called.
        forall (m :: * -> *) a r.
State StreamK m a
-> (a -> StreamK m a -> m r)
-> (a -> m r)
-> m r
-> StreamK m a
-> m r
K.foldStreamShared State StreamK m a
st a -> StreamK m a -> m r
yld a -> m r
sng m r
stp forall a b. (a -> b) -> a -> b
$
            forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> StreamK m a
fromChannelRaw Channel m a
sv{svarRef :: Maybe (IORef ())
svarRef = forall a. a -> Maybe a
Just IORef ()
ref}
    where

    hook :: IO ()
hook = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) forall a b. (a -> b) -> a -> b
$ do
            Maybe AbsTime
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv))
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) forall a b. (a -> b) -> a -> b
$
                IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
sv) String
"SVar Garbage Collected"
        IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv)
        -- If there are any SVars referenced by this SVar a GC will prompt
        -- them to be cleaned up quickly.
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) IO ()
performMajorGC

-- | Generate a stream of results from concurrent evaluations from a channel.
-- Evaluation of the channel does not start until this API is called. This API
-- must not be called more than once on a channel. It kicks off evaluation of
-- the channel by dispatching concurrent workers and ensures that as long there
-- is work queued on the channel workers are dispatched proportional to the
-- demand by the consumer.
--
{-# INLINE fromChannel #-}
fromChannel :: MonadAsync m => Channel m a -> Stream m a
fromChannel :: forall (m :: * -> *) a. MonadAsync m => Channel m a -> Stream m a
fromChannel = forall (m :: * -> *) a. Applicative m => StreamK m a -> Stream m a
Stream.fromStreamK forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. MonadAsync m => Channel m a -> StreamK m a
fromChannelK

data FromSVarState t m a =
      FromSVarInit
    | FromSVarRead (Channel m a)
    | FromSVarLoop (Channel m a) [ChildEvent a]
    | FromSVarDone (Channel m a)

-- | Like 'fromSVar' but generates a StreamD style stream instead of CPS.
--
{-# INLINE_NORMAL _fromChannelD #-}
_fromChannelD :: (MonadIO m, MonadThrow m) => Channel m a -> D.Stream m a
_fromChannelD :: forall (m :: * -> *) a.
(MonadIO m, MonadThrow m) =>
Channel m a -> Stream m a
_fromChannelD Channel m a
svar = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {p} {t} {t}.
p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step forall t (m :: * -> *) a. FromSVarState t m a
FromSVarInit
    where

    {-# INLINE_LATE step #-}
    step :: p -> FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step p
_ FromSVarState t m a
FromSVarInit = do
        IORef ()
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
        -- when this copy of svar gets garbage collected "ref" will get
        -- garbage collected and our GC hook will be called.
        let sv :: Channel m a
sv = Channel m a
svar{svarRef :: Maybe (IORef ())
svarRef = forall a. a -> Maybe a
Just IORef ()
ref}
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarRead Channel m a
sv)

        where

        {-# NOINLINE hook #-}
        hook :: IO ()
hook = do
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
svar) forall a b. (a -> b) -> a -> b
$ do
                Maybe AbsTime
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
svar))
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) forall a b. (a -> b) -> a -> b
$
                    IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
svar) String
"SVar Garbage Collected"
            IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
svar)
            -- If there are any SVars referenced by this SVar a GC will prompt
            -- them to be cleaned up quickly.
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
svar) IO ()
performMajorGC

    step p
_ (FromSVarRead Channel m a
sv) = do
        [ChildEvent a]
list <- forall (m :: * -> *) a. Channel m a -> m [ChildEvent a]
readOutputQ Channel m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv (forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step p
_ (FromSVarLoop Channel m a
sv []) = do
        Bool
done <- forall (m :: * -> *) a. Channel m a -> m Bool
postProcess Channel m a
sv
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip forall a b. (a -> b) -> a -> b
$ if Bool
done
                      then forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarDone Channel m a
sv
                      else forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarRead Channel m a
sv

    step p
_ (FromSVarLoop Channel m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
D.Yield a
a (forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
            ChildEvent a
ChildStopChannel -> do
                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall t (m :: * -> *) a. Channel m a -> FromSVarState t m a
FromSVarDone Channel m a
sv)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                forall (m :: * -> *) a. Channel m a -> ThreadId -> m ()
accountThread Channel m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
                    Just SomeException
ex ->
                        case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
D.Skip (forall t (m :: * -> *) a.
Channel m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop Channel m a
sv [ChildEvent a]
es)
                            Maybe ThreadAbort
Nothing -> do
                                forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Set ThreadId) -> IO ()
cleanupSVar (forall (m :: * -> *) a. Channel m a -> IORef (Set ThreadId)
workerThreads Channel m a
sv))
                                forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwM SomeException
ex

    step p
_ (FromSVarDone Channel m a
sv) = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (m :: * -> *) a. Channel m a -> Bool
svarInspectMode Channel m a
sv) forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (m :: * -> *) a. Channel m a -> SVarStats
svarStats Channel m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ IO String -> String -> IO ()
printSVar (forall (m :: * -> *) a. Channel m a -> IO String
dumpSVar Channel m a
sv) String
"SVar Done"
        forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
D.Stop