module Pipes.Aws.S3
( Bucket(..)
, Object(..)
, fromS3
, fromS3'
, fromS3WithManager
, responseBody
, ChunkSize
, toS3
, toS3'
, toS3WithManager
) where
import Control.Monad (unless)
import Data.String (IsString)
import qualified Data.ByteString as BS
import Data.ByteString (ByteString)
import qualified Data.Text as T
import Pipes
import Pipes.Safe
import qualified Pipes.Prelude as PP
import qualified Pipes.ByteString as PBS
import Control.Monad.Trans.Resource
import Control.Monad.IO.Class
import Network.HTTP.Client
import Network.HTTP.Client.TLS
import qualified Aws
import qualified Aws.Core as Aws
import qualified Aws.S3 as S3
newtype Bucket = Bucket T.Text
deriving (Eq, Ord, Show, Read, IsString)
newtype Object = Object T.Text
deriving (Eq, Ord, Show, Read, IsString)
fromS3 :: MonadSafe m
=> Bucket -> Object
-> (Response (Producer BS.ByteString m ()) -> Producer BS.ByteString m a)
-> Producer BS.ByteString m a
fromS3 bucket object handler = do
cfg <- liftIO Aws.baseConfiguration
fromS3' cfg bucket object handler
fromS3' :: MonadSafe m
=> Aws.Configuration -> Bucket -> Object
-> (Response (Producer BS.ByteString m ()) -> Producer BS.ByteString m a)
-> Producer BS.ByteString m a
fromS3' cfg bucket object handler = do
mgr <- liftIO $ newManager tlsManagerSettings
fromS3WithManager mgr cfg bucket object handler
fromS3WithManager
:: MonadSafe m
=> Manager
-> Aws.Configuration -> Bucket -> Object
-> (Response (Producer BS.ByteString m ()) -> Producer BS.ByteString m a)
-> Producer BS.ByteString m a
fromS3WithManager mgr cfg (Bucket bucket) (Object object) handler = do
let s3cfg = Aws.defServiceConfig :: S3.S3Configuration Aws.NormalQuery
req <- liftIO $ buildRequest cfg s3cfg $ S3.getObject bucket object
Pipes.Safe.bracket (liftIO $ responseOpen req mgr) (liftIO . responseClose) $ \resp ->
handler $ resp { responseBody = from $ brRead $ responseBody resp }
withHTTP :: MonadSafe m
=> Request
-> Manager
-> (Response (Producer ByteString m ()) -> m a)
-> m a
withHTTP req mgr k =
Pipes.Safe.bracket (liftIO $ responseOpen req mgr) (liftIO . responseClose) k'
where
k' resp = do
let p = (from . brRead . responseBody) resp
k (resp { responseBody = p})
from :: MonadIO m => IO ByteString -> Producer ByteString m ()
from io = go
where
go = do
bs <- liftIO io
unless (BS.null bs) $ do
yield bs
go
buildRequest :: (MonadIO m, Aws.Transaction r a)
=> Aws.Configuration
-> Aws.ServiceConfiguration r Aws.NormalQuery
-> r
-> m Request
buildRequest cfg scfg req = do
Just cred <- Aws.loadCredentialsDefault
sigData <- liftIO $ Aws.signatureData Aws.Timestamp cred
let signed = Aws.signQuery req scfg sigData
liftIO $ Aws.queryToHttpRequest signed
type ChunkSize = Int
type ETag = T.Text
type PartN = Integer
toS3 :: forall m a. MonadIO m
=> ChunkSize -> Bucket -> Object
-> Producer BS.ByteString m a
-> m a
toS3 chunkSize bucket object consumer = do
cfg <- Aws.baseConfiguration
toS3' cfg chunkSize bucket object consumer
toS3' :: forall m a. MonadIO m
=> Aws.Configuration -> ChunkSize -> Bucket -> Object
-> Producer BS.ByteString m a
-> m a
toS3' cfg chunkSize bucket object consumer = do
mgr <- liftIO $ newManager tlsManagerSettings
toS3WithManager mgr cfg chunkSize bucket object consumer
toS3WithManager :: forall m a. MonadIO m
=> Manager -> Aws.Configuration -> ChunkSize -> Bucket -> Object
-> Producer BS.ByteString m a
-> m a
toS3WithManager mgr cfg chunkSize (Bucket bucket) (Object object) consumer = do
let s3cfg = Aws.defServiceConfig :: S3.S3Configuration Aws.NormalQuery
resp1 <- liftIO $ runResourceT
$ Aws.pureAws cfg s3cfg mgr
$ S3.postInitiateMultipartUpload bucket object
let uploadId = S3.imurUploadId resp1
let uploadPart :: (PartN, BS.ByteString) -> m (PartN, ETag)
uploadPart (partN, content) = do
resp <- liftIO $ runResourceT
$ Aws.pureAws cfg s3cfg mgr
$ S3.uploadPart bucket object partN uploadId (RequestBodyBS content)
return (partN, S3.uprETag resp)
(parts, res) <- PP.toListM' $ consumer
>-> enumFromP 1
>-> PP.mapM uploadPart
resp2 <- liftIO $ runResourceT
$ Aws.pureAws cfg s3cfg mgr
$ S3.postCompleteMultipartUpload bucket object uploadId parts
return res
enumFromP :: (Monad m, Enum i) => i -> Pipe a (i, a) m r
enumFromP = go
where
go i = await >>= \x -> yield (i, x) >> go (succ i)