{-# LANGUAGE Rank2Types #-}

module Sarsi.Producer where

import Codec.Sarsi (Event (..), Level (..), Message (..), putEvent)
import Control.Concurrent.Async (async, cancel, wait)
import Control.Concurrent.Chan (dupChan, newChan, readChan, writeChan)
import Control.Concurrent.MVar (modifyMVar_, newMVar, readMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TQueue (newTQueue, tryReadTQueue, writeTQueue)
import Control.Exception (IOException, bracket, tryJust)
import Data.Binary.Machine (processPut)
import Data.List (foldl')
import Data.Machine (ProcessT, prepended, runT_, (<~))
import Data.Machine.Process (takingJusts)
import Network.Socket (Socket, accept, bind, close, listen, socketToHandle)
import Sarsi (Topic, createSockAddr, createSocket, removeTopic, title)
import System.Console.ANSI
import System.IO (Handle, IOMode (WriteMode), hClose)
import System.IO.Machine (byChunk, sinkHandle, sinkIO, sourceIO)

finishPrint :: Int -> Int -> IO ()
finishPrint :: Int -> Int -> IO ()
finishPrint Int
e Int
w = do
  [SGR] -> IO ()
setSGR (Int -> Int -> [SGR]
forall a a. (Eq a, Eq a, Num a, Num a) => a -> a -> [SGR]
sgr Int
e Int
w)
  String -> IO ()
putStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
title
  [SGR] -> IO ()
setSGR [SGR
Reset]
  String -> IO ()
putStr (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
": "
  String -> IO ()
putStrLn (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ Event -> String
forall a. Show a => a -> String
show Event
event
  where
    sgr :: a -> a -> [SGR]
sgr a
0 a
0 = [ConsoleLayer -> ColorIntensity -> Color -> SGR
SetColor ConsoleLayer
Foreground ColorIntensity
Dull Color
Green]
    sgr a
0 a
_ = [ConsoleLayer -> ColorIntensity -> Color -> SGR
SetColor ConsoleLayer
Foreground ColorIntensity
Dull Color
Yellow]
    sgr a
_ a
_ = [ConsoleLayer -> ColorIntensity -> Color -> SGR
SetColor ConsoleLayer
Foreground ColorIntensity
Vivid Color
Red]
    event :: Event
event = Int -> Int -> Event
Finish Int
e Int
w

finishCreate :: [Event] -> (Int, Int)
finishCreate :: [Event] -> (Int, Int)
finishCreate [Event]
xs = ((Int, Int) -> Event -> (Int, Int))
-> (Int, Int) -> [Event] -> (Int, Int)
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
foldl' (Int, Int) -> Event -> (Int, Int)
forall b a. (Num b, Num a) => (a, b) -> Event -> (a, b)
f (Int, Int)
empty [Event]
xs
  where
    empty :: (Int, Int)
empty = (Int
0, Int
0)
    f :: (a, b) -> Event -> (a, b)
f (a
e, b
w) (Notify (Message Location
_ Level
Warning [Text]
_)) = (a
e, (b
w b -> b -> b
forall a. Num a => a -> a -> a
+ b
1))
    f (a
e, b
w) (Notify (Message Location
_ Level
Error [Text]
_)) = ((a
e a -> a -> a
forall a. Num a => a -> a -> a
+ a
1), b
w)
    f (a, b)
finish Event
_ = (a, b)
finish

produce :: Topic -> (ProcessT IO Event Event -> IO a) -> IO a
produce :: Topic -> (ProcessT IO Event Event -> IO a) -> IO a
produce Topic
t ProcessT IO Event Event -> IO a
f = do
  TQueue (Async ())
conns <- STM (TQueue (Async ())) -> IO (TQueue (Async ()))
forall a. STM a -> IO a
atomically (STM (TQueue (Async ())) -> IO (TQueue (Async ())))
-> STM (TQueue (Async ())) -> IO (TQueue (Async ()))
forall a b. (a -> b) -> a -> b
$ STM (TQueue (Async ()))
forall a. STM (TQueue a)
newTQueue
  Chan (Maybe Event)
chan <- IO (Chan (Maybe Event))
forall a. IO (Chan a)
newChan
  MVar [Event]
state <- [Event] -> IO (MVar [Event])
forall a. a -> IO (MVar a)
newMVar []
  Async Any
server <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ IO Socket -> (Socket -> IO ()) -> (Socket -> IO Any) -> IO Any
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO Socket
bindSock Socket -> IO ()
close ((Handle -> IO (Maybe Any)) -> Socket -> IO Any
forall a. (Handle -> IO (Maybe a)) -> Socket -> IO a
serve (TQueue (Async ())
-> Chan (Maybe Event) -> MVar [Event] -> Handle -> IO (Maybe Any)
forall a.
TQueue (Async ())
-> Chan (Maybe Event) -> MVar [Event] -> Handle -> IO (Maybe a)
process TQueue (Async ())
conns Chan (Maybe Event)
chan MVar [Event]
state))
  Async a
feeder <- IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async (IO a -> IO (Async a)) -> IO a -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ ProcessT IO Event Event -> IO a
f ((Event -> IO ()) -> SinkIO IO Event
forall a (m :: * -> *). (a -> IO ()) -> SinkIO m a
sinkIO ((Event -> IO ()) -> SinkIO IO Event)
-> (Event -> IO ()) -> SinkIO IO Event
forall a b. (a -> b) -> a -> b
$ Chan (Maybe Event) -> MVar [Event] -> Event -> IO ()
feed Chan (Maybe Event)
chan MVar [Event]
state)
  a
a <- Async a -> IO a
forall a. Async a -> IO a
wait Async a
feeder
  [Event]
es <- MVar [Event] -> IO [Event]
forall a. MVar a -> IO a
readMVar MVar [Event]
state
  let (Int
errs, Int
warns) = [Event] -> (Int, Int)
finishCreate [Event]
es
  Chan (Maybe Event) -> Maybe Event -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe Event)
chan (Maybe Event -> IO ()) -> Maybe Event -> IO ()
forall a b. (a -> b) -> a -> b
$ Event -> Maybe Event
forall a. a -> Maybe a
Just (Event -> Maybe Event) -> Event -> Maybe Event
forall a b. (a -> b) -> a -> b
$ Int -> Int -> Event
Finish Int
errs Int
warns
  Chan (Maybe Event) -> Maybe Event -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe Event)
chan (Maybe Event -> IO ()) -> Maybe Event -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe Event
forall a. Maybe a
Nothing
  Int -> Int -> IO ()
finishPrint Int
errs Int
warns
  TQueue (Async ()) -> IO ()
waitFinish TQueue (Async ())
conns
  Async Any -> IO ()
forall a. Async a -> IO ()
cancel Async Any
server
  Topic -> IO ()
removeTopic Topic
t
  a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
  where
    bindSock :: IO Socket
bindSock = do
      Socket
sock <- IO Socket
createSocket
      SockAddr
addr <- Topic -> IO SockAddr
createSockAddr Topic
t
      Socket -> SockAddr -> IO ()
bind Socket
sock SockAddr
addr
      Socket -> Int -> IO ()
listen Socket
sock Int
1
      Socket -> IO Socket
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
    process :: TQueue (Async ())
-> Chan (Maybe Event) -> MVar [Event] -> Handle -> IO (Maybe a)
process TQueue (Async ())
conns Chan (Maybe Event)
chan' MVar [Event]
state Handle
h = do
      Chan (Maybe Event)
chan <- Chan (Maybe Event) -> IO (Chan (Maybe Event))
forall a. Chan a -> IO (Chan a)
dupChan Chan (Maybe Event)
chan'
      [Event]
es <- MVar [Event] -> IO [Event]
forall a. MVar a -> IO a
readMVar MVar [Event]
state
      Async ()
conn <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ do
        MachineT IO Any Any -> IO ()
forall (m :: * -> *) (k :: * -> *) b.
Monad m =>
MachineT m k b -> m ()
runT_ (MachineT IO Any Any -> IO ()) -> MachineT IO Any Any -> IO ()
forall a b. (a -> b) -> a -> b
$ IODataMode ByteString -> Handle -> SinkIO IO ByteString
forall a (m :: * -> *). IODataMode a -> Handle -> SinkIO m a
sinkHandle IODataMode ByteString
forall a. IOData a => IODataMode a
byChunk Handle
h ProcessT IO ByteString Any
-> MachineT IO Any ByteString -> MachineT IO Any Any
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ (Event -> Put) -> ProcessT IO Event ByteString
forall (m :: * -> *) a.
Monad m =>
(a -> Put) -> ProcessT m a ByteString
processPut Event -> Put
putEvent ProcessT IO Event ByteString
-> MachineT IO Any Event -> MachineT IO Any ByteString
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ ([Event] -> Process Event Event
forall (f :: * -> *) a. Foldable f => f a -> Process a a
prepended ([Event] -> Process Event Event) -> [Event] -> Process Event Event
forall a b. (a -> b) -> a -> b
$ [Event] -> [Event]
forall a. [a] -> [a]
reverse [Event]
es) ProcessT IO Event Event
-> MachineT IO Any Event -> MachineT IO Any Event
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ MachineT IO (Is (Maybe Event)) Event
forall a. Process (Maybe a) a
takingJusts MachineT IO (Is (Maybe Event)) Event
-> MachineT IO Any (Maybe Event) -> MachineT IO Any Event
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ (IO (Maybe Event) -> SourceIO IO (Maybe Event)
forall a (m :: * -> *). IO a -> SourceIO m a
sourceIO (IO (Maybe Event) -> SourceIO IO (Maybe Event))
-> IO (Maybe Event) -> SourceIO IO (Maybe Event)
forall a b. (a -> b) -> a -> b
$ Chan (Maybe Event) -> IO (Maybe Event)
forall a. Chan a -> IO a
readChan Chan (Maybe Event)
chan)
        Handle -> IO ()
hClose Handle
h
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (Async ()) -> Async () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Async ())
conns Async ()
conn
      Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    feed :: Chan (Maybe Event) -> MVar [Event] -> Event -> IO ()
feed Chan (Maybe Event)
chan MVar [Event]
state Event
e = do
      MVar [Event] -> ([Event] -> IO [Event]) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar [Event]
state (([Event] -> IO [Event]) -> IO ())
-> ([Event] -> IO [Event]) -> IO ()
forall a b. (a -> b) -> a -> b
$ case Event
e of
        (Start Text
_) -> IO [Event] -> [Event] -> IO [Event]
forall a b. a -> b -> a
const (IO [Event] -> [Event] -> IO [Event])
-> IO [Event] -> [Event] -> IO [Event]
forall a b. (a -> b) -> a -> b
$ [Event] -> IO [Event]
forall (m :: * -> *) a. Monad m => a -> m a
return [Event
e]
        Event
_ -> [Event] -> IO [Event]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Event] -> IO [Event])
-> ([Event] -> [Event]) -> [Event] -> IO [Event]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (:) Event
e
      Chan (Maybe Event) -> Maybe Event -> IO ()
