{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ParallelListComp #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Amazonka.S3.StreamingUpload
( streamUpload
, ChunkSize
, minimumChunkSize
, NumThreads
, concurrentUpload
, UploadLocation(..)
, abortAllUploads
, module Amazonka.S3.CreateMultipartUpload
, module Amazonka.S3.CompleteMultipartUpload
) where
import Amazonka ( HashedBody(..), LogLevel(..), getFileSize, hashedFileRange, send, toBody )
import Amazonka.Crypto ( hash )
import Amazonka.Env ( Env, logger, manager )
import Amazonka.S3.AbortMultipartUpload ( AbortMultipartUploadResponse, newAbortMultipartUpload )
import Amazonka.S3.CompleteMultipartUpload
( CompleteMultipartUpload(..), CompleteMultipartUploadResponse, newCompleteMultipartUpload )
import Amazonka.S3.CreateMultipartUpload
( CreateMultipartUpload(..), CreateMultipartUploadResponse(..) )
import Amazonka.S3.ListMultipartUploads
( ListMultipartUploadsResponse(..), newListMultipartUploads, uploads )
import Amazonka.S3.Types
( BucketName, CompletedMultipartUpload(..), CompletedPart, MultipartUpload(..),
newCompletedMultipartUpload, newCompletedPart )
import Amazonka.S3.UploadPart ( UploadPartResponse(..), newUploadPart )
import Network.HTTP.Client ( managerConnCount, newManager )
import Network.HTTP.Client.TLS ( tlsManagerSettings )
import Control.Monad.Catch ( Exception, MonadCatch, onException )
import Control.Monad.IO.Class ( MonadIO, liftIO )
import Control.Monad.Trans.Class ( lift )
import Control.Monad.Trans.Resource ( MonadResource, runResourceT )
import Conduit ( MonadUnliftIO(..) )
import Data.Conduit ( ConduitT, Void, await, handleC, yield, (.|) )
import Data.Conduit.Combinators ( sinkList )
import Data.Conduit.Combinators qualified as CC
import Data.ByteString qualified as BS
import Data.ByteString.Builder ( stringUtf8 )
import Data.ByteString.Builder.Extra ( byteStringCopy, runBuilder )
import Data.ByteString.Internal ( ByteString(PS) )
import Data.List ( unfoldr )
import Data.List.NonEmpty ( fromList, nonEmpty )
import Data.Text ( Text )
import Control.Concurrent ( newQSem, signalQSem, waitQSem )
import Control.Concurrent.Async ( forConcurrently )
import Control.Exception.Base ( SomeException, bracket_ )
import Foreign.ForeignPtr ( ForeignPtr, mallocForeignPtrBytes, plusForeignPtr )
import Foreign.ForeignPtr.Unsafe ( unsafeForeignPtrToPtr )
import Control.DeepSeq ( rwhnf )
import Data.Foldable ( for_, traverse_ )
import Data.Typeable ( Typeable )
import Data.Word ( Word8 )
import Control.Monad ((>=>))
type ChunkSize = Int
type NumThreads = Int
minimumChunkSize :: ChunkSize
minimumChunkSize :: Int
minimumChunkSize = Int
6Int -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1024Int -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1024
data StreamingError
= UnableToCreateMultipartUpload CreateMultipartUploadResponse
| FailedToUploadPiece UploadPartResponse
| Other String
deriving stock (Int -> StreamingError -> ShowS
[StreamingError] -> ShowS
StreamingError -> String
(Int -> StreamingError -> ShowS)
-> (StreamingError -> String)
-> ([StreamingError] -> ShowS)
-> Show StreamingError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StreamingError -> ShowS
showsPrec :: Int -> StreamingError -> ShowS
$cshow :: StreamingError -> String
show :: StreamingError -> String
$cshowList :: [StreamingError] -> ShowS
showList :: [StreamingError] -> ShowS
Show, StreamingError -> StreamingError -> Bool
(StreamingError -> StreamingError -> Bool)
-> (StreamingError -> StreamingError -> Bool) -> Eq StreamingError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: StreamingError -> StreamingError -> Bool
== :: StreamingError -> StreamingError -> Bool
$c/= :: StreamingError -> StreamingError -> Bool
/= :: StreamingError -> StreamingError -> Bool
Eq, Typeable)
instance Exception StreamingError
streamUpload :: forall m. (MonadUnliftIO m, MonadResource m)
=> Env
-> Maybe ChunkSize
-> CreateMultipartUpload
-> ConduitT ByteString Void m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
streamUpload :: forall (m :: * -> *).
(MonadUnliftIO m, MonadResource m) =>
Env
-> Maybe Int
-> CreateMultipartUpload
-> ConduitT
ByteString
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
streamUpload Env
env Maybe Int
mChunkSize multiPartUploadDesc :: CreateMultipartUpload
multiPartUploadDesc@CreateMultipartUpload'{$sel:bucket:CreateMultipartUpload' :: CreateMultipartUpload -> BucketName
bucket = BucketName
buck, $sel:key:CreateMultipartUpload' :: CreateMultipartUpload -> ObjectKey
key = ObjectKey
k} = do
Buffer
buffer <- IO Buffer -> ConduitT ByteString Void m Buffer
forall a. IO a -> ConduitT ByteString Void m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Buffer -> ConduitT ByteString Void m Buffer)
-> IO Buffer -> ConduitT ByteString Void m Buffer
forall a b. (a -> b) -> a -> b
$ Int -> IO Buffer
allocBuffer Int
chunkSize
Buffer -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *).
MonadIO m =>
Buffer -> ConduitT ByteString BufferResult m ()
unsafeWriteChunksToBuffer Buffer
buffer
ConduitT ByteString BufferResult m ()
-> ConduitT
BufferResult
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT
ByteString
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT BufferResult (Int, BufferResult) m ()
forall a. ConduitT a (Int, a) m ()
enumerateConduit
ConduitT BufferResult (Int, BufferResult) m ()
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT
BufferResult
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Buffer
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
startUpload Buffer
buffer
where
chunkSize :: ChunkSize
chunkSize :: Int
chunkSize = Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
minimumChunkSize (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
minimumChunkSize) Maybe Int
mChunkSize
logStr :: String -> m ()
logStr :: String -> m ()
logStr String
msg = do
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Env -> Logger
forall (withAuth :: * -> *). Env' withAuth -> Logger
logger Env
env LogLevel
Debug (ByteStringBuilder -> IO ()) -> ByteStringBuilder -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ByteStringBuilder
stringUtf8 String
msg
startUpload :: Buffer
-> ConduitT (Int, BufferResult) Void m
(Either (AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
startUpload :: Buffer
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
startUpload Buffer
buffer = do
CreateMultipartUploadResponse'{$sel:uploadId:CreateMultipartUploadResponse' :: CreateMultipartUploadResponse -> Text
uploadId = Text
upId} <- m CreateMultipartUploadResponse
-> ConduitT
(Int, BufferResult) Void m CreateMultipartUploadResponse
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Int, BufferResult) Void m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m CreateMultipartUploadResponse
-> ConduitT
(Int, BufferResult) Void m CreateMultipartUploadResponse)
-> m CreateMultipartUploadResponse
-> ConduitT
(Int, BufferResult) Void m CreateMultipartUploadResponse
forall a b. (a -> b) -> a -> b
$ Env
-> CreateMultipartUpload -> m (AWSResponse CreateMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env CreateMultipartUpload
multiPartUploadDesc
m () -> ConduitT (Int, BufferResult) Void m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Int, BufferResult) Void m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT (Int, BufferResult) Void m ())
-> m () -> ConduitT (Int, BufferResult) Void m ()
forall a b. (a -> b) -> a -> b
$ String -> m ()
logStr String
"\n**** Created upload\n"
(SomeException
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse))
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
(e -> ConduitT i o m r) -> ConduitT i o m r -> ConduitT i o m r
handleC (Text
-> SomeException
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall i.
Text
-> SomeException
-> ConduitT
i
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
cancelMultiUploadConduit Text
upId) (ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse))
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall a b. (a -> b) -> a -> b
$
((Int, BufferResult) -> m (Maybe CompletedPart))
-> ConduitT (Int, BufferResult) (Maybe CompletedPart) m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
CC.mapM (Buffer -> Text -> (Int, BufferResult) -> m (Maybe CompletedPart)
multiUpload Buffer
buffer Text
upId)
ConduitT (Int, BufferResult) (Maybe CompletedPart) m ()
-> ConduitT
(Maybe CompletedPart)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
-> ConduitT
(Int, BufferResult)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Text
-> ConduitT
(Maybe CompletedPart)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
finishMultiUploadConduit Text
upId
multiUpload :: Buffer -> Text -> (Int, BufferResult) -> m (Maybe CompletedPart)
multiUpload :: Buffer -> Text -> (Int, BufferResult) -> m (Maybe CompletedPart)
multiUpload Buffer
buffer Text
upId (Int
partnum, BufferResult
result) = do
let !bs :: ByteString
bs = Buffer -> BufferResult -> ByteString
bufferToByteString Buffer
buffer BufferResult
result
!bsHash :: Digest SHA256
bsHash = ByteString -> Digest SHA256
forall ba a.
(ByteArrayAccess ba, HashAlgorithm a) =>
ba -> Digest a
hash ByteString
bs
UploadPartResponse'{Maybe ETag
eTag :: Maybe ETag
$sel:eTag:UploadPartResponse' :: UploadPartResponse -> Maybe ETag
eTag} <- Env -> UploadPart -> m (AWSResponse UploadPart)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (UploadPart -> m UploadPartResponse)
-> UploadPart -> m UploadPartResponse
forall a b. (a -> b) -> a -> b
$! BucketName -> ObjectKey -> Int -> Text -> RequestBody -> UploadPart
newUploadPart BucketName
buck ObjectKey
k Int
partnum Text
upId (RequestBody -> UploadPart) -> RequestBody -> UploadPart
forall a b. (a -> b) -> a -> b
$! HashedBody -> RequestBody
forall a. ToBody a => a -> RequestBody
toBody (HashedBody -> RequestBody) -> HashedBody -> RequestBody
forall a b. (a -> b) -> a -> b
$! Digest SHA256 -> ByteString -> HashedBody
HashedBytes Digest SHA256
bsHash ByteString
bs
let !()
_ = Maybe ETag -> ()
forall a. a -> ()
rwhnf Maybe ETag
eTag
String -> m ()
logStr (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"\n**** Uploaded part " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
partnum
Maybe CompletedPart -> m (Maybe CompletedPart)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe CompletedPart -> m (Maybe CompletedPart))
-> Maybe CompletedPart -> m (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$! Int -> ETag -> CompletedPart
newCompletedPart Int
partnum (ETag -> CompletedPart) -> Maybe ETag -> Maybe CompletedPart
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ETag
eTag
finishMultiUploadConduit :: Text
-> ConduitT (Maybe CompletedPart) Void m
(Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
finishMultiUploadConduit :: Text
-> ConduitT
(Maybe CompletedPart)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
finishMultiUploadConduit Text
upId = do
[Maybe CompletedPart]
parts <- ConduitT (Maybe CompletedPart) Void m [Maybe CompletedPart]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
sinkList
CompleteMultipartUploadResponse
res <- m CompleteMultipartUploadResponse
-> ConduitT
(Maybe CompletedPart) Void m CompleteMultipartUploadResponse
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Maybe CompletedPart) Void m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m CompleteMultipartUploadResponse
-> ConduitT
(Maybe CompletedPart) Void m CompleteMultipartUploadResponse)
-> m CompleteMultipartUploadResponse
-> ConduitT
(Maybe CompletedPart) Void m CompleteMultipartUploadResponse
forall a b. (a -> b) -> a -> b
$ Env
-> CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload))
-> CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload)
forall a b. (a -> b) -> a -> b
$ (BucketName -> ObjectKey -> Text -> CompleteMultipartUpload
newCompleteMultipartUpload BucketName
buck ObjectKey
k Text
upId)
{ multipartUpload =
Just $ newCompletedMultipartUpload {parts = sequenceA $ fromList parts}
}
Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse
-> ConduitT
(Maybe CompletedPart)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall a. a -> ConduitT (Maybe CompletedPart) Void m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse
-> ConduitT
(Maybe CompletedPart)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse))
-> Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse
-> ConduitT
(Maybe CompletedPart)
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall a b. (a -> b) -> a -> b
$ CompleteMultipartUploadResponse
-> Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse
forall a b. b -> Either a b
Right CompleteMultipartUploadResponse
res
cancelMultiUploadConduit :: Text -> SomeException
-> ConduitT i Void m
(Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
cancelMultiUploadConduit :: forall i.
Text
-> SomeException
-> ConduitT
i
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
cancelMultiUploadConduit Text
upId SomeException
exc = do
AbortMultipartUploadResponse
res <- m AbortMultipartUploadResponse
-> ConduitT i Void m AbortMultipartUploadResponse
forall (m :: * -> *) a. Monad m => m a -> ConduitT i Void m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m AbortMultipartUploadResponse
-> ConduitT i Void m AbortMultipartUploadResponse)
-> m AbortMultipartUploadResponse
-> ConduitT i Void m AbortMultipartUploadResponse
forall a b. (a -> b) -> a -> b
$ Env -> AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload))
-> AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> Text -> AbortMultipartUpload
newAbortMultipartUpload BucketName
buck ObjectKey
k Text
upId
Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse
-> ConduitT
i
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall a. a -> ConduitT i Void m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse
-> ConduitT
i
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse))
-> Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse
-> ConduitT
i
Void
m
(Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse)
forall a b. (a -> b) -> a -> b
$ (AbortMultipartUploadResponse, SomeException)
-> Either
(AbortMultipartUploadResponse, SomeException)
CompleteMultipartUploadResponse
forall a b. a -> Either a b
Left (AbortMultipartUploadResponse
res, SomeException
exc)
enumerateConduit :: ConduitT a (Int, a) m ()
enumerateConduit :: forall a. ConduitT a (Int, a) m ()
enumerateConduit = Int -> ConduitT a (Int, a) m ()
forall {m :: * -> *} {a} {b}.
(Monad m, Num a) =>
a -> ConduitT b (a, b) m ()
loop Int
1
where
loop :: a -> ConduitT b (a, b) m ()
loop a
i = ConduitT b (a, b) m (Maybe b)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT b (a, b) m (Maybe b)
-> (Maybe b -> ConduitT b (a, b) m ()) -> ConduitT b (a, b) m ()
forall a b.
ConduitT b (a, b) m a
-> (a -> ConduitT b (a, b) m b) -> ConduitT b (a, b) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT b (a, b) m ()
-> (b -> ConduitT b (a, b) m ())
-> Maybe b
-> ConduitT b (a, b) m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT b (a, b) m ()
forall a. a -> ConduitT b (a, b) m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (a -> b -> ConduitT b (a, b) m ()
go a
i)
go :: a -> b -> ConduitT b (a, b) m ()
go a
i b
x = do
(a, b) -> ConduitT b (a, b) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (a
i, b
x)
a -> ConduitT b (a, b) m ()
loop (a
i a -> a -> a
forall a. Num a => a -> a -> a
+ a
1)
{-# INLINE enumerateConduit #-}
data Buffer = Buffer {Buffer -> Int
remaining :: !Int, Buffer -> ForeignPtr Word8
_fptr :: !(ForeignPtr Word8)}
data PutResult
= Ok Buffer
| Full ByteString
data BufferResult = FullBuffer | Incomplete Int
unsafeWriteChunksToBuffer :: MonadIO m => Buffer -> ConduitT ByteString BufferResult m ()
unsafeWriteChunksToBuffer :: forall (m :: * -> *).
MonadIO m =>
Buffer -> ConduitT ByteString BufferResult m ()
unsafeWriteChunksToBuffer Buffer
buffer0 = Buffer -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *).
MonadIO m =>
Buffer -> ConduitT ByteString BufferResult m ()
awaitLoop Buffer
buffer0 where
awaitLoop :: Buffer -> ConduitT ByteString BufferResult m ()
awaitLoop Buffer
buf =
ConduitT ByteString BufferResult m (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT ByteString BufferResult m (Maybe ByteString)
-> (Maybe ByteString -> ConduitT ByteString BufferResult m ())
-> ConduitT ByteString BufferResult m ()
forall a b.
ConduitT ByteString BufferResult m a
-> (a -> ConduitT ByteString BufferResult m b)
-> ConduitT ByteString BufferResult m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT ByteString BufferResult m ()
-> (ByteString -> ConduitT ByteString BufferResult m ())
-> Maybe ByteString
-> ConduitT ByteString BufferResult m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (BufferResult -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (BufferResult -> ConduitT ByteString BufferResult m ())
-> BufferResult -> ConduitT ByteString BufferResult m ()
forall a b. (a -> b) -> a -> b
$ Int -> BufferResult
Incomplete (Int -> BufferResult) -> Int -> BufferResult
forall a b. (a -> b) -> a -> b
$ Buffer -> Int
remaining Buffer
buf)
(IO PutResult -> ConduitT ByteString BufferResult m PutResult
forall a. IO a -> ConduitT ByteString BufferResult m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PutResult -> ConduitT ByteString BufferResult m PutResult)
-> (ByteString -> IO PutResult)
-> ByteString
-> ConduitT ByteString BufferResult m PutResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Buffer -> ByteString -> IO PutResult
putBuffer Buffer
buf (ByteString -> ConduitT ByteString BufferResult m PutResult)
-> (PutResult -> ConduitT ByteString BufferResult m ())
-> ByteString
-> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> \case
Full ByteString
next -> BufferResult -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield BufferResult
FullBuffer ConduitT ByteString BufferResult m ()
-> ConduitT ByteString BufferResult m ()
-> ConduitT ByteString BufferResult m ()
forall a b.
ConduitT ByteString BufferResult m a
-> ConduitT ByteString BufferResult m b
-> ConduitT ByteString BufferResult m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Buffer -> ByteString -> ConduitT ByteString BufferResult m ()
chunkLoop Buffer
buffer0 ByteString
next
Ok Buffer
buf' -> Buffer -> ConduitT ByteString BufferResult m ()
awaitLoop Buffer
buf'
)
chunkLoop :: Buffer -> ByteString -> ConduitT ByteString BufferResult m ()
chunkLoop Buffer
buf = IO PutResult -> ConduitT ByteString BufferResult m PutResult
forall a. IO a -> ConduitT ByteString BufferResult m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PutResult -> ConduitT ByteString BufferResult m PutResult)
-> (ByteString -> IO PutResult)
-> ByteString
-> ConduitT ByteString BufferResult m PutResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Buffer -> ByteString -> IO PutResult
putBuffer Buffer
buf (ByteString -> ConduitT ByteString BufferResult m PutResult)
-> (PutResult -> ConduitT ByteString BufferResult m ())
-> ByteString
-> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> \case
Full ByteString
next -> BufferResult -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield BufferResult
FullBuffer ConduitT ByteString BufferResult m ()
-> ConduitT ByteString BufferResult m ()
-> ConduitT ByteString BufferResult m ()
forall a b.
ConduitT ByteString BufferResult m a
-> ConduitT ByteString BufferResult m b
-> ConduitT ByteString BufferResult m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Buffer -> ByteString -> ConduitT ByteString BufferResult m ()
chunkLoop Buffer
buffer0 ByteString
next
Ok Buffer
buf' -> Buffer -> ConduitT ByteString BufferResult m ()
awaitLoop Buffer
buf'
bufferToByteString :: Buffer -> BufferResult -> ByteString
bufferToByteString :: Buffer -> BufferResult -> ByteString
bufferToByteString (Buffer Int
bufSize ForeignPtr Word8
fptr) BufferResult
FullBuffer = ForeignPtr Word8 -> Int -> Int -> ByteString
PS ForeignPtr Word8
fptr Int
0 Int
bufSize
bufferToByteString (Buffer Int
bufSize ForeignPtr Word8
fptr) (Incomplete Int
remaining) = ForeignPtr Word8 -> Int -> Int -> ByteString
PS ForeignPtr Word8
fptr Int
0 (Int
bufSize Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
remaining)
allocBuffer :: Int -> IO Buffer
allocBuffer :: Int -> IO Buffer
allocBuffer Int
chunkSize = Int -> ForeignPtr Word8 -> Buffer
Buffer Int
chunkSize (ForeignPtr Word8 -> Buffer) -> IO (ForeignPtr Word8) -> IO Buffer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO (ForeignPtr Word8)
forall a. Int -> IO (ForeignPtr a)
mallocForeignPtrBytes Int
chunkSize
putBuffer :: Buffer -> ByteString -> IO PutResult
putBuffer :: Buffer -> ByteString -> IO PutResult
putBuffer Buffer
buffer ByteString
bs
| ByteString -> Int
BS.length ByteString
bs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Buffer -> Int
remaining Buffer
buffer =
Buffer -> PutResult
Ok (Buffer -> PutResult) -> IO Buffer -> IO PutResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Buffer -> ByteString -> IO Buffer
unsafeWriteBuffer Buffer
buffer ByteString
bs
| Bool
otherwise = do
let (ByteString
remainder,ByteString
rest) = Int -> ByteString -> (ByteString, ByteString)
BS.splitAt (Buffer -> Int
remaining Buffer
buffer) ByteString
bs
ByteString -> PutResult
Full ByteString
rest PutResult -> IO Buffer -> IO PutResult
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Buffer -> ByteString -> IO Buffer
unsafeWriteBuffer Buffer
buffer ByteString
remainder
unsafeWriteBuffer :: Buffer -> ByteString -> IO Buffer
unsafeWriteBuffer :: Buffer -> ByteString -> IO Buffer
unsafeWriteBuffer (Buffer Int
remaining ForeignPtr Word8
fptr) ByteString
bs = do
let ptr :: Ptr Word8
ptr = ForeignPtr Word8 -> Ptr Word8
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr ForeignPtr Word8
fptr
len :: Int
len = ByteString -> Int
BS.length ByteString
bs
(Int, Next)
_ <- ByteStringBuilder -> BufferWriter
runBuilder (ByteString -> ByteStringBuilder
byteStringCopy ByteString
bs) Ptr Word8
ptr Int
remaining
Buffer -> IO Buffer
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Buffer -> IO Buffer) -> Buffer -> IO Buffer
forall a b. (a -> b) -> a -> b
$ Int -> ForeignPtr Word8 -> Buffer
Buffer (Int
remaining Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
len) (ForeignPtr Word8 -> Int -> ForeignPtr Word8
forall a b. ForeignPtr a -> Int -> ForeignPtr b
plusForeignPtr ForeignPtr Word8
fptr Int
len)
data UploadLocation
= FP FilePath
| BS ByteString
concurrentUpload :: forall m.
(MonadResource m, MonadCatch m)
=> Env
-> Maybe ChunkSize
-> Maybe NumThreads
-> UploadLocation
-> CreateMultipartUpload
-> m CompleteMultipartUploadResponse
concurrentUpload :: forall (m :: * -> *).
(MonadResource m, MonadCatch m) =>
Env
-> Maybe Int
-> Maybe Int
-> UploadLocation
-> CreateMultipartUpload
-> m CompleteMultipartUploadResponse
concurrentUpload Env
env' Maybe Int
mChunkSize Maybe Int
mNumThreads UploadLocation
uploadLoc
multiPartUploadDesc :: CreateMultipartUpload
multiPartUploadDesc@CreateMultipartUpload'{$sel:bucket:CreateMultipartUpload' :: CreateMultipartUpload -> BucketName
bucket = BucketName
buck, $sel:key:CreateMultipartUpload' :: CreateMultipartUpload -> ObjectKey
key = ObjectKey
k}
= do
CreateMultipartUploadResponse'{$sel:uploadId:CreateMultipartUploadResponse' :: CreateMultipartUploadResponse -> Text
uploadId = Text
upId} <- Env
-> CreateMultipartUpload -> m (AWSResponse CreateMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env' CreateMultipartUpload
multiPartUploadDesc
let logStr :: MonadIO n => String -> n ()
logStr :: forall (n :: * -> *). MonadIO n => String -> n ()
logStr = IO () -> n ()
forall a. IO a -> n a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> (String -> IO ()) -> String -> n ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Logger
forall (withAuth :: * -> *). Env' withAuth -> Logger
logger Env
env' LogLevel
Info (ByteStringBuilder -> IO ())
-> (String -> ByteStringBuilder) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> ByteStringBuilder
stringUtf8
calculateChunkSize :: Int -> Int
calculateChunkSize :: Int -> Int
calculateChunkSize Int
len =
let chunkSize' :: Int
chunkSize' = Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
minimumChunkSize (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
minimumChunkSize) Maybe Int
mChunkSize
in if Int
len Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
chunkSize' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
10000 then Int
len Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
9999 else Int
chunkSize'
mConnCount :: Int
mConnCount = ManagerSettings -> Int
managerConnCount ManagerSettings
tlsManagerSettings
nThreads :: Int
nThreads = Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
mConnCount (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1) Maybe Int
mNumThreads
Env
env <- if Bool -> (Int -> Bool) -> Maybe Int -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
False (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
mConnCount) Maybe Int
mNumThreads
then do
Manager
mgr' <- IO Manager -> m Manager
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Manager -> m Manager) -> IO Manager -> m Manager
forall a b. (a -> b) -> a -> b
$ ManagerSettings -> IO Manager
newManager ManagerSettings
tlsManagerSettings{managerConnCount = nThreads}
Env -> m Env
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Env
env'{manager = mgr'}
else Env -> m Env
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Env
env'
(m CompleteMultipartUploadResponse
-> m AbortMultipartUploadResponse
-> m CompleteMultipartUploadResponse)
-> m AbortMultipartUploadResponse
-> m CompleteMultipartUploadResponse
-> m CompleteMultipartUploadResponse
forall a b c. (a -> b -> c) -> b -> a -> c
flip m CompleteMultipartUploadResponse
-> m AbortMultipartUploadResponse
-> m CompleteMultipartUploadResponse
forall (m :: * -> *) a b.
(HasCallStack, MonadCatch m) =>
m a -> m b -> m a
onException (Env -> AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (BucketName -> ObjectKey -> Text -> AbortMultipartUpload
newAbortMultipartUpload BucketName
buck ObjectKey
k Text
upId)) (m CompleteMultipartUploadResponse
-> m CompleteMultipartUploadResponse)
-> m CompleteMultipartUploadResponse
-> m CompleteMultipartUploadResponse
forall a b. (a -> b) -> a -> b
$ do
QSem
sem <- IO QSem -> m QSem
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO QSem -> m QSem) -> IO QSem -> m QSem
forall a b. (a -> b) -> a -> b
$ Int -> IO QSem
newQSem Int
nThreads
[Maybe CompletedPart]
uploadResponses <- case UploadLocation
uploadLoc of
BS ByteString
bytes ->
let chunkSize :: Int
chunkSize = Int -> Int
calculateChunkSize (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
BS.length ByteString
bytes
in IO [Maybe CompletedPart] -> m [Maybe CompletedPart]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Maybe CompletedPart] -> m [Maybe CompletedPart])
-> IO [Maybe CompletedPart] -> m [Maybe CompletedPart]
forall a b. (a -> b) -> a -> b
$ [(Int, ByteString)]
-> ((Int, ByteString) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
forConcurrently ([Int] -> [ByteString] -> [(Int, ByteString)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([ByteString] -> [(Int, ByteString)])
-> [ByteString] -> [(Int, ByteString)]
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> [ByteString]
chunksOf Int
chunkSize ByteString
bytes) (((Int, ByteString) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart])
-> ((Int, ByteString) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart]
forall a b. (a -> b) -> a -> b
$ \(Int
partnum, ByteString
chunk) ->
IO ()
-> IO () -> IO (Maybe CompletedPart) -> IO (Maybe CompletedPart)
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (QSem -> IO ()
waitQSem QSem
sem) (QSem -> IO ()
signalQSem QSem
sem) (IO (Maybe CompletedPart) -> IO (Maybe CompletedPart))
-> IO (Maybe CompletedPart) -> IO (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$ do
String -> IO ()
forall (n :: * -> *). MonadIO n => String -> n ()
logStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Starting part: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
partnum
UploadPartResponse'{Maybe ETag
$sel:eTag:UploadPartResponse' :: UploadPartResponse -> Maybe ETag
eTag :: Maybe ETag
eTag} <- ResourceT IO UploadPartResponse -> IO UploadPartResponse
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO UploadPartResponse -> IO UploadPartResponse)
-> ResourceT IO UploadPartResponse -> IO UploadPartResponse
forall a b. (a -> b) -> a -> b
$ Env -> UploadPart -> ResourceT IO (AWSResponse UploadPart)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (UploadPart -> ResourceT IO UploadPartResponse)
-> (ByteString -> UploadPart)
-> ByteString
-> ResourceT IO UploadPartResponse
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BucketName -> ObjectKey -> Int -> Text -> RequestBody -> UploadPart
newUploadPart BucketName
buck ObjectKey
k Int
partnum Text
upId (RequestBody -> UploadPart)
-> (ByteString -> RequestBody) -> ByteString -> UploadPart
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> RequestBody
forall a. ToBody a => a -> RequestBody
toBody (ByteString -> ResourceT IO UploadPartResponse)
-> ByteString -> ResourceT IO UploadPartResponse
forall a b. (a -> b) -> a -> b
$ ByteString
chunk
String -> IO ()
forall (n :: * -> *). MonadIO n => String -> n ()
logStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Finished part: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
partnum
Maybe CompletedPart -> IO (Maybe CompletedPart)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe CompletedPart -> IO (Maybe CompletedPart))
-> Maybe CompletedPart -> IO (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$ Int -> ETag -> CompletedPart
newCompletedPart Int
partnum (ETag -> CompletedPart) -> Maybe ETag -> Maybe CompletedPart
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ETag
eTag
FP String
filePath -> do
Integer
fsize <- IO Integer -> m Integer
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Integer -> m Integer) -> IO Integer -> m Integer
forall a b. (a -> b) -> a -> b
$ String -> IO Integer
forall (m :: * -> *). MonadIO m => String -> m Integer
getFileSize String
filePath
let chunkSize :: Int
chunkSize = Int -> Int
calculateChunkSize (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
fsize
(Int
count,Int
lst) = Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
fsize Int -> Int -> (Int, Int)
forall a. Integral a => a -> a -> (a, a)
`divMod` Int
chunkSize
params :: [(Int, Int, Int)]
params = [(Int
partnum, Int
chunkSizeInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
offset, Int
size)
| Int
partnum <- [Int
1..]
| Int
offset <- [Int
0..Int
count]
| Int
size <- (Int
chunkSize Int -> [Int] -> [Int]
forall a b. a -> [b] -> [a]
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ [Int
0..Int
countInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1]) [Int] -> [Int] -> [Int]
forall a. [a] -> [a] -> [a]
++ [Int
lst]
]
IO [Maybe CompletedPart] -> m [Maybe CompletedPart]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Maybe CompletedPart] -> m [Maybe CompletedPart])
-> IO [Maybe CompletedPart] -> m [Maybe CompletedPart]
forall a b. (a -> b) -> a -> b
$ [(Int, Int, Int)]
-> ((Int, Int, Int) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
forConcurrently [(Int, Int, Int)]
params (((Int, Int, Int) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart])
-> ((Int, Int, Int) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart]
forall a b. (a -> b) -> a -> b
$ \(Int
partnum,Int
off,Int
size) ->
IO ()
-> IO () -> IO (Maybe CompletedPart) -> IO (Maybe CompletedPart)
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (QSem -> IO ()
waitQSem QSem
sem) (QSem -> IO ()
signalQSem QSem
sem) (IO (Maybe CompletedPart) -> IO (Maybe CompletedPart))
-> IO (Maybe CompletedPart) -> IO (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$ do
String -> IO ()
forall (n :: * -> *). MonadIO n => String -> n ()
logStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Starting file part: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
partnum
HashedBody
chunkStream <- String -> Integer -> Integer -> IO HashedBody
forall (m :: * -> *).
MonadIO m =>
String -> Integer -> Integer -> m HashedBody
hashedFileRange String
filePath (Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
off) (Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
size)
UploadPartResponse'{Maybe ETag
$sel:eTag:UploadPartResponse' :: UploadPartResponse -> Maybe ETag
eTag :: Maybe ETag
eTag} <- ResourceT IO UploadPartResponse -> IO UploadPartResponse
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO UploadPartResponse -> IO UploadPartResponse)
-> ResourceT IO UploadPartResponse -> IO UploadPartResponse
forall a b. (a -> b) -> a -> b
$
Env -> UploadPart -> ResourceT IO (AWSResponse UploadPart)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (UploadPart -> ResourceT IO UploadPartResponse)
-> (HashedBody -> UploadPart)
-> HashedBody
-> ResourceT IO UploadPartResponse
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BucketName -> ObjectKey -> Int -> Text -> RequestBody -> UploadPart
newUploadPart BucketName
buck ObjectKey
k Int
partnum Text
upId (RequestBody -> UploadPart)
-> (HashedBody -> RequestBody) -> HashedBody -> UploadPart
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HashedBody -> RequestBody
forall a. ToBody a => a -> RequestBody
toBody (HashedBody -> ResourceT IO UploadPartResponse)
-> HashedBody -> ResourceT IO UploadPartResponse
forall a b. (a -> b) -> a -> b
$ HashedBody
chunkStream
String -> IO ()
forall (n :: * -> *). MonadIO n => String -> n ()
logStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Finished file part: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
partnum
Maybe CompletedPart -> IO (Maybe CompletedPart)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe CompletedPart -> IO (Maybe CompletedPart))
-> Maybe CompletedPart -> IO (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$ Int -> ETag -> CompletedPart
newCompletedPart Int
partnum (ETag -> CompletedPart) -> Maybe ETag -> Maybe CompletedPart
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ETag
eTag
let parts :: Maybe (NonEmpty CompletedPart)
parts = [CompletedPart] -> Maybe (NonEmpty CompletedPart)
forall a. [a] -> Maybe (NonEmpty a)
nonEmpty ([CompletedPart] -> Maybe (NonEmpty CompletedPart))
-> Maybe [CompletedPart] -> Maybe (NonEmpty CompletedPart)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< [Maybe CompletedPart] -> Maybe [CompletedPart]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence [Maybe CompletedPart]
uploadResponses
Env
-> CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload))
-> CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload)
forall a b. (a -> b) -> a -> b
$ (BucketName -> ObjectKey -> Text -> CompleteMultipartUpload
newCompleteMultipartUpload BucketName
buck ObjectKey
k Text
upId)
{ multipartUpload = Just $ newCompletedMultipartUpload { parts } }
abortAllUploads :: MonadResource m => Env -> BucketName -> m ()
abortAllUploads :: forall (m :: * -> *). MonadResource m => Env -> BucketName -> m ()
abortAllUploads Env
env BucketName
buck = do
ListMultipartUploadsResponse' {Maybe [MultipartUpload]
$sel:uploads:ListMultipartUploadsResponse' :: ListMultipartUploadsResponse -> Maybe [MultipartUpload]
uploads :: Maybe [MultipartUpload]
uploads} <- Env -> ListMultipartUploads -> m (AWSResponse ListMultipartUploads)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (ListMultipartUploads -> m (AWSResponse ListMultipartUploads))
-> ListMultipartUploads -> m (AWSResponse ListMultipartUploads)
forall a b. (a -> b) -> a -> b
$ BucketName -> ListMultipartUploads
newListMultipartUploads BucketName
buck
((MultipartUpload -> m ()) -> Maybe [MultipartUpload] -> m ())
-> Maybe [MultipartUpload] -> (MultipartUpload -> m ()) -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (([MultipartUpload] -> m ()) -> Maybe [MultipartUpload] -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (([MultipartUpload] -> m ()) -> Maybe [MultipartUpload] -> m ())
-> ((MultipartUpload -> m ()) -> [MultipartUpload] -> m ())
-> (MultipartUpload -> m ())
-> Maybe [MultipartUpload]
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MultipartUpload -> m ()) -> [MultipartUpload] -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_) Maybe [MultipartUpload]
uploads ((MultipartUpload -> m ()) -> m ())
-> (MultipartUpload -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \MultipartUpload'{Maybe ObjectKey
key :: Maybe ObjectKey
$sel:key:MultipartUpload' :: MultipartUpload -> Maybe ObjectKey
key, Maybe Text
uploadId :: Maybe Text
$sel:uploadId:MultipartUpload' :: MultipartUpload -> Maybe Text
uploadId} -> do
let mki :: Maybe (ObjectKey, Text)
mki = (,) (ObjectKey -> Text -> (ObjectKey, Text))
-> Maybe ObjectKey -> Maybe (Text -> (ObjectKey, Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ObjectKey
key Maybe (Text -> (ObjectKey, Text))
-> Maybe Text -> Maybe (ObjectKey, Text)
forall a b. Maybe (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Text
uploadId
Maybe (ObjectKey, Text)
-> ((ObjectKey, Text) -> m AbortMultipartUploadResponse) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (ObjectKey, Text)
mki (((ObjectKey, Text) -> m AbortMultipartUploadResponse) -> m ())
-> ((ObjectKey, Text) -> m AbortMultipartUploadResponse) -> m ()
forall a b. (a -> b) -> a -> b
$ \(ObjectKey
key',Text
uid) -> Env -> AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (BucketName -> ObjectKey -> Text -> AbortMultipartUpload
newAbortMultipartUpload BucketName
buck ObjectKey
key' Text
uid)
justWhen :: (a -> Bool) -> (a -> b) -> a -> Maybe b
justWhen :: forall a b. (a -> Bool) -> (a -> b) -> a -> Maybe b
justWhen a -> Bool
f a -> b
g a
a = if a -> Bool
f a
a then b -> Maybe b
forall a. a -> Maybe a
Just (a -> b
g a
a) else Maybe b
forall a. Maybe a
Nothing
nothingWhen :: (a -> Bool) -> (a -> b) -> a -> Maybe b
nothingWhen :: forall a b. (a -> Bool) -> (a -> b) -> a -> Maybe b
nothingWhen a -> Bool
f = (a -> Bool) -> (a -> b) -> a -> Maybe b
forall a b. (a -> Bool) -> (a -> b) -> a -> Maybe b
justWhen (Bool -> Bool
not (Bool -> Bool) -> (a -> Bool) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)
chunksOf :: Int -> BS.ByteString -> [BS.ByteString]
chunksOf :: Int -> ByteString -> [ByteString]
chunksOf Int
x = (ByteString -> Maybe (ByteString, ByteString))
-> ByteString -> [ByteString]
forall b a. (b -> Maybe (a, b)) -> b -> [a]
unfoldr ((ByteString -> Bool)
-> (ByteString -> (ByteString, ByteString))
-> ByteString
-> Maybe (ByteString, ByteString)
forall a b. (a -> Bool) -> (a -> b) -> a -> Maybe b
nothingWhen ByteString -> Bool
BS.null (Int -> ByteString -> (ByteString, ByteString)
BS.splitAt Int
x))