{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ParallelListComp #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Amazonka.S3.StreamingUpload
  ( streamUpload
  , ChunkSize
  , minimumChunkSize
  , NumThreads
  , concurrentUpload
  , UploadLocation(..)
  , abortAllUploads
  , module Amazonka.S3.CreateMultipartUpload
  , module Amazonka.S3.CompleteMultipartUpload
  ) where

import Amazonka        ( HashedBody(..), LogLevel(..), getFileSize, hashedFileRange, send, toBody )
import Amazonka.Crypto ( hash )
import Amazonka.Env    ( Env, logger, manager )

import Amazonka.S3.AbortMultipartUpload    ( AbortMultipartUploadResponse, newAbortMultipartUpload )
import Amazonka.S3.CompleteMultipartUpload
       ( CompleteMultipartUpload(..), CompleteMultipartUploadResponse, newCompleteMultipartUpload )
import Amazonka.S3.CreateMultipartUpload
       ( CreateMultipartUpload(..), CreateMultipartUploadResponse(..) )
import Amazonka.S3.ListMultipartUploads
       ( ListMultipartUploadsResponse(..), newListMultipartUploads, uploads )
import Amazonka.S3.Types
       ( BucketName, CompletedMultipartUpload(..), CompletedPart, MultipartUpload(..),
       newCompletedMultipartUpload, newCompletedPart )
import Amazonka.S3.UploadPart              ( UploadPartResponse(..), newUploadPart )

import Network.HTTP.Client     ( managerConnCount, newManager )
import Network.HTTP.Client.TLS ( tlsManagerSettings )

import Control.Monad.Catch          ( Exception, MonadCatch, onException )
import Control.Monad.IO.Class       ( MonadIO, liftIO )
import Control.Monad.Trans.Class    ( lift )
import Control.Monad.Trans.Resource ( MonadResource, runResourceT )

import Conduit                  ( MonadUnliftIO(..) )
import Data.Conduit             ( ConduitT, Void, await, handleC, yield, (.|) )
import Data.Conduit.Combinators ( sinkList )
import Data.Conduit.Combinators qualified as CC

import Data.ByteString               qualified as BS
import Data.ByteString.Builder       ( stringUtf8 )
import Data.ByteString.Builder.Extra ( byteStringCopy, runBuilder )
import Data.ByteString.Internal      ( ByteString(PS) )

import Data.List          ( unfoldr )
import Data.List.NonEmpty ( fromList, nonEmpty )
import Data.Text          ( Text )

import Control.Concurrent       ( newQSem, signalQSem, waitQSem )
import Control.Concurrent.Async ( forConcurrently )
import Control.Exception.Base   ( SomeException, bracket_ )

import Foreign.ForeignPtr        ( ForeignPtr, mallocForeignPtrBytes, plusForeignPtr )
import Foreign.ForeignPtr.Unsafe ( unsafeForeignPtrToPtr )

import Control.DeepSeq ( rwhnf )
import Data.Foldable   ( for_, traverse_ )
import Data.Typeable   ( Typeable )
import Data.Word       ( Word8 )
import Control.Monad   ((>=>))


type ChunkSize = Int
type NumThreads = Int

-- | Minimum size of data which will be sent in a single part, currently 6MB
minimumChunkSize :: ChunkSize
minimumChunkSize :: Int
minimumChunkSize = Int
6Int -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1024Int -> Int -> Int
forall a. Num a => a -> a -> a
*Int
1024 -- Making this 5MB+1 seemed to cause AWS to complain


data StreamingError
    = UnableToCreateMultipartUpload CreateMultipartUploadResponse
    | FailedToUploadPiece UploadPartResponse
    | Other String
  deriving stock (Int -> StreamingError -> ShowS
[StreamingError] -> ShowS
StreamingError -> String
(Int -> StreamingError -> ShowS)
-> (StreamingError -> String)
-> ([StreamingError] -> ShowS)
-> Show StreamingError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> StreamingError -> ShowS
showsPrec :: Int -> StreamingError -> ShowS
$cshow :: StreamingError -> String
show :: StreamingError -> String
$cshowList :: [StreamingError] -> ShowS
showList :: [StreamingError] -> ShowS
Show, StreamingError -> StreamingError -> Bool
(StreamingError -> StreamingError -> Bool)
-> (StreamingError -> StreamingError -> Bool) -> Eq StreamingError
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: StreamingError -> StreamingError -> Bool
== :: StreamingError -> StreamingError -> Bool
$c/= :: StreamingError -> StreamingError -> Bool
/= :: StreamingError -> StreamingError -> Bool
Eq, Typeable)

instance Exception StreamingError



{- |
Given a 'CreateMultipartUpload', creates a 'Sink' which will sequentially
upload the data streamed in in chunks of at least 'minimumChunkSize' and return either
the 'CompleteMultipartUploadResponse', or if an exception is thrown,
`AbortMultipartUploadResponse` and the exception as `SomeException`. If aborting
the upload also fails then the exception caused by the call to abort will be thrown.

'Amazonka.S3.ListMultipartUploads' can be used to list any pending
uploads - it is important to abort multipart uploads because you will
be charged for storage of the parts until it is completed or aborted.
See the AWS documentation for more details.

Internally, a single @chunkSize@d buffer will be allocated and reused between
requests to avoid holding onto incoming @ByteString@s.

May throw 'Amazonka.Error'
-}
streamUpload :: forall m. (MonadUnliftIO m, MonadResource m)
             => Env
             -> Maybe ChunkSize -- ^ Optional chunk size
             -> CreateMultipartUpload -- ^ Upload location
             -> ConduitT ByteString Void m (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
streamUpload :: forall (m :: * -> *).
(MonadUnliftIO m, MonadResource m) =>
Env
-> Maybe Int
-> CreateMultipartUpload
-> ConduitT
     ByteString
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
streamUpload Env
env Maybe Int
mChunkSize multiPartUploadDesc :: CreateMultipartUpload
multiPartUploadDesc@CreateMultipartUpload'{$sel:bucket:CreateMultipartUpload' :: CreateMultipartUpload -> BucketName
bucket = BucketName
buck, $sel:key:CreateMultipartUpload' :: CreateMultipartUpload -> ObjectKey
key = ObjectKey
k} = do
  Buffer
buffer <- IO Buffer -> ConduitT ByteString Void m Buffer
forall a. IO a -> ConduitT ByteString Void m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Buffer -> ConduitT ByteString Void m Buffer)
-> IO Buffer -> ConduitT ByteString Void m Buffer
forall a b. (a -> b) -> a -> b
$ Int -> IO Buffer
allocBuffer Int
chunkSize
  Buffer -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *).
