module Sarsi.Tools.Pipe where import Codec.Sarsi (Event (..), Message (..)) import Codec.Sarsi.Curses (cleanLine, cleaningCurses) import Data.Attoparsec.Text.Machine (processParser) import Data.ByteString (ByteString) import qualified Data.ByteString.Char8 as ByteString import Data.Machine (MachineT, ProcessT, asParts, auto, autoM, prepended, runT_, (<~)) import Data.Text (Text) import qualified Data.Text as Text import Data.Text.Encoding (decodeUtf8) import Sarsi (getBroker, getTopic) import Sarsi.Producer (produce) import System.Exit (ExitCode (ExitSuccess), exitWith) import System.IO (stdin, stdout) import System.IO.Machine (byLine, sourceHandle) pipe :: String -> ProcessT IO Text Message -> IO () pipe :: String -> ProcessT IO Text Message -> IO () pipe String lbl ProcessT IO Text Message process = String -> ProcessT IO Text Message -> MachineT IO Any ByteString -> IO () forall (k :: * -> *). String -> ProcessT IO Text Message -> MachineT IO k ByteString -> IO () pipeFrom String lbl ProcessT IO Text Message process (MachineT IO Any ByteString -> IO ()) -> MachineT IO Any ByteString -> IO () forall a b. (a -> b) -> a -> b $ (ByteString -> IO ByteString) -> MachineT IO (Is ByteString) ByteString forall (k :: * -> * -> *) (m :: * -> *) a b. (Category k, Monad m) => (a -> m b) -> MachineT m (k a) b autoM ByteString -> IO ByteString echo MachineT IO (Is ByteString) ByteString -> MachineT IO Any ByteString -> MachineT IO Any ByteString forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ (DataModeIO IO ByteString -> Handle -> SourceIO IO ByteString forall (m :: * -> *) a. DataModeIO m a -> Handle -> SourceIO m a sourceHandle DataModeIO IO ByteString forall a (m :: * -> *). IOData a => DataModeIO m a byLine Handle stdin) where echo :: ByteString -> IO ByteString echo ByteString xs = Handle -> ByteString -> IO () ByteString.hPutStrLn Handle stdout ByteString xs IO () -> IO ByteString -> IO ByteString forall (m :: * -> *) a b. Monad m => m a -> m b -> m b >> ByteString -> IO ByteString forall (m :: * -> *) a. Monad m => a -> m a return ByteString xs pipeFrom :: String -> ProcessT IO Text Message -> MachineT IO k ByteString -> IO () pipeFrom :: String -> ProcessT IO Text Message -> MachineT IO k ByteString -> IO () pipeFrom String lbl ProcessT IO Text Message process MachineT IO k ByteString source = do Broker b <- IO Broker getBroker Topic t <- Broker -> String -> IO Topic getTopic Broker b String "." Topic -> (ProcessT IO Event Event -> IO ()) -> IO () forall a. Topic -> (ProcessT IO Event Event -> IO a) -> IO a produce Topic t ((ProcessT IO Event Event -> IO ()) -> IO ()) -> (ProcessT IO Event Event -> IO ()) -> IO () forall a b. (a -> b) -> a -> b $ String -> ProcessT IO Text Message -> MachineT IO k ByteString -> ProcessT IO Event Event -> IO () forall (k :: * -> *). String -> ProcessT IO Text Message -> MachineT IO k ByteString -> ProcessT IO Event Event -> IO () producer String lbl ProcessT IO Text Message process MachineT IO k ByteString source ExitCode -> IO () forall a. ExitCode -> IO a exitWith ExitCode ExitSuccess producer :: String -> ProcessT IO Text Message -> MachineT IO k ByteString -> ProcessT IO Event Event -> IO () producer :: String -> ProcessT IO Text Message -> MachineT IO k ByteString -> ProcessT IO Event Event -> IO () producer String lbl ProcessT IO Text Message process MachineT IO k ByteString source ProcessT IO Event Event sink = do MachineT IO k Event -> IO () forall (m :: * -> *) (k :: * -> *) b. Monad m => MachineT m k b -> m () runT_ (MachineT IO k Event -> IO ()) -> MachineT IO k Event -> IO () forall a b. (a -> b) -> a -> b $ MachineT IO (Is Message) Event pipeline MachineT IO (Is Message) Event -> MachineT IO k Message -> MachineT IO k Event forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ ProcessT IO Text Message process ProcessT IO Text Message -> MachineT IO k Text -> MachineT IO k Message forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ MachineT IO (Is Text) Text cleaning MachineT IO (Is Text) Text -> MachineT IO k Text -> MachineT IO k Text forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ (ByteString -> Text) -> Process ByteString Text forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b auto ByteString -> Text decodeUtf8 MachineT IO (Is ByteString) Text -> MachineT IO k ByteString -> MachineT IO k Text forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ MachineT IO k ByteString source where pipeline :: MachineT IO (Is Message) Event pipeline = ProcessT IO Event Event sink ProcessT IO Event Event -> MachineT IO (Is Message) Event -> MachineT IO (Is Message) Event forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ [Event] -> Process Event Event forall (f :: * -> *) a. Foldable f => f a -> Process a a prepended [Text -> Event Start (Text -> Event) -> Text -> Event forall a b. (a -> b) -> a -> b $ String -> Text Text.pack String lbl] ProcessT IO Event Event -> MachineT IO (Is Message) Event -> MachineT IO (Is Message) Event forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ (Message -> Event) -> Process Message Event forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b auto Message -> Event Notify cleaning :: MachineT IO (Is Text) Text cleaning = MachineT IO (Is [Text]) Text forall (f :: * -> *) a. Foldable f => Process (f a) a asParts MachineT IO (Is [Text]) Text -> MachineT IO (Is Text) [Text] -> MachineT IO (Is Text) Text forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ (Either String (Text, Text) -> [Text]) -> Process (Either String (Text, Text)) [Text] forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b auto Either String (Text, Text) -> [Text] forall a a a. Either a (a, a) -> [a] unpack MachineT IO (Is (Either String (Text, Text))) [Text] -> MachineT IO (Is Text) (Either String (Text, Text)) -> MachineT IO (Is Text) [Text] forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ Parser Text -> MachineT IO (Is Text) (Either String (Text, Text)) forall (m :: * -> *) a. Monad m => Parser a -> ProcessT m Text (Either String (Text, a)) processParser Parser Text cleaningCurses MachineT IO (Is Text) (Either String (Text, Text)) -> MachineT IO (Is Text) Text -> MachineT IO (Is Text) (Either String (Text, Text)) forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ (Text -> Text) -> Process Text Text forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b auto (\Text txt -> (Text -> Text cleanLine Text txt) Text -> Char -> Text `Text.snoc` Char '\n') where unpack :: Either a (a, a) -> [a] unpack (Right (a _, a txt)) = [a txt] unpack (Left a _) = []