{-# LANGUAGE BlockArguments #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TupleSections #-}
module HadoopStreaming.ByteString
( sourceHandle
, sinkHandle
, stdinLn
, stdoutLn
, defaultKeyValueEncoder
, defaultKeyValueDecoder
) where
import Conduit (MonadThrow(..), lift)
import Control.Exception (IOException, try)
import Control.Monad.Extra (unlessM, (>=>))
import Control.Monad.IO.Class (MonadIO(..))
import Data.Conduit (ConduitT, (.|))
import qualified Data.Conduit as C
import qualified Data.Conduit.Combinators as C
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as BC
import qualified System.IO as IO
sourceHandle
:: MonadIO m => IO.Handle -> ConduitT i ByteString m ()
sourceHandle h = go .| C.filter (not . BC.all (== ' '))
where
go = unlessM (liftIO (IO.hIsEOF h)) (liftIO (BC.hGetLine h) >>= C.yield >> go)
sinkHandle :: MonadIO m => IO.Handle -> ConduitT ByteString o m ()
sinkHandle h = C.awaitForever (liftIO . BC.hPutStrLn h)
stdinLn :: (MonadIO m, MonadThrow m) => ConduitT i ByteString m ()
stdinLn = sourceHandle IO.stdin
stdoutLn :: MonadIO m => ConduitT ByteString o m ()
stdoutLn = sinkHandle IO.stdout
defaultKeyValueEncoder
:: (k -> ByteString)
-> (v -> ByteString)
-> k -> v -> ByteString
defaultKeyValueEncoder encKey encValue k v = encKey k <> "\t" <> encValue v
defaultKeyValueDecoder
:: (ByteString -> Either e k)
-> (ByteString -> Either e v)
-> ByteString -> Either e (k, Maybe v)
defaultKeyValueDecoder decKey decValue i
| BC.length i2 <= 1 = (,Nothing) <$> decKey i1
| otherwise = do
k <- decKey i1
v <- decValue (BC.tail i2)
pure (k, Just v)
where
(i1, i2) = BC.break (== '\t') i