MonadIO m =>
Buffer -> ConduitT ByteString BufferResult m ()
unsafeWriteChunksToBuffer Buffer
buffer
    ConduitT ByteString BufferResult m ()
-> ConduitT
     BufferResult
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
-> ConduitT
     ByteString
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT BufferResult (Int, BufferResult) m ()
forall a. ConduitT a (Int, a) m ()
enumerateConduit
    ConduitT BufferResult (Int, BufferResult) m ()
-> ConduitT
     (Int, BufferResult)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
-> ConduitT
     BufferResult
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Buffer
-> ConduitT
     (Int, BufferResult)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
startUpload Buffer
buffer
  where
    chunkSize :: ChunkSize
    chunkSize :: Int
chunkSize = Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
minimumChunkSize (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
minimumChunkSize) Maybe Int
mChunkSize

    logStr :: String -> m ()
    logStr :: String -> m ()
logStr String
msg  = do
      IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Env -> Logger
forall (withAuth :: * -> *). Env' withAuth -> Logger
logger Env
env LogLevel
Debug (ByteStringBuilder -> IO ()) -> ByteStringBuilder -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> ByteStringBuilder
stringUtf8 String
msg

    startUpload :: Buffer
                -> ConduitT (Int, BufferResult) Void m
                    (Either (AbortMultipartUploadResponse, SomeException)
                    CompleteMultipartUploadResponse)
    startUpload :: Buffer
-> ConduitT
     (Int, BufferResult)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
startUpload Buffer
buffer = do
      CreateMultipartUploadResponse'{$sel:uploadId:CreateMultipartUploadResponse' :: CreateMultipartUploadResponse -> Text
uploadId = Text
upId} <- m CreateMultipartUploadResponse
-> ConduitT
     (Int, BufferResult) Void m CreateMultipartUploadResponse
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Int, BufferResult) Void m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m CreateMultipartUploadResponse
 -> ConduitT
      (Int, BufferResult) Void m CreateMultipartUploadResponse)
-> m CreateMultipartUploadResponse
-> ConduitT
     (Int, BufferResult) Void m CreateMultipartUploadResponse
forall a b. (a -> b) -> a -> b
$ Env
-> CreateMultipartUpload -> m (AWSResponse CreateMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env CreateMultipartUpload
multiPartUploadDesc
      m () -> ConduitT (Int, BufferResult) Void m ()
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Int, BufferResult) Void m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ConduitT (Int, BufferResult) Void m ())
-> m () -> ConduitT (Int, BufferResult) Void m ()
forall a b. (a -> b) -> a -> b
$ String -> m ()
logStr String
"\n**** Created upload\n"

      (SomeException
 -> ConduitT
      (Int, BufferResult)
      Void
      m
      (Either
         (AbortMultipartUploadResponse, SomeException)
         CompleteMultipartUploadResponse))
-> ConduitT
     (Int, BufferResult)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
-> ConduitT
     (Int, BufferResult)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall (m :: * -> *) e i o r.
(MonadUnliftIO m, Exception e) =>
(e -> ConduitT i o m r) -> ConduitT i o m r -> ConduitT i o m r
handleC (Text
-> SomeException
-> ConduitT
     (Int, BufferResult)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall i.
Text
-> SomeException
-> ConduitT
     i
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
cancelMultiUploadConduit Text
upId) (ConduitT
   (Int, BufferResult)
   Void
   m
   (Either
      (AbortMultipartUploadResponse, SomeException)
      CompleteMultipartUploadResponse)
 -> ConduitT
      (Int, BufferResult)
      Void
      m
      (Either
         (AbortMultipartUploadResponse, SomeException)
         CompleteMultipartUploadResponse))
-> ConduitT
     (Int, BufferResult)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
-> ConduitT
     (Int, BufferResult)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall a b. (a -> b) -> a -> b
$
        ((Int, BufferResult) -> m (Maybe CompletedPart))
-> ConduitT (Int, BufferResult) (Maybe CompletedPart) m ()
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
CC.mapM (Buffer -> Text -> (Int, BufferResult) -> m (Maybe CompletedPart)
multiUpload Buffer
buffer Text
upId)
        ConduitT (Int, BufferResult) (Maybe CompletedPart) m ()
-> ConduitT
     (Maybe CompletedPart)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
-> ConduitT
     (Int, BufferResult)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Text
-> ConduitT
     (Maybe CompletedPart)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
finishMultiUploadConduit Text
upId

    multiUpload :: Buffer -> Text -> (Int, BufferResult) -> m (Maybe CompletedPart)
    multiUpload :: Buffer -> Text -> (Int, BufferResult) -> m (Maybe CompletedPart)
multiUpload Buffer
buffer Text
upId (Int
partnum, BufferResult
result) = do
      let !bs :: ByteString
bs = Buffer -> BufferResult -> ByteString
bufferToByteString Buffer
buffer BufferResult
result
          !bsHash :: Digest SHA256
bsHash = ByteString -> Digest SHA256
forall ba a.
(ByteArrayAccess ba, HashAlgorithm a) =>
ba -> Digest a
hash ByteString
bs
      UploadPartResponse'{Maybe ETag
