{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
module Data.Binary.IO.Lifted
(
ReaderError (..)
, Reader (..)
, newReader
, newReaderWith
, mapReader
, Writer (..)
, newWriter
, newWriterWith
, mapWriter
, newPipe
, Duplex (..)
, newDuplex
, newDuplexWith
, mapDuplex
, CanGet (..)
, read
, isEmpty
, CanPut (..)
, write
)
where
import Prelude hiding (read)
import Control.Arrow ((&&&))
import qualified Control.Concurrent.Classy as Concurrent
import Control.Monad (join, unless)
import qualified Control.Monad.Catch as Catch
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class (MonadTrans (lift))
import Control.Monad.Trans.Except (ExceptT, except, runExceptT)
import qualified Data.Binary as Binary
import qualified Data.Binary.Get as Get
import Data.Binary.IO.Internal.AwaitNotify (newAwaitNotify, runAwait, runNotify)
import qualified Data.Binary.Put as Put
import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString
import Data.ByteString.Lazy (toStrict)
import Data.IORef (atomicModifyIORef', mkWeakIORef, newIORef)
import qualified Deque.Strict as Deque
import System.IO (Handle, hSetBinaryMode)
import System.Mem.Weak (deRefWeak)
data ReaderError = ReaderGetError
{ ReaderError -> ByteString
readerErrorRemaining :: !ByteString
, ReaderError -> ByteOffset
readerErrorOffset :: !Get.ByteOffset
, ReaderError -> ByteString
readerErrorInput :: !ByteString
, ReaderError -> String
readerErrorMessage :: !String
}
deriving stock Int -> ReaderError -> ShowS
[ReaderError] -> ShowS
ReaderError -> String
(Int -> ReaderError -> ShowS)
-> (ReaderError -> String)
-> ([ReaderError] -> ShowS)
-> Show ReaderError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReaderError] -> ShowS
$cshowList :: [ReaderError] -> ShowS
show :: ReaderError -> String
$cshow :: ReaderError -> String
showsPrec :: Int -> ReaderError -> ShowS
$cshowsPrec :: Int -> ReaderError -> ShowS
Show
deriving anyclass Show ReaderError
Typeable ReaderError
Typeable ReaderError
-> Show ReaderError
-> (ReaderError -> SomeException)
-> (SomeException -> Maybe ReaderError)
-> (ReaderError -> String)
-> Exception ReaderError
SomeException -> Maybe ReaderError
ReaderError -> String
ReaderError -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ReaderError -> String
$cdisplayException :: ReaderError -> String
fromException :: SomeException -> Maybe ReaderError
$cfromException :: SomeException -> Maybe ReaderError
toException :: ReaderError -> SomeException
$ctoException :: ReaderError -> SomeException
$cp2Exception :: Show ReaderError
$cp1Exception :: Typeable ReaderError
Catch.Exception
newtype StationaryReader m = StationaryReader
{ StationaryReader m
-> forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a)
runStationaryReader
:: forall a
. Binary.Get a
-> ExceptT ReaderError m (StationaryReader m, a)
}
newStationaryReaderWith
:: forall m
. Concurrent.MonadConc m
=> m ByteString
-> m (StationaryReader m)
newStationaryReaderWith :: m ByteString -> m (StationaryReader m)
newStationaryReaderWith m ByteString
getChunk = do
IORef m ByteString
inputRef <- ByteString -> m (IORef m ByteString)
forall (m :: * -> *) a. MonadConc m => a -> m (IORef m a)
Concurrent.newIORef ByteString
ByteString.empty
let
make :: StationaryReader m
make = (forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
forall (m :: * -> *).
(forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
StationaryReader ((forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m)
-> (forall a.
Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
forall a b. (a -> b) -> a -> b
$ \Get a
get -> do
ByteString
input <- m ByteString -> ExceptT ReaderError m ByteString
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m ByteString -> ExceptT ReaderError m ByteString)
-> m ByteString -> ExceptT ReaderError m ByteString
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> m ByteString
forall (m :: * -> *) a. MonadConc m => IORef m a -> m a
Concurrent.readIORef IORef m ByteString
inputRef
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a.
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop (Decoder a -> ExceptT ReaderError m (StationaryReader m, a))
-> Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ Decoder a -> ByteString -> Decoder a
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk (Get a -> Decoder a
forall a. Get a -> Decoder a
Get.runGetIncremental Get a
get) ByteString
input
loop :: Get.Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop :: Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop = \case
Get.Fail ByteString
remainingBody ByteOffset
offset String
errorMessage -> do
ByteString
input <- m ByteString -> ExceptT ReaderError m ByteString
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m ByteString -> ExceptT ReaderError m ByteString)
-> m ByteString -> ExceptT ReaderError m ByteString
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> m ByteString
forall (m :: * -> *) a. MonadConc m => IORef m a -> m a
Concurrent.readIORef IORef m ByteString
inputRef
Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *) e a. Monad m => Either e a -> ExceptT e m a
except (Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a))
-> Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ ReaderError -> Either ReaderError (StationaryReader m, a)
forall a b. a -> Either a b
Left ReaderGetError :: ByteString -> ByteOffset -> ByteString -> String -> ReaderError
ReaderGetError
{ readerErrorRemaining :: ByteString
readerErrorRemaining = ByteString
remainingBody
, readerErrorOffset :: ByteOffset
readerErrorOffset = ByteOffset
offset
, readerErrorInput :: ByteString
readerErrorInput = ByteString
input
, readerErrorMessage :: String
readerErrorMessage = String
errorMessage
}
Get.Done ByteString
remainingBody ByteOffset
_ a
value -> ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *) a. MonadMask m => m a -> m a
Catch.mask_ (ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a))
-> ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ do
m () -> ExceptT ReaderError m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ExceptT ReaderError m ())
-> m () -> ExceptT ReaderError m ()
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> ByteString -> m ()
forall (m :: * -> *) a. MonadConc m => IORef m a -> a -> m ()
Concurrent.writeIORef IORef m ByteString
inputRef ByteString
remainingBody
(StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StationaryReader m
make, a
value)
Get.Partial Maybe ByteString -> Decoder a
continue -> do
Maybe ByteString
chunk <- m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString))
-> m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString)
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask (((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString))
-> ((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
ByteString
chunk <- m ByteString -> m ByteString
forall a. m a -> m a
restore m ByteString
getChunk
if ByteString -> Bool
ByteString.null ByteString
chunk then
Maybe ByteString -> m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing
else
IORef m ByteString
-> (ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString)
forall (m :: * -> *) a b.
MonadConc m =>
IORef m a -> (a -> (a, b)) -> m b
Concurrent.atomicModifyIORef' IORef m ByteString
inputRef ((ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString))
-> (ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
chunk) (ByteString -> ByteString)
-> (ByteString -> Maybe ByteString)
-> ByteString
-> (ByteString, Maybe ByteString)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& Maybe ByteString -> ByteString -> Maybe ByteString
forall a b. a -> b -> a
const (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
chunk)
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a.
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop (Decoder a -> ExceptT ReaderError m (StationaryReader m, a))
-> Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ Maybe ByteString -> Decoder a
continue Maybe ByteString
chunk
StationaryReader m -> m (StationaryReader m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure StationaryReader m
make
newtype Reader m = Reader
{ Reader m -> forall a. Get a -> m a
runReader :: forall a. Binary.Get a -> m a }
mapReader :: (forall a. m a -> n a) -> Reader m -> Reader n
mapReader :: (forall a. m a -> n a) -> Reader m -> Reader n
mapReader forall a. m a -> n a
f (Reader forall a. Get a -> m a
run) = (forall a. Get a -> n a) -> Reader n
forall (m :: * -> *). (forall a. Get a -> m a) -> Reader m
Reader (m a -> n a
forall a. m a -> n a
f (m a -> n a) -> (Get a -> m a) -> Get a -> n a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Get a -> m a
forall a. Get a -> m a
run)
newReaderWith
:: Concurrent.MonadConc m
=> m ByteString
-> m (Reader m)
newReaderWith :: m ByteString -> m (Reader m)
newReaderWith m ByteString
getChunk = do
StationaryReader m
posReader <- m ByteString -> m (StationaryReader m)
forall (m :: * -> *).
MonadConc m =>
m ByteString -> m (StationaryReader m)
newStationaryReaderWith m ByteString
getChunk
MVar m (StationaryReader m)
mvar <- StationaryReader m -> m (MVar m (StationaryReader m))
forall (m :: * -> *) a. MonadConc m => a -> m (MVar m a)
Concurrent.newMVar StationaryReader m
posReader
Reader m -> m (Reader m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Reader m -> m (Reader m)) -> Reader m -> m (Reader m)
forall a b. (a -> b) -> a -> b
$ (forall a. Get a -> m a) -> Reader m
forall (m :: * -> *). (forall a. Get a -> m a) -> Reader m
Reader ((forall a. Get a -> m a) -> Reader m)
-> (forall a. Get a -> m a) -> Reader m
forall a b. (a -> b) -> a -> b
$ \Get a
get ->
MVar m (StationaryReader m)
-> (StationaryReader m -> m (StationaryReader m, a)) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m (a, b)) -> m b
Concurrent.modifyMVar MVar m (StationaryReader m)
mvar ((StationaryReader m -> m (StationaryReader m, a)) -> m a)
-> (StationaryReader m -> m (StationaryReader m, a)) -> m a
forall a b. (a -> b) -> a -> b
$ \StationaryReader m
posReader -> do
Either ReaderError (StationaryReader m, a)
result <- ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a)))
-> ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a))
forall a b. (a -> b) -> a -> b
$ StationaryReader m
-> Get a -> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *).
StationaryReader m
-> forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a)
runStationaryReader StationaryReader m
posReader Get a
get
(ReaderError -> m (StationaryReader m, a))
-> ((StationaryReader m, a) -> m (StationaryReader m, a))
-> Either ReaderError (StationaryReader m, a)
-> m (StationaryReader m, a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ReaderError -> m (StationaryReader m, a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Catch.throwM (StationaryReader m, a) -> m (StationaryReader m, a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either ReaderError (StationaryReader m, a)
result
newReader
:: (Concurrent.MonadConc m, MonadIO m)
=> Handle
-> m (Reader m)
newReader :: Handle -> m (Reader m)
newReader Handle
handle = do
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> Bool -> IO ()
hSetBinaryMode Handle
handle Bool
True
m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith (m ByteString -> m (Reader m)) -> m ByteString -> m (Reader m)
forall a b. (a -> b) -> a -> b
$ IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> m ByteString) -> IO ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
ByteString.hGetSome Handle
handle Int
4096
class CanGet r m where
runGet :: r -> Binary.Get a -> m a
instance CanGet (Reader m) m where
runGet :: Reader m -> Get a -> m a
runGet = Reader m -> Get a -> m a
forall (m :: * -> *). Reader m -> forall a. Get a -> m a
runReader
instance CanGet (Duplex m) m where
runGet :: Duplex m -> Get a -> m a
runGet = Reader m -> Get a -> m a
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet (Reader m -> Get a -> m a)
-> (Duplex m -> Reader m) -> Duplex m -> Get a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Duplex m -> Reader m
forall (m :: * -> *). Duplex m -> Reader m
duplexReader
read
:: (CanGet r m, Binary.Binary a)
=> r
-> m a
read :: r -> m a
read r
source = r -> Get a -> m a
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet r
source Get a
forall t. Binary t => Get t
Binary.get
isEmpty
:: CanGet r m
=> r
-> m Bool
isEmpty :: r -> m Bool
isEmpty r
source = r -> Get Bool -> m Bool
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet r
source Get Bool
Get.isEmpty
newtype Writer m = Writer
{ Writer m -> forall a. PutM a -> m a
runWriter :: forall a. Put.PutM a -> m a }
mapWriter :: (forall x. m x -> n x) -> Writer m -> Writer n
mapWriter :: (forall x. m x -> n x) -> Writer m -> Writer n
mapWriter forall x. m x -> n x
f (Writer forall a. PutM a -> m a
write) = (forall a. PutM a -> n a) -> Writer n
forall (m :: * -> *). (forall a. PutM a -> m a) -> Writer m
Writer (m a -> n a
forall x. m x -> n x
f (m a -> n a) -> (PutM a -> m a) -> PutM a -> n a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PutM a -> m a
forall a. PutM a -> m a
write)
newWriterWith
:: Functor m
=> (ByteString -> m ())
-> Writer m
newWriterWith :: (ByteString -> m ()) -> Writer m
newWriterWith ByteString -> m ()
write = (forall a. PutM a -> m a) -> Writer m
forall (m :: * -> *). (forall a. PutM a -> m a) -> Writer m
Writer ((forall a. PutM a -> m a) -> Writer m)
-> (forall a. PutM a -> m a) -> Writer m
forall a b. (a -> b) -> a -> b
$ \PutM a
put -> do
let (a
result, ByteString
body) = PutM a -> (a, ByteString)
forall a. PutM a -> (a, ByteString)
Put.runPutM PutM a
put
a
result a -> m () -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ ByteString -> m ()
write (ByteString -> ByteString
toStrict ByteString
body)
newWriter
:: MonadIO m
=> Handle
-> Writer m
newWriter :: Handle -> Writer m
newWriter Handle
handle =
(ByteString -> m ()) -> Writer m
forall (m :: * -> *). Functor m => (ByteString -> m ()) -> Writer m
newWriterWith (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> ByteString -> IO ()
ByteString.hPut Handle
handle)
class CanPut w m where
runPut :: w -> Put.PutM a -> m a
instance CanPut (Writer m) m where
runPut :: Writer m -> PutM a -> m a
runPut = Writer m -> PutM a -> m a
forall (m :: * -> *). Writer m -> forall a. PutM a -> m a
runWriter
instance CanPut (Duplex m) m where
runPut :: Duplex m -> PutM a -> m a
runPut = Writer m -> PutM a -> m a
forall w (m :: * -> *) a. CanPut w m => w -> PutM a -> m a
runPut (Writer m -> PutM a -> m a)
-> (Duplex m -> Writer m) -> Duplex m -> PutM a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Duplex m -> Writer m
forall (m :: * -> *). Duplex m -> Writer m
duplexWriter
instance MonadIO m => CanPut Handle m where
runPut :: Handle -> PutM a -> m a
runPut Handle
handle PutM a
put = do
let (a
result, ByteString
body) = PutM a -> (a, ByteString)
forall a. PutM a -> (a, ByteString)
Put.runPutM PutM a
put
a
result a -> m () -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> ByteString -> IO ()
ByteString.hPut Handle
handle (ByteString -> ByteString
toStrict ByteString
body))
write
:: (CanPut w m, Binary.Binary a)
=> w
-> a
-> m ()
write :: w -> a -> m ()
write w
sink a
value = w -> PutM () -> m ()
forall w (m :: * -> *) a. CanPut w m => w -> PutM a -> m a
runPut w
sink (PutM () -> m ()) -> PutM () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> PutM ()
forall t. Binary t => t -> PutM ()
Binary.put a
value
newPipe :: (Concurrent.MonadConc m, MonadIO m) => m (Reader m, Writer m)
newPipe :: m (Reader m, Writer m)
newPipe = do
IORef (Deque ByteString)
chan <- IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString)))
-> IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString))
forall a b. (a -> b) -> a -> b
$ Deque ByteString -> IO (IORef (Deque ByteString))
forall a. a -> IO (IORef a)
newIORef Deque ByteString
forall a. Monoid a => a
mempty
Weak (IORef (Deque ByteString))
weakChan <- IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString))))
-> IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString)))
forall a b. (a -> b) -> a -> b
$ IORef (Deque ByteString)
-> IO () -> IO (Weak (IORef (Deque ByteString)))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef (Deque ByteString)
chan (IO () -> IO (Weak (IORef (Deque ByteString))))
-> IO () -> IO (Weak (IORef (Deque ByteString)))
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
(Await
await, Notify
notify) <- IO (Await, Notify) -> m (Await, Notify)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Await, Notify)
newAwaitNotify
let
read :: IO ByteString
read = do
Maybe (IORef (Deque ByteString))
mbChan <- Weak (IORef (Deque ByteString))
-> IO (Maybe (IORef (Deque ByteString)))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (IORef (Deque ByteString))
weakChan
case Maybe (IORef (Deque ByteString))
mbChan of
Maybe (IORef (Deque ByteString))
Nothing -> ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
ByteString.empty
Just IORef (Deque ByteString)
chan -> IO (IO ByteString) -> IO ByteString
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ByteString) -> IO ByteString)
-> IO (IO ByteString) -> IO ByteString
forall a b. (a -> b) -> a -> b
$
IORef (Deque ByteString)
-> (Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Deque ByteString)
chan ((Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString))
-> (Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString)
forall a b. (a -> b) -> a -> b
$ \Deque ByteString
queue ->
case Deque ByteString -> Maybe (ByteString, Deque ByteString)
forall a. Deque a -> Maybe (a, Deque a)
Deque.uncons Deque ByteString
queue of
Just (ByteString
elem, Deque ByteString
queue) -> (Deque ByteString
queue, ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
elem)
Maybe (ByteString, Deque ByteString)
Nothing -> (Deque ByteString
queue, Await -> IO Bool
runAwait Await
await IO Bool -> IO ByteString -> IO ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ByteString
read)
write :: ByteString -> IO ()
write ByteString
msg =
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
ByteString.null ByteString
msg) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Deque ByteString)
-> (Deque ByteString -> (Deque ByteString, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Deque ByteString)
chan ((Deque ByteString -> (Deque ByteString, ())) -> IO ())
-> (Deque ByteString -> (Deque ByteString, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Deque ByteString
queue ->
(ByteString -> Deque ByteString -> Deque ByteString
forall a. a -> Deque a -> Deque a
Deque.snoc ByteString
msg Deque ByteString
queue, ())
Notify -> IO ()
runNotify Notify
notify
Reader m
reader <- m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith (IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ByteString
read)
let writer :: Writer m
writer = (ByteString -> m ()) -> Writer m
forall (m :: * -> *). Functor m => (ByteString -> m ()) -> Writer m
newWriterWith (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> IO ()
write)
(Reader m, Writer m) -> m (Reader m, Writer m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Reader m
reader, Writer m
writer)
data Duplex m = Duplex
{ Duplex m -> Writer m
duplexWriter :: Writer m
, Duplex m -> Reader m
duplexReader :: Reader m
}
mapDuplex :: (forall a. m a -> n a) -> Duplex m -> Duplex n
mapDuplex :: (forall a. m a -> n a) -> Duplex m -> Duplex n
mapDuplex forall a. m a -> n a
f (Duplex Writer m
w Reader m
r) = Writer n -> Reader n -> Duplex n
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex ((forall a. m a -> n a) -> Writer m -> Writer n
forall (m :: * -> *) (n :: * -> *).
(forall x. m x -> n x) -> Writer m -> Writer n
mapWriter forall a. m a -> n a
f Writer m
w) ((forall a. m a -> n a) -> Reader m -> Reader n
forall (m :: * -> *) (n :: * -> *).
(forall a. m a -> n a) -> Reader m -> Reader n
mapReader forall a. m a -> n a
f Reader m
r)
newDuplex
:: (Concurrent.MonadConc m, MonadIO m)
=> Handle
-> m (Duplex m)
newDuplex :: Handle -> m (Duplex m)
newDuplex Handle
handle = Writer m -> Reader m -> Duplex m
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex (Handle -> Writer m
forall (m :: * -> *). MonadIO m => Handle -> Writer m
newWriter Handle
handle) (Reader m -> Duplex m) -> m (Reader m) -> m (Duplex m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> m (Reader m)
forall (m :: * -> *).
(MonadConc m, MonadIO m) =>
Handle -> m (Reader m)
newReader Handle
handle
newDuplexWith
:: Concurrent.MonadConc m
=> m ByteString
-> (ByteString -> m ())
-> m (Duplex m)
newDuplexWith :: m ByteString -> (ByteString -> m ()) -> m (Duplex m)
newDuplexWith m ByteString
getChunk ByteString -> m ()
writeChunk = Writer m -> Reader m -> Duplex m
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex ((ByteString -> m ()) -> Writer m
forall (m :: * -> *). Functor m => (ByteString -> m ()) -> Writer m
newWriterWith ByteString -> m ()
writeChunk) (Reader m -> Duplex m) -> m (Reader m) -> m (Duplex m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith m ByteString
getChunk