--
-- MinIO Haskell SDK, (C) 2017-2019 MinIO, Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
--     http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

module Network.Minio.SelectAPI
  ( -- | The `selectObjectContent` allows querying CSV, JSON or Parquet
    -- format objects in AWS S3 and in MinIO using SQL Select
    -- statements. This allows significant reduction of data transfer
    -- from object storage for computation-intensive tasks, as relevant
    -- data is filtered close to the storage.
    selectObjectContent,
    SelectRequest,
    selectRequest,

    -- *** Input Serialization
    InputSerialization,
    defaultCsvInput,
    linesJsonInput,
    documentJsonInput,
    defaultParquetInput,
    setInputCSVProps,
    CompressionType (..),
    setInputCompressionType,

    -- *** CSV Format details

    -- | CSV format options such as delimiters and quote characters are
    -- specified using using the functions below. Options are combined
    -- monoidally.
    CSVProp,
    recordDelimiter,
    fieldDelimiter,
    quoteCharacter,
    quoteEscapeCharacter,
    commentCharacter,
    allowQuotedRecordDelimiter,
    FileHeaderInfo (..),
    fileHeaderInfo,
    QuoteFields (..),
    quoteFields,

    -- *** Output Serialization
    OutputSerialization,
    defaultCsvOutput,
    defaultJsonOutput,
    outputCSVFromProps,
    outputJSONFromRecordDelimiter,

    -- *** Progress messages
    setRequestProgressEnabled,

    -- *** Interpreting Select output

    -- | The conduit returned by `selectObjectContent` returns values of
    -- the `EventMessage` data type. This returns the query output
    -- messages formatted according to the chosen output serialization,
    -- interleaved with progress messages (if enabled by
    -- `setRequestProgressEnabled`), and at the end a statistics
    -- message.
    --
    -- If the application is interested in only the payload, then
    -- `getPayloadBytes` can be used. For example to simply print the
    -- payload to stdout:
    --
    -- > resultConduit <- selectObjectContent bucket object mySelectRequest
    -- > runConduit $ resultConduit .| getPayloadBytes .| stdoutC
    --
    -- Note that runConduit, the connect operator (.|) and stdoutC are
    -- all from the "conduit" package.
    getPayloadBytes,
    EventMessage (..),
    Progress (..),
    Stats,
  )
where

import Conduit ((.|))
import qualified Conduit as C
import qualified Data.Binary as Bin
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import Data.Digest.CRC32 (crc32, crc32Update)
import Lib.Prelude
import qualified Network.HTTP.Conduit as NC
import qualified Network.HTTP.Types as HT
import Network.Minio.API
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.Utils
import Network.Minio.XmlGenerator
import Network.Minio.XmlParser
import UnliftIO (MonadUnliftIO)

data EventStreamException
  = ESEPreludeCRCFailed
  | ESEMessageCRCFailed
  | ESEUnexpectedEndOfStream
  | ESEDecodeFail [Char]
  | ESEInvalidHeaderType
  | ESEInvalidHeaderValueType
  | ESEInvalidMessageType
  deriving stock (EventStreamException -> EventStreamException -> Bool
(EventStreamException -> EventStreamException -> Bool)
-> (EventStreamException -> EventStreamException -> Bool)
-> Eq EventStreamException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EventStreamException -> EventStreamException -> Bool
$c/= :: EventStreamException -> EventStreamException -> Bool
== :: EventStreamException -> EventStreamException -> Bool
$c== :: EventStreamException -> EventStreamException -> Bool
Eq, Int -> EventStreamException -> ShowS
[EventStreamException] -> ShowS
EventStreamException -> String
(Int -> EventStreamException -> ShowS)
-> (EventStreamException -> String)
-> ([EventStreamException] -> ShowS)
-> Show EventStreamException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [EventStreamException] -> ShowS
$cshowList :: [EventStreamException] -> ShowS
show :: EventStreamException -> String
$cshow :: EventStreamException -> String
showsPrec :: Int -> EventStreamException -> ShowS
$cshowsPrec :: Int -> EventStreamException -> ShowS
Show)