eTag :: Maybe ETag
$sel:eTag:UploadPartResponse' :: UploadPartResponse -> Maybe ETag
eTag} <- Env -> UploadPart -> m (AWSResponse UploadPart)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (UploadPart -> m UploadPartResponse)
-> UploadPart -> m UploadPartResponse
forall a b. (a -> b) -> a -> b
$! BucketName -> ObjectKey -> Int -> Text -> RequestBody -> UploadPart
newUploadPart BucketName
buck ObjectKey
k Int
partnum Text
upId (RequestBody -> UploadPart) -> RequestBody -> UploadPart
forall a b. (a -> b) -> a -> b
$! HashedBody -> RequestBody
forall a. ToBody a => a -> RequestBody
toBody (HashedBody -> RequestBody) -> HashedBody -> RequestBody
forall a b. (a -> b) -> a -> b
$! Digest SHA256 -> ByteString -> HashedBody
HashedBytes Digest SHA256
bsHash ByteString
bs
      let !()
_ = Maybe ETag -> ()
forall a. a -> ()
rwhnf Maybe ETag
eTag
      String -> m ()
logStr (String -> m ()) -> String -> m ()
forall a b. (a -> b) -> a -> b
$ String
"\n**** Uploaded part " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
partnum
      Maybe CompletedPart -> m (Maybe CompletedPart)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe CompletedPart -> m (Maybe CompletedPart))
-> Maybe CompletedPart -> m (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$! Int -> ETag -> CompletedPart
newCompletedPart Int
partnum (ETag -> CompletedPart) -> Maybe ETag -> Maybe CompletedPart
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ETag
eTag

    -- collect all the parts
    finishMultiUploadConduit :: Text
                             -> ConduitT (Maybe CompletedPart) Void m
                                  (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
    finishMultiUploadConduit :: Text
-> ConduitT
     (Maybe CompletedPart)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
finishMultiUploadConduit Text
upId = do
      [Maybe CompletedPart]
parts <- ConduitT (Maybe CompletedPart) Void m [Maybe CompletedPart]
forall (m :: * -> *) a o. Monad m => ConduitT a o m [a]
sinkList
      CompleteMultipartUploadResponse
res <- m CompleteMultipartUploadResponse
-> ConduitT
     (Maybe CompletedPart) Void m CompleteMultipartUploadResponse
forall (m :: * -> *) a.
Monad m =>
m a -> ConduitT (Maybe CompletedPart) Void m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m CompleteMultipartUploadResponse
 -> ConduitT
      (Maybe CompletedPart) Void m CompleteMultipartUploadResponse)
-> m CompleteMultipartUploadResponse
-> ConduitT
     (Maybe CompletedPart) Void m CompleteMultipartUploadResponse
forall a b. (a -> b) -> a -> b
$ Env
-> CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (CompleteMultipartUpload
 -> m (AWSResponse CompleteMultipartUpload))
-> CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload)
forall a b. (a -> b) -> a -> b
$ (BucketName -> ObjectKey -> Text -> CompleteMultipartUpload
newCompleteMultipartUpload BucketName
buck ObjectKey
k Text
upId)
               { multipartUpload =
                  Just $ newCompletedMultipartUpload {parts = sequenceA $ fromList parts}
               }

      Either
  (AbortMultipartUploadResponse, SomeException)
  CompleteMultipartUploadResponse
-> ConduitT
     (Maybe CompletedPart)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall a. a -> ConduitT (Maybe CompletedPart) Void m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either
   (AbortMultipartUploadResponse, SomeException)
   CompleteMultipartUploadResponse
 -> ConduitT
      (Maybe CompletedPart)
      Void
      m
      (Either
         (AbortMultipartUploadResponse, SomeException)
         CompleteMultipartUploadResponse))
-> Either
     (AbortMultipartUploadResponse, SomeException)
     CompleteMultipartUploadResponse
-> ConduitT
     (Maybe CompletedPart)
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall a b. (a -> b) -> a -> b
$ CompleteMultipartUploadResponse
-> Either
     (AbortMultipartUploadResponse, SomeException)
     CompleteMultipartUploadResponse
forall a b. b -> Either a b
Right CompleteMultipartUploadResponse
res

    -- in case of an exception, return Left
    cancelMultiUploadConduit :: Text -> SomeException
                             -> ConduitT i Void m
                                  (Either (AbortMultipartUploadResponse, SomeException) CompleteMultipartUploadResponse)
    cancelMultiUploadConduit :: forall i.
Text
-> SomeException
-> ConduitT
     i
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
cancelMultiUploadConduit Text
upId SomeException
exc = do
      AbortMultipartUploadResponse
res <- m AbortMultipartUploadResponse
-> ConduitT i Void m AbortMultipartUploadResponse
forall (m :: * -> *) a. Monad m => m a -> ConduitT i Void m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m AbortMultipartUploadResponse
 -> ConduitT i Void m AbortMultipartUploadResponse)
-> m AbortMultipartUploadResponse
-> ConduitT i Void m AbortMultipartUploadResponse
forall a b. (a -> b) -> a -> b
$ Env -> AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload))
-> AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload)
forall a b. (a -> b) -> a -> b
$ BucketName -> ObjectKey -> Text -> AbortMultipartUpload
newAbortMultipartUpload BucketName
buck ObjectKey
k Text
upId
      Either
  (AbortMultipartUploadResponse, SomeException)
  CompleteMultipartUploadResponse
-> ConduitT
     i
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall a. a -> ConduitT i Void m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either
   (AbortMultipartUploadResponse, SomeException)
   CompleteMultipartUploadResponse
 -> ConduitT
      i
      Void
      m
      (Either
         (AbortMultipartUploadResponse, SomeException)
         CompleteMultipartUploadResponse))
-> Either
     (AbortMultipartUploadResponse, SomeException)
     CompleteMultipartUploadResponse
-> ConduitT
     i
     Void
     m
     (Either
        (AbortMultipartUploadResponse, SomeException)
        CompleteMultipartUploadResponse)
forall a b. (a -> b) -> a -> b
$ (AbortMultipartUploadResponse, SomeException)
-> Either
     (AbortMultipartUploadResponse, SomeException)
     CompleteMultipartUploadResponse
