Maintainer | Ziyang Liu <free@cofree.io> |
---|---|
Safe Haskell | None |
Language | Haskell2010 |
This module has some utilities for working with Text
in Hadoop streaming.
Word count example:
{-# LANGUAGE OverloadedStrings, TupleSections #-} import Data.Conduit (ConduitT) import qualified Data.Conduit as C import qualified Data.Conduit.Combinators as C import Data.Void (Void) import HadoopStreaming import qualified HadoopStreaming.Text as HT import Data.Text (Text) import qualified Data.Text as Text mapper :: Mapper Text Text Void IO mapper = Mapper dec enc trans where dec :: Text -> Either Void [Text] dec = Right . Text.words enc :: Text -> Int -> Text enc = HT.defaultKeyValueEncoder id (Text.pack . show) trans :: ConduitT [Text] (Text, Int) IO () trans = C.concatMap (map (,1)) reducer :: Reducer Text Text Void IO reducer = Reducer dec enc trans where dec :: Text -> Either Void (Text, Int) dec i = Right (i1, read . tail . Text.unpack $ i2) where (i1, i2) = Text.break (== '\\t') i enc :: (Text, Int) -> Text enc (t, c) = t <> "," <> Text.pack (show c) trans :: Text -> Int -> ConduitT Int (Text, Int) IO () trans k v0 = C.foldl (+) v0 >>= C.yield . (k,)
Synopsis
- sourceHandle :: MonadIO m => (IOException -> m ()) -> Handle -> ConduitT i Text m ()
- sinkHandle :: MonadIO m => Handle -> ConduitT Text o m ()
- stdinLn :: (MonadIO m, MonadThrow m) => ConduitT i Text m ()
- stdoutLn :: MonadIO m => ConduitT Text o m ()
- defaultKeyValueEncoder :: (k -> Text) -> (v -> Text) -> k -> v -> Text
- defaultKeyValueDecoder :: (Text -> Either e k) -> (Text -> Either e v) -> Text -> Either e (k, Maybe v)
Documentation
:: MonadIO m | |
=> (IOException -> m ()) | An action to be executed if there is an error reading the input. This is usually caused by the input having an incorrect encoding or containing corrupt data. The recommended action is to log an error message and fail the job. NB: The stream will terminate if an error occurrs, regardless of whether this action re-throws the error or not. |
-> Handle | |
-> ConduitT i Text m () |
sinkHandle :: MonadIO m => Handle -> ConduitT Text o m () Source #
Stream data to a Handle
, separated by \n
.
stdoutLn :: MonadIO m => ConduitT Text o m () Source #
Stream data to stdout
, separated by \n
.
stdoutLn = sinkHandle System.IO.stdout
defaultKeyValueEncoder Source #
Encode a key-value pair by separating them with a tab, which is the default way the mapper output should be formatted.
defaultKeyValueDecoder Source #
:: (Text -> Either e k) | Key decoder |
-> (Text -> Either e v) | Value decoder |
-> Text | |
-> Either e (k, Maybe v) |
Decode a line by treating the prefix up to the first tab as key, and the suffix after the first tab as value. If the line does not contain a tab, or if the first tab is the last character, the whole line is considered as key, and the value decoder is not used.