-- |
-- Module      : Streamly.Internal.Data.Stream.Lifted
-- Copyright   : (c) 2019 Composewell Technologies
-- License     : BSD-3-Clause
-- Maintainer  : streamly@composewell.com
-- Stability   : experimental
-- Portability : GHC

module Streamly.Internal.Data.Stream.Lifted
    (
      after
    , bracket
    , bracket3
    , finally
    , retry

    -- For IsStream module
    , afterD
    , bracket3D
    , retryD
    )
where

#include "inline.hs"

import Control.Exception (Exception, SomeException, mask_)
import Control.Monad.Catch (MonadCatch)
#ifdef USE_UNLIFTIO
import Control.Monad.IO.Unlift (MonadUnliftIO)
#else
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.Control (MonadBaseControl)
#endif
import Data.Map.Strict (Map)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent
    (MonadRunInIO, MonadAsync, withRunInIO)
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Data.IOFinalizer.Lifted
    (newIOFinalizer, runIOFinalizer, clearingIOFinalizer)
import Streamly.Internal.Data.Stream (Step(..))

import qualified Control.Monad.Catch as MC
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Stream as D

-- $setup
-- >>> :m
-- >>> import qualified Streamly.Internal.Data.Stream.Exception.Lifted as Stream

-- XXX Implement in terms of the corresponding IO operation (gbracketIO).

data GbracketIOState s1 s2 v wref
    = GBracketIOInit
    | GBracketIONormal s1 v wref
    | GBracketIOException s2

{-# INLINE_NORMAL gbracket #-}
gbracket
    :: MonadRunInIO m
    => m c -- ^ before
    -> (c -> m d1) -- ^ on normal stop
    -> (c -> e -> D.Stream m b -> m (D.Stream m b)) -- ^ on exception
    -> (c -> m d2) -- ^ on GC without normal stop or exception
    -> (forall s. m s -> m (Either e s)) -- ^ try (exception handling)
    -> (c -> D.Stream m b) -- ^ stream generator
    -> D.Stream m b
gbracket :: forall (m :: * -> *) c d1 e b d2.
MonadRunInIO m =>
m c
-> (c -> m d1)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> m d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket m c
bef c -> m d1
aft c -> e -> Stream m b -> m (Stream m b)
onExc c -> m d2
onGC forall s. m s -> m (Either e s)
ftry c -> Stream m b
action =
    (State StreamK m b
 -> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. GbracketIOState s1 s2 v wref
GBracketIOInit

    where

    -- If the stream is never evaluated the "aft" action will never be
    -- called. For that to occur we will need the user of this API to pass a
    -- weak pointer to us.
    {-# INLINE_LATE step #-}
    step :: State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step State StreamK m b
_ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
GBracketIOInit = do
        -- We mask asynchronous exceptions to make the execution
        -- of 'bef' and the registration of 'aft' atomic.
        -- A similar thing is done in the resourcet package: https://git.io/JvKV3
        -- Tutorial: https://markkarpov.com/tutorial/exceptions.html
        (c
r, IOFinalizer
ref) <- ((forall a. m a -> IO (StM m a)) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer)
forall (m :: * -> *) b.
MonadRunInIO m =>
((forall a. m a -> IO (StM m a)) -> IO (StM m b)) -> m b
withRunInIO (((forall a. m a -> IO (StM m a)) -> IO (StM m (c, IOFinalizer)))
 -> m (c, IOFinalizer))
-> ((forall a. m a -> IO (StM m a)) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO (StM m a)
run -> IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer))
forall a. IO a -> IO a
mask_ (IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer)))
-> IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer))
forall a b. (a -> b) -> a -> b
$ m (c, IOFinalizer) -> IO (StM m (c, IOFinalizer))
forall a. m a -> IO (StM m a)
run (m (c, IOFinalizer) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer) -> IO (StM m (c, IOFinalizer))
forall a b. (a -> b) -> a -> b
$ do
            c
r <- m c
bef
            IOFinalizer
ref <- m d2 -> m IOFinalizer
forall (m :: * -> *) a. MonadRunInIO m => m a -> m IOFinalizer
newIOFinalizer (c -> m d2
onGC c
r)
            (c, IOFinalizer) -> m (c, IOFinalizer)
forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, IOFinalizer
ref)
        Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
 -> Step
      (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall a b. (a -> b) -> a -> b
$ Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (c -> Stream m b
action c
r) c
r IOFinalizer
ref

    step State StreamK m b
gst (GBracketIONormal (D.UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st) c
v IOFinalizer
ref) = do
        Either e (Step s b)
res <- m (Step s b) -> m (Either e (Step s b))
forall s. m s -> m (Either e s)
ftry (m (Step s b) -> m (Either e (Step s b)))
-> m (Step s b) -> m (Either e (Step s b))
forall a b. (a -> b) -> a -> b
$ State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Either e (Step s b)
res of
            Right Step s b
r -> case Step s b
r of
                Yield b
x s
s ->
                    Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Skip s
s ->
                    Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
                Step s b
Stop ->
                    IOFinalizer -> m d1 -> m d1
forall (m :: * -> *) a. MonadRunInIO m => IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> m d1
aft c
v) m d1
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop
            -- XXX Do not handle async exceptions, just rethrow them.
            Left e
e -> do
                -- Clearing of finalizer and running of exception handler must
                -- be atomic wrt async exceptions. Otherwise if we have cleared
                -- the finalizer and have not run the exception handler then we
                -- may leak the resource.
                Stream m b
stream <-
                    IOFinalizer -> m (Stream m b) -> m (Stream m b)
forall (m :: * -> *) a. MonadRunInIO m => IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> e -> Stream m b -> m (Stream m b)
onExc c
v e
e ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st))
                Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException Stream m b
stream)
    step State StreamK m b
gst (GBracketIOException (D.UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)) = do
        Step s b
res <- State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
        case Step s b
res of
            Yield b
x s
s ->
                Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Skip s
