{-# LANGUAGE Rank2Types #-}

module Sarsi.Consumer where

import Codec.Sarsi (Event, getEvent)
import Control.Concurrent.MVar (newEmptyMVar, putMVar, takeMVar)
import Control.Exception (IOException, bracket, try)
import Data.Binary.Machine (streamGet)
import Data.Machine (asParts, auto, (<~))
import Network.Socket (connect, socketToHandle)
import Sarsi (Broker (..), Topic (..), createSocket, getSockAddr)
import System.FSNotify (eventPath, watchDir, withManager)
import System.IO (IOMode (ReadMode), hClose, hWaitForInput)
import System.IO.Machine (IOSource, byChunkOf, sourceHandle)

consumeOrWait :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
consumeOrWait :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
consumeOrWait topic :: Topic
topic@(Topic (Broker FilePath
bp) FilePath
tp) Maybe s -> IOSource Event -> IO (Either s a)
f = do
  Either IOException a
res <- Topic
-> (Maybe s -> IOSource Event -> IO (Either s a))
-> IO (Either IOException a)
forall s a.
Topic
-> (Maybe s -> IOSource Event -> IO (Either s a))
-> IO (Either IOException a)
consume Topic
topic Maybe s -> IOSource Event -> IO (Either s a)
f
  (IOException -> IO a)
-> (a -> IO a) -> Either IOException a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (IO a -> IOException -> IO a
forall a b. a -> b -> a
const (IO a -> IOException -> IO a) -> IO a -> IOException -> IO a
forall a b. (a -> b) -> a -> b
$ (WatchManager -> IO a) -> IO a
forall a. (WatchManager -> IO a) -> IO a
withManager WatchManager -> IO a
waitAndRetry) a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Either IOException a
res
  where
    waitAndRetry :: WatchManager -> IO a
waitAndRetry WatchManager
mng = do
      MVar ()
lck <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
      StopListening
stop <- WatchManager
-> FilePath -> ActionPredicate -> Action -> IO StopListening
watchDir WatchManager
mng FilePath
bp ActionPredicate
pred' (Action -> IO StopListening) -> Action -> IO StopListening
forall a b. (a -> b) -> a -> b
$ StopListening -> Action
forall a b. a -> b -> a
const (StopListening -> Action) -> StopListening -> Action
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> StopListening
forall a. MVar a -> a -> StopListening
putMVar MVar ()
lck ()
      MVar () -> StopListening
forall a. MVar a -> IO a
takeMVar MVar ()
lck
      StopListening
stop
      Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
forall s a.
Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
consumeOrWait Topic
topic Maybe s -> IOSource Event -> IO (Either s a)
f
    pred' :: ActionPredicate
pred' Event
e = Event -> FilePath
eventPath Event
e FilePath -> FilePath -> Bool
forall a. Eq a => a -> a -> Bool
== FilePath
tp

consume :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO (Either IOException a)
consume :: Topic
-> (Maybe s -> IOSource Event -> IO (Either s a))
-> IO (Either IOException a)
consume Topic
topic Maybe s -> IOSource Event -> IO (Either s a)
f = IO a -> IO (Either IOException a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO (Either IOException a))
-> IO a -> IO (Either IOException a)
forall a b. (a -> b) -> a -> b
$ Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
forall s a.
Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
consume' Topic
topic Maybe s -> IOSource Event -> IO (Either s a)
f

consume' :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
consume' :: Topic -> (Maybe s -> IOSource Event -> IO (Either s a)) -> IO a
consume' Topic
topic Maybe s -> IOSource Event -> IO (Either s a)
f = IO Handle -> (Handle -> StopListening) -> (Handle -> IO a) -> IO a
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO Handle
createHandle Handle -> StopListening
hClose (Maybe s -> Handle -> IO a
process Maybe s
forall a. Maybe a
Nothing)
  where
    createHandle :: IO Handle
createHandle = do
      Socket
sock <- IO Socket
createSocket
      Socket -> SockAddr -> StopListening
connect Socket
sock (SockAddr -> StopListening) -> SockAddr -> StopListening
forall a b. (a -> b) -> a -> b
$ Topic -> SockAddr
getSockAddr Topic
topic
      Socket -> IOMode -> IO Handle
socketToHandle Socket
sock IOMode
ReadMode
    process :: Maybe s -> Handle -> IO a
process Maybe s
s Handle
h = do
      Either s a
sa <- Maybe s -> IOSource Event -> IO (Either s a)
f Maybe s
s (IOSource Event -> IO (Either s a))
-> IOSource Event -> IO (Either s a)
forall a b. (a -> b) -> a -> b
$ MachineT IO (Is [Event]) Event
forall (f :: * -> *) a. Foldable f => Process (f a) a
asParts MachineT IO (Is [Event]) Event
-> MachineT IO k [Event] -> MachineT IO k Event
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ (Either DecodingError Event -> [Event])
-> Process (Either DecodingError Event) [Event]
forall (k :: * -> * -> *) a b. Automaton k => k a b -> Process a b
auto Either DecodingError Event -> [Event]
forall a a. Either a a -> [a]
unpack MachineT IO (Is (Either DecodingError Event)) [Event]
-> MachineT IO k (Either DecodingError Event)
-> MachineT IO k [Event]
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ Get Event -> Process ByteString (Either DecodingError Event)
forall a. Get a -> Process ByteString (Either DecodingError a)
streamGet Get Event
getEvent MachineT IO (Is ByteString) (Either DecodingError Event)
-> MachineT IO k ByteString
-> MachineT IO k (Either DecodingError Event)
forall (m :: * -> *) b c (k :: * -> *).
Monad m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~ DataModeIO IO ByteString -> Handle -> SourceIO IO ByteString
forall (m :: * -> *) a. DataModeIO m a -> Handle -> SourceIO m a
sourceHandle (Int -> IODataMode ByteString
byChunkOf Int
1) Handle
h
      Bool
_ <- Handle -> Int -> IO Bool
hWaitForInput Handle
h (-Int
1)
      (s -> IO a) -> (a -> IO a) -> Either s a -> IO a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (Handle -> s -> IO a
continue Handle
h) a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Either s a
sa
      where
        continue :: Handle -> s -> IO a
continue Handle
h' s
s' = Maybe s -> Handle -> IO a
process (s -> Maybe s
forall a. a -> Maybe a
Just s
s') Handle
h'
    unpack :: Either a a -> [a]
unpack (Right a
e) = [a
e]
    unpack (Left a
_) = []