module Sarsi.Tools.Pipe where

import Codec.Sarsi (Event (..), Message (..))
import Codec.Sarsi.Curses (cleanLine, cleaningCurses)
import Data.Attoparsec.Text.Machine (processParser)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Char8 as ByteString
import Data.Machine (MachineT, ProcessT, asParts, auto, autoM, prepended, runT_, (<~))
import Data.Text (Text)
import qualified Data.Text as Text
import Data.Text.Encoding (decodeUtf8)
import Sarsi (getBroker, getTopic)
import Sarsi.Producer (produce)
import System.Exit (ExitCode (ExitSuccess), exitWith)
import System.IO (stdin, stdout)
import System.IO.Machine (byLine, sourceHandle)

pipe :: String -> ProcessT IO Text Message -> IO ()
pipe :: String -> ProcessT IO Text Message -> IO ()
pipe String
lbl ProcessT IO Text Message
process =
  String
-> ProcessT IO Text Message -> MachineT IO Any ByteString -> IO ()
forall (k :: * -> *).
String
-> ProcessT IO Text Message -> MachineT IO k ByteString -> IO ()
pipeFrom String
lbl ProcessT IO Text Message
process (MachineT IO Any ByteString -> IO ())
-> MachineT IO Any ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$
    (ByteString -> IO ByteString)
-> MachineT IO (Is ByteString) ByteString
forall (k :: * -> * -> *) (m :: * -> *) a b.
(Category k, Monad m) =>
(a -> m b) -> MachineT m (k a) b
autoM ByteString -> IO ByteString
echo MachineT IO (Is ByteString) ByteString
-> MachineT IO Any ByteString -> MachineT IO Any ByteString
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ (DataModeIO IO ByteString -> Handle -> SourceIO IO ByteString
forall (m :: * -> *) a. DataModeIO m a -> Handle -> SourceIO m a
sourceHandle DataModeIO IO ByteString
forall a (m :: * -> *). IOData a => DataModeIO m a
byLine Handle
stdin)
  where
    echo :: ByteString -> IO ByteString
echo ByteString
xs = Handle -> ByteString -> IO ()
ByteString.hPutStrLn Handle
stdout ByteString
xs IO () -> IO ByteString -> IO ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
xs

pipeFrom :: String -> ProcessT IO Text Message -> MachineT IO k ByteString -> IO ()
pipeFrom :: String
-> ProcessT IO Text Message -> MachineT IO k ByteString -> IO ()
pipeFrom String
lbl ProcessT IO Text Message
process MachineT IO k ByteString
source = do
  Broker
b <- IO Broker
getBroker
  Topic
t <- Broker -> String -> IO Topic
getTopic Broker
b String
"."
  Topic -> (ProcessT IO Event Event -> IO ()) -> IO ()
forall a. Topic -> (ProcessT IO Event Event -> IO a) -> IO a
produce Topic
t ((ProcessT IO Event Event -> IO ()) -> IO ())
-> (ProcessT IO Event Event -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ String
-> ProcessT IO Text Message
-> MachineT IO k ByteString
-> ProcessT IO Event Event
-> IO ()
forall (k :: * -> *).
String
-> ProcessT IO Text Message
-> MachineT IO k ByteString
-> ProcessT IO Event Event
-> IO ()
producer String
lbl ProcessT IO Text Message
process MachineT IO k ByteString
source
  ExitCode -> IO ()
forall a. ExitCode -> IO a
exitWith ExitCode
ExitSuccess

producer :: String -> ProcessT IO Text Message -> MachineT IO k ByteString -> ProcessT IO Event Event -> IO ()
producer :: String
-> ProcessT IO Text Message
-> MachineT IO k ByteString
-> ProcessT IO Event Event
-> IO ()
producer String
lbl ProcessT IO Text Message
process MachineT IO k ByteString
source ProcessT IO Event Event
sink = do
  MachineT IO k Event -> IO ()
forall (m :: * -> *) (k :: * -> *) b.
Monad m =>
MachineT m k b -> m ()
runT_ (MachineT IO k Event -> IO ()) -> MachineT IO k Event -> IO ()
forall a b. (a -> b) -> a -> b
$ MachineT IO (Is Message) Event
pipeline MachineT IO (Is Message) Event
-> MachineT IO k Message -> MachineT IO k Event
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ ProcessT IO Text Message
process ProcessT IO Text Message
-> MachineT IO k Text -> MachineT IO k Message
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ MachineT IO (Is Text) Text
cleaning MachineT IO (Is Text) Text
-> MachineT IO k Text -> MachineT IO k Text
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ (ByteString -> Text) -> Process ByteString Text
forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b
auto ByteString -> Text
decodeUtf8 MachineT IO (Is ByteString) Text
-> MachineT IO k ByteString -> MachineT IO k Text
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ MachineT IO k ByteString
source
  where
    pipeline :: MachineT IO (Is Message) Event
pipeline = ProcessT IO Event Event
sink ProcessT IO Event Event
-> MachineT IO (Is Message) Event -> MachineT IO (Is Message) Event
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ [Event] -> Process Event Event
forall (f :: * -> *) a. Foldable f => f a -> Process a a
prepended [Text -> Event
Start (Text -> Event) -> Text -> Event
forall a b. (a -> b) -> a -> b
$ String -> Text
Text.pack String
lbl] ProcessT IO Event Event
-> MachineT IO (Is Message) Event -> MachineT IO (Is Message) Event
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ (Message -> Event) -> Process Message Event
forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b
auto Message -> Event
Notify
    cleaning :: MachineT IO (Is Text) Text
cleaning = MachineT IO (Is [Text]) Text
forall (f :: * -> *) a. Foldable f => Process (f a) a
asParts MachineT IO (Is [Text]) Text
-> MachineT IO (Is Text) [Text] -> MachineT IO (Is Text) Text
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ (Either String (Text, Text) -> [Text])
-> Process (Either String (Text, Text)) [Text]
forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b
auto Either String (Text, Text) -> [Text]
forall a a a. Either a (a, a) -> [a]
unpack MachineT IO (Is (Either String (Text, Text))) [Text]
-> MachineT IO (Is Text) (Either String (Text, Text))
-> MachineT IO (Is Text) [Text]
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ Parser Text -> MachineT IO (Is Text) (Either String (Text, Text))
forall (m :: * -> *) a.
Monad m =>
Parser a -> ProcessT m Text (Either String (Text, a))
processParser Parser Text
cleaningCurses MachineT IO (Is Text) (Either String (Text, Text))
-> MachineT IO (Is Text) Text
-> MachineT IO (Is Text) (Either String (Text, Text))
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ (Text -> Text) -> Process Text Text
forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b
auto (\Text
txt -> (Text -> Text
cleanLine Text
txt) Text -> Char -> Text
`Text.snoc` Char
'\n')
      where
        unpack :: Either a (a, a) -> [a]
unpack (Right (a
_, a
txt)) = [a
txt]
        unpack (Left a
_) = []