{-# LANGUAGE BlockArguments #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} ----------------------------------------------------------------------------- -- | -- Module : HadoopStreaming -- Maintainer : Ziyang Liu <free@cofree.io> -- -- Hadoop streaming makes it possible to write Hadoop jobs in Haskell. See the -- <https://hadoop.apache.org/docs/current/hadoop-streaming/HadoopStreaming.html official documentation> for how -- Hadoop streaming works. See "HadoopStreaming.Text" for a word count example. module HadoopStreaming ( Mapper(..) , Reducer(..) , runMapper , runReducer , println , incCounter , incCounterBy ) where import Control.Monad.IO.Class (MonadIO(..)) import Data.Conduit (ConduitT, runConduit, (.|)) import qualified Data.Conduit as C import qualified Data.Conduit.Combinators as C import qualified Data.Conduit.List as CL import Data.Text (Text) import qualified Data.Text as Text import qualified Data.Text.IO as Text import qualified System.IO as IO -- | A @Mapper@ consists of a decoder, an encoder, and a stream transforming -- input into @(key, value)@ pairs. data Mapper i o e m = forall j k v. Mapper (i -> Either e j) -- ^ Decoder for mapper input (k -> v -> o) -- ^ Encoder for mapper output (ConduitT j (k, v) m ()) -- ^ A stream transforming @input@ into @(k, v)@ pairs. -- | A @Reducer@ consists of a decoder, an encoder, and a stream transforming -- each key and all values associated with the key into some result values. data Reducer i o e m = forall k v r. Eq k => Reducer (i -> Either e (k, v)) -- ^ Decoder for reducer input (r -> o) -- ^ Encoder for reducer output (k -> v -> ConduitT v r m ()) -- ^ A stream processing a key and all values associated with the key. The parameter -- @v@ is the first value associated with the key (since a key always has one or more -- values), and the remaining values are processed by the conduit. -- -- Examples: -- -- @ -- import qualified Data.Conduit as C -- import qualified Data.Conduit.Combinators as C -- -- -- Sum up all values associated with the key and emit a (key, sum) pair. -- sumValues :: (Monad m, Num v) => k -> v -> ConduitT v (k, v) m () -- sumValues k v0 = C.foldl (+) v0 >>= C.yield . (k,) -- -- -- Increment a counter for each (key, value) pair, and emit the (key, value) pair. -- incCounterAndEmit :: MonadIO m => k -> v -> ConduitT v (k, v) m () -- incCounterAndEmit k v0 = C.leftover v0 <> C.mapM \\v -> -- incCounter "reducer" "key-value pairs" >> pure (k, v) -- @ -- | Run a 'Mapper'. runMapper :: MonadIO m => ConduitT () i m () -- ^ Mapper source. The source should generally stream from @stdin@, and produce -- a value for each line of the input, as that is how Hadoop streaming is supposed -- to work, but this is not enforced. For example, if you really want to, you can produce a single -- value for the entire input split (which can be done using @Data.Conduit.Combinators.stdin@ -- as the source). Note that regardless of what the source does, the values of Hadoop counters -- "Map input records" and "Reduce input records" are always the number of lines of the mapper and reducer -- input, because these counters are managed by the Hadoop framework. -- -- An example is 'HadoopStreaming.Text.stdinLn', which streams from 'IO.stdin' as 'Text', one line -- at a time. -> ConduitT o C.Void m () -- ^ Mapper sink. It should generally stream to 'IO.stdout'. An example is -- 'HadoopStreaming.Text.stdoutLn'. -> (i -> e -> m ()) -- ^ An action to be executed for each input that cannot be decoded. The first parameter -- is the input and the second parameter is the decoding error. One may choose to, for instance, -- increment a counter and 'println' an error message. -> Mapper i o e m -> m () runMapper source sink f (Mapper dec enc trans) = runConduit $ source .| CL.mapMaybeM g .| trans .| C.map (uncurry enc) .| sink where g input = either ((Nothing <$) . f input) (pure . Just) (dec input) -- | Run a 'Reducer'. runReducer :: MonadIO m => ConduitT () i m () -- ^ Reducer source -> ConduitT o C.Void m () -- ^ Reducer sink -> (i -> e -> m ()) -- ^ An action to be executed for each input that cannot be decoded. -> Reducer i o e m -> m () runReducer source sink f (Reducer dec enc trans) = runConduit $ source .| CL.mapMaybeM g .| CL.groupOn1 fst .| reduceKey trans .| C.map enc .| sink where g input = either ((Nothing <$) . f input) (pure . Just) (dec input) reduceKey :: Monad m => (k -> v -> ConduitT v r m ()) -> ConduitT ((k,v), [(k,v)]) r m () reduceKey f = go where go = C.await >>= maybe (pure ()) \((k, v), kvs) -> C.yieldMany (map snd kvs) .| f k v >> go -- | Like 'Text.putStrLn', but writes to 'IO.stderr'. println :: MonadIO m => Text -> m () println = liftIO . Text.hPutStrLn IO.stderr -- | Increment a counter by 1. incCounter :: MonadIO m => Text -- ^ Group name. Must not contain comma. -> Text -- ^ Counter name. Must not contain comma. -> m () incCounter = incCounterBy 1 -- | Increment a counter by @n@. incCounterBy :: MonadIO m => Int -> Text -- ^ Group name. Must not contain comma. -> Text -- ^ Counter name. Must not contain comma. -> m () incCounterBy n group name = println $ "reporter:counter:" <> Text.intercalate "," [group, name, Text.pack (show n)]