forall a. Chan a -> a -> IO ()
writeChan Chan (Maybe Event)
chan (Maybe Event -> IO ()) -> Maybe Event -> IO ()
forall a b. (a -> b) -> a -> b
$ Event -> Maybe Event
forall a. a -> Maybe a
Just Event
e
    waitFinish :: TQueue (Async ()) -> IO ()
waitFinish TQueue (Async ())
conns = do
      Maybe (Async ())
conn <- STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a. STM a -> IO a
atomically (STM (Maybe (Async ())) -> IO (Maybe (Async ())))
-> STM (Maybe (Async ())) -> IO (Maybe (Async ()))
forall a b. (a -> b) -> a -> b
$ TQueue (Async ()) -> STM (Maybe (Async ()))
forall a. TQueue a -> STM (Maybe a)
tryReadTQueue TQueue (Async ())
conns
      Either () ()
_ <- (IOException -> Maybe ()) -> IO () -> IO (Either () ())
forall e b a.
Exception e =>
(e -> Maybe b) -> IO a -> IO (Either b a)
tryJust IOException -> Maybe ()
io (IO () -> IO (Either () ())) -> IO () -> IO (Either () ())
forall a b. (a -> b) -> a -> b
$ IO () -> (Async () -> IO ()) -> Maybe (Async ()) -> IO ()
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) Async () -> IO ()
forall a. Async a -> IO a
wait Maybe (Async ())
conn
      () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
      where
        io :: IOException -> Maybe ()
        io :: IOException -> Maybe ()
