module Streamly.Internal.Data.Stream.Exception.Lifted
(
after
, bracket
, bracket3
, finally
, retry
, afterD
, bracket3D
, retryD
)
where
#include "inline.hs"
import Control.Exception (Exception, SomeException, mask_)
import Control.Monad.Catch (MonadCatch)
#ifdef USE_UNLIFTIO
import Control.Monad.IO.Unlift (MonadUnliftIO)
#else
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.Control (MonadBaseControl)
#endif
import Data.Map.Strict (Map)
import GHC.Exts (inline)
import Streamly.Internal.Control.Concurrent
(MonadRunInIO, MonadAsync, withRunInIO)
import Streamly.Internal.Data.Stream.StreamD.Type (Stream)
import Streamly.Internal.Data.IOFinalizer.Lifted
(newIOFinalizer, runIOFinalizer, clearingIOFinalizer)
import Streamly.Internal.Data.Stream.StreamD (Step(..))
import qualified Control.Monad.Catch as MC
import qualified Data.Map.Strict as Map
import qualified Streamly.Internal.Data.Stream.StreamD as D
data GbracketIOState s1 s2 v wref
= GBracketIOInit
| GBracketIONormal s1 v wref
| GBracketIOException s2
{-# INLINE_NORMAL gbracket #-}
gbracket
:: MonadRunInIO m
=> m c
-> (c -> m d1)
-> (c -> e -> D.Stream m b -> m (D.Stream m b))
-> (c -> m d2)
-> (forall s. m s -> m (Either e s))
-> (c -> D.Stream m b)
-> D.Stream m b
gbracket :: forall (m :: * -> *) c d1 e b d2.
MonadRunInIO m =>
m c
-> (c -> m d1)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> m d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket m c
bef c -> m d1
aft c -> e -> Stream m b -> m (Stream m b)
onExc c -> m d2
onGC forall s. m s -> m (Either e s)
ftry c -> Stream m b
action =
forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step forall s1 s2 v wref. GbracketIOState s1 s2 v wref
GBracketIOInit
where
{-# INLINE_LATE step #-}
step :: State StreamK m b
-> GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
-> m (Step
(GbracketIOState (Stream m b) (Stream m b) c IOFinalizer) b)
step State StreamK m b
_ GbracketIOState (Stream m b) (Stream m b) c IOFinalizer
GBracketIOInit = do
(c
r, IOFinalizer
ref) <- forall (m :: * -> *) b.
MonadRunInIO m =>
((forall a. m a -> IO (StM m a)) -> IO (StM m b)) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO (StM m a)
run -> forall a. IO a -> IO a
mask_ forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO (StM m a)
run forall a b. (a -> b) -> a -> b
$ do
c
r <- m c
bef
IOFinalizer
ref <- forall (m :: * -> *) a. MonadRunInIO m => m a -> m IOFinalizer
newIOFinalizer (c -> m d2
onGC c
r)
forall (m :: * -> *) a. Monad m => a -> m a
return (c
r, IOFinalizer
ref)
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (c -> Stream m b
action c
r) c
r IOFinalizer
ref
step State StreamK m b
gst (GBracketIONormal (D.UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st) c
v IOFinalizer
ref) = do
Either e (Step s b)
res <- forall s. m s -> m (Either e s)
ftry forall a b. (a -> b) -> a -> b
$ State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
case Either e (Step s b)
res of
Right Step s b
r -> case Step s b
r of
Yield b
x s
s ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
Skip s
s ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2 v wref.
s1 -> v -> wref -> GbracketIOState s1 s2 v wref
GBracketIONormal (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s) c
v IOFinalizer
ref)
Step s b
Stop ->
forall (m :: * -> *) a. MonadRunInIO m => IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> m d1
aft c
v) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
Left e
e -> do
Stream m b
stream <-
forall (m :: * -> *) a. MonadRunInIO m => IOFinalizer -> m a -> m a
clearingIOFinalizer IOFinalizer
ref (c -> e -> Stream m b -> m (Stream m b)
onExc c
v e
e (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st))
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException Stream m b
stream)
step State StreamK m b
gst (GBracketIOException (D.UnStream State StreamK m b -> s -> m (Step s b)
step1 s
st)) = do
Step s b
res <- State StreamK m b -> s -> m (Step s b)
step1 State StreamK m b
gst s
st
case Step s b
res of
Yield b
x s
s ->
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield b
x (forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall s1 s2 v wref. s2 -> GbracketIOState s1 s2 v wref
GBracketIOException (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m b -> s -> m (Step s b)
step1 s
s))
Step s b
Stop -> forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
{-# INLINE_NORMAL bracket3D #-}
bracket3D :: (MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> D.Stream m a)
-> D.Stream m a
bracket3D :: forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3D m b
bef b -> m c
aft b -> m d
onExc b -> m e
onGC =
forall (m :: * -> *) c d1 e b d2.
MonadRunInIO m =>
m c
-> (c -> m d1)
-> (c -> e -> Stream m b -> m (Stream m b))
-> (c -> m d2)
-> (forall s. m s -> m (Either e s))
-> (c -> Stream m b)
-> Stream m b
gbracket
m b
bef
b -> m c
aft
(\b
a (SomeException
e :: SomeException) Stream m a
_ -> b -> m d
onExc b
a forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) b a. Applicative m => m b -> Stream m a
D.nilM (forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
MC.throwM SomeException
e)))
b -> m e
onGC
(forall a. a -> a
inline forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try)
{-# INLINE bracket3 #-}
bracket3 :: (MonadAsync m, MonadCatch m)
=> m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3 :: forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3 = forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3D
{-# INLINE bracket #-}
bracket :: (MonadAsync m, MonadCatch m)
=> m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket :: forall (m :: * -> *) b c a.
(MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket m b
bef b -> m c
aft = forall (m :: * -> *) b c d e a.
(MonadAsync m, MonadCatch m) =>
m b
-> (b -> m c)
-> (b -> m d)
-> (b -> m e)
-> (b -> Stream m a)
-> Stream m a
bracket3 m b
bef b -> m c
aft b -> m c
aft b -> m c
aft
{-# INLINE finally #-}
finally :: (MonadAsync m, MonadCatch m) =>
m b -> Stream m a -> Stream m a
finally :: forall (m :: * -> *) b a.
(MonadAsync m, MonadCatch m) =>
m b -> Stream m a -> Stream m a
finally m b
action Stream m a
xs = forall (m :: * -> *) b c a.
(MonadAsync m, MonadCatch m) =>
m b -> (b -> m c) -> (b -> Stream m a) -> Stream m a
bracket (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall a b. a -> b -> a
const m b
action) (forall a b. a -> b -> a
const Stream m a
xs)
{-# INLINE_NORMAL afterD #-}
afterD :: MonadRunInIO m
=> m b -> D.Stream m a -> D.Stream m a
afterD :: forall (m :: * -> *) b a.
MonadRunInIO m =>
m b -> Stream m a -> Stream m a
afterD m b
action (D.Stream State StreamK m a -> s -> m (Step s a)
step s
state) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' forall a. Maybe a
Nothing
where
{-# INLINE_LATE step' #-}
step' :: State StreamK m a
-> Maybe (s, IOFinalizer) -> m (Step (Maybe (s, IOFinalizer)) a)
step' State StreamK m a
_ Maybe (s, IOFinalizer)
Nothing = do
IOFinalizer
ref <- forall (m :: * -> *) a. MonadRunInIO m => m a -> m IOFinalizer
newIOFinalizer m b
action
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just (s
state, IOFinalizer
ref)
step' State StreamK m a
gst (Just (s
st, IOFinalizer
ref)) = do
Step s a
res <- State StreamK m a -> s -> m (Step s a)
step State StreamK m a
gst s
st
case Step s a
res of
Yield a
x s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. a -> s -> Step s a
Yield a
x (forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
Skip s
s -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip (forall a. a -> Maybe a
Just (s
s, IOFinalizer
ref))
Step s a
Stop -> do
forall (m :: * -> *). MonadIO m => IOFinalizer -> m ()
runIOFinalizer IOFinalizer
ref
forall (m :: * -> *) a. Monad m => a -> m a
return forall s a. Step s a
Stop
{-# INLINE after #-}
after ::
#ifdef USE_UNLIFTIO
MonadUnliftIO m
#else
(MonadIO m, MonadBaseControl IO m)
#endif
=> m b -> Stream m a -> Stream m a
after :: forall (m :: * -> *) b a.
(MonadIO m, MonadBaseControl IO m) =>
m b -> Stream m a -> Stream m a
after = forall (m :: * -> *) b a.
MonadRunInIO m =>
m b -> Stream m a -> Stream m a
afterD
data RetryState emap s1 s2
= RetryWithMap emap s1
| RetryDefault s2
{-# INLINE_NORMAL retryD #-}
retryD
:: forall e m a. (Exception e, Ord e, MonadCatch m)
=> Map e Int
-> (e -> D.Stream m a)
-> D.Stream m a
-> D.Stream m a
retryD :: forall e (m :: * -> *) a.
(Exception e, Ord e, MonadCatch m) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retryD Map e Int
emap0 e -> Stream m a
defaultHandler (D.Stream State StreamK m a -> s -> m (Step s a)
step0 s
state0) = forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream forall {a}.
(Ord a, Num a) =>
State StreamK m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step forall {s2}. RetryState (Map e Int) s s2
state
where
state :: RetryState (Map e Int) s s2
state = forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e Int
emap0 s
state0
{-# INLINE_LATE step #-}
step :: State StreamK m a
-> RetryState (Map e a) s (Stream m a)
-> m (Step (RetryState (Map e a) s (Stream m a)) a)
step State StreamK m a
gst (RetryWithMap Map e a
emap s
st) = do
Either e (Step s a)
eres <- forall (m :: * -> *) e a.
(MonadCatch m, Exception e) =>
m a -> m (Either e a)
MC.try forall a b. (a -> b) -> a -> b
$ State StreamK m a -> s -> m (Step s a)
step0 State StreamK m a
gst s
st
case Either e (Step s a)
eres of
Left e
e -> forall {m :: * -> *} {a} {s1} {a}.
(Monad m, Ord a, Num a) =>
e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s
st
Right Step s a
res ->
forall (m :: * -> *) a. Monad m => a -> m a
return
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
Yield a
x s
st1 -> forall s a. a -> s -> Step s a
Yield a
x forall a b. (a -> b) -> a -> b
$ forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
Skip s
st1 -> forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap s
st1
Step s a
Stop -> forall s a. Step s a
Stop
step State StreamK m a
gst (RetryDefault (D.UnStream State StreamK m a -> s -> m (Step s a)
step1 s
state1)) = do
Step s a
res <- State StreamK m a -> s -> m (Step s a)
step1 State StreamK m a
gst s
state1
forall (m :: * -> *) a. Monad m => a -> m a
return
forall a b. (a -> b) -> a -> b
$ case Step s a
res of
Yield a
x s
st1 -> forall s a. a -> s -> Step s a
Yield a
x forall a b. (a -> b) -> a -> b
$ forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
st1)
Skip s
st1 -> forall s a. s -> Step s a
Skip forall a b. (a -> b) -> a -> b
$ forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault (forall (m :: * -> *) a s.
(State StreamK m a -> s -> m (Step s a)) -> s -> Stream m a
D.Stream State StreamK m a -> s -> m (Step s a)
step1 s
st1)
Step s a
Stop -> forall s a. Step s a
Stop
{-# INLINE handler #-}
handler :: e
-> Map e a
-> s1
-> m (Step (RetryState (Map e a) s1 (Stream m a)) a)
handler e
e Map e a
emap s1
st =
forall (m :: * -> *) a. Monad m => a -> m a
return
forall a b. (a -> b) -> a -> b
$ forall s a. s -> Step s a
Skip
forall a b. (a -> b) -> a -> b
$ case forall k a. Ord k => k -> Map k a -> Maybe a
Map.lookup e
e Map e a
emap of
Just a
i
| a
i forall a. Ord a => a -> a -> Bool
> a
0 ->
let emap1 :: Map e a
emap1 = forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert e
e (a
i forall a. Num a => a -> a -> a
- a
1) Map e a
emap
in forall emap s1 s2. emap -> s1 -> RetryState emap s1 s2
RetryWithMap Map e a
emap1 s1
st
| Bool
otherwise -> forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e
Maybe a
Nothing -> forall emap s1 s2. s2 -> RetryState emap s1 s2
RetryDefault forall a b. (a -> b) -> a -> b
$ e -> Stream m a
defaultHandler e
e
{-# INLINE retry #-}
retry :: (MonadCatch m, Exception e, Ord e)
=> Map e Int
-> (e -> Stream m a)
-> Stream m a
-> Stream m a
retry :: forall (m :: * -> *) e a.
(MonadCatch m, Exception e, Ord e) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retry = forall e (m :: * -> *) a.
(Exception e, Ord e, MonadCatch m) =>
Map e Int -> (e -> Stream m a) -> Stream m a -> Stream m a
retryD