instance Exception EventStreamException

-- chunkSize in bytes is 32KiB
chunkSize :: Int
chunkSize :: Int
chunkSize = Int
32 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1024

parseBinary :: Bin.Binary a => ByteString -> IO a
parseBinary :: ByteString -> IO a
parseBinary ByteString
b = do
  case ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
forall a.
Binary a =>
ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Bin.decodeOrFail (ByteString
 -> Either
      (ByteString, ByteOffset, String) (ByteString, ByteOffset, a))
-> ByteString
-> Either
     (ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LB.fromStrict ByteString
b of
    Left (ByteString
_, ByteOffset
_, String
msg) -> EventStreamException -> IO a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (EventStreamException -> IO a) -> EventStreamException -> IO a
forall a b. (a -> b) -> a -> b
$ String -> EventStreamException
ESEDecodeFail String
msg
    Right (ByteString
_, ByteOffset
_, a
r) -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r

bytesToHeaderName :: Text -> IO MsgHeaderName
bytesToHeaderName :: Text -> IO MsgHeaderName
bytesToHeaderName Text
t = case Text
t of
  Text
":message-type" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
MessageType
  Text
":event-type" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
EventType
  Text
":content-type" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ContentType
  Text
":error-code" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ErrorCode
  Text
":error-message" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ErrorMessage
  Text
_ -> EventStreamException -> IO MsgHeaderName
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidHeaderType

parseHeaders ::
  MonadUnliftIO m =>
  Word32 ->
  C.ConduitM ByteString a m [MessageHeader]
parseHeaders :: Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders Word32
0 = [MessageHeader] -> ConduitM ByteString a m [MessageHeader]
forall (m :: * -> *) a. Monad m => a -> m a
return []
parseHeaders Word32
hdrLen = do
  ByteString
bs1 <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
1
  Word8
n :: Word8 <- IO Word8 -> ConduitT ByteString a m Word8
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word8 -> ConduitT ByteString a m Word8)
-> IO Word8 -> ConduitT ByteString a m Word8
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word8
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs1

  ByteString
headerKeyBytes <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes (Int -> ConduitM ByteString a m ByteString)
-> Int -> ConduitM ByteString a m ByteString
forall a b. (a -> b) -> a -> b
$ Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word8
n
  let headerKey :: Text
headerKey = ByteString -> Text
decodeUtf8Lenient ByteString
headerKeyBytes
  MsgHeaderName
headerName <- IO MsgHeaderName -> ConduitT ByteString a m MsgHeaderName
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO MsgHeaderName -> ConduitT ByteString a m MsgHeaderName)
-> IO MsgHeaderName -> ConduitT ByteString a m MsgHeaderName
forall a b. (a -> b) -> a -> b
$ Text -> IO MsgHeaderName
bytesToHeaderName Text
headerKey

  ByteString
bs2 <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
1
  Word8
headerValueType :: Word8 <- IO Word8 -> ConduitT ByteString a m Word8
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word8 -> ConduitT ByteString a m Word8)
-> IO Word8 -> ConduitT ByteString a m Word8
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word8
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs2
  Bool -> ConduitT ByteString a m () -> ConduitT ByteString a m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
headerValueType Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
7) (ConduitT ByteString a m () -> ConduitT ByteString a m ())
-> ConduitT ByteString a m () -> ConduitT ByteString a m ()
forall a b. (a -> b) -> a -> b
$ EventStreamException -> ConduitT ByteString a m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidHeaderValueType

  ByteString
bs3 <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
2
  Word16
vLen :: Word16 <- IO Word16 -> ConduitT ByteString a m Word16
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word16 -> ConduitT ByteString a m Word16)
-> IO Word16 -> ConduitT ByteString a m Word16
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word16
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs3
  ByteString
