{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingVia #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}

-- | Read and write values of types that implement 'Binary.Binary'.
module Data.Binary.IO.Lifted
  ( -- * Reader
    ReaderError (..)

  , Reader (..)
  , newReader
  , newReaderWith
  , mapReader

    -- * Writer
  , Writer (..)
  , newWriter
  , newWriterWith
  , mapWriter

    -- * Pipe
  , newPipe

    -- * Duplex
  , Duplex (..)
  , newDuplex
  , newDuplexWith
  , mapDuplex

    -- * Classes
  , CanGet (..)
  , read
  , isEmpty

  , CanPut (..)
  , write
  )
where

import Prelude hiding (read)

import           Control.Arrow ((&&&))
import qualified Control.Concurrent.Classy as Concurrent
import           Control.Monad (join, unless)
import qualified Control.Monad.Catch as Catch
import           Control.Monad.IO.Class (MonadIO (liftIO))
import           Control.Monad.Trans.Class (MonadTrans (lift))
import           Control.Monad.Trans.Except (ExceptT, except, runExceptT)
import qualified Data.Binary as Binary
import qualified Data.Binary.Get as Get
import           Data.Binary.IO.Internal.AwaitNotify (newAwaitNotify, runAwait, runNotify)
import qualified Data.Binary.Put as Put
import           Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString
import           Data.ByteString.Lazy (toStrict)
import           Data.IORef (atomicModifyIORef', mkWeakIORef, newIORef)
import qualified Deque.Strict as Deque
import           System.IO (Handle, hSetBinaryMode)
import           System.Mem.Weak (deRefWeak)

-- * Reader

-- | An error that can occur during reading
--
-- @since 0.4.0
data ReaderError = ReaderGetError -- ^ Error from the 'Binary.Get' operation
  { ReaderError -> ByteString
readerErrorRemaining :: !ByteString
  -- ^ Unconsumed part of the byte stream
  --
  -- @since 0.4.0

  , ReaderError -> ByteOffset
readerErrorOffset :: !Get.ByteOffset
  -- ^ Error location represented as an offset into the input
  --
  -- @since 0.4.0

  , ReaderError -> ByteString
readerErrorInput :: !ByteString
  -- ^ Input to the 'Binary.Get' operation
  --
  -- @since 0.4.0

  , ReaderError -> String
readerErrorMessage :: !String
  -- ^ Error message
  --
  -- @since 0.4.0
  }
  deriving stock Int -> ReaderError -> ShowS
[ReaderError] -> ShowS
ReaderError -> String
(Int -> ReaderError -> ShowS)
-> (ReaderError -> String)
-> ([ReaderError] -> ShowS)
-> Show ReaderError
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ReaderError] -> ShowS
$cshowList :: [ReaderError] -> ShowS
show :: ReaderError -> String
$cshow :: ReaderError -> String
showsPrec :: Int -> ReaderError -> ShowS
$cshowsPrec :: Int -> ReaderError -> ShowS
Show
  deriving anyclass Show ReaderError
Typeable ReaderError
Typeable ReaderError
-> Show ReaderError
-> (ReaderError -> SomeException)
-> (SomeException -> Maybe ReaderError)
-> (ReaderError -> String)
-> Exception ReaderError
SomeException -> Maybe ReaderError
ReaderError -> String
ReaderError -> SomeException
forall e.
Typeable e
-> Show e
-> (e -> SomeException)
-> (SomeException -> Maybe e)
-> (e -> String)
-> Exception e
displayException :: ReaderError -> String
$cdisplayException :: ReaderError -> String
fromException :: SomeException -> Maybe ReaderError
$cfromException :: SomeException -> Maybe ReaderError
toException :: ReaderError -> SomeException
$ctoException :: ReaderError -> SomeException
$cp2Exception :: Show ReaderError
$cp1Exception :: Typeable ReaderError
Catch.Exception

newtype StationaryReader m = StationaryReader
  { StationaryReader m
-> forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a)
runStationaryReader
      :: forall a
      .  Binary.Get a
      -> ExceptT ReaderError m (StationaryReader m, a)
  }