s    -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
 -> m (Step
         (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
            Step s b
Stop      -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
        (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop

{-# INLINE_NORMAL bracket3D #-}
bracket3D :: (MonadAsync m, MonadCatch m) =>
       m b
    -> (b -> m c)
    -> (b -> m d)
    -> (b -> m e)
    -> (b -> D.Stream m a)
    -> D.Stream m a
bracket3D :: forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3D m b
bef b -> m c
aft b -> m d
onExc b -> m e
onGC =
    m b
-> (b -> m c)
-> (b -> SomeException -> Stream m a -> m (Stream m a))
-> (b -> m e)
-> (forall s. m s -> m (Either SomeException s))
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) c d1 e b d2.
MonadRunInIO m =>
m c
-> (c -> m d1)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> m d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket
        m b
bef
        b -> m c
aft
        (\b
a (SomeException
e :: SomeException) Stream m a
_ -> b -> m d
onExc b
a m d -> m (Stream m a) -> m (Stream m a)
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m (Stream m a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e)
        b -> m e
onGC
        ((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try)

-- For a use case of this see the "streamly-process" package. It needs to kill
-- the process in case of exception or garbage collection, but waits for the
-- process to terminate in normal cases.

-- | Like 'bracket' but can use 3 separate cleanup actions depending on the
-- mode of termination:
--
-- 1. When the stream stops normally
-- 2. When the stream is garbage collected
-- 3. When the stream encounters an exception
--
-- @bracket3 before onStop onGC onException action@ runs @action@ using the
-- result of @before@. If the stream stops, @onStop@ action is executed, if the
-- stream is abandoned @onGC@ is executed, if the stream encounters an
-- exception @onException@ is executed.
--
-- The exception is not caught, it is rethrown.
--
-- /Pre-release/
{-# INLINE bracket3 #-}
bracket3 :: (MonadAsync m, MonadCatch m)
    => m b
    -> (b -> m c)
    -> (b -> m d)
    -> (b -> m e)
    -> (b -> Stream m a)
    -> Stream m a
bracket3 :: forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3 = m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3D

-- | Run the alloc action @IO b@ with async exceptions disabled but keeping
-- blocking operations interruptible (see 'Control.Exception.mask').  Use the
-- output @b@ of the IO action as input to the function @b -> Stream m a@ to
-- generate an output stream.
--
-- @b@ is usually a resource under the IO monad, e.g. a file handle, that
-- requires a cleanup after use. The cleanup action @b -> m c@, runs whenever
-- (1) the stream ends normally, (2) due to a sync or async exception or, (3)
-- if it gets garbage collected after a partial lazy evaluation. The exception
-- is not caught, it is rethrown.
--
-- 'bracket' only guarantees that the cleanup action runs, and it runs with
-- async exceptions enabled. The action must ensure that it can successfully
-- cleanup the resource in the face of sync or async exceptions.
--
-- When the stream ends normally or on a sync exception, cleanup action runs
-- immediately in the current thread context, whereas in other cases it runs in
-- the GC context, therefore, cleanup may be delayed until the GC gets to run.
--
-- /See also: 'bracket_'/
--
-- /Inhibits stream fusion/
--
{-# INLINE bracket #-}
bracket :: (MonadAsync m, MonadCatch m)
    => m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket :: forall (m :: * -> *) b c a.
(MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket m b
bef b -> m c
aft = m b
-> (b -> m c)
-> (b -> m c)
-> (b -> m c)
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3 m b
bef b -> m c
aft b -> m c
aft b -> m c
aft

-- | Run the action @m b@ whenever the stream @Stream m a@ stops normally,
-- aborts due to an exception or if it is garbage collected after a partial
-- lazy evaluation.
--
-- The semantics of running the action @m b@ are similar to the cleanup action
-- semantics described in 'bracket'.
--
-- >>> finally action xs = Stream.bracket (return ()) (const action) (const xs)
--
-- /See also 'finally_'/
--
-- /Inhibits stream fusion/
--
{-# INLINE finally #-}
finally :: (MonadAsync m, MonadCatch m) =>
    m b -> Stream m a -> Stream m a
finally :: forall (m :: * -> *) b a.
(MonadAsync m, MonadCatch m) =>
m b -> Stream m a -> Stream m a
finally m b
action Stream m a
xs = m () -> (() -> m b) -> (() -> Stream m a) -> Stream m a
forall (m :: * -> *) b c a.
(MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (m b -> () -> m b
forall a b. a -> b -> a
const m b
action) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
xs)

{-# INLINE_NORMAL afterD #-}
afterD :: MonadRunInIO m
    => m b -> D.Stream m a -> D.Stream m a
afterD :: forall (m :: * -> *) b a.
MonadRunInIO m =>
m b -> Stream m a -> Stream m a
afterD m b
action (D.Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK m a
 -> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a))
-> Maybe (s, IOFinalizer) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' Maybe (s, IOFinalizer)
forall a. Maybe a
Nothing

    where

    {-# INLINE_LATE step' #-}
    step' :: State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' State StreamK m a
_ Maybe (s, IOFinalizer)
Nothing = do
        IOFinalizer
ref <- m b -> m IOFinalizer
forall (m :: * -> *) a. MonadRunInIO m => m a -> m IOFinalizer
newIOFinalizer m b
action
        Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip (Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a)
-> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall a b. (a -> b) -> a -> b
$ (s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
state, IOFinalizer
ref)
    step' State StreamK m a
gst (Just (s
st, IOFinalizer
ref)) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
        case Step s a
res of
            Yield a
x s
s -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. a -> s -> Step s a
Yield a
x ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Skip s
s    -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
 -> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
            Step s a
Stop      -> do
                IOFinalizer -> m ()
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
                Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (s, IOFinalizer)) a
forall s a. Step s a
Stop

-- | Run the action @m b@ whenever the stream @Stream m a@ stops normally, or
-- if it is garbage collected after a partial lazy evaluation.
--
-- The semantics of the action @m b@ are similar to the semantics of cleanup
-- action in 'bracket'.
--
-- /See also 'after_'/
--
{-# INLINE after #-}
after ::
#ifdef USE_UNLIFTIO
    MonadUnliftIO m
#else
    (MonadIO m, MonadBaseControl IO m)
#endif
    => m b -> Stream m a -> Stream m a
after :: forall (m :: * -> *) b a.
(MonadIO m, MonadBaseControl IO m) =>
m b -> Stream m a -> Stream m a
after = m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
MonadRunInIO m =>
m b -> Stream m a -> Stream m a
afterD

data RetryState emap s1 s2
    = RetryWithMap emap s1
    | RetryDefault s2

-- | See 'Streamly.Internal.Data.Stream.retry'
--
{-# INLINE_NORMAL retryD #-}
retryD
    :: forall e m a. (Exception e, Ord e, MonadCatch m)
    => Map e Int
       -- ^ map from exception to retry count
    -> (e -> D.Stream m a)
       -- ^ default handler for those exceptions that are not in the map
    -> D.Stream m a
    -> D.Stream m a
retryD :: forall e (m :: * -> *) a.
(Exception e, Ord e, MonadCatch m) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retryD Map e Int
emap0 e -> Stream m a
defaultHandler (D.Stream State StreamK m a -> s -> m (Step s a)
step0 s
state0) = (State StreamK m a
 -> RetryState (Map e Int) s (Stream m a)
 -> m (Step (RetryState (Map e Int) s (Stream m a)) a))
-> RetryState (Map e Int) s (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> RetryState (Map e Int) s (Stream m a)
-> m (Step (RetryState (Map e Int) s (Stream m a)) a)
forall {a}.
(Ord a, Num a) =>
State StreamK m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step RetryState (Map e Int) s (Stream m a)
forall {s2}. RetryState (Map e Int) s s2
state

    where

    state :: RetryState (Map e Int) s s2
state = Map e Int -> s -> RetryState (Map e Int) s s2
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e Int
emap0 s
state0

    {-# INLINE_LATE step #-}
    step :: State StreamK m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step State StreamK m a
gst (RetryWithMap Map e a
emap s
st) = do
        Either e (Step s a)
eres <- m (Step s a) -> m (Either e (Step s a))
forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step0 State StreamK m a
gst s
st
        case Either e (Step s a)
eres of
            Left e
e -> e
-> Map e a -> s -> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall {m :: * -> *} {a} {s1} {a}.
(Monad m, Ord a, Num a) =>
e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s
st
            Right Step s a
res ->
                Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
                    (Step (RetryState (Map e a) s (Stream m a)) a
 -> m (Step (RetryState (Map e a) s (Stream m a)) a))
-> Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
                          Yield a
x s
st1 -> a
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Map e a -> s -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
                          Skip s
st1 -> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. s -> Step s a
Skip (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Map e a -> s -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
                          Step s a
Stop -> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. Step s a
Stop
    step State StreamK m a
gst (RetryDefault (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
state1)) = do
        Step s a
res <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
state1
        Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (RetryState (Map e a) s (Stream m a)) a
 -> m (Step (RetryState (Map e a) s (Stream m a)) a))
-> Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
                  Yield a
x s
st1 -> a
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
st1)
                  Skip s
st1 -> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. s -> Step s a
Skip (RetryState (Map e a) s (Stream m a)
 -> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
st1)
                  Step s a
Stop -> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. Step s a
Stop

    {-# INLINE handler #-}
    handler :: e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s1
st =
        Step (RetryState (Map e a) s1 (Stream m a)) a
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
forall (m :: * -> *) a. Monad m => a -> m a
return
            (Step (RetryState (Map e a) s1 (Stream m a)) a
 -> m (Step (RetryState (Map e a) s1 (Stream m a)) a))
-> Step (RetryState (Map e a) s1 (Stream m a)) a
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ RetryState (Map e a) s1 (Stream m a)
-> Step (RetryState (Map e a) s1 (Stream m a)) a
forall s a. s -> Step s a
Skip
            (RetryState (Map e a) s1 (Stream m a)
 -> Step (RetryState (Map e a) s1 (Stream m a)) a)
-> RetryState (Map e a) s1 (Stream m a)
-> Step (RetryState (Map e a) s1 (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ case e -> Map e a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup e
e Map e a
emap of
                  Just a
i
                      | a
i a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
0 ->
                          let emap1 :: Map e a
emap1 = e -> a -> Map e a -> Map e a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert e
e (a
i a -> a -> a
forall a. Num a => a -> a -> a
- a
1) Map e a
emap
                           in Map e a -> s1 -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap1 s1
st
                      | Bool
otherwise -> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (Stream m a -> RetryState (Map e a) s1 (Stream m a))
-> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e
                  Maybe a
Nothing -> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (Stream m a -> RetryState (Map e a) s1 (Stream m a))
-> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e

-- | @retry@ takes 3 arguments
--
-- 1. A map @m@ whose keys are exceptions and values are the number of times to
-- retry the action given that the exception occurs.
--
-- 2. A handler @han@ that decides how to handle an exception when the exception
-- cannot be retried.
--
-- 3. The stream itself that we want to run this mechanism on.
--
-- When evaluating a stream if an exception occurs,
--
-- 1. The stream evaluation aborts
--
-- 2. The exception is looked up in @m@
--
--    a. If the exception exists and the mapped value is > 0 then,
--
--       i. The value is decreased by 1.
--
--       ii. The stream is resumed from where the exception was called, retrying
--       the action.
--
--    b. If the exception exists and the mapped value is == 0 then the stream
--    evaluation stops.
--
--    c. If the exception does not exist then we handle the exception using
--    @han@.
--
-- /Internal/
--
{-# INLINE retry #-}
retry :: (MonadCatch m, Exception e, Ord e)
    => Map e Int
       -- ^ map from exception to retry count
    -> (e -> Stream m a)
       -- ^ default handler for those exceptions that are not in the map
    -> Stream m a
    -> Stream m a
retry :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e, Ord e) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retry = Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
forall e (m :: * -> *) a.
(Exception e, Ord e, MonadCatch m) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retryD