headerValueBytes <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes (Int -> ConduitM ByteString a m ByteString)
-> Int -> ConduitM ByteString a m ByteString
forall a b. (a -> b) -> a -> b
$ Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
vLen
  let headerValue :: Text
headerValue = ByteString -> Text
decodeUtf8Lenient ByteString
headerValueBytes
      m :: MessageHeader
m = (MsgHeaderName
headerName, Text
headerValue)
      k :: Word32
k = Word32
1 Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
+ Word8 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word8
n Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
+ Word32
1 Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
+ Word32
2 Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
+ Word16 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
vLen

  [MessageHeader]
ms <- Word32 -> ConduitM ByteString a m [MessageHeader]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders (Word32
hdrLen Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
- Word32
k)
  [MessageHeader] -> ConduitM ByteString a m [MessageHeader]
forall (m :: * -> *) a. Monad m => a -> m a
return (MessageHeader
m MessageHeader -> [MessageHeader] -> [MessageHeader]
forall a. a -> [a] -> [a]
: [MessageHeader]
ms)

-- readNBytes returns N bytes read from the string and throws an
-- exception if N bytes are not present on the stream.
readNBytes :: MonadUnliftIO m => Int -> C.ConduitM ByteString a m ByteString
readNBytes :: Int -> ConduitM ByteString a m ByteString
readNBytes Int
n = do
  ByteString
b <- ByteString -> ByteString
LB.toStrict (ByteString -> ByteString)
-> ConduitT ByteString a m ByteString
-> ConduitM ByteString a m ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Index ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
Index seq -> ConduitT seq seq m ()
C.takeCE Int
Index ByteString
n ConduitT ByteString ByteString m ()
-> ConduitT ByteString a m ByteString
-> ConduitT ByteString a m ByteString
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString a m ByteString
forall (m :: * -> *) lazy strict o.
(Monad m, LazySequence lazy strict) =>
ConduitT strict o m lazy
C.sinkLazy)
  if ByteString -> Int
B.length ByteString
b Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
n
    then EventStreamException -> ConduitM ByteString a m ByteString
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEUnexpectedEndOfStream
    else ByteString -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
b

crcCheck ::
  MonadUnliftIO m =>
  C.ConduitM ByteString ByteString m ()
crcCheck :: ConduitM ByteString ByteString m ()
crcCheck = do
  ByteString
b <- Int -> ConduitM ByteString ByteString m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
12
  Word32
n :: Word32 <- IO Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString ByteString m Word32)
-> IO Word32 -> ConduitT ByteString ByteString m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary (ByteString -> IO Word32) -> ByteString -> IO Word32
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.take Int
4 ByteString
b
  Word32
preludeCRC :: Word32 <- IO Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString ByteString m Word32)
-> IO Word32 -> ConduitT ByteString ByteString m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary (ByteString -> IO Word32) -> ByteString -> IO Word32
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.drop Int
8 ByteString
b
  Bool
-> ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Word32
forall a. CRC32 a => a -> Word32
crc32 (Int -> ByteString -> ByteString
B.take Int
8 ByteString
b) Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word32
preludeCRC) (ConduitM ByteString ByteString m ()
 -> ConduitM ByteString ByteString m ())
-> ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ()
forall a b. (a -> b) -> a -> b
$
    EventStreamException -> ConduitM ByteString ByteString m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEPreludeCRCFailed

  -- we do not yield the checksum
  ByteString -> ConduitM ByteString ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (ByteString -> ConduitM ByteString ByteString m ())
-> ByteString -> ConduitM ByteString ByteString m ()
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.take Int
8 ByteString
b

  -- 12 bytes have been read off the current message. Now read the
  -- next (n-12)-4 bytes and accumulate the checksum, and yield it.
  let startCrc :: Word32
