--
-- MinIO Haskell SDK, (C) 2017-2019 MinIO, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

module Network.Minio.PutObject
  (
    putObjectInternal
  , ObjectData(..)
  , selectPartSizes
  ) where


import           Conduit                  (takeC)
import qualified Conduit                  as C
import qualified Data.ByteString.Lazy     as LBS
import qualified Data.Conduit.Binary      as CB
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.List        as CL
import qualified Data.List                as List


import           Lib.Prelude

import           Network.Minio.Data
import           Network.Minio.Errors
import           Network.Minio.S3API
import           Network.Minio.Utils


-- | A data-type to represent the source data for an object. A
-- file-path or a producer-conduit may be provided.
--
-- For files, a size may be provided - this is useful in cases when
-- the file size cannot be automatically determined or if only some
-- prefix of the file is desired.
--
-- For streams also, a size may be provided. This is useful to limit
-- the input - if it is not provided, upload will continue until the
-- stream ends or the object reaches `maxObjectSize` size.
data ObjectData m
  = ODFile FilePath (Maybe Int64) -- ^ Takes filepath and optional
                                  -- size.
  | ODStream (C.ConduitM () ByteString m ()) (Maybe Int64) -- ^ Pass
                                                           -- size
                                                           -- (bytes)
                                                           -- if
                                                           -- known.

-- | Put an object from ObjectData. This high-level API handles
-- objects of all sizes, and even if the object size is unknown.
putObjectInternal :: Bucket -> Object -> PutObjectOptions
                  -> ObjectData Minio -> Minio ETag
putObjectInternal b o opts (ODStream src sizeMay) = do
  case sizeMay of
    -- unable to get size, so assume non-seekable file
    Nothing -> sequentialMultipartUpload b o opts Nothing src

    -- got file size, so check for single/multipart upload
    Just size ->
      if | size <= 64 * oneMiB -> do
             bs <- C.runConduit $ src C..| takeC (fromIntegral size) C..| CB.sinkLbs
             putObjectSingle' b o (pooToHeaders opts) $ LBS.toStrict bs
         | size > maxObjectSize -> throwIO $ MErrVPutSizeExceeded size
         | otherwise -> sequentialMultipartUpload b o opts (Just size) src

putObjectInternal b o opts (ODFile fp sizeMay) = do
  hResE <- withNewHandle fp $ \h ->
    liftM2 (,) (isHandleSeekable h) (getFileSize h)

  (isSeekable, handleSizeMay) <- either (const $ return (False, Nothing)) return
                                 hResE

  -- prefer given size to queried size.
  let finalSizeMay = listToMaybe $ catMaybes [sizeMay, handleSizeMay]

  case finalSizeMay of
    -- unable to get size, so assume non-seekable file
    Nothing -> sequentialMultipartUpload b o opts Nothing $ CB.sourceFile fp

    -- got file size, so check for single/multipart upload
    Just size ->
      if | size <= 64 * oneMiB -> either throwIO return =<<
           withNewHandle fp (\h -> putObjectSingle b o (pooToHeaders opts) h 0 size)
         | size > maxObjectSize -> throwIO $ MErrVPutSizeExceeded size
         | isSeekable -> parallelMultipartUpload b o opts fp size
         | otherwise -> sequentialMultipartUpload b o opts (Just size) $
                        CB.sourceFile fp

parallelMultipartUpload :: Bucket -> Object -> PutObjectOptions
                        -> FilePath -> Int64 -> Minio ETag
parallelMultipartUpload b o opts filePath size = do
  -- get a new upload id.
  uploadId <- newMultipartUpload b o (pooToHeaders opts)

  let partSizeInfo = selectPartSizes size

  let threads = fromMaybe 10 $ pooNumThreads opts

  -- perform upload with 'threads' threads
  uploadedPartsE <- limitedMapConcurrently (fromIntegral threads)
                    (uploadPart uploadId) partSizeInfo

  -- if there were any errors, rethrow exception.
  mapM_ throwIO $ lefts uploadedPartsE

  -- if we get here, all parts were successfully uploaded.
  completeMultipartUpload b o uploadId $ rights uploadedPartsE

  where
    uploadPart uploadId (partNum, offset, sz) =
      withNewHandle filePath $ \h -> do
        let payload = PayloadH h offset sz
        putObjectPart b o uploadId partNum [] payload

-- | Upload multipart object from conduit source sequentially
sequentialMultipartUpload :: Bucket -> Object -> PutObjectOptions
                          -> Maybe Int64
                          -> C.ConduitM () ByteString Minio ()
                          -> Minio ETag
sequentialMultipartUpload b o opts sizeMay src = do
  -- get a new upload id.
  uploadId <- newMultipartUpload b o (pooToHeaders opts)

  -- upload parts in loop
  let partSizes = selectPartSizes $ maybe maxObjectSize identity sizeMay
      (pnums, _, sizes) = List.unzip3 partSizes
  uploadedParts <- C.runConduit
                 $ src
              C..| chunkBSConduit (map fromIntegral sizes)
              C..| CL.map PayloadBS
              C..| uploadPart' uploadId pnums
              C..| CC.sinkList

  -- complete multipart upload
  completeMultipartUpload b o uploadId uploadedParts

  where
    uploadPart' _ [] = return ()
    uploadPart' uid (pn:pns) = do
      payloadMay <- C.await
      case payloadMay of
        Nothing -> return ()
        Just payload -> do pinfo <- lift $ putObjectPart b o uid pn [] payload
                           C.yield pinfo
                           uploadPart' uid pns