{-# LANGUAGE Rank2Types #-} module Sarsi.Consumer where import Codec.Sarsi (Event, getEvent) import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar) import Control.Exception (IOException, bracket, try) import Data.Binary.Machine (streamGet) import Data.Machine (asParts, auto, (<~)) import Network.Socket (connect, socketToHandle) import Sarsi (Broker (..), Topic (..), createSocket, getSockAddr) import System.FSNotify (eventPath, watchDir, withManager) import System.IO (IOMode (ReadMode), hClose, hWaitForInput) import System.IO.Machine (IOSource, byChunkOf, sourceHandle) consumeOrWait :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a consumeOrWait :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a consumeOrWait topic :: Topic topic@(Topic (Broker FilePath bp) FilePath tp) Maybe s -> IOSource Event -> IO (Either s a) f = do Either IOException a res <- Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO (Either IOException a) forall s a. Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO (Either IOException a) consume Topic topic Maybe s -> IOSource Event -> IO (Either s a) f (IOException -> IO a) -> (a -> IO a) -> Either IOException a -> IO a forall a c b. (a -> c) -> (b -> c) -> Either a b -> c either (IO a -> IOException -> IO a forall a b. a -> b -> a const (IO a -> IOException -> IO a) -> IO a -> IOException -> IO a forall a b. (a -> b) -> a -> b $ (WatchManager -> IO a) -> IO a forall a. (WatchManager -> IO a) -> IO a withManager WatchManager -> IO a waitAndRetry) a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return Either IOException a res where waitAndRetry :: WatchManager -> IO a waitAndRetry WatchManager mng = do MVar () lck <- IO (MVar ()) forall a. IO (MVar a) newEmptyMVar StopListening stop <- WatchManager -> FilePath -> ActionPredicate -> Action -> IO StopListening watchDir WatchManager mng FilePath bp ActionPredicate pred' (Action -> IO StopListening) -> Action -> IO StopListening forall a b. (a -> b) -> a -> b $ StopListening -> Action forall a b. a -> b -> a const (StopListening -> Action) -> StopListening -> Action forall a b. (a -> b) -> a -> b $ MVar () -> () -> StopListening forall a. MVar a -> a -> StopListening putMVar MVar () lck () MVar () -> StopListening forall a. MVar a -> IO a takeMVar MVar () lck StopListening stop Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a forall s a. Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a consumeOrWait Topic topic Maybe s -> IOSource Event -> IO (Either s a) f pred' :: ActionPredicate pred' Event e = Event -> FilePath eventPath Event e FilePath -> FilePath -> Bool forall a. Eq a => a -> a -> Bool == FilePath tp consume :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO (Either IOException a) consume :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO (Either IOException a) consume Topic topic Maybe s -> IOSource Event -> IO (Either s a) f = IO a -> IO (Either IOException a) forall e a. Exception e => IO a -> IO (Either e a) try (IO a -> IO (Either IOException a)) -> IO a -> IO (Either IOException a) forall a b. (a -> b) -> a -> b $ Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a forall s a. Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a consume' Topic topic Maybe s -> IOSource Event -> IO (Either s a) f consume' :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a consume' :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a consume' Topic topic Maybe s -> IOSource Event -> IO (Either s a) f = IO Handle -> (Handle -> StopListening) -> (Handle -> IO a) -> IO a forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c bracket IO Handle createHandle Handle -> StopListening hClose (Maybe s -> Handle -> IO a process Maybe s forall a. Maybe a Nothing) where createHandle :: IO Handle createHandle = do Socket sock <- IO Socket createSocket Socket -> SockAddr -> StopListening connect Socket sock (SockAddr -> StopListening) -> SockAddr -> StopListening forall a b. (a -> b) -> a -> b $ Topic -> SockAddr getSockAddr Topic topic Socket -> IOMode -> IO Handle socketToHandle Socket sock IOMode ReadMode process :: Maybe s -> Handle -> IO a process Maybe s s Handle h = do Either s a sa <- Maybe s -> IOSource Event -> IO (Either s a) f Maybe s s (IOSource Event -> IO (Either s a)) -> IOSource Event -> IO (Either s a) forall a b. (a -> b) -> a -> b $ MachineT IO (Is [Event]) Event forall (f :: * -> *) a. Foldable f => Process (f a) a asParts MachineT IO (Is [Event]) Event -> MachineT IO k [Event] -> MachineT IO k Event forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ (Either DecodingError Event -> [Event]) -> Process (Either DecodingError Event) [Event] forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b auto Either DecodingError Event -> [Event] forall a a. Either a a -> [a] unpack MachineT IO (Is (Either DecodingError Event)) [Event] -> MachineT IO k (Either DecodingError Event) -> MachineT IO k [Event] forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ Get Event -> Process ByteString (Either DecodingError Event) forall a. Get a -> Process ByteString (Either DecodingError a) streamGet Get Event getEvent MachineT IO (Is ByteString) (Either DecodingError Event) -> MachineT IO k ByteString -> MachineT IO k (Either DecodingError Event) forall (m :: * -> *) b c (k :: * -> *). Monad m => ProcessT m b c -> MachineT m k b -> MachineT m k c <~ DataModeIO IO ByteString -> Handle -> SourceIO IO ByteString forall (m :: * -> *) a. DataModeIO m a -> Handle -> SourceIO m a sourceHandle (Int -> IODataMode ByteString byChunkOf Int 1) Handle h Bool _ <- Handle -> Int -> IO Bool hWaitForInput Handle h (-Int 1) (s -> IO a) -> (a -> IO a) -> Either s a -> IO a forall a c b. (a -> c) -> (b -> c) -> Either a b -> c either (Handle -> s -> IO a continue Handle h) a -> IO a forall (m :: * -> *) a. Monad m => a -> m a return Either s a sa where continue :: Handle -> s -> IO a continue Handle h' s s' = Maybe s -> Handle -> IO a process (s -> Maybe s forall a. a -> Maybe a Just s s') Handle h' unpack :: Either a a -> [a] unpack (Right a e) = [a e] unpack (Left a _) = []