module Network.Minio.PutObject
(
putObjectInternal
, ObjectData(..)
, selectPartSizes
) where
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Conduit as C
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL
import qualified Data.List as List
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.S3API
import Network.Minio.Utils
data ObjectData m
= ODFile FilePath (Maybe Int64)
| ODStream (C.ConduitM () ByteString m ()) (Maybe Int64)
putObjectInternal :: Bucket -> Object -> PutObjectOptions
-> ObjectData Minio -> Minio ETag
putObjectInternal b o opts (ODStream src sizeMay) = do
case sizeMay of
Nothing -> sequentialMultipartUpload b o opts (Just maxObjectSize) src
Just size ->
if | size <= 64 * oneMiB -> do
bs <- C.runConduit $ src C..| CB.sinkLbs
putObjectSingle' b o (pooToHeaders opts) $ LBS.toStrict bs
| size > maxObjectSize -> throwM $ MErrVPutSizeExceeded size
| otherwise -> sequentialMultipartUpload b o opts (Just size) src
putObjectInternal b o opts (ODFile fp sizeMay) = do
hResE <- withNewHandle fp $ \h ->
liftM2 (,) (isHandleSeekable h) (getFileSize h)
(isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return
hResE
let finalSizeMay = listToMaybe $ catMaybes [sizeMay, handleSizeMay]
case finalSizeMay of
Nothing -> sequentialMultipartUpload b o opts (Just maxObjectSize) $
CB.sourceFile fp
Just size ->
if | size <= 64 * oneMiB -> either throwM return =<<
withNewHandle fp (\h -> putObjectSingle b o (pooToHeaders opts) h 0 size)
| size > maxObjectSize -> throwM $ MErrVPutSizeExceeded size
| isSeekable -> parallelMultipartUpload b o opts fp size
| otherwise -> sequentialMultipartUpload b o opts (Just size) $
CB.sourceFile fp
parallelMultipartUpload :: Bucket -> Object -> PutObjectOptions
-> FilePath -> Int64 -> Minio ETag
parallelMultipartUpload b o opts filePath size = do
uploadId <- newMultipartUpload b o (pooToHeaders opts)
let partSizeInfo = selectPartSizes size
let threads = fromMaybe 10 $ pooNumThreads opts
uploadedPartsE <- limitedMapConcurrently (fromIntegral threads)
(uploadPart uploadId) partSizeInfo
mapM_ throwM $ lefts uploadedPartsE
completeMultipartUpload b o uploadId $ rights uploadedPartsE
where
uploadPart uploadId (partNum, offset, sz) =
withNewHandle filePath $ \h -> do
let payload = PayloadH h offset sz
putObjectPart b o uploadId partNum [] payload
sequentialMultipartUpload :: Bucket -> Object -> PutObjectOptions
-> Maybe Int64
-> C.ConduitM () ByteString Minio ()
-> Minio ETag
sequentialMultipartUpload b o opts sizeMay src = do
uploadId <- newMultipartUpload b o (pooToHeaders opts)
let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay
(pnums, _, sizes) = List.unzip3 partSizes
uploadedParts <- C.runConduit
$ src
C..| chunkBSConduit sizes
C..| CL.map PayloadBS
C..| uploadPart' uploadId pnums
C..| CC.sinkList
completeMultipartUpload b o uploadId uploadedParts
where
uploadPart' _ [] = return ()
uploadPart' uid (pn:pns) = do
payloadMay <- C.await
case payloadMay of
Nothing -> return ()
Just payload -> do pinfo <- lift $ putObjectPart b o uid pn [] payload
C.yield pinfo
uploadPart' uid pns