newStationaryReaderWith
  :: forall m
  .  Concurrent.MonadConc m
  => m ByteString
  -> m (StationaryReader m)
newStationaryReaderWith :: m ByteString -> m (StationaryReader m)
newStationaryReaderWith m ByteString
getChunk = do
  IORef m ByteString
inputRef <- ByteString -> m (IORef m ByteString)
forall (m :: * -> *) a. MonadConc m => a -> m (IORef m a)
Concurrent.newIORef ByteString
ByteString.empty

  let
    make :: StationaryReader m
make = (forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
forall (m :: * -> *).
(forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
StationaryReader ((forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a))
 -> StationaryReader m)
-> (forall a.
    Get a -> ExceptT ReaderError m (StationaryReader m, a))
-> StationaryReader m
forall a b. (a -> b) -> a -> b
$ \Get a
get -> do
      ByteString
input <- m ByteString -> ExceptT ReaderError m ByteString
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m ByteString -> ExceptT ReaderError m ByteString)
-> m ByteString -> ExceptT ReaderError m ByteString
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> m ByteString
forall (m :: * -> *) a. MonadConc m => IORef m a -> m a
Concurrent.readIORef IORef m ByteString
inputRef
      Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a.
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop (Decoder a -> ExceptT ReaderError m (StationaryReader m, a))
-> Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ Decoder a -> ByteString -> Decoder a
forall a. Decoder a -> ByteString -> Decoder a
Get.pushChunk (Get a -> Decoder a
forall a. Get a -> Decoder a
Get.runGetIncremental Get a
get) ByteString
input

    loop :: Get.Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
    loop :: Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop = \case
      Get.Fail ByteString
remainingBody ByteOffset
offset String
errorMessage -> do
        ByteString
input <- m ByteString -> ExceptT ReaderError m ByteString
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m ByteString -> ExceptT ReaderError m ByteString)
-> m ByteString -> ExceptT ReaderError m ByteString
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> m ByteString
forall (m :: * -> *) a. MonadConc m => IORef m a -> m a
Concurrent.readIORef IORef m ByteString
inputRef
        Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *) e a. Monad m => Either e a -> ExceptT e m a
except (Either ReaderError (StationaryReader m, a)
 -> ExceptT ReaderError m (StationaryReader m, a))
-> Either ReaderError (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ ReaderError -> Either ReaderError (StationaryReader m, a)
forall a b. a -> Either a b
Left ReaderGetError :: ByteString -> ByteOffset -> ByteString -> String -> ReaderError
ReaderGetError
          { readerErrorRemaining :: ByteString
readerErrorRemaining = ByteString
remainingBody
          , readerErrorOffset :: ByteOffset
readerErrorOffset = ByteOffset
offset
          , readerErrorInput :: ByteString
readerErrorInput = ByteString
input
          , readerErrorMessage :: String
readerErrorMessage = String
errorMessage
          }

      Get.Done ByteString
remainingBody ByteOffset
_ a
value -> ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *) a. MonadMask m => m a -> m a
Catch.mask_ (ExceptT ReaderError m (StationaryReader m, a)
 -> ExceptT ReaderError m (StationaryReader m, a))
-> ExceptT ReaderError m (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ do
        m () -> ExceptT ReaderError m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m () -> ExceptT ReaderError m ())
-> m () -> ExceptT ReaderError m ()
forall a b. (a -> b) -> a -> b
$ IORef m ByteString -> ByteString -> m ()
forall (m :: * -> *) a. MonadConc m => IORef m a -> a -> m ()
Concurrent.writeIORef IORef m ByteString
inputRef ByteString
remainingBody
        (StationaryReader m, a)