forall a b. a -> Either a b
Left (AbortMultipartUploadResponse
res, SomeException
exc)

    -- count from 1
    enumerateConduit :: ConduitT a (Int, a) m ()
    enumerateConduit :: forall a. ConduitT a (Int, a) m ()
enumerateConduit = Int -> ConduitT a (Int, a) m ()
forall {m :: * -> *} {a} {b}.
(Monad m, Num a) =>
a -> ConduitT b (a, b) m ()
loop Int
1
      where
        loop :: a -> ConduitT b (a, b) m ()
loop a
i = ConduitT b (a, b) m (Maybe b)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT b (a, b) m (Maybe b)
-> (Maybe b -> ConduitT b (a, b) m ()) -> ConduitT b (a, b) m ()
forall a b.
ConduitT b (a, b) m a
-> (a -> ConduitT b (a, b) m b) -> ConduitT b (a, b) m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT b (a, b) m ()
-> (b -> ConduitT b (a, b) m ())
-> Maybe b
-> ConduitT b (a, b) m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> ConduitT b (a, b) m ()
forall a. a -> ConduitT b (a, b) m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) (a -> b -> ConduitT b (a, b) m ()
go a
i)
        go :: a -> b -> ConduitT b (a, b) m ()
go a
i b
x = do
          (a, b) -> ConduitT b (a, b) m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (a
i, b
x)
          a -> ConduitT b (a, b) m ()
loop (a
i a -> a -> a
forall a. Num a => a -> a -> a
+ a
1)
    {-# INLINE enumerateConduit #-}

-- The number of bytes remaining in a buffer, and the pointer that backs it.
data Buffer = Buffer {Buffer -> Int
remaining :: !Int, Buffer -> ForeignPtr Word8
_fptr :: !(ForeignPtr Word8)}

data PutResult
    = Ok Buffer         -- Didn't fill the buffer, updated buffer.
    | Full ByteString   -- Buffer is full, the unwritten remaining string.

data BufferResult = FullBuffer | Incomplete Int

-- Accepts @ByteString@s and writes them into @Buffer@. When the buffer is full,
-- @FullBuffer@ is emitted. If there is no more input, @Incomplete@ is emitted with
-- the number of bytes remaining in the buffer.
unsafeWriteChunksToBuffer :: MonadIO m => Buffer -> ConduitT ByteString BufferResult m ()
unsafeWriteChunksToBuffer :: forall (m :: * -> *).
MonadIO m =>
Buffer -> ConduitT ByteString BufferResult m ()
unsafeWriteChunksToBuffer Buffer
buffer0 = Buffer -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *).
MonadIO m =>
Buffer -> ConduitT ByteString BufferResult m ()
awaitLoop Buffer
buffer0 where
  awaitLoop :: Buffer -> ConduitT ByteString BufferResult m ()
awaitLoop Buffer
buf =
    ConduitT ByteString BufferResult m (Maybe ByteString)
forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await ConduitT ByteString BufferResult m (Maybe ByteString)
-> (Maybe ByteString -> ConduitT ByteString BufferResult m ())
-> ConduitT ByteString BufferResult m ()
forall a b.
ConduitT ByteString BufferResult m a
-> (a -> ConduitT ByteString BufferResult m b)
-> ConduitT ByteString BufferResult m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ConduitT ByteString BufferResult m ()
-> (ByteString -> ConduitT ByteString BufferResult m ())
-> Maybe ByteString
-> ConduitT ByteString BufferResult m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (BufferResult -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (BufferResult -> ConduitT ByteString BufferResult m ())
-> BufferResult -> ConduitT ByteString BufferResult m ()
forall a b. (a -> b) -> a -> b
$ Int -> BufferResult
Incomplete (Int -> BufferResult) -> Int -> BufferResult
forall a b. (a -> b) -> a -> b
$ Buffer -> Int
remaining Buffer
buf)
      (IO PutResult -> ConduitT ByteString BufferResult m PutResult
forall a. IO a -> ConduitT ByteString BufferResult m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PutResult -> ConduitT ByteString BufferResult m PutResult)
-> (ByteString -> IO PutResult)
-> ByteString
-> ConduitT ByteString BufferResult m PutResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Buffer -> ByteString -> IO PutResult
putBuffer Buffer
buf (ByteString -> ConduitT ByteString BufferResult m PutResult)
-> (PutResult -> ConduitT ByteString BufferResult m ())
-> ByteString
-> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> \case
        Full ByteString
next -> BufferResult -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield BufferResult
FullBuffer ConduitT ByteString BufferResult m ()
-> ConduitT ByteString BufferResult m ()
-> ConduitT ByteString BufferResult m ()
forall a b.
ConduitT ByteString BufferResult m a
-> ConduitT ByteString BufferResult m b
-> ConduitT ByteString BufferResult m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Buffer -> ByteString -> ConduitT ByteString BufferResult m ()
chunkLoop Buffer
buffer0 ByteString
next
        Ok Buffer
buf'   -> Buffer -> ConduitT ByteString BufferResult m ()
awaitLoop Buffer
buf'
      )
  -- Handle inputs which are larger than the chunkSize
  chunkLoop :: Buffer -> ByteString -> ConduitT ByteString BufferResult m ()
chunkLoop Buffer
buf = IO PutResult -> ConduitT ByteString BufferResult m PutResult
forall a. IO a -> ConduitT ByteString BufferResult m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PutResult -> ConduitT ByteString BufferResult m PutResult)
-> (ByteString -> IO PutResult)
-> ByteString
-> ConduitT ByteString BufferResult m PutResult
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Buffer -> ByteString -> IO PutResult
putBuffer Buffer
buf (ByteString -> ConduitT ByteString BufferResult m PutResult)
-> (PutResult -> ConduitT ByteString BufferResult m ())
-> ByteString
-> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> \case
    Full ByteString
