{-# OPTIONS_GHC -fglasgow-exts #-} -- | Synchronous message receiving. module System.Miniplex.Source ( Source, attach, attachWait, read, getMsgs, detach, withSource, withSourceWait ) where import System.Miniplex.Sekrit import Prelude hiding (read, catch) import Control.Concurrent import Control.Exception import Control.Monad import Control.Monad.Fix import Control.Monad.Reader () import Data.Typeable import Network.Socket import System.IO.Lock import System.IO.Unsafe import System.Posix.Files import System.Posix.IO newtype Source = Source { sock :: Socket } deriving (Typeable) -- | @'attach' tag@ returns a message source connected to the sink created by -- a call to @'System.Miniplex.Sink.create' tag@. If no such sink exists, an -- exception is thrown. attach :: String -> IO Source attach what = do (n, _, _) <- pathFromTag "System.Miniplex.Source.attach" what bracketOnError (socket AF_UNIX Stream 0) sClose $ \s -> do closeOnExec s connect s (SockAddrUnix n) shutdown s ShutdownSend return $ Source s -- | Similar to 'attach', but if the specified sink doesn't exist, 'attachWait' -- blocks until it becomes available. attachWait :: String -> IO Source attachWait what = do (com, lck, ret) <- pathFromTag "System.Miniplex.Source.attachWait" what bracketOnError (socket AF_UNIX Stream 0) sClose $ \s -> do closeOnExec s block . fix $ \retry -> do ld <- do bracket (openFd lck ReadOnly (Just mode644) defaultFileFlags) closeFd $ \lf -> do setLockAll lf LockRead x <- tryJust ioErrors (unblock $ connect s (SockAddrUnix com)) `catch` \e -> do unLock ld throwIO e case x of Right () -> unLock ld Left _ -> do wf <- flip finally (unLock ld) $ unblock $ do handleJust eexists (const $ return ()) $ createNamedPipe ret mode644 openFd ret ReadOnly Nothing defaultFileFlags{ nonBlock = True } unblock (threadWaitRead wf) `finally` closeFd wf retry shutdown s ShutdownSend return $ Source s -- | Synchronously reads a message from a source (i.e. it blocks if there is -- currently no message available). read :: Source -> IO String read so = do n <- liftM intFromBytes $ reallyRecv s 4 reallyRecv s n where s = sock so -- | Returns a lazy list of all messages arriving at a source (like -- @'System.IO.hGetContents'@). getMsgs :: Source -> IO [String] getMsgs so = unsafeInterleaveIO . handle (\_ -> detach so >> return []) $ liftM2 (liftM2 (:)) read getMsgs so -- | Disconnects from a message sink. The detached source becomes invalid -- and must not be used again. detach :: Source -> IO () detach so = do sClose (sock so) -- | Helper function to simplify resource handling. @'withSource' tag body@ -- creates a source, calls @body@, then disconnects the source, even if -- @body@ throws an exception. withSource :: String -> (Source -> IO a) -> IO a withSource tag f = block $ do so <- attach tag unblock (f so) `finally` detach so -- | Similar to 'withSource', but calls 'attachWait' instead of 'attach'. withSourceWait :: String -> (Source -> IO a) -> IO a withSourceWait tag f = block $ do so <- unblock $ attachWait tag unblock (f so) `finally` detach so