{-# OPTIONS_GHC -Wno-deprecations #-}
-- |
-- Module      : Streamly.Internal.Data.Fold.SVar
-- Copyright   : (c) 2017 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.Fold.SVar
    (
      write
    , writeLimited
    )
where

#include "inline.hs"

import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(liftIO))
import Streamly.Internal.Data.Fold (Fold(..))

import qualified Streamly.Internal.Data.Fold as FL (Step(Done, Partial))

import Streamly.Internal.Data.SVar

-- | A fold to write a stream to an SVar. Unlike 'toSVar' this does not allow
-- for concurrent evaluation of the stream, as the fold receives the input one
-- element at a time, it just forwards the elements to the SVar. However, we
-- can safely execute the fold in an independent thread, the SVar can act as a
-- buffer decoupling the sender from the receiver. Also, we can have multiple
-- folds running concurrently pusing the streams to the SVar.
--
{-# INLINE write #-}
write :: MonadIO m => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
write SVar t m a
svar Maybe WorkerInfo
winfo = (() -> a -> m (Step () ()))
-> m (Step () ()) -> (() -> m ()) -> (() -> m ()) -> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold () -> a -> m (Step () ())
forall {m :: * -> *} {b}. MonadIO m => () -> a -> m (Step () b)
step m (Step () ())
forall {b}. m (Step () b)
initial () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return () -> m ()
forall {m :: * -> *}. MonadIO m => () -> m ()
final

    where

    initial :: m (Step () b)
initial = Step () b -> m (Step () b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () b -> m (Step () b)) -> Step () b -> m (Step () b)
forall a b. (a -> b) -> a -> b
$ () -> Step () b
forall s b. s -> Step s b
FL.Partial ()

    -- XXX we can have a separate fold for unlimited buffer case to avoid a
    -- branch in the step here.
    step :: () -> a -> m (Step () b)
step () a
x =
        IO (Step () b) -> m (Step () b)
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Step () b) -> m (Step () b))
-> IO (Step () b) -> m (Step () b)
forall a b. (a -> b) -> a -> b
$ do
            SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
            IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
x)
            Step () b -> IO (Step () b)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step () b -> IO (Step () b)) -> Step () b -> IO (Step () b)
forall a b. (a -> b) -> a -> b
$ () -> Step () b
forall s b. s -> Step s b
FL.Partial ()

    final :: () -> m ()
final () = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo

-- | Like write, but applies a yield limit.
--
{-# INLINE writeLimited #-}
writeLimited :: MonadIO m
    => SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited :: forall (m :: * -> *) (t :: (* -> *) -> * -> *) a.
MonadIO m =>
SVar t m a -> Maybe WorkerInfo -> Fold m a ()
writeLimited SVar t m a
svar Maybe WorkerInfo
winfo = (Bool -> a -> m (Step Bool ()))
-> m (Step Bool ())
-> (Bool -> m ())
-> (Bool -> m ())
-> Fold m a ()
forall (m :: * -> *) a b s.
(s -> a -> m (Step s b))
-> m (Step s b) -> (s -> m b) -> (s -> m b) -> Fold m a b
Fold Bool -> a -> m (Step Bool ())
forall {m :: * -> *}. MonadIO m => Bool -> a -> m (Step Bool ())
step m (Step Bool ())
forall {b}. m (Step Bool b)
initial (m () -> Bool -> m ()
forall a b. a -> b -> a
const (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())) Bool -> m ()
forall {m :: * -> *}. MonadIO m => Bool -> m ()
final

    where

    initial :: m (Step Bool b)
initial = Step Bool b -> m (Step Bool b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool b -> m (Step Bool b)) -> Step Bool b -> m (Step Bool b)
forall a b. (a -> b) -> a -> b
$ Bool -> Step Bool b
forall s b. s -> Step s b
FL.Partial Bool
True

    step :: Bool -> a -> m (Step Bool ())
step Bool
True a
x =
        IO (Step Bool ()) -> m (Step Bool ())
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Step Bool ()) -> m (Step Bool ()))
-> IO (Step Bool ()) -> m (Step Bool ())
forall a b. (a -> b) -> a -> b
$ do
            Bool
yieldLimitOk <- SVar t m a -> IO Bool
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO Bool
decrementYieldLimit SVar t m a
svar
            if Bool
yieldLimitOk
            then do
                SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
decrementBufferLimit SVar t m a
svar
                IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> ChildEvent a -> IO Int
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> ChildEvent a -> IO Int
send SVar t m a
svar (a -> ChildEvent a
forall a. a -> ChildEvent a
ChildYield a
x)
                Step Bool () -> IO (Step Bool ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> IO (Step Bool ()))
-> Step Bool () -> IO (Step Bool ())
forall a b. (a -> b) -> a -> b
$ Bool -> Step Bool ()
forall s b. s -> Step s b
FL.Partial Bool
True
            else do
                SVar t m a -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> IO ()
cleanupSVarFromWorker SVar t m a
svar
                SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
                Step Bool () -> IO (Step Bool ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> IO (Step Bool ()))
-> Step Bool () -> IO (Step Bool ())
forall a b. (a -> b) -> a -> b
$ () -> Step Bool ()
forall s b. b -> Step s b
FL.Done ()
    step Bool
False a
_ = Step Bool () -> m (Step Bool ())
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step Bool () -> m (Step Bool ()))
-> Step Bool () -> m (Step Bool ())
forall a b. (a -> b) -> a -> b
$ () -> Step Bool ()
forall s b. b -> Step s b
FL.Done ()

    final :: Bool -> m ()
final Bool
True = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SVar t m a -> Maybe WorkerInfo -> IO ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SVar t m a -> Maybe WorkerInfo -> IO ()
sendStop SVar t m a
svar Maybe WorkerInfo
winfo
    final Bool
False = () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()