next -> BufferResult -> ConduitT ByteString BufferResult m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield BufferResult
FullBuffer ConduitT ByteString BufferResult m ()
-> ConduitT ByteString BufferResult m ()
-> ConduitT ByteString BufferResult m ()
forall a b.
ConduitT ByteString BufferResult m a
-> ConduitT ByteString BufferResult m b
-> ConduitT ByteString BufferResult m b
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Buffer -> ByteString -> ConduitT ByteString BufferResult m ()
chunkLoop Buffer
buffer0 ByteString
next
    Ok Buffer
buf'   -> Buffer -> ConduitT ByteString BufferResult m ()
awaitLoop Buffer
buf'

bufferToByteString :: Buffer -> BufferResult -> ByteString
bufferToByteString :: Buffer -> BufferResult -> ByteString
bufferToByteString (Buffer Int
bufSize ForeignPtr Word8
fptr) BufferResult
FullBuffer             = ForeignPtr Word8 -> Int -> Int -> ByteString
PS ForeignPtr Word8
fptr Int
0 Int
bufSize
bufferToByteString (Buffer Int
bufSize ForeignPtr Word8
fptr) (Incomplete Int
remaining) = ForeignPtr Word8 -> Int -> Int -> ByteString
PS ForeignPtr Word8
fptr Int
0 (Int
bufSize Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
remaining)

allocBuffer :: Int -> IO Buffer
allocBuffer :: Int -> IO Buffer
allocBuffer Int
chunkSize = Int -> ForeignPtr Word8 -> Buffer
Buffer Int
chunkSize (ForeignPtr Word8 -> Buffer) -> IO (ForeignPtr Word8) -> IO Buffer
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO (ForeignPtr Word8)
forall a. Int -> IO (ForeignPtr a)
mallocForeignPtrBytes Int
chunkSize

putBuffer :: Buffer -> ByteString -> IO PutResult
putBuffer :: Buffer -> ByteString -> IO PutResult
putBuffer Buffer
buffer ByteString
bs
  | ByteString -> Int
BS.length ByteString
bs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Buffer -> Int
remaining Buffer
buffer =
      Buffer -> PutResult
Ok (Buffer -> PutResult) -> IO Buffer -> IO PutResult
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Buffer -> ByteString -> IO Buffer
unsafeWriteBuffer Buffer
buffer ByteString
bs
  | Bool
otherwise = do
      let (ByteString
remainder,ByteString
rest) = Int -> ByteString -> (ByteString, ByteString)
BS.splitAt (Buffer -> Int
remaining Buffer
buffer) ByteString
bs
      ByteString -> PutResult
Full ByteString
rest PutResult -> IO Buffer -> IO PutResult
forall a b. a -> IO b -> IO a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Buffer -> ByteString -> IO Buffer
unsafeWriteBuffer Buffer
buffer ByteString
remainder

-- The length of the bytestring must be less than or equal to the number
-- of bytes remaining.
unsafeWriteBuffer :: Buffer -> ByteString -> IO Buffer
unsafeWriteBuffer :: Buffer -> ByteString -> IO Buffer
unsafeWriteBuffer (Buffer Int
remaining ForeignPtr Word8
fptr) ByteString
bs = do
    let ptr :: Ptr Word8
ptr = ForeignPtr Word8 -> Ptr Word8
forall a. ForeignPtr a -> Ptr a
unsafeForeignPtrToPtr ForeignPtr Word8
fptr
        len :: Int
len = ByteString -> Int
BS.length ByteString
bs
    (Int, Next)
_ <- ByteStringBuilder -> BufferWriter
runBuilder (ByteString -> ByteStringBuilder
byteStringCopy ByteString
bs) Ptr Word8
ptr Int
remaining
    Buffer -> IO Buffer
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Buffer -> IO Buffer) -> Buffer -> IO Buffer
forall a b. (a -> b) -> a -> b
$ Int -> ForeignPtr Word8 -> Buffer
Buffer (Int
remaining Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
len) (ForeignPtr Word8 -> Int -> ForeignPtr Word8
forall a b. ForeignPtr a -> Int -> ForeignPtr b
plusForeignPtr ForeignPtr Word8
fptr Int
len)


-- | Specifies whether to upload a file or 'ByteString'.
data UploadLocation
    = FP FilePath -- ^ A file to be uploaded
    | BS ByteString -- ^ A strict 'ByteString'

{-|
Allows a file or 'ByteString' to be uploaded concurrently, using the
async library.  The chunk size may optionally be specified, but will be at least
`minimumChunkSize`, and may be made larger than if the `ByteString` or file
is larger enough to cause more than 10,000 chunks.

Files are mmapped into 'chunkSize' chunks and each chunk is uploaded in parallel.
This considerably reduces the memory necessary compared to reading the contents
into memory as a strict 'ByteString'. The usual caveats about mmaped files apply:
if the file is modified during this operation, the data may become corrupt.

May throw `Amazonka.Error`, or `IOError`; an attempt is made to cancel the
multipart upload on any error, but this may also fail if, for example, the network
connection has been broken. See `abortAllUploads` for a crude cleanup method.
-}
concurrentUpload :: forall m.
  (MonadResource m, MonadCatch m)
  => Env
  -> Maybe ChunkSize -- ^ Optional chunk size
  -> Maybe NumThreads -- ^ Optional number of threads to upload with
  -> UploadLocation -- ^ Whether to upload a file on disk or a `ByteString` that's already in memory.
  -> CreateMultipartUpload -- ^ Description of where to upload.
  -> m CompleteMultipartUploadResponse
concurrentUpload :: forall (m :: * -> *).
(MonadResource m, MonadCatch m) =>
Env
-> Maybe Int
-> Maybe Int
-> UploadLocation
-> CreateMultipartUpload
-> m CompleteMultipartUploadResponse
concurrentUpload Env
env' Maybe Int
mChunkSize Maybe Int
mNumThreads UploadLocation
uploadLoc
                 multiPartUploadDesc :: CreateMultipartUpload