-> ExceptT ReaderError m (StationaryReader m, a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (StationaryReader m
make, a
value)

      Get.Partial Maybe ByteString -> Decoder a
continue -> do
        Maybe ByteString
chunk <- m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString))
-> m (Maybe ByteString) -> ExceptT ReaderError m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ ((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString)
forall (m :: * -> *) b.
MonadMask m =>
((forall a. m a -> m a) -> m b) -> m b
Catch.mask (((forall a. m a -> m a) -> m (Maybe ByteString))
 -> m (Maybe ByteString))
-> ((forall a. m a -> m a) -> m (Maybe ByteString))
-> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> m a
restore -> do
          ByteString
chunk <- m ByteString -> m ByteString
forall a. m a -> m a
restore m ByteString
getChunk
          if ByteString -> Bool
ByteString.null ByteString
chunk then
            Maybe ByteString -> m (Maybe ByteString)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ByteString
forall a. Maybe a
Nothing
          else
            IORef m ByteString
-> (ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString)
forall (m :: * -> *) a b.
MonadConc m =>
IORef m a -> (a -> (a, b)) -> m b
Concurrent.atomicModifyIORef' IORef m ByteString
inputRef ((ByteString -> (ByteString, Maybe ByteString))
 -> m (Maybe ByteString))
-> (ByteString -> (ByteString, Maybe ByteString))
-> m (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ (ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
chunk) (ByteString -> ByteString)
-> (ByteString -> Maybe ByteString)
-> ByteString
-> (ByteString, Maybe ByteString)
forall (a :: * -> * -> *) b c c'.
Arrow a =>
a b c -> a b c' -> a b (c, c')
&&& Maybe ByteString -> ByteString -> Maybe ByteString
forall a b. a -> b -> a
const (ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
chunk)

        Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a.
Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
loop (Decoder a -> ExceptT ReaderError m (StationaryReader m, a))
-> Decoder a -> ExceptT ReaderError m (StationaryReader m, a)
forall a b. (a -> b) -> a -> b
$ Maybe ByteString -> Decoder a
continue Maybe ByteString
chunk

  StationaryReader m -> m (StationaryReader m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure StationaryReader m
make

-- | @since 0.4.0
newtype Reader m = Reader
  { Reader m -> forall a. Get a -> m a
runReader :: forall a. Binary.Get a -> m a }

-- | Transform the underlying functor.
--
-- @since 0.4.0
mapReader :: (forall a. m a -> n a) -> Reader m -> Reader n
mapReader :: (forall a. m a -> n a) -> Reader m -> Reader n
mapReader forall a. m a -> n a
f (Reader forall a. Get a -> m a
run) = (forall a. Get a -> n a) -> Reader n
forall (m :: * -> *). (forall a. Get a -> m a) -> Reader m
Reader (m a -> n a
forall a. m a -> n a
f (m a -> n a) -> (Get a -> m a) -> Get a -> n a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Get a -> m a
forall a. Get a -> m a
run)

-- | Create a new 'Reader' using an action that provides the chunks.
--
-- The chunk producers indicates the end of the stream by returning an empty
-- 'ByteString.ByteString'.
--
-- Reading using the 'Reader' may throw 'ReaderError'.
--
-- The internal position of the 'Reader' is not advanced when it throws an exception during reading.
-- This has the consequence that if you're trying to read with the same faulty 'Binary.Get'
-- operation multiple times, you will always receive an exception.
--
-- The 'Reader' is safe to use concurrently.
--
-- @since 0.4.0
newReaderWith
  :: Concurrent.MonadConc m
  => m ByteString -- ^ Chunk provider
  -> m (Reader m)
newReaderWith :: m ByteString -> m (Reader m)
newReaderWith m ByteString
getChunk = do
  StationaryReader m
posReader <- m ByteString -> m (StationaryReader m)
forall (m :: * -> *).
MonadConc m =>
m ByteString -> m (StationaryReader m)
newStationaryReaderWith m ByteString
getChunk
  MVar m (StationaryReader m)
mvar <- StationaryReader m -> m (MVar m (StationaryReader m))
forall (m :: * -> *) a. MonadConc m => a -> m (MVar m a)
Concurrent.newMVar StationaryReader m
posReader
  Reader m -> m (Reader m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Reader m -> m (Reader m)) -> Reader m -> m (Reader m)
forall a b. (a -> b) -> a -> b
$ (forall a. Get a -> m a) -> Reader m
forall (m :: * -> *). (forall a. Get a -> m a) -> Reader m
Reader ((forall a. Get a -> m a) -> Reader m)
-> (forall a. Get a -> m a) -> Reader m
forall a b. (a -> b) -> a -> b
$ \Get a
get ->
    MVar m (StationaryReader m)
-> (StationaryReader m -> m (StationaryReader m, a)) -> m a
forall (m :: * -> *) a b.
MonadConc m =>
MVar m a -> (a -> m (a, b)) -> m b
Concurrent.modifyMVar MVar m (StationaryReader m)
mvar ((StationaryReader m -> m (StationaryReader m, a)) -> m a)
-> (StationaryReader m -> m (StationaryReader m, a)) -> m a
forall a b. (a -> b) -> a -> b
$ \StationaryReader m
posReader -> do
      Either ReaderError (StationaryReader m, a)
result <- ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT ReaderError m (StationaryReader m, a)
 -> m (Either ReaderError (StationaryReader m, a)))
-> ExceptT ReaderError m (StationaryReader m, a)
-> m (Either ReaderError (StationaryReader m, a))
forall a b. (a -> b) -> a -> b
$ StationaryReader m
-> Get a -> ExceptT ReaderError m (StationaryReader m, a)
forall (m :: * -> *).
StationaryReader m
-> forall a. Get a -> ExceptT ReaderError m (StationaryReader m, a)
runStationaryReader StationaryReader m
posReader Get a
get
      (ReaderError -> m (StationaryReader m, a))
-> ((StationaryReader m, a) -> m (StationaryReader m, a))
-> Either ReaderError (StationaryReader m, a)
-> m (StationaryReader m, a)
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ReaderError -> m (StationaryReader m, a)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
Catch.throwM (StationaryReader m, a) -> m (StationaryReader m, a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Either ReaderError (StationaryReader m, a)
result

-- | Create a new reader.
--
-- Inherits properties from 'newReaderWith'.
--
-- Other threads reading from the 'Handle' will interfere with read operations of the 'Reader'.
-- However, the 'Reader' itself is thread-safe and can be utilized concurrently.
--
-- The given 'Handle' will be swiched to binary mode via 'hSetBinaryMode'.
--
-- @since 0.4.0
newReader
  :: (Concurrent.MonadConc m, MonadIO m)
  => Handle -- ^ Handle to read from
  -> m (Reader m)
newReader :: Handle -> m (Reader m)
newReader Handle
handle = do
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ Handle -> Bool -> IO ()
hSetBinaryMode Handle
handle Bool
True
  m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith (m ByteString -> m (Reader m)) -> m ByteString -> m (Reader m)
forall a b. (a -> b) -> a -> b
$ IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ByteString -> m ByteString) -> IO ByteString -> m ByteString
forall a b. (a -> b) -> a -> b
$ Handle -> Int -> IO ByteString
ByteString.hGetSome Handle
handle Int
4096

-- | @r@ can execute 'Binary.Get' operations in @m@
--
-- @since 0.4.0
class CanGet r m where
  runGet :: r -> Binary.Get a -> m a

instance CanGet (Reader m) m where
  runGet :: Reader m -> Get a -> m a
runGet = Reader m -> Get a -> m a
forall (m :: * -> *). Reader m -> forall a. Get a -> m a
runReader

instance CanGet (Duplex m) m where
  runGet :: Duplex m -> Get a -> m a
runGet = Reader m -> Get a -> m a
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet (Reader m -> Get a -> m a)
-> (Duplex m -> Reader m) -> Duplex m -> Get a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Duplex m -> Reader m
forall (m :: * -> *). Duplex m -> Reader m
duplexReader

-- | Read something from @r@. Inherits properties from 'runGet'.
--
-- @since 0.4.0
read
  :: (CanGet r m, Binary.Binary a)
  => r -- ^ Source to read from
  -> m a
read :: r -> m a
read r
source = r -> Get a -> m a
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet r
source Get a
forall t. Binary t => Get t
Binary.get

-- | Check if there is no more input to consume. This function may block. All properties of 'runGet'
-- apply to this function as well.
--
-- @since 0.4.0
isEmpty
  :: CanGet r m
  => r -- ^ Source to check for stream depletion
  -> m Bool
isEmpty :: r -> m Bool
isEmpty r
source = r -> Get Bool -> m Bool
forall r (m :: * -> *) a. CanGet r m => r -> Get a -> m a
runGet r
source Get Bool
Get.isEmpty

-- * Writer

-- | @since 0.4.0
newtype Writer m = Writer
  { Writer m -> forall a. PutM a -> m a
runWriter :: forall a. Put.PutM a -> m a }

-- | Transform the underlying functor.
--
-- @since 0.4.0
mapWriter :: (forall x. m x -> n x) -> Writer m -> Writer n
mapWriter :: (forall x. m x -> n x) -> Writer m -> Writer n
mapWriter forall x. m x -> n x
f (Writer forall a. PutM a -> m a
write) = (forall a. PutM a -> n a) -> Writer n
forall (m :: * -> *). (forall a. PutM a -> m a) -> Writer m
Writer (m a -> n a
forall x. m x -> n x
f (m a -> n a) -> (PutM a -> m a) -> PutM a -> n a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PutM a -> m a
forall a. PutM a -> m a
write)

-- | Create a writer using a function that handles the output chunks.
--
-- @since 0.4.0
newWriterWith
  :: Functor m
  => (ByteString -> m ()) -- ^ Chunk writer
  -> Writer m
newWriterWith :: (ByteString -> m ()) -> Writer m
newWriterWith ByteString -> m ()
write = (forall a. PutM a -> m a) -> Writer m
forall (m :: * -> *). (forall a. PutM a -> m a) -> Writer m
Writer ((forall a. PutM a -> m a) -> Writer m)
-> (forall a. PutM a -> m a) -> Writer m
forall a b. (a -> b) -> a -> b
$ \PutM a
put -> do
  let (a
result, ByteString
body) = PutM a -> (a, ByteString)
forall a. PutM a -> (a, ByteString)
Put.runPutM PutM a
put
  a
result a -> m () -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ ByteString -> m ()
write (ByteString -> ByteString
toStrict ByteString
body)

-- | Create a writer.
--
-- Other threads writing to the same 'Handle' do not interfere with the resulting 'Writer'. The
-- 'Writer' may be used concurrently.
--
-- @since 0.4.0
newWriter
  :: MonadIO m
  => Handle -- ^ Write target
  -> Writer m
newWriter :: Handle -> Writer m
newWriter Handle
handle =
  (ByteString -> m ()) -> Writer m
forall (m :: * -> *). Functor m => (ByteString -> m ()) -> Writer m
newWriterWith (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Handle -> ByteString -> IO ()
ByteString.hPut Handle
handle)

-- | @w@ can execute 'Binary.Put' operations in @m@
--
-- @since 0.4.0
class CanPut w m where
  runPut :: w -> Put.PutM a -> m a

instance CanPut (Writer m) m where
  runPut :: Writer m -> PutM a -> m a
runPut = Writer m -> PutM a -> m a
forall (m :: * -> *). Writer m -> forall a. PutM a -> m a
runWriter

instance CanPut (Duplex m) m where
  runPut :: Duplex m -> PutM a -> m a
runPut = Writer m -> PutM a -> m a
forall w (m :: * -> *) a. CanPut w m => w -> PutM a -> m a
runPut (Writer m -> PutM a -> m a)
-> (Duplex m -> Writer m) -> Duplex m -> PutM a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Duplex m -> Writer m
forall (m :: * -> *). Duplex m -> Writer m
duplexWriter

instance MonadIO m => CanPut Handle m where
  runPut :: Handle -> PutM a -> m a
runPut Handle
handle PutM a
put = do
    let (a
result, ByteString
body) = PutM a -> (a, ByteString)
forall a. PutM a -> (a, ByteString)
Put.runPutM PutM a
put
    a
result a -> m () -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Handle -> ByteString -> IO ()
ByteString.hPut Handle
handle (ByteString -> ByteString
toStrict ByteString
body))

