{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module HadoopStreaming
( Mapper(..)
, Reducer(..)
, runMapper
, runMapperWith
, runReducer
, runReducerWith
, println
, incCounter
, incCounterBy
, sourceHandle
, sinkHandle
) where
import Control.Monad.Extra (unlessM)
import Control.Monad.IO.Class (MonadIO(..))
import Data.Conduit (ConduitT, runConduit, (.|))
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import qualified Data.Conduit.List as CL
import Data.Text (Text)
import qualified Data.Text as Text
import qualified Data.Text.IO as Text
import qualified System.IO as IO
data Mapper e m = forall input k v. Mapper
(Text -> Either e input)
(k -> v -> Text)
(ConduitT input (k, v) m ())
data Reducer e m = forall k v res. Eq k => Reducer
(Text -> Either e (k, v))
(res -> Text)
(k -> v -> ConduitT v res m ())
runMapper
:: MonadIO m
=> (Text -> e -> m ())
-> Mapper e m -> m ()
runMapper = runMapperWith stdin stdout
runMapperWith
:: MonadIO m
=> ConduitT () Text m ()
-> ConduitT Text C.Void m ()
-> (Text -> e -> m ())
-> Mapper e m -> m ()
runMapperWith source sink f (Mapper dec enc trans) = runConduit $
source .| CL.mapMaybeM g .| trans .| C.map (uncurry enc) .| sink
where
g input = either ((Nothing <$) . f input) (pure . Just) (dec input)
runReducer
:: MonadIO m
=> (Text -> e -> m ())
-> Reducer e m -> m ()
runReducer = runReducerWith stdin stdout
runReducerWith
:: MonadIO m
=> ConduitT () Text m ()
-> ConduitT Text C.Void m ()
-> (Text -> e -> m ())
-> Reducer e m -> m ()
runReducerWith source sink f (Reducer dec enc trans) = runConduit $
source .| CL.mapMaybeM g .| CL.groupOn1 fst .| reduceKey trans .| C.map enc .| sink
where
g input = either ((Nothing <$) . f input) (pure . Just) (dec input)
reduceKey :: forall m k v res. Monad m
=> (k -> v -> ConduitT v res m ())
-> ConduitT ((k,v), [(k,v)]) res m ()
reduceKey f = go
where
go = C.await >>= maybe (pure ()) \((k, v), kvs) ->
C.yieldMany (map snd kvs) .| f k v >> go
stdin :: MonadIO m => ConduitT i Text m ()
stdin = sourceHandle IO.stdin
stdout :: MonadIO m => ConduitT Text o m ()
stdout = sinkHandle IO.stdout
sourceHandle :: MonadIO m => IO.Handle -> ConduitT i Text m ()
sourceHandle h = go .| C.filter (not . Text.all (== ' '))
where
go = unlessM (liftIO (IO.hIsEOF h)) (liftIO (Text.hGetLine h) >>= C.yield >> go)
sinkHandle :: MonadIO m => IO.Handle -> ConduitT Text o m ()
sinkHandle h = C.awaitForever (liftIO . Text.hPutStrLn h)
println :: MonadIO m => Text -> m ()
println = liftIO . Text.hPutStrLn IO.stderr
incCounter
:: MonadIO m
=> Text
-> Text
-> m ()
incCounter = incCounterBy 1
incCounterBy
:: MonadIO m
=> Int
-> Text
-> Text
-> m ()
incCounterBy n group name = println $
"reporter:counter:" <> Text.intercalate "," [group, name, Text.pack (show n)]