multiPartUploadDesc@CreateMultipartUpload'{$sel:bucket:CreateMultipartUpload' :: CreateMultipartUpload -> BucketName
bucket = BucketName
buck, $sel:key:CreateMultipartUpload' :: CreateMultipartUpload -> ObjectKey
key = ObjectKey
k}
  = do
  CreateMultipartUploadResponse'{$sel:uploadId:CreateMultipartUploadResponse' :: CreateMultipartUploadResponse -> Text
uploadId = Text
upId} <- Env
-> CreateMultipartUpload -> m (AWSResponse CreateMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env' CreateMultipartUpload
multiPartUploadDesc

  let logStr :: MonadIO n => String -> n ()
      logStr :: forall (n :: * -> *). MonadIO n => String -> n ()
logStr = IO () -> n ()
forall a. IO a -> n a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> n ()) -> (String -> IO ()) -> String -> n ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Env -> Logger
forall (withAuth :: * -> *). Env' withAuth -> Logger
logger Env
env' LogLevel
Info (ByteStringBuilder -> IO ())
-> (String -> ByteStringBuilder) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> ByteStringBuilder
stringUtf8

      calculateChunkSize :: Int -> Int
      calculateChunkSize :: Int -> Int
calculateChunkSize Int
len =
          let chunkSize' :: Int
chunkSize' = Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
minimumChunkSize (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
minimumChunkSize) Maybe Int
mChunkSize
          in if Int
len Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
chunkSize' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
10000 then Int
len Int -> Int -> Int
forall a. Integral a => a -> a -> a
`div` Int
9999 else Int
chunkSize'

      mConnCount :: Int
mConnCount = ManagerSettings -> Int
managerConnCount ManagerSettings
tlsManagerSettings
      nThreads :: Int
nThreads   = Int -> (Int -> Int) -> Maybe Int -> Int
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Int
mConnCount (Int -> Int -> Int
forall a. Ord a => a -> a -> a
max Int
1) Maybe Int
mNumThreads

  Env
env <- if Bool -> (Int -> Bool) -> Maybe Int -> Bool
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Bool
False (Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
mConnCount) Maybe Int
mNumThreads
              then do
                  Manager
mgr' <- IO Manager -> m Manager
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Manager -> m Manager) -> IO Manager -> m Manager
forall a b. (a -> b) -> a -> b
$ ManagerSettings -> IO Manager
newManager ManagerSettings
tlsManagerSettings{managerConnCount = nThreads}
                  Env -> m Env
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Env
env'{manager = mgr'}
              else Env -> m Env
forall a. a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Env
env'
  (m CompleteMultipartUploadResponse
 -> m AbortMultipartUploadResponse
 -> m CompleteMultipartUploadResponse)
-> m AbortMultipartUploadResponse
-> m CompleteMultipartUploadResponse
-> m CompleteMultipartUploadResponse
forall a b c. (a -> b -> c) -> b -> a -> c
flip m CompleteMultipartUploadResponse
-> m AbortMultipartUploadResponse
-> m CompleteMultipartUploadResponse
forall (m :: * -> *) a b.
(HasCallStack, MonadCatch m) =>
m a -> m b -> m a
onException (Env -> AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (BucketName -> ObjectKey -> Text -> AbortMultipartUpload
newAbortMultipartUpload BucketName
buck ObjectKey
k Text
upId)) (m CompleteMultipartUploadResponse
 -> m CompleteMultipartUploadResponse)
-> m CompleteMultipartUploadResponse
-> m CompleteMultipartUploadResponse
forall a b. (a -> b) -> a -> b
$ do
      QSem
sem <- IO QSem -> m QSem
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO QSem -> m QSem) -> IO QSem -> m QSem
forall a b. (a -> b) -> a -> b
$ Int -> IO QSem
newQSem Int
nThreads
      [Maybe CompletedPart]
uploadResponses <- case UploadLocation
uploadLoc of
          BS ByteString
bytes ->
            let chunkSize :: Int
chunkSize = Int -> Int
calculateChunkSize (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ ByteString -> Int
BS.length ByteString
bytes
            in IO [Maybe CompletedPart] -> m [Maybe CompletedPart]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Maybe CompletedPart] -> m [Maybe CompletedPart])
-> IO [Maybe CompletedPart] -> m [Maybe CompletedPart]
forall a b. (a -> b) -> a -> b
$ [(Int, ByteString)]
-> ((Int, ByteString) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
forConcurrently ([Int] -> [ByteString] -> [(Int, ByteString)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1..] ([ByteString] -> [(Int, ByteString)])
-> [ByteString] -> [(Int, ByteString)]
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> [ByteString]
chunksOf Int
chunkSize ByteString
bytes) (((Int, ByteString) -> IO (Maybe CompletedPart))
 -> IO [Maybe CompletedPart])
-> ((Int, ByteString) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart]
forall a b. (a -> b) -> a -> b
$ \(Int
partnum, ByteString
chunk) ->
                IO ()
-> IO () -> IO (Maybe CompletedPart) -> IO (Maybe CompletedPart)
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (QSem -> IO ()
waitQSem QSem
sem) (QSem -> IO ()
signalQSem QSem
sem) (IO (Maybe CompletedPart) -> IO (Maybe CompletedPart))
-> IO (Maybe CompletedPart) -> IO (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$ do
                  String -> IO ()
forall (n :: * -> *). MonadIO n => String -> n ()
logStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Starting part: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
partnum
                  UploadPartResponse'{Maybe ETag
$sel:eTag:UploadPartResponse' :: UploadPartResponse -> Maybe ETag
eTag :: Maybe ETag
eTag} <- ResourceT IO UploadPartResponse -> IO UploadPartResponse
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO UploadPartResponse -> IO UploadPartResponse)
-> ResourceT IO UploadPartResponse -> IO UploadPartResponse
forall a b. (a -> b) -> a -> b
$ Env -> UploadPart -> ResourceT IO (AWSResponse UploadPart)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (UploadPart -> ResourceT IO UploadPartResponse)
-> (ByteString -> UploadPart)
-> ByteString
-> ResourceT IO UploadPartResponse
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BucketName -> ObjectKey -> Int -> Text -> RequestBody -> UploadPart
newUploadPart BucketName
buck ObjectKey
k Int
partnum Text
upId (RequestBody -> UploadPart)
-> (ByteString -> RequestBody) -> ByteString -> UploadPart
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> RequestBody
forall a. ToBody a => a -> RequestBody
toBody (ByteString -> ResourceT IO UploadPartResponse)
-> ByteString -> ResourceT IO UploadPartResponse
forall a b. (a -> b) -> a -> b
$ ByteString
chunk
                  String -> IO ()
