hadoop-streaming-0.2.0.3: A simple Hadoop streaming library

MaintainerZiyang Liu <free@cofree.io>
Safe HaskellNone
LanguageHaskell2010

HadoopStreaming

Description

Hadoop streaming makes it possible to write Hadoop jobs in Haskell. See the official documentation for how Hadoop streaming works. See HadoopStreaming.Text for a word count example.

Synopsis

Documentation

data Mapper i o e m Source #

A Mapper consists of a decoder, an encoder, and a stream transforming input into (key, value) pairs.

Constructors

Mapper 

Fields

  • (i -> Either e j)

    Decoder for mapper input

  • (k -> v -> o)

    Encoder for mapper output

  • (ConduitT j (k, v) m ())

    A stream transforming input into (k, v) pairs.

data Reducer i o e m Source #

A Reducer consists of a decoder, an encoder, and a stream transforming each key and all values associated with the key into some result values.

Constructors

Eq k => Reducer 

Fields

  • (i -> Either e (k, v))

    Decoder for reducer input

  • (r -> o)

    Encoder for reducer output

  • (k -> v -> ConduitT v r m ())

    A stream processing a key and all values associated with the key. The parameter v is the first value associated with the key (since a key always has one or more values), and the remaining values are processed by the conduit.

    Examples:

    import qualified Data.Conduit as C
    import qualified Data.Conduit.Combinators as C
    
    -- Sum up all values associated with the key and emit a (key, sum) pair.
    sumValues :: (Monad m, Num v) => k -> v -> ConduitT v (k, v) m ()
    sumValues k v0 = C.foldl (+) v0 >>= C.yield . (k,)
    
    -- Increment a counter for each (key, value) pair, and emit the (key, value) pair.
    incCounterAndEmit :: MonadIO m => k -> v -> ConduitT v (k, v) m ()
    incCounterAndEmit k v0 = C.leftover v0 <> C.mapM \v ->
      incCounter "reducer" "key-value pairs" >> pure (k, v)
    

runMapper Source #

Arguments

:: MonadIO m 
=> ConduitT () i m ()

Mapper source. The source should generally stream from stdin, and produce a value for each line of the input, as that is how Hadoop streaming is supposed to work, but this is not enforced. For example, if you really want to, you can produce a single value for the entire input split (which can be done using Data.Conduit.Combinators.stdin as the source). Note that regardless of what the source does, the values of Hadoop counters "Map input records" and "Reduce input records" are always the number of lines of the mapper and reducer input, because these counters are managed by the Hadoop framework.

An example is stdinLn, which streams from stdin as Text, one line at a time.

-> ConduitT o Void m ()

Mapper sink. It should generally stream to stdout. An example is stdoutLn.

-> (i -> e -> m ())

An action to be executed for each input that cannot be decoded. The first parameter is the input and the second parameter is the decoding error. One may choose to, for instance, increment a counter and println an error message.

-> Mapper i o e m 
-> m () 

Run a Mapper.

runReducer Source #

Arguments

:: MonadIO m 
=> ConduitT () i m ()

Reducer source

-> ConduitT o Void m ()

Reducer sink

-> (i -> e -> m ())

An action to be executed for each input that cannot be decoded.

-> Reducer i o e m 
-> m () 

Run a Reducer.

println :: MonadIO m => Text -> m () Source #

Like putStrLn, but writes to stderr.

incCounter Source #

Arguments

:: MonadIO m 
=> Text

Group name. Must not contain comma.

-> Text

Counter name. Must not contain comma.

-> m () 

Increment a counter by 1.

incCounterBy Source #

Arguments

:: MonadIO m 
=> Int 
-> Text

Group name. Must not contain comma.

-> Text

Counter name. Must not contain comma.

-> m () 

Increment a counter by n.