module Streamly.Internal.Data.Stream.Lifted
(
after
, bracket
, bracket3
, finally
, retry
, afterD
, bracket3D
, retryD
)
where
#include "inline.hs"
import Control.Exception (Exception, SomeException, mask_)
import Control.Monad.Catch (MonadCatch)
#ifdef USE_UNLIFTIO
import Control.Monad.IO.Unlift (MonadUnliftIO)
#else
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.Control (MonadBaseControl)
#endif
import Data.Map.Strict (Map)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, withRunInIO)
import Streamly.Internal.Data.Stream (Stream)
import Streamly.Internal.Data.IOFinalizer.Lifted
(newIOFinalizer, runIOFinalizer, clearingIOFinalizer)
import Streamly.Internal.Data.Stream (Step(..))
import qualified Control.Monad.Catch as MC
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Stream as D
data GbracketIOState s1 s2 v wref
= GBracketIOInit
| GBracketIONormal s1 v wref
| GBracketIOException s2
{-# INLINE_NORMAL gbracket #-}
gbracket
:: MonadRunInIO m
=> m c
-> (c -> m d1)
-> (c -> e -> D.Stream m b -> m (D.Stream m b))
-> (c -> m d2)
-> (forall s. m s -> m (Either e s))
-> (c -> D.Stream m b)
-> D.Stream m b
gbracket :: forall (m :: * -> *) c d1 e b d2.
MonadRunInIO m =>
m c
-> (c -> m d1)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> m d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket m c
bef c -> m d1
aft c -> e -> Stream m b -> m (Stream m b)
onExc c -> m d2
onGC forall s. m s -> m (Either e s)
ftry c -> Stream m b
action =
(State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. GbracketIOState s1 s2 v wref
GBracketIOInit
where
{-# INLINE_LATE step #-}
step :: State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step State StreamK m b
_ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
GBracketIOInit = do
(c
r, IOFinalizer
ref) <- ((forall a. m a -> IO (StM m a)) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer)
forall (m :: * -> *) b.
MonadRunInIO m =>
((forall a. m a -> IO (StM m a)) -> IO (StM m b)) -> m b
withRunInIO (((forall a. m a -> IO (StM m a)) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer))
-> ((forall a. m a -> IO (StM m a)) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO (StM m a)
run -> IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer))
forall a. IO a -> IO a
mask_ (IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer)))
-> IO (StM m (c, IOFinalizer)) -> IO (StM m (c, IOFinalizer))
forall a b. (a -> b) -> a -> b
$ m (c, IOFinalizer) -> IO (StM m (c, IOFinalizer))
forall a. m a -> IO (StM m a)
run (m (c, IOFinalizer) -> IO (StM m (c, IOFinalizer)))
-> m (c, IOFinalizer) -> IO (StM m (c, IOFinalizer))
forall a b. (a -> b) -> a -> b
$ do
c
r <- m c
bef
IOFinalizer
ref <- m d2 -> m IOFinalizer
forall (m :: * -> *) a. MonadRunInIO m => m a -> m IOFinalizer
newIOFinalizer (c -> m d2
onGC c
r)
(c, IOFinalizer) -> m (c, IOFinalizer)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, IOFinalizer
ref)
Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall a b. (a -> b) -> a -> b
$ Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (c -> Stream m b
action c
r) c
r IOFinalizer
ref
step State StreamK m b
gst (GBracketIONormal (D.UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st) c
v IOFinalizer
ref) = do
Either e (Step s b)
res <- m (Step s b) -> m (Either e (Step s b))
forall s. m s -> m (Either e s)
ftry (m (Step s b) -> m (Either e (Step s b)))
-> m (Step s b) -> m (Either e (Step s b))
forall a b. (a -> b) -> a -> b
$ State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
case Either e (Step s b)
res of
Right Step s b
r -> case Step s b
r of
Yield b
x s
s ->
Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
Skip s
s ->
Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> c
-> IOFinalizer
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
Step s b
Stop ->
IOFinalizer -> m d1 -> m d1
forall (m :: * -> *) a. MonadRunInIO m => IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> m d1
aft c
v) m d1
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop
Left e
e -> do
Stream m b
stream <-
IOFinalizer -> m (Stream m b) -> m (Stream m b)
forall (m :: * -> *) a. MonadRunInIO m => IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> e -> Stream m b -> m (Stream m b)
onExc c
v e
e ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st))
Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException Stream m b
stream)
step State StreamK m b
gst (GBracketIOException (D.UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)) = do
Step s b
res <- State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
case Step s b
res of
Yield b
x s
s ->
Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. a -> s -> Step s a
Yield b
x (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
Skip s
s -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b))
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a b. (a -> b) -> a -> b
$ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. s -> Step s a
Skip (Stream m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException ((State StreamK m b -> s -> m (Step s b)) -> s -> Stream m b
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
Step s b
Stop -> Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b
forall s a. Step s a
Stop
{-# INLINE_NORMAL bracket3D #-}
bracket3D :: (MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> D.Stream m a)
-> D.Stream m a
bracket3D :: forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3D m b
bef b -> m c
aft b -> m d
onExc b -> m e
onGC =
m b
-> (b -> m c)
-> (b -> SomeException -> Stream m a -> m (Stream m a))
-> (b -> m e)
-> (forall s. m s -> m (Either SomeException s))
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) c d1 e b d2.
MonadRunInIO m =>
m c
-> (c -> m d1)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> m d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket
m b
bef
b -> m c
aft
(\b
a (SomeException
e :: SomeException) Stream m a
_ -> b -> m d
onExc b
a m d -> m (Stream m a) -> m (Stream m a)
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> SomeException -> m (Stream m a)
forall e a. (HasCallStack, Exception e) => e -> m a
forall (m :: * -> *) e a.
(MonadThrow m, HasCallStack, Exception e) =>
e -> m a
MC.throwM SomeException
e)
b -> m e
onGC
((m s -> m (Either SomeException s))
-> m s -> m (Either SomeException s)
forall a. a -> a
inline m s -> m (Either SomeException s)
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try)
{-# INLINE bracket3 #-}
bracket3 :: (MonadAsync m, MonadCatch m)
=> m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3 :: forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3 = m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3D
{-# INLINE bracket #-}
bracket :: (MonadAsync m, MonadCatch m)
=> m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket :: forall (m :: * -> *) b c a.
(MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket m b
bef b -> m c
aft = m b
-> (b -> m c)
-> (b -> m c)
-> (b -> m c)
-> (b -> Stream m a)
-> Stream m a
forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3 m b
bef b -> m c
aft b -> m c
aft b -> m c
aft
{-# INLINE finally #-}
finally :: (MonadAsync m, MonadCatch m) =>
m b -> Stream m a -> Stream m a
finally :: forall (m :: * -> *) b a.
(MonadAsync m, MonadCatch m) =>
m b -> Stream m a -> Stream m a
finally m b
action Stream m a
xs = m () -> (() -> m b) -> (() -> Stream m a) -> Stream m a
forall (m :: * -> *) b c a.
(MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (m b -> () -> m b
forall a b. a -> b -> a
const m b
action) (Stream m a -> () -> Stream m a
forall a b. a -> b -> a
const Stream m a
xs)
{-# INLINE_NORMAL afterD #-}
afterD :: MonadRunInIO m
=> m b -> D.Stream m a -> D.Stream m a
afterD :: forall (m :: * -> *) b a.
MonadRunInIO m =>
m b -> Stream m a -> Stream m a
afterD m b
action (D.Stream State StreamK m a -> s -> m (Step s a)
step s
state) = (State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a))
-> Maybe (s, IOFinalizer) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' Maybe (s, IOFinalizer)
forall a. Maybe a
Nothing
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' State StreamK m a
_ Maybe (s, IOFinalizer)
Nothing = do
IOFinalizer
ref <- m b -> m IOFinalizer
forall (m :: * -> *) a. MonadRunInIO m => m a -> m IOFinalizer
newIOFinalizer m b
action
Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip (Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a)
-> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall a b. (a -> b) -> a -> b
$ (s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
state, IOFinalizer
ref)
step' State StreamK m a
gst (Just (s
st, IOFinalizer
ref)) = do
Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
case Step s a
res of
Yield a
x s
s -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. a -> s -> Step s a
Yield a
x ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
Skip s
s -> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a))
-> Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a b. (a -> b) -> a -> b
$ Maybe (s, IOFinalizer) -> Step (Maybe (s, IOFinalizer)) a
forall s a. s -> Step s a
Skip ((s, IOFinalizer) -> Maybe (s, IOFinalizer)
forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
Step s a
Stop -> do
IOFinalizer -> m ()
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
Step (Maybe (s, IOFinalizer)) a
-> m (Step (Maybe (s, IOFinalizer)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Step (Maybe (s, IOFinalizer)) a
forall s a. Step s a
Stop
{-# INLINE after #-}
after ::
#ifdef USE_UNLIFTIO
MonadUnliftIO m
#else
(MonadIO m, MonadBaseControl IO m)
#endif
=> m b -> Stream m a -> Stream m a
after :: forall (m :: * -> *) b a.
(MonadIO m, MonadBaseControl IO m) =>
m b -> Stream m a -> Stream m a
after = m b -> Stream m a -> Stream m a
forall (m :: * -> *) b a.
MonadRunInIO m =>
m b -> Stream m a -> Stream m a
afterD
data RetryState emap s1 s2
= RetryWithMap emap s1
| RetryDefault s2
{-# INLINE_NORMAL retryD #-}
retryD
:: forall e m a. (Exception e, Ord e, MonadCatch m)
=> Map e Int
-> (e -> D.Stream m a)
-> D.Stream m a
-> D.Stream m a
retryD :: forall e (m :: * -> *) a.
(Exception e, Ord e, MonadCatch m) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retryD Map e Int
emap0 e -> Stream m a
defaultHandler (D.Stream State StreamK m a -> s -> m (Step s a)
step0 s
state0) = (State StreamK m a
-> RetryState (Map e Int) s (Stream m a)
-> m (Step (RetryState (Map e Int) s (Stream m a)) a))
-> RetryState (Map e Int) s (Stream m a) -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> RetryState (Map e Int) s (Stream m a)
-> m (Step (RetryState (Map e Int) s (Stream m a)) a)
forall {a}.
(Ord a, Num a) =>
State StreamK m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step RetryState (Map e Int) s (Stream m a)
forall {s2}. RetryState (Map e Int) s s2
state
where
state :: RetryState (Map e Int) s s2
state = Map e Int -> s -> RetryState (Map e Int) s s2
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e Int
emap0 s
state0
{-# INLINE_LATE step #-}
step :: State StreamK m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step State StreamK m a
gst (RetryWithMap Map e a
emap s
st) = do
Either e (Step s a)
eres <- m (Step s a) -> m (Either e (Step s a))
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try (m (Step s a) -> m (Either e (Step s a)))
-> m (Step s a) -> m (Either e (Step s a))
forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step0 State StreamK m a
gst s
st
case Either e (Step s a)
eres of
Left e
e -> e
-> Map e a -> s -> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall {m :: * -> *} {a} {s1} {a}.
(Monad m, Ord a, Num a) =>
e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s
st
Right Step s a
res ->
Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
(Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a))
-> Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
Yield a
x s
st1 -> a
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Map e a -> s -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
Skip s
st1 -> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. s -> Step s a
Skip (RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Map e a -> s -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
Step s a
Stop -> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. Step s a
Stop
step State StreamK m a
gst (RetryDefault (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
state1)) = do
Step s a
res <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
state1
Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
(Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a))
-> Step (RetryState (Map e a) s (Stream m a)) a
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
Yield a
x s
st1 -> a
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. a -> s -> Step s a
Yield a
x (RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
st1)
Skip s
st1 -> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. s -> Step s a
Skip (RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a)
-> RetryState (Map e a) s (Stream m a)
-> Step (RetryState (Map e a) s (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ Stream m a -> RetryState (Map e a) s (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault ((State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
st1)
Step s a
Stop -> Step (RetryState (Map e a) s (Stream m a)) a
forall s a. Step s a
Stop
{-# INLINE handler #-}
handler :: e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s1
st =
Step (RetryState (Map e a) s1 (Stream m a)) a
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
(Step (RetryState (Map e a) s1 (Stream m a)) a
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a))
-> Step (RetryState (Map e a) s1 (Stream m a)) a
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
forall a b. (a -> b) -> a -> b
$ RetryState (Map e a) s1 (Stream m a)
-> Step (RetryState (Map e a) s1 (Stream m a)) a
forall s a. s -> Step s a
Skip
(RetryState (Map e a) s1 (Stream m a)
-> Step (RetryState (Map e a) s1 (Stream m a)) a)
-> RetryState (Map e a) s1 (Stream m a)
-> Step (RetryState (Map e a) s1 (Stream m a)) a
forall a b. (a -> b) -> a -> b
$ case e -> Map e a -> Maybe a
forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup e
e Map e a
emap of
Just a
i
| a
i a -> a -> Bool
forall a. Ord a => a -> a -> Bool
> a
0 ->
let emap1 :: Map e a
emap1 = e -> a -> Map e a -> Map e a
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert e
e (a
i a -> a -> a
forall a. Num a => a -> a -> a
- a
1) Map e a
emap
in Map e a -> s1 -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap1 s1
st
| Bool
otherwise -> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (Stream m a -> RetryState (Map e a) s1 (Stream m a))
-> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e
Maybe a
Nothing -> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (Stream m a -> RetryState (Map e a) s1 (Stream m a))
-> Stream m a -> RetryState (Map e a) s1 (Stream m a)
forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e
{-# INLINE retry #-}
retry :: (MonadCatch m, Exception e, Ord e)
=> Map e Int
-> (e -> Stream m a)
-> Stream m a
-> Stream m a
retry :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e, Ord e) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retry = Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
forall e (m :: * -> *) a.
(Exception e, Ord e, MonadCatch m) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retryD