module Network.Minio.SelectAPI
(
selectObjectContent,
SelectRequest,
selectRequest,
InputSerialization,
defaultCsvInput,
linesJsonInput,
documentJsonInput,
defaultParquetInput,
setInputCSVProps,
CompressionType (..),
setInputCompressionType,
CSVProp,
recordDelimiter,
fieldDelimiter,
quoteCharacter,
quoteEscapeCharacter,
commentCharacter,
allowQuotedRecordDelimiter,
FileHeaderInfo (..),
fileHeaderInfo,
QuoteFields (..),
quoteFields,
OutputSerialization,
defaultCsvOutput,
defaultJsonOutput,
outputCSVFromProps,
outputJSONFromRecordDelimiter,
setRequestProgressEnabled,
getPayloadBytes,
EventMessage (..),
Progress (..),
Stats,
)
where
import Conduit ((.|))
import qualified Conduit as C
import qualified Data.Binary as Bin
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import Data.Digest.CRC32 (crc32, crc32Update)
import Lib.Prelude
import qualified Network.HTTP.Conduit as NC
import qualified Network.HTTP.Types as HT
import Network.Minio.API
import Network.Minio.Data
import Network.Minio.Errors
import Network.Minio.Utils
import Network.Minio.XmlGenerator
import Network.Minio.XmlParser
import UnliftIO (MonadUnliftIO)
data EventStreamException
= ESEPreludeCRCFailed
| ESEMessageCRCFailed
| ESEUnexpectedEndOfStream
| ESEDecodeFail [Char]
|
|
| ESEInvalidMessageType
deriving stock (EventStreamException -> EventStreamException -> Bool
(EventStreamException -> EventStreamException -> Bool)
-> (EventStreamException -> EventStreamException -> Bool)
-> Eq EventStreamException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: EventStreamException -> EventStreamException -> Bool
$c/= :: EventStreamException -> EventStreamException -> Bool
== :: EventStreamException -> EventStreamException -> Bool
$c== :: EventStreamException -> EventStreamException -> Bool
Eq, Int -> EventStreamException -> ShowS
[EventStreamException] -> ShowS
EventStreamException -> String
(Int -> EventStreamException -> ShowS)
-> (EventStreamException -> String)
-> ([EventStreamException] -> ShowS)
-> Show EventStreamException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [EventStreamException] -> ShowS
$cshowList :: [EventStreamException] -> ShowS
show :: EventStreamException -> String
$cshow :: EventStreamException -> String
showsPrec :: Int -> EventStreamException -> ShowS
$cshowsPrec :: Int -> EventStreamException -> ShowS
Show)
instance Exception EventStreamException
chunkSize :: Int
chunkSize :: Int
chunkSize = Int
32 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1024
parseBinary :: Bin.Binary a => ByteString -> IO a
parseBinary :: ByteString -> IO a
parseBinary ByteString
b = do
case ByteString
-> Either
(ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
forall a.
Binary a =>
ByteString
-> Either
(ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
Bin.decodeOrFail (ByteString
-> Either
(ByteString, ByteOffset, String) (ByteString, ByteOffset, a))
-> ByteString
-> Either
(ByteString, ByteOffset, String) (ByteString, ByteOffset, a)
forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
LB.fromStrict ByteString
b of
Left (ByteString
_, ByteOffset
_, String
msg) -> EventStreamException -> IO a
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (EventStreamException -> IO a) -> EventStreamException -> IO a
forall a b. (a -> b) -> a -> b
$ String -> EventStreamException
ESEDecodeFail String
msg
Right (ByteString
_, ByteOffset
_, a
r) -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
bytesToHeaderName :: Text -> IO MsgHeaderName
Text
t = case Text
t of
Text
":message-type" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
MessageType
Text
":event-type" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
EventType
Text
":content-type" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ContentType
Text
":error-code" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ErrorCode
Text
":error-message" -> MsgHeaderName -> IO MsgHeaderName
forall (m :: * -> *) a. Monad m => a -> m a
return MsgHeaderName
ErrorMessage
Text
_ -> EventStreamException -> IO MsgHeaderName
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidHeaderType
parseHeaders ::
MonadUnliftIO m =>
Word32 ->
C.ConduitM ByteString a m [MessageHeader]
Word32
0 = [MessageHeader] -> ConduitM ByteString a m [MessageHeader]
forall (m :: * -> *) a. Monad m => a -> m a
return []
parseHeaders Word32
hdrLen = do
ByteString
bs1 <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
1
Word8
n :: Word8 <- IO Word8 -> ConduitT ByteString a m Word8
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word8 -> ConduitT ByteString a m Word8)
-> IO Word8 -> ConduitT ByteString a m Word8
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word8
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs1
ByteString
headerKeyBytes <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes (Int -> ConduitM ByteString a m ByteString)
-> Int -> ConduitM ByteString a m ByteString
forall a b. (a -> b) -> a -> b
$ Word8 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word8
n
let headerKey :: Text
headerKey = ByteString -> Text
decodeUtf8Lenient ByteString
headerKeyBytes
MsgHeaderName
headerName <- IO MsgHeaderName -> ConduitT ByteString a m MsgHeaderName
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO MsgHeaderName -> ConduitT ByteString a m MsgHeaderName)
-> IO MsgHeaderName -> ConduitT ByteString a m MsgHeaderName
forall a b. (a -> b) -> a -> b
$ Text -> IO MsgHeaderName
bytesToHeaderName Text
headerKey
ByteString
bs2 <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
1
Word8
headerValueType :: Word8 <- IO Word8 -> ConduitT ByteString a m Word8
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word8 -> ConduitT ByteString a m Word8)
-> IO Word8 -> ConduitT ByteString a m Word8
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word8
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs2
Bool -> ConduitT ByteString a m () -> ConduitT ByteString a m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word8
headerValueType Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word8
7) (ConduitT ByteString a m () -> ConduitT ByteString a m ())
-> ConduitT ByteString a m () -> ConduitT ByteString a m ()
forall a b. (a -> b) -> a -> b
$ EventStreamException -> ConduitT ByteString a m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidHeaderValueType
ByteString
bs3 <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
2
Word16
vLen :: Word16 <- IO Word16 -> ConduitT ByteString a m Word16
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word16 -> ConduitT ByteString a m Word16)
-> IO Word16 -> ConduitT ByteString a m Word16
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word16
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs3
ByteString
headerValueBytes <- Int -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes (Int -> ConduitM ByteString a m ByteString)
-> Int -> ConduitM ByteString a m ByteString
forall a b. (a -> b) -> a -> b
$ Word16 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
vLen
let headerValue :: Text
headerValue = ByteString -> Text
decodeUtf8Lenient ByteString
headerValueBytes
m :: MessageHeader
m = (MsgHeaderName
headerName, Text
headerValue)
k :: Word32
k = Word32
1 Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
+ Word8 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word8
n Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
+ Word32
1 Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
+ Word32
2 Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
+ Word16 -> Word32
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word16
vLen
[MessageHeader]
ms <- Word32 -> ConduitM ByteString a m [MessageHeader]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders (Word32
hdrLen Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
- Word32
k)
[MessageHeader] -> ConduitM ByteString a m [MessageHeader]
forall (m :: * -> *) a. Monad m => a -> m a
return (MessageHeader
m MessageHeader -> [MessageHeader] -> [MessageHeader]
forall a. a -> [a] -> [a]
: [MessageHeader]
ms)
readNBytes :: MonadUnliftIO m => Int -> C.ConduitM ByteString a m ByteString
readNBytes :: Int -> ConduitM ByteString a m ByteString
readNBytes Int
n = do
ByteString
b <- ByteString -> ByteString
LB.toStrict (ByteString -> ByteString)
-> ConduitT ByteString a m ByteString
-> ConduitM ByteString a m ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Index ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
Index seq -> ConduitT seq seq m ()
C.takeCE Int
Index ByteString
n ConduitT ByteString ByteString m ()
-> ConduitT ByteString a m ByteString
-> ConduitT ByteString a m ByteString
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString a m ByteString
forall (m :: * -> *) lazy strict o.
(Monad m, LazySequence lazy strict) =>
ConduitT strict o m lazy
C.sinkLazy)
if ByteString -> Int
B.length ByteString
b Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
/= Int
n
then EventStreamException -> ConduitM ByteString a m ByteString
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEUnexpectedEndOfStream
else ByteString -> ConduitM ByteString a m ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
b
crcCheck ::
MonadUnliftIO m =>
C.ConduitM ByteString ByteString m ()
crcCheck :: ConduitM ByteString ByteString m ()
crcCheck = do
ByteString
b <- Int -> ConduitM ByteString ByteString m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
12
Word32
n :: Word32 <- IO Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString ByteString m Word32)
-> IO Word32 -> ConduitT ByteString ByteString m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary (ByteString -> IO Word32) -> ByteString -> IO Word32
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.take Int
4 ByteString
b
Word32
preludeCRC :: Word32 <- IO Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString ByteString m Word32)
-> IO Word32 -> ConduitT ByteString ByteString m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary (ByteString -> IO Word32) -> ByteString -> IO Word32
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.drop Int
8 ByteString
b
Bool
-> ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Word32
forall a. CRC32 a => a -> Word32
crc32 (Int -> ByteString -> ByteString
B.take Int
8 ByteString
b) Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word32
preludeCRC) (ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ())
-> ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ()
forall a b. (a -> b) -> a -> b
$
EventStreamException -> ConduitM ByteString ByteString m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEPreludeCRCFailed
ByteString -> ConduitM ByteString ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (ByteString -> ConduitM ByteString ByteString m ())
-> ByteString -> ConduitM ByteString ByteString m ()
forall a b. (a -> b) -> a -> b
$ Int -> ByteString -> ByteString
B.take Int
8 ByteString
b
let startCrc :: Word32
startCrc = ByteString -> Word32
forall a. CRC32 a => a -> Word32
crc32 ByteString
b
Word32
finalCrc <- Int -> Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *).
MonadUnliftIO m =>
Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
16) Word32
startCrc
ByteString
bs <- Int -> ConduitM ByteString ByteString m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
Word32
expectedCrc :: Word32 <- IO Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString ByteString m Word32)
-> IO Word32 -> ConduitT ByteString ByteString m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
bs
Bool
-> ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32
finalCrc Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
/= Word32
expectedCrc) (ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ())
-> ConduitM ByteString ByteString m ()
-> ConduitM ByteString ByteString m ()
forall a b. (a -> b) -> a -> b
$
EventStreamException -> ConduitM ByteString ByteString m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEMessageCRCFailed
ConduitM ByteString ByteString m ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck
where
accumulateYield :: Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield Int
n Word32
checkSum = do
let toRead :: Int
toRead = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n Int
chunkSize
ByteString
b <- Int -> ConduitM ByteString ByteString m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
toRead
let c' :: Word32
c' = Word32 -> ByteString -> Word32
forall a. CRC32 a => Word32 -> a -> Word32
crc32Update Word32
checkSum ByteString
b
n' :: Int
n' = Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- ByteString -> Int
B.length ByteString
b
ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
b
if Int
n' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0
then Int -> Word32 -> ConduitT ByteString ByteString m Word32
accumulateYield Int
n' Word32
c'
else Word32 -> ConduitT ByteString ByteString m Word32
forall (m :: * -> *) a. Monad m => a -> m a
return Word32
c'
handleMessage :: MonadUnliftIO m => C.ConduitT ByteString EventMessage m ()
handleMessage :: ConduitT ByteString EventMessage m ()
handleMessage = do
ByteString
b1 <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
Word32
msgLen :: Word32 <- IO Word32 -> ConduitT ByteString EventMessage m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString EventMessage m Word32)
-> IO Word32 -> ConduitT ByteString EventMessage m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b1
ByteString
b2 <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
4
Word32
hdrLen :: Word32 <- IO Word32 -> ConduitT ByteString EventMessage m Word32
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Word32 -> ConduitT ByteString EventMessage m Word32)
-> IO Word32 -> ConduitT ByteString EventMessage m Word32
forall a b. (a -> b) -> a -> b
$ ByteString -> IO Word32
forall a. Binary a => ByteString -> IO a
parseBinary ByteString
b2
[MessageHeader]
hs <- Word32 -> ConduitM ByteString EventMessage m [MessageHeader]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Word32 -> ConduitM ByteString a m [MessageHeader]
parseHeaders Word32
hdrLen
let payloadLen :: Word32
payloadLen = Word32
msgLen Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
- Word32
hdrLen Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
- Word32
16
getHdrVal :: a -> t (a, b) -> Maybe b
getHdrVal a
h = ((a, b) -> b) -> Maybe (a, b) -> Maybe b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (a, b) -> b
forall a b. (a, b) -> b
snd (Maybe (a, b) -> Maybe b)
-> (t (a, b) -> Maybe (a, b)) -> t (a, b) -> Maybe b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((a, b) -> Bool) -> t (a, b) -> Maybe (a, b)
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find ((a
h a -> a -> Bool
forall a. Eq a => a -> a -> Bool
==) (a -> Bool) -> ((a, b) -> a) -> (a, b) -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a, b) -> a
forall a b. (a, b) -> a
fst)
eventHdrValue :: Maybe Text
eventHdrValue = MsgHeaderName -> [MessageHeader] -> Maybe Text
forall (t :: * -> *) a b.
(Foldable t, Eq a) =>
a -> t (a, b) -> Maybe b
getHdrVal MsgHeaderName
EventType [MessageHeader]
hs
msgHdrValue :: Maybe Text
msgHdrValue = MsgHeaderName -> [MessageHeader] -> Maybe Text
forall (t :: * -> *) a b.
(Foldable t, Eq a) =>
a -> t (a, b) -> Maybe b
getHdrVal MsgHeaderName
MessageType [MessageHeader]
hs
errCode :: Maybe Text
errCode = MsgHeaderName -> [MessageHeader] -> Maybe Text
forall (t :: * -> *) a b.
(Foldable t, Eq a) =>
a -> t (a, b) -> Maybe b
getHdrVal MsgHeaderName
ErrorCode [MessageHeader]
hs
errMsg :: Maybe Text
errMsg = MsgHeaderName -> [MessageHeader] -> Maybe Text
forall (t :: * -> *) a b.
(Foldable t, Eq a) =>
a -> t (a, b) -> Maybe b
getHdrVal MsgHeaderName
ErrorMessage [MessageHeader]
hs
case Maybe Text
msgHdrValue of
Just Text
"event" -> do
case Maybe Text
eventHdrValue of
Just Text
"Records" -> Int -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *).
MonadUnliftIO m =>
Int -> ConduitT ByteString EventMessage m ()
passThrough (Int -> ConduitT ByteString EventMessage m ())
-> Int -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
Just Text
"Cont" -> () -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just Text
"Progress" -> do
ByteString
bs <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes (Int -> ConduitM ByteString EventMessage m ByteString)
-> Int -> ConduitM ByteString EventMessage m ByteString
forall a b. (a -> b) -> a -> b
$ Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
Progress
progress <- ByteString -> ConduitT ByteString EventMessage m Progress
forall (m :: * -> *). MonadIO m => ByteString -> m Progress
parseSelectProgress ByteString
bs
EventMessage -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (EventMessage -> ConduitT ByteString EventMessage m ())
-> EventMessage -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ Progress -> EventMessage
ProgressEventMessage Progress
progress
Just Text
"Stats" -> do
ByteString
bs <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes (Int -> ConduitM ByteString EventMessage m ByteString)
-> Int -> ConduitM ByteString EventMessage m ByteString
forall a b. (a -> b) -> a -> b
$ Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
payloadLen
Progress
stats <- ByteString -> ConduitT ByteString EventMessage m Progress
forall (m :: * -> *). MonadIO m => ByteString -> m Progress
parseSelectProgress ByteString
bs
EventMessage -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (EventMessage -> ConduitT ByteString EventMessage m ())
-> EventMessage -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ Progress -> EventMessage
StatsEventMessage Progress
stats
Just Text
"End" -> () -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe Text
_ -> EventStreamException -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType
Bool
-> ConduitT ByteString EventMessage m ()
-> ConduitT ByteString EventMessage m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe Text
eventHdrValue Maybe Text -> Maybe Text -> Bool
forall a. Eq a => a -> a -> Bool
/= Text -> Maybe Text
forall a. a -> Maybe a
Just Text
"End") ConduitT ByteString EventMessage m ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage
Just Text
"error" -> do
let reqMsgMay :: Maybe EventMessage
reqMsgMay = Text -> Text -> EventMessage
RequestLevelErrorMessage (Text -> Text -> EventMessage)
-> Maybe Text -> Maybe (Text -> EventMessage)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Text
errCode Maybe (Text -> EventMessage) -> Maybe Text -> Maybe EventMessage
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe Text
errMsg
ConduitT ByteString EventMessage m ()
-> (EventMessage -> ConduitT ByteString EventMessage m ())
-> Maybe EventMessage
-> ConduitT ByteString EventMessage m ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (EventStreamException -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType) EventMessage -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield Maybe EventMessage
reqMsgMay
Maybe Text
_ -> EventStreamException -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO EventStreamException
ESEInvalidMessageType
where
passThrough :: Int -> ConduitT ByteString EventMessage m ()
passThrough Int
0 = () -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
passThrough Int
n = do
let c :: Int
c = Int -> Int -> Int
forall a. Ord a => a -> a -> a
min Int
n Int
chunkSize
ByteString
b <- Int -> ConduitM ByteString EventMessage m ByteString
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ConduitM ByteString a m ByteString
readNBytes Int
c
EventMessage -> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (EventMessage -> ConduitT ByteString EventMessage m ())
-> EventMessage -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ ByteString -> EventMessage
RecordPayloadEventMessage ByteString
b
Int -> ConduitT ByteString EventMessage m ()
passThrough (Int -> ConduitT ByteString EventMessage m ())
-> Int -> ConduitT ByteString EventMessage m ()
forall a b. (a -> b) -> a -> b
$ Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- ByteString -> Int
B.length ByteString
b
selectProtoConduit ::
MonadUnliftIO m =>
C.ConduitT ByteString EventMessage m ()
selectProtoConduit :: ConduitT ByteString EventMessage m ()
selectProtoConduit = ConduitM ByteString ByteString m ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitM ByteString ByteString m ()
crcCheck ConduitM ByteString ByteString m ()
-> ConduitT ByteString EventMessage m ()
-> ConduitT ByteString EventMessage m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitT ByteString EventMessage m ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
handleMessage
selectObjectContent ::
Bucket ->
Object ->
SelectRequest ->
Minio (C.ConduitT () EventMessage Minio ())
selectObjectContent :: Text
-> Text
-> SelectRequest
-> Minio (ConduitT () EventMessage Minio ())
selectObjectContent Text
b Text
o SelectRequest
r = do
let reqInfo :: S3ReqInfo
reqInfo =
S3ReqInfo
defaultS3ReqInfo
{ riMethod :: ByteString
riMethod = ByteString
HT.methodPost,
riBucket :: Maybe Text
riBucket = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
b,
riObject :: Maybe Text
riObject = Text -> Maybe Text
forall a. a -> Maybe a
Just Text
o,
riPayload :: Payload
riPayload = ByteString -> Payload
PayloadBS (ByteString -> Payload) -> ByteString -> Payload
forall a b. (a -> b) -> a -> b
$ SelectRequest -> ByteString
mkSelectRequest SelectRequest
r,
riNeedsLocation :: Bool
riNeedsLocation = Bool
False,
riQueryParams :: Query
riQueryParams = [(ByteString
"select", Maybe ByteString
forall a. Maybe a
Nothing), (ByteString
"select-type", ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
"2")]
}
Response (ConduitM () ByteString Minio ())
resp <- S3ReqInfo -> Minio (Response (ConduitM () ByteString Minio ()))
mkStreamRequest S3ReqInfo
reqInfo
ConduitT () EventMessage Minio ()
-> Minio (ConduitT () EventMessage Minio ())
forall (m :: * -> *) a. Monad m => a -> m a
return (ConduitT () EventMessage Minio ()
-> Minio (ConduitT () EventMessage Minio ()))
-> ConduitT () EventMessage Minio ()
-> Minio (ConduitT () EventMessage Minio ())
forall a b. (a -> b) -> a -> b
$ Response (ConduitM () ByteString Minio ())
-> ConduitM () ByteString Minio ()
forall body. Response body -> body
NC.responseBody Response (ConduitM () ByteString Minio ())
resp ConduitM () ByteString Minio ()
-> ConduitM ByteString EventMessage Minio ()
-> ConduitT () EventMessage Minio ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.| ConduitM ByteString EventMessage Minio ()
forall (m :: * -> *).
MonadUnliftIO m =>
ConduitT ByteString EventMessage m ()
selectProtoConduit
getPayloadBytes :: MonadIO m => C.ConduitT EventMessage ByteString m ()
getPayloadBytes :: ConduitT EventMessage ByteString m ()
getPayloadBytes = do
Maybe EventMessage
evM <- ConduitT EventMessage ByteString m (Maybe EventMessage)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await
case Maybe EventMessage
evM of
Just EventMessage
v -> do
case EventMessage
v of
RecordPayloadEventMessage ByteString
b -> ByteString -> ConduitT EventMessage ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
b
RequestLevelErrorMessage Text
c Text
m -> IO () -> ConduitT EventMessage ByteString m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ConduitT EventMessage ByteString m ())
-> IO () -> ConduitT EventMessage ByteString m ()
forall a b. (a -> b) -> a -> b
$ ServiceErr -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (ServiceErr -> IO ()) -> ServiceErr -> IO ()
forall a b. (a -> b) -> a -> b
$ Text -> Text -> ServiceErr
SelectErr Text
c Text
m
EventMessage
_ -> () -> ConduitT EventMessage ByteString m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
ConduitT EventMessage ByteString m ()
forall (m :: * -> *).
MonadIO m =>
ConduitT EventMessage ByteString m ()
getPayloadBytes
Maybe EventMessage
Nothing -> () -> ConduitT EventMessage ByteString m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()