io IOException
_ = () -> Maybe ()
forall a. a -> Maybe a
Just ()

serve :: (Handle -> IO (Maybe a)) -> Socket -> IO a
serve :: (Handle -> IO (Maybe a)) -> Socket -> IO a
serve Handle -> IO (Maybe a)
f Socket
sock = IO Handle -> (Handle -> IO ()) -> (Handle -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO Handle
acceptHandle Handle -> IO ()
hClose Handle -> IO a
process
  where
    acceptHandle :: IO Handle
acceptHandle = do
      (Socket
conn, SockAddr
_) <- Socket -> IO (Socket, SockAddr)
accept Socket
sock
      Handle
h <- Socket -> IOMode -> IO Handle
socketToHandle Socket
conn IOMode
WriteMode
      Handle -> IO Handle
forall (m :: * -> *) a. Monad m => a -> m a
return Handle
h
    process :: Handle -> IO a
process Handle
h = do
      Maybe a
a <- Handle -> IO (Maybe a)
f Handle
h
      IO a -> (a -> IO a) -> Maybe a -> IO a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((Handle -> IO (Maybe a)) -> Socket -> IO a
forall a. (Handle -> IO (Maybe a)) -> Socket -> IO a
serve Handle -> IO (Maybe a)
f Socket
sock) a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
a