{-|
Module      : Z.IO.UV.UVStream
Description : IO manager based on libuv
Copyright   : (c) Dong Han, 2017-2018
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This module provides 'UVStream' handle type.

-}

module Z.IO.UV.UVStream
  ( -- * uv_stream abstraction
    initUVStream
  , UVStream(..)
  , getUVStreamFD
  , closeUVStream
  , shutdownUVStream
  , helloWorld, echo
  ) where

import           Control.Concurrent
import           Control.Monad
import qualified Z.Data.Text.Print          as T
import           Z.IO.UV.Errno
import           Z.IO.UV.FFI
import           Z.IO.UV.Manager
import           Z.IO.Buffered
import           Z.IO.Exception
import           Z.IO.Resource
import           Data.IORef
import           GHC.Ptr

--------------------------------------------------------------------------------
-- UVStream

-- | A haskell data type wrap an @uv_stream_t@ inside
--
-- 'UVStream' DO NOT provide thread safety! Use 'UVStream' concurrently in multiple
-- threads will lead to undefined behavior.
data UVStream = UVStream
    { UVStream -> Ptr UVHandle
uvsHandle  :: {-# UNPACK #-} !(Ptr UVHandle)
    , UVStream -> UVSlot
uvsSlot    :: {-# UNPACK #-} !UVSlot
    , UVStream -> UVManager
uvsManager :: UVManager
    , UVStream -> IORef Bool
uvsClosed  :: {-# UNPACK #-} !(IORef Bool)    -- We have no thread-safe guarantee,
                                                    -- so no need to use atomic read&write
    }

instance Show UVStream where show :: UVStream -> String
show = UVStream -> String
forall a. Print a => a -> String
T.toString

instance T.Print UVStream where
    toUTF8BuilderP :: UVSlot -> UVStream -> Builder ()
toUTF8BuilderP UVSlot
_ (UVStream Ptr UVHandle
hdl UVSlot
slot UVManager
uvm IORef Bool
_) = do
        Builder ()
"UVStream{uvsHandle="  Builder () -> Builder () -> Builder ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr UVHandle -> Builder ()
forall a. Print a => a -> Builder ()
T.toUTF8Builder Ptr UVHandle
hdl
        Builder ()
",uvsSlot="            Builder () -> Builder () -> Builder ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> UVSlot -> Builder ()
forall a. Print a => a -> Builder ()
T.toUTF8Builder UVSlot
slot
        Builder ()
",uvsManager="         Builder () -> Builder () -> Builder ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> UVManager -> Builder ()
forall a. Print a => a -> Builder ()
T.toUTF8Builder UVManager
uvm
        Char -> Builder ()
T.char7 Char
'}'

-- | Safely lock an uv manager and perform uv_handle initialization.
--
-- Initialization an UV stream usually take two step:
--
--   * allocate an uv_stream struct with proper size
--   * lock a particular uv_loop from a uv manager, and perform custom initialization, such as @uv_tcp_init@.
--
-- And this is what 'initUVStream' do, all you need to do is to provide the manager you want to hook the handle
-- onto(usually the one on the same capability, i.e. the one obtained by 'getUVManager'),
-- and provide a custom initialization function (which should throw an exception if failed).
--
initUVStream :: HasCallStack
             => (Ptr UVLoop -> Ptr UVHandle -> IO ())
             -> UVManager
             -> Resource UVStream
initUVStream :: (Ptr UVLoop -> Ptr UVHandle -> IO ())
-> UVManager -> Resource UVStream
initUVStream Ptr UVLoop -> Ptr UVHandle -> IO ()
f UVManager
uvm = IO UVStream -> (UVStream -> IO ()) -> Resource UVStream
forall a. IO a -> (a -> IO ()) -> Resource a
initResource
    (UVManager -> (Ptr UVLoop -> IO UVStream) -> IO UVStream
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO UVStream) -> IO UVStream)
-> (Ptr UVLoop -> IO UVStream) -> IO UVStream
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> do
        Ptr UVHandle
hdl <- Ptr UVLoop -> IO (Ptr UVHandle)
hs_uv_handle_alloc Ptr UVLoop
loop
        UVSlot
slot <- HasCallStack => UVManager -> IO UVSlotUnsafe -> IO UVSlot
UVManager -> IO UVSlotUnsafe -> IO UVSlot
getUVSlot UVManager
uvm (Ptr UVHandle -> IO UVSlotUnsafe
peekUVHandleData Ptr UVHandle
hdl)
        Maybe UVSlot
_ <- MVar UVSlot -> IO (Maybe UVSlot)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar (MVar UVSlot -> IO (Maybe UVSlot))
-> IO (MVar UVSlot) -> IO (Maybe UVSlot)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< UVManager -> UVSlot -> IO (MVar UVSlot)
getBlockMVar UVManager
uvm UVSlot
slot   -- clear the parking spot
        -- this function should be run inside mask, no need to protect
        Ptr UVLoop -> Ptr UVHandle -> IO ()
f Ptr UVLoop
loop Ptr UVHandle
hdl -- `onException` hs_uv_handle_free hdl
        IORef Bool
closed <- Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
        UVStream -> IO UVStream
forall (m :: * -> *) a. Monad m => a -> m a
return (Ptr UVHandle -> UVSlot -> UVManager -> IORef Bool -> UVStream
UVStream Ptr UVHandle
hdl UVSlot
slot UVManager
uvm IORef Bool
closed))
    UVStream -> IO ()
closeUVStream

-- | Manually close a uv stream.
closeUVStream :: UVStream -> IO ()
closeUVStream :: UVStream -> IO ()
closeUVStream (UVStream Ptr UVHandle
hdl UVSlot
_ UVManager
uvm IORef Bool
closed) = UVManager -> IO () -> IO ()
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
closed
    -- hs_uv_handle_close won't return error
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
c (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
closed Bool
True IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Ptr UVHandle -> IO ()
hs_uv_handle_close Ptr UVHandle
hdl

-- | Shutdown the outgoing (write) side of a duplex stream. It waits for pending write requests to complete.
--
-- Futher writing will throw 'ResourceVanished'(EPIPE).
shutdownUVStream :: HasCallStack => UVStream -> IO ()
shutdownUVStream :: UVStream -> IO ()
shutdownUVStream (UVStream Ptr UVHandle
hdl UVSlot
_ UVManager
uvm IORef Bool
closed) = do
    Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
closed
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
c IO ()
forall a. HasCallStack => IO a
throwECLOSED
    MVar UVSlot
m <- UVManager -> IO (MVar UVSlot) -> IO (MVar UVSlot)
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (IO (MVar UVSlot) -> IO (MVar UVSlot))
-> IO (MVar UVSlot) -> IO (MVar UVSlot)
forall a b. (a -> b) -> a -> b
$ do
        UVSlot
reqSlot <- HasCallStack => UVManager -> IO UVSlotUnsafe -> IO UVSlot
UVManager -> IO UVSlotUnsafe -> IO UVSlot
getUVSlot UVManager
uvm (Ptr UVHandle -> IO UVSlotUnsafe
hs_uv_shutdown Ptr UVHandle
hdl)
        MVar UVSlot
m <- UVManager -> UVSlot -> IO (MVar UVSlot)
getBlockMVar UVManager
uvm UVSlot
reqSlot
        Maybe UVSlot
_ <- MVar UVSlot -> IO (Maybe UVSlot)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar UVSlot
m
        MVar UVSlot -> IO (MVar UVSlot)
forall (m :: * -> *) a. Monad m => a -> m a
return MVar UVSlot
m
    IO UVSlot -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_  (IO UVSlot -> IO UVSlot
forall a. IO a -> IO a
uninterruptibleMask_ (IO UVSlot -> IO UVSlot) -> IO UVSlot -> IO UVSlot
forall a b. (a -> b) -> a -> b
$ MVar UVSlot -> IO UVSlot
forall a. MVar a -> IO a
takeMVar MVar UVSlot
m)

-- | Get stream fd
getUVStreamFD :: HasCallStack => UVStream -> IO FD
getUVStreamFD :: UVStream -> IO FD
getUVStreamFD (UVStream Ptr UVHandle
hdl UVSlot
_ UVManager
_ IORef Bool
closed) = do
    Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
closed
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
c IO ()
forall a. HasCallStack => IO a
throwECLOSED
    IO FD -> IO FD
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (Ptr UVHandle -> IO FD
hs_uv_fileno Ptr UVHandle
hdl)

instance Input UVStream where
    -- readInput :: HasCallStack => UVStream -> Ptr Word8 ->  Int -> IO Int
    {-# INLINABLE readInput  #-}
    readInput :: UVStream -> Ptr Word8 -> UVSlot -> IO UVSlot
readInput (UVStream Ptr UVHandle
hdl UVSlot
slot UVManager
uvm IORef Bool
closed) Ptr Word8
buf UVSlot
len = IO UVSlot -> IO UVSlot
forall a. IO a -> IO a
mask_ (IO UVSlot -> IO UVSlot) -> IO UVSlot -> IO UVSlot
forall a b. (a -> b) -> a -> b
$ do
        Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
closed
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
c IO ()
forall a. HasCallStack => IO a
throwECLOSED
        -- set up buffer
        UVManager -> UVSlot -> Ptr Word8 -> UVSlot -> IO ()
pokeBufferTable UVManager
uvm UVSlot
slot Ptr Word8
buf UVSlot
len
        MVar UVSlot
m <- UVManager -> UVSlot -> IO (MVar UVSlot)
getBlockMVar UVManager
uvm UVSlot
slot
        -- clean up
        Maybe UVSlot
_ <- MVar UVSlot -> IO (Maybe UVSlot)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar UVSlot
m

        IO FD -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (IO FD -> IO ()) -> IO FD -> IO ()
forall a b. (a -> b) -> a -> b
$ UVManager -> IO FD -> IO FD
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (Ptr UVHandle -> IO FD
hs_uv_read_start Ptr UVHandle
hdl)
        -- since we are inside mask, this is the only place
        -- async exceptions could possibly kick in, and we should stop reading
        UVSlot
r <- MVar UVSlot -> IO UVSlot
forall a. MVar a -> IO a
takeMVar MVar UVSlot
m IO UVSlot -> IO () -> IO UVSlot
forall a b. IO a -> IO b -> IO a
`onException` (do
                -- normally we call 'uv_read_stop' in C read callback
                -- but when exception raise, here's the place to stop
                -- stop a handle twice will be a libuv error, so we don't check result
                FD
_ <- UVManager -> IO FD -> IO FD
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (Ptr UVHandle -> IO FD
uv_read_stop Ptr UVHandle
hdl)
                IO (Maybe UVSlot) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (MVar UVSlot -> IO (Maybe UVSlot)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar UVSlot
m))

        if  | UVSlot
r UVSlot -> UVSlot -> Bool
forall a. Ord a => a -> a -> Bool
> UVSlot
0  -> UVSlot -> IO UVSlot
forall (m :: * -> *) a. Monad m => a -> m a
return UVSlot
r
            | UVSlot
r UVSlot -> UVSlot -> Bool
forall a. Eq a => a -> a -> Bool
== FD -> UVSlot
forall a b. (Integral a, Num b) => a -> b
fromIntegral FD
UV_EOF -> UVSlot -> IO UVSlot
forall (m :: * -> *) a. Monad m => a -> m a
return UVSlot
0
            | UVSlot
r UVSlot -> UVSlot -> Bool
forall a. Ord a => a -> a -> Bool
< UVSlot
0 ->  IO UVSlot -> IO UVSlot
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (UVSlot -> IO UVSlot
forall (m :: * -> *) a. Monad m => a -> m a
return UVSlot
r)
            -- r == 0 should be impossible, since we guard this situation in c side
            | Bool
otherwise -> FD -> IOEInfo -> IO UVSlot
forall a. FD -> IOEInfo -> IO a
throwUVError FD
UV_UNKNOWN IOEInfo :: Text -> Text -> CallStack -> IOEInfo
IOEInfo{
                                  ioeName :: Text
ioeName = Text
"UVStream read error"
                                , ioeDescription :: Text
ioeDescription = Text
"UVStream read should never return 0 before EOF"
                                , ioeCallStack :: CallStack
ioeCallStack = CallStack
HasCallStack => CallStack
callStack
                                }

instance Output UVStream where
    -- writeOutput :: HasCallStack => UVStream -> Ptr Word8 -> Int -> IO ()
    {-# INLINABLE writeOutput  #-}
    writeOutput :: UVStream -> Ptr Word8 -> UVSlot -> IO ()
writeOutput (UVStream Ptr UVHandle
hdl UVSlot
_ UVManager
uvm IORef Bool
closed) Ptr Word8
buf UVSlot
len = IO () -> IO ()
forall a. IO a -> IO a
mask_ (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Bool
c <- IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
closed
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
c IO ()
forall a. HasCallStack => IO a
throwECLOSED
        MVar UVSlot
m <- UVManager -> IO (MVar UVSlot) -> IO (MVar UVSlot)
forall a. HasCallStack => UVManager -> IO a -> IO a
withUVManager' UVManager
uvm (IO (MVar UVSlot) -> IO (MVar UVSlot))
-> IO (MVar UVSlot) -> IO (MVar UVSlot)
forall a b. (a -> b) -> a -> b
$ do
            UVSlot
reqSlot <- HasCallStack => UVManager -> IO UVSlotUnsafe -> IO UVSlot
UVManager -> IO UVSlotUnsafe -> IO UVSlot
getUVSlot UVManager
uvm (Ptr UVHandle -> Ptr Word8 -> UVSlot -> IO UVSlotUnsafe
hs_uv_write Ptr UVHandle
hdl Ptr Word8
buf UVSlot
len)
            MVar UVSlot
m <- UVManager -> UVSlot -> IO (MVar UVSlot)
getBlockMVar UVManager
uvm UVSlot
reqSlot
            Maybe UVSlot
_ <- MVar UVSlot -> IO (Maybe UVSlot)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar UVSlot
m
            MVar UVSlot -> IO (MVar UVSlot)
forall (m :: * -> *) a. Monad m => a -> m a
return MVar UVSlot
m
        -- we can't cancel uv_write_t with current libuv,
        -- otherwise disaster will happen if buffer got collected.
        -- so we have to turn to uninterruptibleMask_'s help.
        -- i.e. writing UVStream is an uninterruptible operation.
        -- OS will guarantee writing TTY and socket will not
        -- hang forever anyway.
        IO UVSlot -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_  (IO UVSlot -> IO UVSlot
forall a. IO a -> IO a
uninterruptibleMask_ (IO UVSlot -> IO UVSlot) -> IO UVSlot -> IO UVSlot
forall a b. (a -> b) -> a -> b
$ MVar UVSlot -> IO UVSlot
forall a. MVar a -> IO a
takeMVar MVar UVSlot
m)
        {- wait for https://github.com/libuv/libuv/pull/2874
        -- attempt blocking write first
        r <- hs_uv_try_write hdl buf len
        if  | r == len -> return ()
            | r < 0 && r /= fromIntegral UV_EAGAIN -> throwUV r
            | otherwise -> do
                m <- withUVManager' uvm $ do
                    reqSlot <- if r > 0
                        then getUVSlot uvm (hs_uv_write hdl (buf `plusPtr` r) (len - r))
                        else getUVSlot uvm (hs_uv_write hdl buf len)
                    m <- getBlockMVar uvm reqSlot
                    _ <- tryTakeMVar m
                    return m
                -- we can't cancel uv_write_t with current libuv,
                -- otherwise disaster will happen if buffer got collected.
                -- so we have to turn to uninterruptibleMask_'s help.
                -- i.e. writing UVStream is an uninterruptible operation.
                -- OS will guarantee writing TTY and socket will not
                -- hang forever anyway.
                throwUVIfMinus_  (uninterruptibleMask_ $ takeMVar m)
        -}

--------------------------------------------------------------------------------

-- | Write "hello world" to a 'UVStream'.
helloWorld :: UVStream -> IO ()
helloWorld :: UVStream -> IO ()
helloWorld UVStream
uvs = UVStream -> Ptr Word8 -> UVSlot -> IO ()
forall o. Output o => o -> Ptr Word8 -> UVSlot -> IO ()
writeOutput UVStream
uvs (Addr# -> Ptr Word8
forall a. Addr# -> Ptr a
Ptr Addr#
"hello world"#) UVSlot
11

-- | Echo whatever received bytes.
echo :: UVStream -> IO ()
echo :: UVStream -> IO ()
echo UVStream
uvs = do
    BufferedInput
i <- UVStream -> IO BufferedInput
forall i. Input i => i -> IO BufferedInput
newBufferedInput UVStream
uvs
    BufferedOutput
o <- UVStream -> IO BufferedOutput
forall o. Output o => o -> IO BufferedOutput
newBufferedOutput UVStream
uvs
    IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ HasCallStack => BufferedInput -> IO Bytes
BufferedInput -> IO Bytes
readBuffer BufferedInput
i IO Bytes -> (Bytes -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HasCallStack => BufferedOutput -> Bytes -> IO ()
BufferedOutput -> Bytes -> IO ()
writeBuffer BufferedOutput
o IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> HasCallStack => BufferedOutput -> IO ()
BufferedOutput -> IO ()
flushBuffer BufferedOutput
o