{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Hadoop.Hdfs ( Hdfs(..) , hdfsProtocol , getConnection , runHdfs , runHdfs' , hdfsInvoke , CreateParent , Recursive , Overwrite , getListing , getListing' , getFileInfo , getContentSummary , mkdirs , delete , rename , setPermissions ) where import Control.Applicative (Applicative(..), (<$>)) import Control.Exception (throw) import Control.Monad (ap) import Control.Monad.Catch (MonadMask(..), MonadThrow(..), MonadCatch(..)) import Control.Monad.IO.Class (MonadIO(..)) import Data.ByteString (ByteString) import Data.Maybe (fromMaybe) import Data.Text (Text) import qualified Data.Text.Encoding as T import qualified Data.Vector as V import Data.Word (Word32) import qualified Data.Hadoop.Protobuf.ClientNameNode as P import qualified Data.Hadoop.Protobuf.Hdfs as P import Data.ProtocolBuffers import Data.ProtocolBuffers.Orphans () import Data.Hadoop.Configuration import Data.Hadoop.Types import Network.Hadoop.Rpc import qualified Network.Hadoop.Socket as S ------------------------------------------------------------------------ newtype Hdfs a = Hdfs { unHdfs :: Connection -> IO a } instance Functor Hdfs where fmap f m = Hdfs $ \c -> fmap f (unHdfs m c) instance Applicative Hdfs where pure = return (<*>) = ap instance Monad Hdfs where return x = Hdfs $ \_ -> return x m >>= k = Hdfs $ \c -> unHdfs m c >>= \x -> unHdfs (k x) c instance MonadIO Hdfs where liftIO io = Hdfs $ const io instance MonadThrow Hdfs where throwM = liftIO . throwM instance MonadCatch Hdfs where catch m k = Hdfs $ \c -> unHdfs m c `catch` \e -> unHdfs (k e) c instance MonadMask Hdfs where mask a = Hdfs $ \e -> mask $ \u -> unHdfs (a $ q u) e where q u (Hdfs b) = Hdfs (u . b) uninterruptibleMask a = Hdfs $ \e -> uninterruptibleMask $ \u -> unHdfs (a $ q u) e where q u (Hdfs b) = Hdfs (u . b) ------------------------------------------------------------------------ type CreateParent = Bool type Recursive = Bool type Overwrite = Bool ------------------------------------------------------------------------ getConnection :: Hdfs Connection getConnection = Hdfs return hdfsProtocol :: Protocol hdfsProtocol = Protocol "org.apache.hadoop.hdfs.protocol.ClientProtocol" 1 runHdfs :: Hdfs a -> IO a runHdfs hdfs = do config <- getHadoopConfig runHdfs' config hdfs runHdfs' :: HadoopConfig -> Hdfs a -> IO a runHdfs' config@HadoopConfig{..} hdfs = S.runTcp hcProxy nameNode session where session socket = do conn <- initConnectionV7 config hdfsProtocol socket unHdfs hdfs conn nameNode = case hcNameNodes of [] -> throw (ConfigError "Could not find name nodes in Hadoop configuration") (x:_) -> x hdfsInvoke :: (Encode a, Decode b) => Text -> a -> Hdfs b hdfsInvoke method arg = Hdfs $ \c -> invoke c method arg ------------------------------------------------------------------------ getListing :: HdfsPath -> Hdfs (Maybe (V.Vector FileStatus)) getListing path = do mDirList <- getPartialListing path "" case mDirList of Nothing -> return Nothing Just dirList -> do let p = partialListing dirList if hasRemainingEntries dirList then Just <$> loop [p] (lastFileName p) else return (Just p) where partialListing :: P.DirectoryListing -> V.Vector FileStatus partialListing = V.map fromProtoFileStatus . V.fromList . getField . P.dlPartialListing hasRemainingEntries :: P.DirectoryListing -> Bool hasRemainingEntries = (/= 0) . getField . P.dlRemaingEntries lastFileName :: V.Vector FileStatus -> ByteString lastFileName v | V.null v = "" | otherwise = fsPath . V.last $ v loop :: [V.Vector FileStatus] -> ByteString -> Hdfs (V.Vector FileStatus) loop ps startAfter = do dirList <- fromMaybe emptyListing <$> getPartialListing path startAfter let p = partialListing dirList ps' = ps ++ [p] if hasRemainingEntries dirList then loop ps' (lastFileName p) else return (V.concat ps') emptyListing = P.DirectoryListing (putField []) (putField 0) getListing' :: HdfsPath -> Hdfs (V.Vector FileStatus) getListing' path = fromMaybe V.empty <$> getListing path ------------------------------------------------------------------------ getPartialListing :: HdfsPath -> ByteString -> Hdfs (Maybe P.DirectoryListing) getPartialListing path startAfter = getField . P.lsDirList <$> hdfsInvoke "getListing" P.GetListingRequest { P.lsSrc = putField (T.decodeUtf8 path) , P.lsStartAfter = putField startAfter , P.lsNeedLocation = putField False } getFileInfo :: HdfsPath -> Hdfs (Maybe FileStatus) getFileInfo path = fmap fromProtoFileStatus . getField . P.fiFileStatus <$> hdfsInvoke "getFileInfo" P.GetFileInfoRequest { P.fiSrc = putField (T.decodeUtf8 path) } getContentSummary :: HdfsPath -> Hdfs ContentSummary getContentSummary path = fromProtoContentSummary . getField . P.csSummary <$> hdfsInvoke "getContentSummary" P.GetContentSummaryRequest { P.csPath = putField (T.decodeUtf8 path) } mkdirs :: CreateParent -> HdfsPath -> Hdfs Bool mkdirs createParent path = getField . P.mdResult <$> hdfsInvoke "mkdirs" P.MkdirsRequest { P.mdSrc = putField (T.decodeUtf8 path) , P.mdMasked = putField (P.FilePermission (putField 0o755)) , P.mdCreateParent = putField createParent } delete :: Recursive -> HdfsPath -> Hdfs Bool delete recursive path = getField . P.dlResult <$> hdfsInvoke "delete" P.DeleteRequest { P.dlSrc = putField (T.decodeUtf8 path) , P.dlRecursive = putField recursive } rename :: Overwrite -> HdfsPath -> HdfsPath -> Hdfs () rename overwrite src dst = ignore <$> hdfsInvoke "rename2" P.Rename2Request { P.mvSrc = putField (T.decodeUtf8 src) , P.mvDst = putField (T.decodeUtf8 dst) , P.mvOverwrite = putField overwrite } where ignore :: P.Rename2Response -> () ignore = const () setPermissions :: Word32 -> HdfsPath -> Hdfs () setPermissions mode path = ignore <$> hdfsInvoke "setPermission" P.SetPermissionRequest { P.chmodPath = putField (T.decodeUtf8 path) , P.chmodMode = putField $ P.FilePermission { fpPerm = putField mode } } where ignore :: P.SetPermissionResponse -> () ignore = const () ------------------------------------------------------------------------ -- Awesome Protobuf Mapping (╯°□°)╯︵ ┻━┻ fromProtoContentSummary :: P.ContentSummary -> ContentSummary fromProtoContentSummary p = ContentSummary { csLength = getField $ P.csLength p , csFileCount = getField $ P.csFileCount p , csDirectoryCount = getField $ P.csDirectoryCount p , csQuota = getField $ P.csQuota p , csSpaceConsumed = getField $ P.csSpaceConsumed p , csSpaceQuota = getField $ P.csSpaceQuota p } fromProtoFileStatus :: P.FileStatus -> FileStatus fromProtoFileStatus p = FileStatus { fsFileType = fromProtoFileType $ getField $ P.fsFileType p , fsPath = getField $ P.fsPath p , fsLength = getField $ P.fsLength p , fsPermission = fromIntegral $ getField $ P.fpPerm $ getField $ P.fsPermission p , fsOwner = getField $ P.fsOwner p , fsGroup = getField $ P.fsGroup p , fsModificationTime = getField $ P.fsModificationTime p , fsAccessTime = getField $ P.fsAccessTime p , fsSymLink = getField $ P.fsSymLink p , fsBlockReplication = fromIntegral $ fromMaybe 0 $ getField $ P.fsBlockReplication p , fsBlockSize = fromMaybe 0 $ getField $ P.fsBlockSize p } fromProtoFileType :: P.FileType -> FileType fromProtoFileType p = case p of P.IS_DIR -> Dir P.IS_FILE -> File P.IS_SYMLINK -> SymLink