module Control.Pipeline (
Pipe, newPipe, send, call,
Size,
Length(..),
Resource(..),
Flush(..),
Stream(..), getN
) where
import Prelude hiding (length)
import Control.Applicative ((<$>))
import Control.Monad (forever)
import Control.Exception (assert)
import System.IO.Error (try, mkIOError, eofErrorType)
import System.IO (Handle, hFlush, hClose, hIsClosed)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import Data.Monoid (Monoid(..))
import Control.Concurrent (ThreadId, forkIO, killThread)
import Control.Concurrent.MVar
import Control.Concurrent.Chan
type Size = Int
class Length list where
length :: list -> Size
instance Length S.ByteString where
length = S.length
instance Length L.ByteString where
length = fromEnum . L.length
class Resource m r where
close :: r -> m ()
isClosed :: r -> m Bool
instance Resource IO Handle where
close = hClose
isClosed = hIsClosed
class Flush handle where
flush :: handle -> IO ()
instance Flush Handle where
flush = hFlush
class (Length bytes, Monoid bytes, Flush handle) => Stream handle bytes where
put :: handle -> bytes -> IO ()
get :: handle -> Int -> IO bytes
getN :: (Stream h b) => h -> Int -> IO b
getN h n = assert (n >= 0) $ do
bytes <- get h n
let x = length bytes
if x >= n then return bytes
else if x == 0 then ioError (mkIOError eofErrorType "Control.Pipeline" Nothing Nothing)
else mappend bytes <$> getN h (n x)
instance Stream Handle S.ByteString where
put = S.hPut
get = S.hGet
instance Stream Handle L.ByteString where
put = L.hPut
get = L.hGet
data Pipe handle bytes = Pipe {
encodeSize :: Size -> bytes,
decodeSize :: bytes -> Size,
vHandle :: MVar handle,
responseQueue :: Chan (MVar (Either IOError bytes)),
listenThread :: ThreadId
}
newPipe :: (Stream h b, Resource IO h) =>
(Size -> b)
-> (b -> Size)
-> h
-> IO (Pipe h b)
newPipe encodeSize decodeSize handle = do
vHandle <- newMVar handle
responseQueue <- newChan
rec
let pipe = Pipe{..}
listenThread <- forkIO (listen pipe)
addMVarFinalizer vHandle $ do
killThread listenThread
close handle
return pipe
instance (Resource IO h) => Resource IO (Pipe h b) where
close Pipe{..} = do
killThread listenThread
close =<< readMVar vHandle
isClosed Pipe{..} = isClosed =<< readMVar vHandle
listen :: (Stream h b) => Pipe h b -> IO ()
listen Pipe{..} = do
let n = length (encodeSize 0)
h <- readMVar vHandle
forever $ do
e <- try $ do
len <- decodeSize <$> getN h n
getN h len
var <- readChan responseQueue
putMVar var e
send :: (Stream h b) => Pipe h b -> [b] -> IO ()
send Pipe{..} messages = withMVar vHandle $ \h -> do
mapM_ (write encodeSize h) messages
flush h
call :: (Stream h b) => Pipe h b -> [b] -> IO (IO b)
call Pipe{..} messages = withMVar vHandle $ \h -> do
mapM_ (write encodeSize h) messages
flush h
var <- newEmptyMVar
writeChan responseQueue var
return (either ioError return =<< readMVar var)
write :: (Stream h b, Monoid b, Length b) => (Size -> b) -> h -> b -> IO ()
write encodeSize h bytes = put h (mappend lenBytes bytes) where lenBytes = encodeSize (length bytes)