startCrc = ByteString -> Word32
forall a. CRC32 a => a -> Word32
crc32 ByteString
b
  Word32
finalCrc <- Int -> Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *).
MonadUnliftIO m =>
Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
16) Word32
startCrc

  ByteString
bs <- Int -> ConduitM ByteString ByteString m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
  Word32
expectedCrc :: Word32 <- IO Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString ByteString m Word32)
-> IO Word32 -> ConduitT ByteString ByteString m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs

  Bool
-> ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32
finalCrc Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word32
expectedCrc) (ConduitM ByteString ByteString m ()
 -> ConduitM ByteString ByteString m ())
-> ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ()
forall a b. (a -> b) -> a -> b
$
    EventStreamException -> ConduitM ByteString ByteString m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEMessageCRCFailed

  -- we unconditionally recurse - downstream figures out when to
  -- quit reading the stream
  ConduitM ByteString ByteString m ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck
  where
    accumulateYield :: Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield Int
n Word32
checkSum = do
      let toRead :: Int
toRead = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n Int
chunkSize
      ByteString
b <- Int -> ConduitM ByteString ByteString m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
toRead
      let c' :: Word32
c' = Word32 -> ByteString -> Word32
forall a. CRC32 a => Word32 -> a -> Word32
crc32Update Word32
checkSum ByteString
b
          n' :: Int
n' = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- ByteString -> Int
B.length ByteString
b
      ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
b
      if Int
n' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
        then Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield Int
n' Word32
c'
        else Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *) a. Monad m => a -> m a
return Word32
c'

handleMessage :: MonadUnliftIO m => C.ConduitT ByteString EventMessage m ()
handleMessage :: ConduitT ByteString EventMessage m ()
handleMessage = do
  ByteString
b1 <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
  Word32
msgLen :: Word32 <- IO Word32 -> ConduitT ByteString EventMessage m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString EventMessage m Word32)
-> IO Word32 -> ConduitT ByteString EventMessage m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b1

  ByteString
b2 <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
  Word32
hdrLen :: Word32 <- IO Word32 -> ConduitT ByteString EventMessage m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString EventMessage m Word32)
-> IO Word32 -> ConduitT ByteString EventMessage m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b2

  [MessageHeader]
hs <- Word32 -> ConduitM ByteString EventMessage m [MessageHeader]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders Word32
hdrLen

  let payloadLen :: Word32
payloadLen = Word32
msgLen Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
- Word32
hdrLen Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
- Word32
16
      getHdrVal :: a -> t (a, b) -> Maybe b
getHdrVal a
h = ((a, b) -> b) -> Maybe (a, b) -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, b) -> b
forall a b. (a, b) -> b
snd (Maybe (a, b) -> Maybe b)
-> (t (a, b) -> Maybe (a, b)) -> t (a, b) -> Maybe b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((a, b) -> Bool) -> t (a, b) -> Maybe (a, b)
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find ((a
h a -> a -> Bool
forall a. Eq a => a -> a -> Bool
==) (a -> Bool) -> ((a, b) -> a) -> (a, b) -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a, b) -> a
forall a b. (a, b) -> a
fst)
      eventHdrValue :: Maybe Text
eventHdrValue = MsgHeaderName -> [MessageHeader] -> Maybe Text
forall (t :: * -> *) a b.
(Foldable t, Eq a) =>
a -> t (a, b) -> Maybe b
getHdrVal MsgHeaderName
EventType [MessageHeader]
hs
      msgHdrValue :: Maybe Text
msgHdrValue = MsgHeaderName -> [MessageHeader] -> Maybe Text
forall (t :: * -> *) a b.
(Foldable t, Eq a) =>
a -> t (a, b) -> Maybe b
getHdrVal MsgHeaderName
MessageType [MessageHeader]
hs
      errCode :: Maybe Text