-- | Write something to @w@.
--
-- @since 0.4.0
write
  :: (CanPut w m, Binary.Binary a)
  => w -- ^ Write target
  -> a -- ^ Value to write
  -> m ()
write :: w -> a -> m ()
write w
sink a
value = w -> PutM () -> m ()
forall w (m :: * -> *) a. CanPut w m => w -> PutM a -> m a
runPut w
sink (PutM () -> m ()) -> PutM () -> m ()
forall a b. (a -> b) -> a -> b
$ a -> PutM ()
forall t. Binary t => t -> PutM ()
Binary.put a
value

-- * Pipe

-- | Create a connected pair of 'Reader' and 'Writer'.
--
-- The 'Reader' will automatically end the stream if the 'Writer' goes out of scope.
--
-- @since 0.4.0
newPipe :: (Concurrent.MonadConc m, MonadIO m) => m (Reader m, Writer m)
newPipe :: m (Reader m, Writer m)
newPipe = do
  IORef (Deque ByteString)
chan <- IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString)))
-> IO (IORef (Deque ByteString)) -> m (IORef (Deque ByteString))
forall a b. (a -> b) -> a -> b
$ Deque ByteString -> IO (IORef (Deque ByteString))
forall a. a -> IO (IORef a)
newIORef Deque ByteString
forall a. Monoid a => a
mempty
  Weak (IORef (Deque ByteString))