forall (n :: * -> *). MonadIO n => String -> n ()
logStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Finished part: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
partnum
                  Maybe CompletedPart -> IO (Maybe CompletedPart)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe CompletedPart -> IO (Maybe CompletedPart))
-> Maybe CompletedPart -> IO (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$ Int -> ETag -> CompletedPart
newCompletedPart Int
partnum (ETag -> CompletedPart) -> Maybe ETag -> Maybe CompletedPart
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ETag
eTag

          FP String
filePath -> do
            Integer
fsize <- IO Integer -> m Integer
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Integer -> m Integer) -> IO Integer -> m Integer
forall a b. (a -> b) -> a -> b
$ String -> IO Integer
forall (m :: * -> *). MonadIO m => String -> m Integer
getFileSize String
filePath
            let chunkSize :: Int
chunkSize = Int -> Int
calculateChunkSize (Int -> Int) -> Int -> Int
forall a b. (a -> b) -> a -> b
$ Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
fsize
                (Int
count,Int
lst) = Integer -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Integer
fsize Int -> Int -> (Int, Int)
forall a. Integral a => a -> a -> (a, a)
`divMod` Int
chunkSize
                params :: [(Int, Int, Int)]
params = [(Int
partnum, Int
chunkSizeInt -> Int -> Int
forall a. Num a => a -> a -> a
*Int
offset, Int
size)
                          | Int
partnum <- [Int
1..]
                          | Int
offset  <- [Int
0..Int
count]
                          | Int
size    <- (Int
chunkSize Int -> [Int] -> [Int]
forall a b. a -> [b] -> [a]
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ [Int
0..Int
countInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1]) [Int] -> [Int] -> [Int]
forall a. [a] -> [a] -> [a]
++ [Int
lst]
                          ]

            IO [Maybe CompletedPart] -> m [Maybe CompletedPart]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [Maybe CompletedPart] -> m [Maybe CompletedPart])
-> IO [Maybe CompletedPart] -> m [Maybe CompletedPart]
forall a b. (a -> b) -> a -> b
$ [(Int, Int, Int)]
-> ((Int, Int, Int) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart]
forall (t :: * -> *) a b.
Traversable t =>
t a -> (a -> IO b) -> IO (t b)
forConcurrently [(Int, Int, Int)]
params (((Int, Int, Int) -> IO (Maybe CompletedPart))
 -> IO [Maybe CompletedPart])
-> ((Int, Int, Int) -> IO (Maybe CompletedPart))
-> IO [Maybe CompletedPart]
forall a b. (a -> b) -> a -> b
$ \(Int
partnum,Int
off,Int
size) ->
              IO ()
-> IO () -> IO (Maybe CompletedPart) -> IO (Maybe CompletedPart)
forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (QSem -> IO ()
waitQSem QSem
sem) (QSem -> IO ()
signalQSem QSem
sem) (IO (Maybe CompletedPart) -> IO (Maybe CompletedPart))
-> IO (Maybe CompletedPart) -> IO (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$ do
                String -> IO ()
forall (n :: * -> *). MonadIO n => String -> n ()
logStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Starting file part: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
partnum
                HashedBody
chunkStream <- String -> Integer -> Integer -> IO HashedBody
forall (m :: * -> *).
MonadIO m =>
String -> Integer -> Integer -> m HashedBody
hashedFileRange String
filePath (Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
off) (Int -> Integer
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
size)
                UploadPartResponse'{Maybe ETag
$sel:eTag:UploadPartResponse' :: UploadPartResponse -> Maybe ETag
eTag :: Maybe ETag
eTag} <- ResourceT IO UploadPartResponse -> IO UploadPartResponse
forall (m :: * -> *) a. MonadUnliftIO m => ResourceT m a -> m a
runResourceT (ResourceT IO UploadPartResponse -> IO UploadPartResponse)
-> ResourceT IO UploadPartResponse -> IO UploadPartResponse
forall a b. (a -> b) -> a -> b
$
                  Env -> UploadPart -> ResourceT IO (AWSResponse UploadPart)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (UploadPart -> ResourceT IO UploadPartResponse)
-> (HashedBody -> UploadPart)
-> HashedBody
-> ResourceT IO UploadPartResponse
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BucketName -> ObjectKey -> Int -> Text -> RequestBody -> UploadPart
newUploadPart BucketName
buck ObjectKey
k Int
partnum Text
upId (RequestBody -> UploadPart)
-> (HashedBody -> RequestBody) -> HashedBody -> UploadPart
forall b c a. (b -> c) -> (a -> b) -> a -> c
. HashedBody -> RequestBody
forall a. ToBody a => a -> RequestBody
toBody (HashedBody -> ResourceT IO UploadPartResponse)
-> HashedBody -> ResourceT IO UploadPartResponse
forall a b. (a -> b) -> a -> b
$ HashedBody
chunkStream
                String -> IO ()
forall (n :: * -> *). MonadIO n => String -> n ()
logStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Finished file part: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
partnum
                Maybe CompletedPart -> IO (Maybe CompletedPart)
forall a. a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Maybe CompletedPart -> IO (Maybe CompletedPart))
-> Maybe CompletedPart -> IO (Maybe CompletedPart)
forall a b. (a -> b) -> a -> b
$ Int -> ETag -> CompletedPart
newCompletedPart Int
partnum (ETag -> CompletedPart) -> Maybe ETag -> Maybe CompletedPart
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ETag
eTag

      let parts :: Maybe (NonEmpty CompletedPart)
