module Network.Minio.PutObject
(
putObject
, ObjectData(..)
, selectPartSizes
) where
import qualified Data.ByteString as B
import qualified Data.Conduit as C
import Data.Conduit.Binary (sourceHandleRange)
import qualified Data.Conduit.Binary as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List as CL
import qualified Data.List as List
import qualified Data.Map.Strict as Map
import Lib.Prelude
import Network.Minio.Data
import Network.Minio.Data.Crypto
import Network.Minio.ListOps
import Network.Minio.S3API
import Network.Minio.Utils
maxObjectSize :: Int64
maxObjectSize = 5 * 1024 * 1024 * oneMiB
oneMiB :: Int64
oneMiB = 1024 * 1024
maxMultipartParts :: Int64
maxMultipartParts = 10000
data ObjectData m = ODFile FilePath (Maybe Int64)
| ODStream (C.Producer m ByteString) (Maybe Int64)
putObject :: Bucket -> Object -> ObjectData Minio -> Minio ETag
putObject b o (ODStream src sizeMay) = sequentialMultipartUpload b o sizeMay src
putObject b o (ODFile fp sizeMay) = do
hResE <- withNewHandle fp $ \h ->
liftM2 (,) (isHandleSeekable h) (getFileSize h)
(isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return
hResE
let finalSizeMay = listToMaybe $ catMaybes [sizeMay, handleSizeMay]
case finalSizeMay of
Nothing -> sequentialMultipartUpload b o (Just maxObjectSize) $
CB.sourceFile fp
Just size ->
if | size <= 64 * oneMiB -> either throwM return =<<
withNewHandle fp (\h -> putObjectSingle b o [] h 0 size)
| size > maxObjectSize -> throwM $ ValidationError $
MErrVPutSizeExceeded size
| isSeekable -> parallelMultipartUpload b o fp size
| otherwise -> sequentialMultipartUpload b o (Just size) $
CB.sourceFile fp
selectPartSizes :: Int64 -> [(PartNumber, Int64, Int64)]
selectPartSizes size = List.zip3 [1..] partOffsets partSizes
where
ceil :: Double -> Int64
ceil = ceiling
partSize = max (64 * oneMiB) (ceil $ fromIntegral size /
fromIntegral maxMultipartParts)
(numParts, lastPartSize) = size `divMod` partSize
lastPart = filter (> 0) [lastPartSize]
partSizes = replicate (fromIntegral numParts) partSize ++ lastPart
partOffsets = List.scanl' (+) 0 partSizes
checkUploadNeeded :: Payload -> PartNumber
-> Map.Map PartNumber ListPartInfo
-> Minio (Maybe PartInfo)
checkUploadNeeded payload n pmap = do
(md5hash, pSize) <- case payload of
PayloadBS bs -> return (hashMD5 bs, fromIntegral $ B.length bs)
PayloadH h off size -> liftM (, size) $
hashMD5FromSource $ sourceHandleRange h (Just $ fromIntegral off)
(Just $ fromIntegral size)
case Map.lookup n pmap of
Nothing -> return Nothing
Just (ListPartInfo _ etag size _) -> return $
bool Nothing (Just (PartInfo n etag)) $
md5hash == encodeUtf8 etag && size == pSize
parallelMultipartUpload :: Bucket -> Object -> FilePath -> Int64
-> Minio ETag
parallelMultipartUpload b o filePath size = do
(uidMay, pmap) <- getExistingUpload b o
uploadId <- maybe (newMultipartUpload b o []) return uidMay
let partSizeInfo = selectPartSizes size
uploadedPartsE <- limitedMapConcurrently 10
(uploadPart pmap uploadId) partSizeInfo
mapM_ throwM $ lefts uploadedPartsE
completeMultipartUpload b o uploadId $ rights uploadedPartsE
where
uploadPart pmap uploadId (partNum, offset, sz) =
withNewHandle filePath $ \h -> do
let payload = PayloadH h offset sz
pInfoMay <- checkUploadNeeded payload partNum pmap
maybe
(putObjectPart b o uploadId partNum [] payload)
return pInfoMay
sequentialMultipartUpload :: Bucket -> Object -> Maybe Int64
-> C.Producer Minio ByteString -> Minio ETag
sequentialMultipartUpload b o sizeMay src = do
(uidMay, pmap) <- getExistingUpload b o
uploadId <- maybe (newMultipartUpload b o []) return uidMay
let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay
(pnums, _, sizes) = List.unzip3 partSizes
uploadedParts <- src
C..| chunkBSConduit sizes
C..| CL.map PayloadBS
C..| checkAndUpload uploadId pmap pnums
C.$$ CC.sinkList
completeMultipartUpload b o uploadId uploadedParts
where
checkAndUpload _ _ [] = return ()
checkAndUpload uid pmap (pn:pns) = do
payloadMay <- C.await
case payloadMay of
Nothing -> return ()
Just payload -> do partMay <- lift $ checkUploadNeeded payload pn pmap
pinfo <- maybe
(lift $ putObjectPart b o uid pn [] payload)
return partMay
C.yield pinfo
checkAndUpload uid pmap pns
getExistingUpload :: Bucket -> Object
-> Minio (Maybe UploadId, Map.Map PartNumber ListPartInfo)
getExistingUpload b o = do
uidMay <- (fmap . fmap) uiUploadId $
listIncompleteUploads b (Just o) False C.$$ CC.head
parts <- maybe (return [])
(\uid -> listIncompleteParts b o uid C.$$ CC.sinkList) uidMay
return (uidMay, Map.fromList $ map (\p -> (piNumber p, p)) parts)