-- |
-- Module      : Streamly.Internal.Data.Unfold.SVar
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD3
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
module Streamly.Internal.Data.Unfold.SVar
    (
      fromSVar
    , fromProducer
    )
where

#include "inline.hs"

import Control.Exception (fromException)
import Control.Monad (when)
import Control.Monad.IO.Class (MonadIO(..))
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (isNothing)
import Streamly.Internal.Control.Concurrent (MonadAsync)
import Streamly.Internal.Data.Stream.StreamD.Type (Step(..))
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
import Streamly.Internal.Data.SVar (printSVar, cleanupSVar, sendStopToProducer)
import System.Mem (performMajorGC)

import qualified Control.Monad.Catch as MC

import Streamly.Internal.Data.SVar.Type
import Streamly.Internal.Data.Unfold.Type
import Prelude
       hiding (map, mapM, takeWhile, take, filter, const, zipWith
              , drop, dropWhile, either)

-------------------------------------------------------------------------------
-- Generation from SVar
-------------------------------------------------------------------------------

data FromSVarState t m a =
      FromSVarInit (SVar t m a)
    | FromSVarRead (SVar t m a)
    | FromSVarLoop (SVar t m a) [ChildEvent a]
    | FromSVarDone (SVar t m a)

-- | /Internal/
--
{-# INLINE_NORMAL fromSVar #-}
fromSVar :: MonadAsync m => Unfold m (SVar t m a) a
fromSVar :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Unfold m (SVar t m a) a
fromSVar = forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold forall {m :: * -> *} {t :: (* -> *) -> * -> *} {a}.
(MonadIO m, MonadThrow m) =>
FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarInit)
    where

    {-# INLINE_LATE step #-}
    step :: FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (FromSVarInit SVar t m a
svar) = do
        IORef ()
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef ()
        Weak (IORef ())
_ <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef ()
ref IO ()
hook
        -- when this copy of svar gets garbage collected "ref" will get
        -- garbage collected and our GC hook will be called.
        let sv :: SVar t m a
sv = SVar t m a
svar{svarRef :: Maybe (IORef ())
svarRef = forall a. a -> Maybe a
Just IORef ()
ref}
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv)

        where

        {-# NOINLINE hook #-}
        hook :: IO ()
hook = do
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) forall a b. (a -> b) -> a -> b
$ do
                Maybe AbsTime
r <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
svar))
                forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall a. Maybe a -> Bool
isNothing Maybe AbsTime
r) forall a b. (a -> b) -> a -> b
$
                    forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
svar String
"SVar Garbage Collected"
            forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
svar
            -- If there are any SVars referenced by this SVar a GC will prompt
            -- them to be cleaned up quickly.
            forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
svar) IO ()
performMajorGC

    step (FromSVarRead SVar t m a
sv) = do
        [ChildEvent a]
list <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv (forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step (FromSVarLoop SVar t m a
sv []) = do
        Bool
done <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m Bool
postProcess SVar t m a
sv
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ if Bool
done
                      then forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv
                      else forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv

    step (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        Bool
stop <- forall {m :: * -> *}. MonadIO m => ThreadId -> m Bool
shouldStop ThreadId
tid
                        if Bool
stop
                        then do
                            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv)
                            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
                        else forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
                    Just SomeException
ex ->
                        case forall e. Exception e => SomeException -> Maybe e
fromException SomeException
ex of
                            Just ThreadAbort
ThreadAbort ->
                                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
                            Maybe ThreadAbort
Nothing -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVar SVar t m a
sv) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
ex
        where

        shouldStop :: ThreadId -> m Bool
shouldStop ThreadId
tid =
            case forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStopStyle
svarStopStyle SVar t m a
sv of
                SVarStopStyle
StopNone -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
                SVarStopStyle
StopAny -> forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                SVarStopStyle
StopBy -> do
                    ThreadId
sid <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IORef ThreadId
svarStopBy SVar t m a
sv)
                    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ ThreadId
tid forall a. Eq a => a -> a -> Bool
== ThreadId
sid

    step (FromSVarDone SVar t m a
sv) = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
        forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

-------------------------------------------------------------------------------
-- Process events received by a fold consumer from a stream producer
-------------------------------------------------------------------------------

-- XXX Have an error for "FromSVarInit" instead of undefined. The error can
-- redirect the user to report the failure to the developers.
-- | /Internal/
--
{-# INLINE_NORMAL fromProducer #-}
fromProducer :: MonadAsync m => Unfold m (SVar t m a) a
fromProducer :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadAsync m =>
Unfold m (SVar t m a) a
fromProducer = forall (m :: * -> *) a b s.
(s -> m (Step s b)) -> (a -> m s) -> Unfold m a b
Unfold forall {m :: * -> *} {t :: (* -> *) -> * -> *} {a}.
MonadIO m =>
FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (forall (m :: * -> *) a. Monad m => a -> m a
return forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead)
    where

    {-# INLINE_LATE step #-}
    step :: FromSVarState t m a -> m (Step (FromSVarState t m a) a)
step (FromSVarRead SVar t m a
sv) = do
        [ChildEvent a]
list <- forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> m [ChildEvent a]
readOutputQ SVar t m a
sv
        -- Reversing the output is important to guarantee that we process the
        -- outputs in the same order as they were generated by the constituent
        -- streams.
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv (forall a. [a] -> [a]
Prelude.reverse [ChildEvent a]
list)

    step (FromSVarLoop SVar t m a
sv []) = forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarRead SVar t m a
sv
    step (FromSVarLoop SVar t m a
sv (ChildEvent a
ev : [ChildEvent a]
es)) = do
        case ChildEvent a
ev of
            ChildYield a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
a (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> [ChildEvent a] -> FromSVarState t m a
FromSVarLoop SVar t m a
sv [ChildEvent a]
es)
            ChildStop ThreadId
tid Maybe SomeException
e -> do
                forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ThreadId -> m ()
accountThread SVar t m a
sv ThreadId
tid
                case Maybe SomeException
e of
                    Maybe SomeException
Nothing -> do
                        forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> m ()
sendStopToProducer SVar t m a
sv
                        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> FromSVarState t m a
FromSVarDone SVar t m a
sv)
                    Just SomeException
_ -> forall a. HasCallStack => String -> a
error String
"Bug: fromProducer: received exception"

    step (FromSVarDone SVar t m a
sv) = do
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Bool
svarInspectMode SVar t m a
sv) forall a b. (a -> b) -> a -> b
$ do
            AbsTime
t <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Clock -> IO AbsTime
getTime Clock
Monotonic
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> a -> IO ()
writeIORef (SVarStats -> IORef (Maybe AbsTime)
svarStopTime (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> SVarStats
svarStats SVar t m a
sv)) (forall a. a -> Maybe a
Just AbsTime
t)
            forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> String -> IO ()
printSVar SVar t m a
sv String
"SVar Done"
        forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop

    step (FromSVarInit SVar t m a
_) = forall a. HasCallStack => a
undefined