module Network.Hadoop.Read
( HdfsReadHandle
, openRead
, hdfsMapM_
, hdfsFoldM
, hdfsCat
) where
import Control.Applicative ((<$>), (<$))
import Control.Exception (IOException, throwIO)
import Control.Monad (foldM)
import Control.Monad.Catch (MonadMask, catch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Loops (iterateUntilM)
import Control.Monad.Trans.State
import Control.Monad.Trans.Class (lift)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as B
import Data.Maybe (fromMaybe)
import Data.ProtocolBuffers
import Data.ProtocolBuffers.Orphans ()
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import Data.Word (Word64)
import qualified Data.Serialize.Get as Get
import qualified Data.Serialize.Put as Put
import Data.Hadoop.Protobuf.ClientNameNode
import Data.Hadoop.Protobuf.DataTransfer
import Data.Hadoop.Protobuf.Hdfs
import Data.Hadoop.Types
import Network.Hadoop.Hdfs
import Network.Hadoop.Rpc
import qualified Network.Hadoop.Socket as S
import qualified Network.Hadoop.Stream as Stream
import Text.Printf (printf)
import Prelude hiding (rem)
data HdfsReadHandle = HdfsReadHandle (Maybe SocksProxy) LocatedBlocks
openRead :: HdfsPath -> Hdfs (Maybe HdfsReadHandle)
openRead path = do
locs <- getField . locsLocations <$> hdfsInvoke "getBlockLocations" GetBlockLocationsRequest
{ catSrc = putField (T.decodeUtf8 path)
, catOffset = putField 0
, catLength = putField maxBound
}
case locs of
Nothing -> return Nothing
Just ls -> do
proxy <- hcProxy . cnConfig <$> getConnection
return $ Just (HdfsReadHandle proxy ls)
hdfsCat :: HdfsPath -> Hdfs ()
hdfsCat path = do
h <- openRead path
maybe (return ()) (hdfsMapM_ (liftIO . B.putStr)) h
hdfsMapM_ :: (Functor m, MonadIO m, MonadMask m) =>
(ByteString -> m ()) -> HdfsReadHandle -> m ()
hdfsMapM_ f = hdfsFoldM (\_ x -> f x) ()
hdfsFoldM :: (Functor m, MonadIO m, MonadMask m) =>
(a -> ByteString -> m a) -> a -> HdfsReadHandle -> m a
hdfsFoldM f acc0 (HdfsReadHandle proxy l) = do
let len = getField . lbFileLength $ l
foldM (flip (execStateT . procBlock f proxy len)) acc0 (getField $ lbBlocks l)
procBlock :: (Functor m, MonadIO m, MonadMask m) =>
(a -> ByteString -> m a) -> Maybe SocksProxy -> Word64 -> LocatedBlock -> StateT a m ()
procBlock f proxy blockSize block = do
let extended = getField . lbExtended $ block
token = getField . lbToken $ block
case getField . lbLocations $ block of
[] -> error $ "No locations for block " ++ show extended
ls -> failover [] ls (getLoc extended token) $ \es ->
error $ printf "All locations failed for block %s:\n%s"
(show es) (show extended)
where
failover es [] _ err = err es
failover es (x:xs) fn err = catch (fn x) handler
where handler (e :: IOException) = failover ((e,x):es) xs fn err
getLoc extended token l = do
let i = getField (dnId l)
host = getField . dnHostName $ i
port = getField . dnXferPort $ i
endpoint = Endpoint host (fromIntegral port)
runBlock endpoint 0 extended token
runBlock endpoint offset extended token = do
let len = fromMaybe blockSize . getField . ebNumBytes $ extended
S.bracketSocket proxy endpoint $ readBlock offset len extended token
readBlock offset0 len extended token sock = () <$ go (0, offset0, len)
where
go = iterateUntilM (\(_, _, rem) -> rem <= 0) $ \(nread, offset, rem) -> do
len' <- readBlockPart offset rem extended token sock
return (nread + len', offset + len', rem len')
readBlockPart offset rem extended token sock = do
stream <- liftIO $ Stream.mkSocketStream sock
b <- liftIO $ do
Stream.runPut stream putReadRequest
Stream.runGet stream decodeLengthPrefixedMessage
readPackets b rem stream
where
putReadRequest = do
Put.putWord16be 28
Put.putWord8 81
encodeLengthPrefixedMessage opReadBlock
opReadBlock = OpReadBlock
{ orbHeader = putField coh
, orbOffset = putField offset
, orbLen = putField rem
, orbSendChecksums = putField Nothing
, orbCachingStrategy = putField Nothing
}
coh = ClientOperationHeader
{ cohBaseHeader = putField bh
, cohClientName = putField (T.pack "hh")
}
bh = BaseHeader
{ bhBlock = putField extended
, bhToken = putField (Just token)
}
readPackets BlockOpResponse{..} len stream = fst <$> go (0, len)
where
go = iterateUntilM (\(_, rem) -> rem <= 0) $ \(nread, rem) -> do
len' <- readPacket (fromIntegral <$> b) stream
return (nread + len', rem len')
m = getField borReadOpChecksumInfo
c = getField . rociChecksum <$> m
b = getField . csBytesPerChecksum <$> c
readPacket bytesPerChecksum stream = do
(dataLen, d) <- liftIO $ do
_len <- Stream.runGet stream Get.getWord32be
sz <- Stream.runGet stream Get.getWord16be
bs <- Stream.runGet stream $ Get.getByteString (fromIntegral sz)
ph <- decodeBytes bs
let numChunks = countChunks ph
dataLen = getField $ phDataLen ph
_ <- Stream.runGet stream (Get.getByteString (4*numChunks))
(dataLen,) <$>
Stream.runGet stream (Get.getByteString (fromIntegral dataLen))
get >>= \x -> lift (f x d) >>= put
return (fromIntegral dataLen)
where
countChunks :: PacketHeader -> Int
countChunks PacketHeader{..} = (dataLen + b 1) `div` b
where
b = fromMaybe 512 bytesPerChecksum
dataLen = fromIntegral $ getField phDataLen
decodeBytes bs = case Get.runGetState decodeMessage bs 0 of
Left err -> throwIO (RemoteError "DecodeError" (T.pack err))
Right (x, "") -> return x
Right (_, _) -> throwIO (RemoteError "DecodeError" "decoded response but did not consume enough bytes")