parts = [CompletedPart] -> Maybe (NonEmpty CompletedPart)
forall a. [a] -> Maybe (NonEmpty a)
nonEmpty ([CompletedPart] -> Maybe (NonEmpty CompletedPart))
-> Maybe [CompletedPart] -> Maybe (NonEmpty CompletedPart)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< [Maybe CompletedPart] -> Maybe [CompletedPart]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence [Maybe CompletedPart]
uploadResponses
      Env
-> CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (CompleteMultipartUpload
 -> m (AWSResponse CompleteMultipartUpload))
-> CompleteMultipartUpload
-> m (AWSResponse CompleteMultipartUpload)
forall a b. (a -> b) -> a -> b
$ (BucketName -> ObjectKey -> Text -> CompleteMultipartUpload
newCompleteMultipartUpload BucketName
buck ObjectKey
k Text
upId)
                  { multipartUpload = Just $ newCompletedMultipartUpload { parts } }

-- | Aborts all uploads in a given bucket - useful for cleaning up.
abortAllUploads :: MonadResource m => Env -> BucketName -> m ()
abortAllUploads :: forall (m :: * -> *). MonadResource m => Env -> BucketName -> m ()
abortAllUploads Env
env BucketName
buck = do
  ListMultipartUploadsResponse' {Maybe [MultipartUpload]
$sel:uploads:ListMultipartUploadsResponse' :: ListMultipartUploadsResponse -> Maybe [MultipartUpload]
uploads :: Maybe [MultipartUpload]
uploads} <- Env -> ListMultipartUploads -> m (AWSResponse ListMultipartUploads)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (ListMultipartUploads -> m (AWSResponse ListMultipartUploads))
-> ListMultipartUploads -> m (AWSResponse ListMultipartUploads)
forall a b. (a -> b) -> a -> b
$ BucketName -> ListMultipartUploads
newListMultipartUploads BucketName
buck
  ((MultipartUpload -> m ()) -> Maybe [MultipartUpload] -> m ())
-> Maybe [MultipartUpload] -> (MultipartUpload -> m ()) -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (([MultipartUpload] -> m ()) -> Maybe [MultipartUpload] -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ (([MultipartUpload] -> m ()) -> Maybe [MultipartUpload] -> m ())
-> ((MultipartUpload -> m ()) -> [MultipartUpload] -> m ())
-> (MultipartUpload -> m ())
-> Maybe [MultipartUpload]
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MultipartUpload -> m ()) -> [MultipartUpload] -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_) Maybe [MultipartUpload]
uploads ((MultipartUpload -> m ()) -> m ())
-> (MultipartUpload -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \MultipartUpload'{Maybe ObjectKey
key :: Maybe ObjectKey
$sel:key:MultipartUpload' :: MultipartUpload -> Maybe ObjectKey
key, Maybe Text
uploadId :: Maybe Text
$sel:uploadId:MultipartUpload' :: MultipartUpload -> Maybe Text
uploadId} -> do
    let mki :: Maybe (ObjectKey, Text)
mki = (,) (ObjectKey -> Text -> (ObjectKey, Text))
-> Maybe ObjectKey -> Maybe (Text -> (ObjectKey, Text))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe ObjectKey
key Maybe (Text -> (ObjectKey, Text))
-> Maybe Text -> Maybe (ObjectKey, Text)
forall a b. Maybe (a -> b) -> Maybe a -> Maybe b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Text
uploadId
    Maybe (ObjectKey, Text)
-> ((ObjectKey, Text) -> m AbortMultipartUploadResponse) -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ Maybe (ObjectKey, Text)
mki (((ObjectKey, Text) -> m AbortMultipartUploadResponse) -> m ())
-> ((ObjectKey, Text) -> m AbortMultipartUploadResponse) -> m ()
forall a b. (a -> b) -> a -> b
$ \(ObjectKey
key',Text
uid) -> Env -> AbortMultipartUpload -> m (AWSResponse AbortMultipartUpload)
forall (m :: * -> *) a.
(MonadResource m, AWSRequest a, Typeable a,
 Typeable (AWSResponse a)) =>
Env -> a -> m (AWSResponse a)
send Env
env (BucketName -> ObjectKey -> Text -> AbortMultipartUpload
newAbortMultipartUpload BucketName
buck ObjectKey
key' Text
uid)



-- http://stackoverflow.com/questions/32826539/chunksof-analog-for-bytestring
justWhen :: (a -> Bool) -> (a -> b) -> a -> Maybe b
justWhen :: forall a b. (a -> Bool) -> (a -> b) -> a -> Maybe b
justWhen a -> Bool
f a -> b
g a
a = if a -> Bool
f a
a then b -> Maybe b
forall a. a -> Maybe a
Just (a -> b
g a
a) else Maybe b
forall a. Maybe a
Nothing

nothingWhen :: (a -> Bool) -> (a -> b) -> a -> Maybe b
nothingWhen :: forall a b. (a -> Bool) -> (a -> b) -> a -> Maybe b
nothingWhen a -> Bool
f = (a -> Bool) -> (a -> b) -> a -> Maybe b
forall a b. (a -> Bool) -> (a -> b) -> a -> Maybe b
justWhen (Bool -> Bool
not (Bool -> Bool) -> (a -> Bool) -> a -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Bool
f)

chunksOf :: Int -> BS.ByteString -> [BS.ByteString]
chunksOf :: Int -> ByteString -> [ByteString]
chunksOf Int
x = (ByteString -> Maybe (ByteString, ByteString))
-> ByteString -> [ByteString]
forall b a. (b -> Maybe (a, b)) -> b -> [a]
unfoldr ((ByteString -> Bool)
-> (ByteString -> (ByteString, ByteString))
-> ByteString
-> Maybe (ByteString, ByteString)
forall a b. (a -> Bool) -> (a -> b) -> a -> Maybe b
nothingWhen ByteString -> Bool
BS.null (Int -> ByteString -> (ByteString, ByteString)
BS.splitAt Int
x))