module Hans.Layer.Tcp.WaitBuffer (
Wakeup
, tryAgain
, abort
, Incoming, Outgoing
, Buffer
, emptyBuffer
, shutdownWaiting
, isFull
, isEmpty
, flushWaiting
, writeBytes
, readBytes
, takeBytes
, putBytes
) where
import Control.Monad (guard)
import Data.Int (Int64)
import qualified Data.ByteString.Lazy as L
import qualified Data.Foldable as F
import qualified Data.Sequence as Seq
type Wakeup = Bool -> IO ()
tryAgain :: Wakeup -> IO ()
tryAgain f = f True
abort :: Wakeup -> IO ()
abort f = f False
data Incoming
data Outgoing
data Buffer d = Buffer
{ bufBytes :: L.ByteString
, bufWaiting :: Seq.Seq Wakeup
, bufSize :: !Int64
, bufAvailable :: !Int64
}
emptyBuffer :: Int64 -> Buffer d
emptyBuffer size = Buffer
{ bufBytes = L.empty
, bufWaiting = Seq.empty
, bufSize = size
, bufAvailable = size
}
isEmpty :: Buffer d -> Bool
isEmpty buf = bufAvailable buf == bufSize buf
isFull :: Buffer d -> Bool
isFull buf = bufAvailable buf == 0
flushWaiting :: Buffer d -> (IO (), Buffer d)
flushWaiting buf = (F.traverse_ abort (bufWaiting buf), buf { bufWaiting = Seq.empty })
queueWaiting :: Wakeup -> Buffer d -> Buffer d
queueWaiting wakeup buf = buf { bufWaiting = bufWaiting buf Seq.|> wakeup }
queueBytes :: L.ByteString -> Buffer d -> (Maybe Int64, Buffer d)
queueBytes bytes buf
| bufAvailable buf <= 0 = (Nothing,buf)
| otherwise = (Just qlen, buf')
where
queued = L.take (bufAvailable buf) bytes
qlen = L.length queued
buf' = buf
{ bufBytes = bufBytes buf `L.append` queued
, bufAvailable = bufAvailable buf qlen
}
removeBytes :: Int64 -> Buffer d -> Maybe (L.ByteString, Buffer d)
removeBytes len buf = do
guard (not (L.null (bufBytes buf)))
let (bytes,rest) = L.splitAt len (bufBytes buf)
buf' = buf
{ bufBytes = rest
, bufAvailable = bufAvailable buf + L.length bytes
}
return (bytes,buf')
shutdownWaiting :: Buffer d -> (IO (), Buffer d)
shutdownWaiting buf = (m,buf { bufWaiting = Seq.empty })
where
m = F.mapM_ abort (bufWaiting buf)
writeBytes :: L.ByteString -> Wakeup -> Buffer Outgoing
-> (Maybe Int64,Buffer Outgoing)
writeBytes bytes wakeup buf = case queueBytes bytes buf of
(Nothing,buf') -> (Nothing,queueWaiting wakeup buf')
res -> res
takeBytes :: Int64 -> Buffer Outgoing
-> Maybe (Maybe Wakeup,L.ByteString,Buffer Outgoing)
takeBytes len buf = do
(bytes,buf') <- removeBytes len buf
case Seq.viewl (bufWaiting buf') of
Seq.EmptyL -> return (Nothing, bytes, buf')
w Seq.:< ws -> return (Just w, bytes, buf' { bufWaiting = ws })
readBytes :: Int64 -> Wakeup -> Buffer Incoming
-> (Maybe L.ByteString, Buffer Incoming)
readBytes len wakeup buf =
case removeBytes len buf of
Just (bytes,buf') -> (Just bytes, buf')
Nothing -> (Nothing, queueWaiting wakeup buf)
putBytes :: L.ByteString -> Buffer Incoming
-> Maybe (Maybe Wakeup,Buffer Incoming)
putBytes bytes buf = do
let needed = L.length bytes + L.length (bufBytes buf)
guard (needed < bufSize buf)
let buf' = buf { bufBytes = bufBytes buf `L.append` bytes }
case Seq.viewl (bufWaiting buf') of
Seq.EmptyL -> return (Nothing, buf')
w Seq.:< ws -> return (Just w, buf' { bufWaiting = ws })