errCode = MsgHeaderName -> [MessageHeader] -> Maybe Text
forall (t :: * -> *) a b.
(Foldable t, Eq a) =>
a -> t (a, b) -> Maybe b
getHdrVal MsgHeaderName
ErrorCode [MessageHeader]
hs
      errMsg :: Maybe Text
errMsg = MsgHeaderName -> [MessageHeader] -> Maybe Text
forall (t :: * -> *) a b.
(Foldable t, Eq a) =>
a -> t (a, b) -> Maybe b
getHdrVal MsgHeaderName
ErrorMessage [MessageHeader]
hs

  case Maybe Text
msgHdrValue of
    Just Text
"event" -> do
      case Maybe Text
eventHdrValue of
        Just Text
"Records" -> Int -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *).
MonadUnliftIO m =>
Int -> ConduitT ByteString EventMessage m ()
passThrough (Int -> ConduitT ByteString EventMessage m ())
-> Int -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
        Just Text
"Cont" -> () -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just Text
"Progress" -> do
          ByteString
bs <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes (Int -> ConduitM ByteString EventMessage m ByteString)
-> Int -> ConduitM ByteString EventMessage m ByteString
forall a b. (a -> b) -> a -> b
$ Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
          Progress
progress <- ByteString -> ConduitT ByteString EventMessage m Progress
forall (m :: * -> *). MonadIO m => ByteString -> m Progress
parseSelectProgress ByteString
bs
          EventMessage -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (EventMessage -> ConduitT ByteString EventMessage m ())
-> EventMessage -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ Progress -> EventMessage
ProgressEventMessage Progress
progress
        Just Text
"Stats" -> do
          ByteString
bs <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes (Int -> ConduitM ByteString EventMessage m ByteString)
-> Int -> ConduitM ByteString EventMessage m ByteString
forall a b. (a -> b) -> a -> b
$ Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
          Progress
stats <- ByteString -> ConduitT ByteString EventMessage m Progress
forall (m :: * -> *). MonadIO m => ByteString -> m Progress
parseSelectProgress ByteString
bs
          EventMessage -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (EventMessage -> ConduitT ByteString EventMessage m ())
-> EventMessage -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ Progress -> EventMessage
StatsEventMessage Progress
stats
        Just Text
"End" -> () -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Maybe Text
_ -> EventStreamException -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType
      Bool
-> ConduitT ByteString EventMessage m ()
-> ConduitT ByteString EventMessage m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Text
eventHdrValue Maybe Text -> Maybe Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"End") ConduitT ByteString EventMessage m ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage
    Just Text
"error" -> do
      let reqMsgMay :: Maybe EventMessage