weakChan <- IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Weak (IORef (Deque ByteString)))
 -> m (Weak (IORef (Deque ByteString))))
-> IO (Weak (IORef (Deque ByteString)))
-> m (Weak (IORef (Deque ByteString)))
forall a b. (a -> b) -> a -> b
$ IORef (Deque ByteString)
-> IO () -> IO (Weak (IORef (Deque ByteString)))
forall a. IORef a -> IO () -> IO (Weak (IORef a))
mkWeakIORef IORef (Deque ByteString)
chan (IO () -> IO (Weak (IORef (Deque ByteString))))
-> IO () -> IO (Weak (IORef (Deque ByteString)))
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
  (Await
await, Notify
notify) <- IO (Await, Notify) -> m (Await, Notify)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Await, Notify)
newAwaitNotify

  let
    read :: IO ByteString
read = do
      Maybe (IORef (Deque ByteString))
mbChan <- Weak (IORef (Deque ByteString))
-> IO (Maybe (IORef (Deque ByteString)))
forall v. Weak v -> IO (Maybe v)
deRefWeak Weak (IORef (Deque ByteString))
weakChan
      case Maybe (IORef (Deque ByteString))
mbChan of
        Maybe (IORef (Deque ByteString))
Nothing -> ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
ByteString.empty
        Just IORef (Deque ByteString)
