hadoop-streaming-0.2.0.1: A simple Hadoop streaming library

MaintainerZiyang Liu <free@cofree.io>
Safe HaskellNone
LanguageHaskell2010

HadoopStreaming.Text

Description

This module has some utilities for working with Text in Hadoop streaming.

Word count example:

 {-# LANGUAGE OverloadedStrings, TupleSections #-}

 import Data.Conduit (ConduitT)
 import qualified Data.Conduit as C
 import qualified Data.Conduit.Combinators as C
 import Data.Void (Void)
 import HadoopStreaming
 import qualified HadoopStreaming.Text as HT
 import Data.Text (Text)
 import qualified Data.Text as Text

 mapper :: Mapper Text Text Void IO
 mapper = Mapper dec enc trans
   where
     dec :: Text -> Either Void [Text]
     dec = Right . Text.words

     enc :: Text -> Int -> Text
     enc = HT.defaultKeyValueEncoder id (Text.pack . show)

     trans :: ConduitT [Text] (Text, Int) IO ()
     trans = C.concatMap (map (,1))

 reducer :: Reducer Text Text Void IO
 reducer = Reducer dec enc trans
   where
     dec :: Text -> Either Void (Text, Int)
     dec i = Right (i1, read . tail . Text.unpack $ i2)
       where (i1, i2) = Text.break (== '\\t') i

     enc :: (Text, Int) -> Text
     enc (t, c) = t <> "," <> Text.pack (show c)

     trans :: Text -> Int -> ConduitT Int (Text, Int) IO ()
     trans k v0 = C.foldl (+) v0 >>= C.yield . (k,)
Synopsis

Documentation

sourceHandle Source #

Arguments

:: MonadIO m 
=> (IOException -> m ())

An action to be executed if there is an error reading the input. This is usually caused by the input having an incorrect encoding or containing corrupt data. The recommended action is to log an error message and fail the job.

NB: The stream will terminate if an error occurrs, regardless of whether this action re-throws the error or not.

-> Handle 
-> ConduitT i Text m () 

Stream the contents of a Handle one line at a time as Text.

sinkHandle :: MonadIO m => Handle -> ConduitT Text o m () Source #

Stream data to a Handle, separated by \n.

stdinLn :: (MonadIO m, MonadThrow m) => ConduitT i Text m () Source #

Stream the contents from stdin one line at a time as Text.

stdinLn = sourceHandle throwM System.IO.stdin

stdoutLn :: MonadIO m => ConduitT Text o m () Source #

Stream data to stdout, separated by \n.

stdoutLn = sinkHandle System.IO.stdout

defaultKeyValueEncoder Source #

Arguments

:: (k -> Text)

Key encoder

-> (v -> Text)

Value encoder

-> k 
-> v 
-> Text 

Encode a key-value pair by separating them with a tab, which is the default way the mapper output should be formatted.

defaultKeyValueDecoder Source #

Arguments

:: (Text -> Either e k)

Key decoder

-> (Text -> Either e v)

Value decoder

-> Text 
-> Either e (k, Maybe v) 

Decode a line by treating the prefix up to the first tab as key, and the suffix after the first tab as value. If the line does not contain a tab, or if the first tab is the last character, the whole line is considered as key, and the value decoder is not used.