reqMsgMay = Text -> Text -> EventMessage
RequestLevelErrorMessage (Text -> Text -> EventMessage)
-> Maybe Text -> Maybe (Text -> EventMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
errCode Maybe (Text -> EventMessage) -> Maybe Text -> Maybe EventMessage
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Text
errMsg
      ConduitT ByteString EventMessage m ()
-> (EventMessage -> ConduitT ByteString EventMessage m ())
-> Maybe EventMessage
-> ConduitT ByteString EventMessage m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (EventStreamException -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType) EventMessage -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Maybe EventMessage
reqMsgMay
    Maybe Text
_ -> EventStreamException -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType
  where
    passThrough :: Int -> ConduitT ByteString EventMessage m ()
passThrough Int
0 = () -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    passThrough Int
n = do
      let c :: Int
c = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n Int
chunkSize
      ByteString
b <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
c
      EventMessage -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (EventMessage -> ConduitT ByteString EventMessage m ())
-> EventMessage -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ ByteString -> EventMessage
RecordPayloadEventMessage ByteString
b
      Int -> ConduitT ByteString EventMessage m ()
passThrough (Int -> ConduitT ByteString EventMessage m ())
-> Int -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- ByteString -> Int
B.length ByteString
b

selectProtoConduit ::
  MonadUnliftIO m =>
  C.ConduitT ByteString EventMessage m ()
selectProtoConduit :: ConduitT ByteString EventMessage m ()
selectProtoConduit = ConduitM ByteString ByteString m ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck ConduitM ByteString ByteString m ()
-> ConduitT ByteString EventMessage m ()
-> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString EventMessage m ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage

-- | selectObjectContent calls the SelectRequest on the given
-- object. It returns a Conduit of event messages that can be consumed
-- by the client.
selectObjectContent ::
  Bucket ->
  Object ->
  SelectRequest ->
  Minio (C.ConduitT () EventMessage Minio ())
selectObjectContent :: Text
-> Text
-> SelectRequest
-> Minio (ConduitT () EventMessage Minio ())
selectObjectContent Text
b Text
o SelectRequest
r = do
  let reqInfo :: S3ReqInfo
reqInfo =
        S3ReqInfo
defaultS3ReqInfo
          { riMethod :: ByteString
riMethod = ByteString
HT.methodPost,
            riBucket :: Maybe Text
riBucket = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
b,
            riObject :: Maybe Text
riObject = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
o,
            riPayload :: Payload
riPayload = ByteString -> Payload
PayloadBS (ByteString -> Payload) -> ByteString -> Payload
forall a b. (a -> b) -> a -> b
$ SelectRequest -> ByteString
mkSelectRequest SelectRequest
r,
            riNeedsLocation :: Bool
riNeedsLocation = Bool
False,
            riQueryParams :: Query
riQueryParams = [(ByteString
"select", Maybe ByteString
forall a. Maybe a
Nothing), (ByteString
"select-type", ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
"2")]
          }
  -- print $ mkSelectRequest r
  Response (ConduitM () ByteString Minio ())
resp <- S3ReqInfo -> Minio (Response (ConduitM () ByteString Minio ()))
mkStreamRequest S3ReqInfo
reqInfo
  ConduitT () EventMessage Minio ()
-> Minio (ConduitT () EventMessage Minio ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () EventMessage Minio ()
 -> Minio (ConduitT () EventMessage Minio ()))
-> ConduitT () EventMessage Minio ()
-> Minio (ConduitT () EventMessage Minio ())
forall a b. (a -> b) -> a -> b
$ Response (ConduitM () ByteString Minio ())
-> ConduitM () ByteString Minio ()
forall body. Response body -> body
NC.responseBody Response (ConduitM () ByteString Minio ())
resp ConduitM () ByteString Minio ()
-> ConduitM ByteString EventMessage Minio ()
-> ConduitT () EventMessage Minio ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ByteString EventMessage Minio ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
selectProtoConduit

-- | A helper conduit that returns only the record payload bytes.
getPayloadBytes :: MonadIO m => C.ConduitT EventMessage ByteString m ()
getPayloadBytes :: ConduitT EventMessage ByteString m ()
getPayloadBytes = do
  Maybe EventMessage
evM <- ConduitT EventMessage ByteString m (Maybe EventMessage)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
  case Maybe EventMessage
evM of
    Just EventMessage
v -> do
      case EventMessage
v of
        RecordPayloadEventMessage ByteString
b -> ByteString -> ConduitT EventMessage ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
b
        RequestLevelErrorMessage Text
c Text
m -> IO () -> ConduitT EventMessage ByteString m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT EventMessage ByteString m ())
-> IO () -> ConduitT EventMessage ByteString m ()
forall a b. (a -> b) -> a -> b
$ ServiceErr -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (ServiceErr -> IO ()) -> ServiceErr -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> ServiceErr
SelectErr Text
c Text
m
        EventMessage
_ -> () -> ConduitT EventMessage ByteString m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      ConduitT EventMessage ByteString m ()
forall (m :: * -> *).
MonadIO m =>
ConduitT EventMessage ByteString m ()
getPayloadBytes
    Maybe EventMessage
Nothing -> () -> ConduitT EventMessage ByteString m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()