{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE BangPatterns #-}
module Pipes.Aws.S3.Download.Retry
( fromS3WithRetries
, RetryPolicy(..)
, retryNTimes
, retryIfException
, warnOnRetry
) where
import Control.Monad (when)
import Data.Semigroup
import Data.IORef
import System.IO
import qualified Data.ByteString as BS
import Pipes
import Pipes.Safe
import qualified Pipes.Prelude as PP
import Pipes.Aws.S3.Types
import Pipes.Aws.S3.Download
data RetryPolicy m = forall s. RetryIf s (Bucket -> Object -> s -> SomeException -> m (s, Bool))
instance Applicative m => Semigroup (RetryPolicy m) where
RetryIf sx0 px <> RetryIf sy0 py =
RetryIf (sx0, sy0) (\bucket object (sx, sy) exc -> merge <$> px bucket object sx exc <*> py bucket object sy exc)
where
merge (sx1, againX) (sy1, againY) = ((sx1, sy1), againX && againY)
instance Applicative m => Monoid (RetryPolicy m) where
mempty = RetryIf () (\_ _ s _ -> pure (s, True))
mappend = (<>)
retryNTimes :: Applicative m => Int -> RetryPolicy m
retryNTimes n0 = RetryIf n0 shouldRetry
where
shouldRetry _ _ !n _ = pure (n-1, n > 0)
retryIfException :: Applicative m => (SomeException -> Bool) -> RetryPolicy m
retryIfException predicate = RetryIf () (\_ _ s exc -> pure (s, predicate exc))
warnOnRetry :: MonadIO m => RetryPolicy m -> RetryPolicy m
warnOnRetry (RetryIf s0 action) =
RetryIf (0::Int, s0) $ \bucket object (!n,s) exc -> do
(s', shouldRetry) <- action bucket object s exc
let msg = show bucket ++ "/" ++ show object ++ ": Retry attempt " ++ show n ++ " failed: " ++ show exc
when shouldRetry $ liftIO $ hPutStrLn stderr msg
return ((n+1, s'), shouldRetry)
fromS3WithRetries :: forall m. (MonadSafe m)
=> RetryPolicy m -> Bucket -> Object
-> Producer BS.ByteString m ()
fromS3WithRetries (RetryIf retryAcc0 shouldRetry) bucket object = do
offsetVar <- liftIO $ newIORef 0
go retryAcc0 offsetVar
where
go retryAcc offsetVar = do
offset <- liftIO $ readIORef offsetVar
handleAll retry $
fromS3 bucket object (Just $ ContentRange offset maxBound) >-> PP.mapM trackOffset
where
retry exc = do
(retryAcc', again) <- lift $ shouldRetry bucket object retryAcc exc
if again
then go retryAcc' offsetVar
else fail $ "Failed to download "++show bucket++" "++show object
trackOffset bs = do
liftIO $ modifyIORef' offsetVar (+ BS.length bs)
return bs