-- | -- Module: Data.Enumerator.NetLines.IO -- Copyright: (c) 2011 Ertugrul Soeylemez -- License: BSD3 -- Maintainer: Ertugrul Soeylemez -- -- Efficient input and output with timeouts. {-# LANGUAGE ScopedTypeVariables #-} module Data.Enumerator.NetLines.IO ( -- * Enumerators (input) enumHandleSession, enumHandleTimeout, -- * Iteratees (output) iterHandleTimeout ) where import Control.ContStuff as Monad import Data.Enumerator as E import Data.Enumerator.NetLines.Class as NL import Data.Enumerator.NetLines.Error import Data.Time.Clock import System.IO import System.IO.Error as IO import System.Timeout -- | Enumerate from a handle with the given buffer size (first -- argument), read timeout in milliseconds (second argument) and session -- timeout in milliseconds (third argument). If either timeout is -- exceeded a 'TimeoutError' exception is thrown via 'throwError'. enumHandleSession :: forall b m str. (MonadIO m, Splittable str) => Int -> Int -> Int -> Handle -> Enumerator str m b enumHandleSession bufSize readTime sessionTime h step = do startTime <- liftIO getCurrentTime loop startTime step where loop :: UTCTime -> Enumerator str m b loop startTime (Continue k) = do now <- liftIO getCurrentTime let timeoutErr = TimeoutError "Read timeout" diff = sessionTime - round (1000 * diffUTCTime now startTime) timeout = min diff readTime when (timeout <= 0) $ throwError timeoutErr mHaveInput <- liftIO $ IO.try (hWaitForInput h timeout) case mHaveInput of Left err | isEOFError err -> continue k | otherwise -> throwError err Right False -> throwError timeoutErr Right True -> do str <- tryIO $ NL.hGetNonBlocking h bufSize if NL.null str then continue k else k (Chunks [str]) >>== loop startTime loop _ step = returnI step -- | Enumerate from a handle with the given buffer size (first argument) -- and timeout in milliseconds (second argument). If the timeout is -- exceeded a 'TimeoutError' exception is thrown via 'throwError'. -- -- Note that this timeout is not a timeout for the whole enumeration, -- but for each individual read operation. In other words, this timeout -- protects against dead/unresponsive peers, but not against (perhaps -- intentionally) slowly sending peers. enumHandleTimeout :: forall b m str. (MonadIO m, Splittable str) => Int -> Int -> Handle -> Enumerator str m b enumHandleTimeout bufSize timeout h = loop where loop :: Enumerator str m b loop (Continue k) = do mHaveInput <- liftIO $ IO.try (hWaitForInput h timeout) case mHaveInput of Left err | isEOFError err -> continue k | otherwise -> throwError err Right False -> throwError $ TimeoutError "Read timeout" Right True -> do str <- tryIO $ NL.hGetNonBlocking h bufSize if NL.null str then continue k else k (Chunks [str]) >>== loop loop step = returnI step -- | Writes its inputs to the given handle. Times out after the given -- number of milliseconds with a 'TimeoutError' iteratee exception. The -- handle should be unbuffered and in binary mode. See 'hSetBuffering' -- and 'hSetBinaryMode'. -- -- Please note that only the write operations themselves are timed. -- Most notably the operation of the data source enumerator is *not* -- timed. Hence the operation may time out later than the given time -- margin, but never earlier. iterHandleTimeout :: forall m str. (MonadIO m, Splittable str) => Int -> Handle -> Iteratee str m () iterHandleTimeout maxTime h = do startTime <- tryIO getCurrentTime continue (loop startTime) where loop :: UTCTime -> Stream str -> Iteratee str m () loop _ EOF = yield () EOF loop startTime (Chunks []) = continue (loop startTime) loop startTime (Chunks strs) = do let timeoutErr = TimeoutError "Write timeout" now <- tryIO getCurrentTime let restMs = 1000*maxTime - round (1000000 * diffUTCTime now startTime) when (restMs <= 0) (throwError timeoutErr) tryIO (timeout restMs (mapM_ (NL.hPutStr h) strs)) >>= maybe (throwError timeoutErr) return continue (loop startTime)