chan -> IO (IO ByteString) -> IO ByteString
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (IO (IO ByteString) -> IO ByteString)
-> IO (IO ByteString) -> IO ByteString
forall a b. (a -> b) -> a -> b
$
          IORef (Deque ByteString)
-> (Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Deque ByteString)
chan ((Deque ByteString -> (Deque ByteString, IO ByteString))
 -> IO (IO ByteString))
-> (Deque ByteString -> (Deque ByteString, IO ByteString))
-> IO (IO ByteString)
forall a b. (a -> b) -> a -> b
$ \Deque ByteString
queue ->
            case Deque ByteString -> Maybe (ByteString, Deque ByteString)
forall a. Deque a -> Maybe (a, Deque a)
Deque.uncons Deque ByteString
queue of
              Just (ByteString
elem, Deque ByteString
queue) -> (Deque ByteString
queue, ByteString -> IO ByteString
forall (f :: * -> *) a. Applicative f => a -> f a
pure ByteString
elem)
              Maybe (ByteString, Deque ByteString)
Nothing -> (Deque ByteString
queue, Await -> IO Bool
runAwait Await
await IO Bool -> IO ByteString -> IO ByteString
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ByteString
read)

    write :: ByteString -> IO ()
write ByteString
msg =
      Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ByteString -> Bool
ByteString.null ByteString
msg) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        IORef (Deque ByteString)
-> (Deque ByteString -> (Deque ByteString, ())) -> IO ()
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Deque ByteString)
chan ((Deque ByteString -> (Deque ByteString, ())) -> IO ())
-> (Deque ByteString -> (Deque ByteString, ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Deque ByteString
queue ->
          (ByteString -> Deque ByteString -> Deque ByteString
forall a. a -> Deque a -> Deque a
Deque.snoc ByteString
msg Deque ByteString
queue, ())
        Notify -> IO ()
runNotify Notify
notify

  Reader m
reader <- m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith (IO ByteString -> m ByteString
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ByteString
read)
  let writer :: Writer m
writer = (ByteString -> m ()) -> Writer m
forall (m :: * -> *). Functor m => (ByteString -> m ()) -> Writer m
newWriterWith (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (ByteString -> IO ()) -> ByteString -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> IO ()
write)

  (Reader m, Writer m) -> m (Reader m, Writer m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Reader m
reader, Writer m
writer)

-- * Duplex

-- | Pair of 'Reader' and 'Writer'
--
-- @since 0.4.0
data Duplex m = Duplex
  { Duplex m -> Writer m
duplexWriter :: Writer m
  , Duplex m -> Reader m
duplexReader :: Reader m
  }

-- | Transform the underlying functor.
--
-- @since 0.4.0
mapDuplex :: (forall a. m a -> n a) -> Duplex m -> Duplex n
mapDuplex :: (forall a. m a -> n a) -> Duplex m -> Duplex n
mapDuplex forall a. m a -> n a
f (Duplex Writer m
w Reader m
r) = Writer n -> Reader n -> Duplex n
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex ((forall a. m a -> n a) -> Writer m -> Writer n
forall (m :: * -> *) (n :: * -> *).
(forall x. m x -> n x) -> Writer m -> Writer n
mapWriter forall a. m a -> n a
f Writer m
w) ((forall a. m a -> n a) -> Reader m -> Reader n
forall (m :: * -> *) (n :: * -> *).
(forall a. m a -> n a) -> Reader m -> Reader n
mapReader forall a. m a -> n a
f Reader m
r)

-- | Create a new duplex. The 'Duplex' inherits all the properties of 'Reader' and 'Writer' when
-- created with 'newReader' and 'newWriter'.
--
-- @since 0.4.0
newDuplex
  :: (Concurrent.MonadConc m, MonadIO m)
   => Handle -- ^ Handle to read from and write to
   -> m (Duplex m)
newDuplex :: Handle -> m (Duplex m)
newDuplex Handle
handle = Writer m -> Reader m -> Duplex m
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex (Handle -> Writer m
forall (m :: * -> *). MonadIO m => Handle -> Writer m
newWriter Handle
handle) (Reader m -> Duplex m) -> m (Reader m) -> m (Duplex m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Handle -> m (Reader m)
forall (m :: * -> *).
(MonadConc m, MonadIO m) =>
Handle -> m (Reader m)
newReader Handle
handle

-- | Combines 'newReaderWith' and 'newWriterWith'.
--
-- @since 0.4.0
newDuplexWith
  :: Concurrent.MonadConc m
  => m ByteString -- ^ Input chunk producer for 'Reader'
  -> (ByteString -> m ()) -- ^ Chunk writer for 'Writer'
  -> m (Duplex m)
newDuplexWith :: m ByteString -> (ByteString -> m ()) -> m (Duplex m)
newDuplexWith m ByteString
getChunk ByteString -> m ()
writeChunk = Writer m -> Reader m -> Duplex m
forall (m :: * -> *). Writer m -> Reader m -> Duplex m
Duplex ((ByteString -> m ()) -> Writer m
forall (m :: * -> *). Functor m => (ByteString -> m ()) -> Writer m
newWriterWith ByteString -> m ()
writeChunk) (Reader m -> Duplex m) -> m (Reader m) -> m (Duplex m)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m ByteString -> m (Reader m)
forall (m :: * -> *). MonadConc m => m ByteString -> m (Reader m)
newReaderWith m ByteString
getChunk