{-|
Module      : Z.IO.UV.Manager
Description : IO manager based on libuv
Copyright   : (c) Dong Han, 2017-2018
License     : BSD
Maintainer  : winterland1989@gmail.com
Stability   : experimental
Portability : non-portable

This is an internal module provides IO manager which bridge libuv's async interface with ghc's lightweight thread.

The main procedures for doing event IO is:

  * Allocate uv_handle in C side, get its slot number with 'getUVSlot', or allocate uv_request with 'withUVRequest'.
  * Prepare you IO buffer with 'pokeBufferTable'(read or write).
  * Call C side IO functions with predefined callbacks.
  * Block your thread with the 'MVar' from 'getBlockMVar'.
  * In predefined callbacks, push slot number to uv_loop's queue.
  * IO polling finishes, IO manager thread will unblock blocking IO threads by filling the 'MVar' with
    current value from buffer size table.
  * Slot is freed on C side, either via callbacks, or when handle is closed.

Usually slots are cache in the IO device so that you don't have to allocate new one before each IO operation.
Check "Z.IO.Network.TCP" as an example.

-}

module Z.IO.UV.Manager
  ( UVManager(..)
  , getUVManager
  , getBlockMVar
  , peekBufferSizeTable
  , pokeBufferSizeTable
  , pokeBufferTable
  , withUVManager
  , withUVManager'
  , getUVSlot
  -- * request based async function helper
  , withUVRequest
  , withUVRequest_
  , withUVRequest'
  , withUVRequestEx
  -- * concurrent helpers
  , forkBa
  ) where

import           Control.Concurrent
import           Control.Monad
import           Control.Monad.IO.Class
import           Data.IORef
import           Data.Bits (shiftL)
import           Data.Word
import           GHC.Ptr
import           Foreign.Storable
import           GHC.Conc.Sync            (labelThread)
import           System.IO.Unsafe
import           Z.Data.Array
import           Z.Data.PrimRef.PrimIORef
import qualified Z.Data.Text.Print as T
import           Z.IO.Exception
import           Z.IO.Resource
import           Z.IO.UV.FFI

--------------------------------------------------------------------------------

data UVManager = UVManager
    { UVManager -> IORef (UnliftedArray (MVar Int))
uvmBlockTable :: {-# UNPACK #-} !(IORef (UnliftedArray (MVar Int)))
    -- ^ An array to store threads blocked on async IO.
    , UVManager -> Ptr UVLoop
uvmLoop       :: {-# UNPACK #-} !(Ptr UVLoop)
    -- ^ The uv loop refrerence
    , UVManager -> Ptr UVLoopData
uvmLoopData   :: {-# UNPACK #-} !(Ptr UVLoopData)
    -- ^ Cached pointer to uv_loop_t's data field
    , UVManager -> MVar Bool
uvmRunning    :: {-# UNPACK #-} !(MVar Bool)
    -- ^ Only uv manager thread will modify this value,
    -- 'True' druing uv_run and 'False' otherwise.
    -- Unlike epoll/ONESHOT, uv loop are NOT thread safe,
    -- one have to wake up the loop before mutating uv_loop's state.
    , UVManager -> Int
uvmCap        ::  {-# UNPACK #-} !Int
    -- ^ The capability number which uv manager runs on.
    }

instance Show UVManager where show :: UVManager -> String
show = UVManager -> String
forall a. Print a => a -> String
T.toString

instance T.Print UVManager where
    toUTF8BuilderP :: Int -> UVManager -> Builder ()
toUTF8BuilderP Int
p UVManager
uvm = Bool -> Builder () -> Builder ()
T.parenWhen (Int
p Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
10) (Builder () -> Builder ()) -> Builder () -> Builder ()
forall a b. (a -> b) -> a -> b
$
        Builder ()
"UVManager on capability " Builder () -> Builder () -> Builder ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> Builder ()
forall a. (Integral a, Bounded a) => a -> Builder ()
T.int (UVManager -> Int
uvmCap UVManager
uvm)

uvManagerArray :: IORef (Array UVManager)
{-# NOINLINE uvManagerArray #-}
uvManagerArray :: IORef (Array UVManager)
uvManagerArray = IO (IORef (Array UVManager)) -> IORef (Array UVManager)
forall a. IO a -> a
unsafePerformIO (IO (IORef (Array UVManager)) -> IORef (Array UVManager))
-> IO (IORef (Array UVManager)) -> IORef (Array UVManager)
forall a b. (a -> b) -> a -> b
$ do
    Int
numCaps <- IO Int
getNumCapabilities
    MutableArray RealWorld UVManager
uvmArray <- Int -> IO (MArr Array RealWorld UVManager)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
newArr Int
numCaps
    QSemN
s <- Int -> IO QSemN
newQSemN Int
0
    [Int] -> (Int -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
0..Int
numCapsInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1] ((Int -> IO ThreadId) -> IO ()) -> (Int -> IO ThreadId) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Int
i -> do
        -- fork uv manager thread
        Int -> IO () -> IO ThreadId
forkOn Int
i (IO () -> IO ThreadId)
-> ((UVManager -> IO ()) -> IO ())
-> (UVManager -> IO ())
-> IO ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Resource UVManager -> (UVManager -> IO ()) -> IO ()
forall (m :: * -> *) a b.
(MonadMask m, MonadIO m, HasCallStack) =>
Resource a -> (a -> m b) -> m b
withResource (HasCallStack => Int -> Int -> Resource UVManager
Int -> Int -> Resource UVManager
initUVManager Int
INIT_LOOP_SIZE Int
i) ((UVManager -> IO ()) -> IO ThreadId)
-> (UVManager -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ UVManager
m -> do
            IO ThreadId
myThreadId IO ThreadId -> (ThreadId -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (ThreadId -> String -> IO ()
`labelThread` (String
"uv manager on " String -> ShowS
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show Int
i))
            MArr Array RealWorld UVManager -> Int -> UVManager -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
writeArr MArr Array RealWorld UVManager
MutableArray RealWorld UVManager
uvmArray Int
i UVManager
m
            QSemN -> Int -> IO ()
signalQSemN QSemN
s Int
1
            HasCallStack => UVManager -> IO ()
UVManager -> IO ()
startUVManager UVManager
m
    QSemN -> Int -> IO ()
waitQSemN QSemN
s Int
numCaps
    Array UVManager
iuvmArray <- MArr Array RealWorld UVManager -> IO (Array UVManager)
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
unsafeFreezeArr MArr Array RealWorld UVManager
MutableArray RealWorld UVManager
uvmArray
    Array UVManager -> IO (IORef (Array UVManager))
forall a. a -> IO (IORef a)
newIORef Array UVManager
iuvmArray

-- | Get 'UVManager' runing on the same capability.
--
getUVManager :: IO UVManager
{-# INLINABLE getUVManager #-}
getUVManager :: IO UVManager
getUVManager = do
    (Int
cap, Bool
_) <- ThreadId -> IO (Int, Bool)
threadCapability (ThreadId -> IO (Int, Bool)) -> IO ThreadId -> IO (Int, Bool)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
    Array UVManager
uvmArray <- IORef (Array UVManager) -> IO (Array UVManager)
forall a. IORef a -> IO a
readIORef IORef (Array UVManager)
uvManagerArray
    Array UVManager -> Int -> IO UVManager
forall (arr :: * -> *) a (m :: * -> *).
(Arr arr a, Monad m) =>
arr a -> Int -> m a
indexArrM Array UVManager
uvmArray (Int
cap Int -> Int -> Int
forall a. Integral a => a -> a -> a
`rem` Array UVManager -> Int
forall (arr :: * -> *) a. Arr arr a => arr a -> Int
sizeofArr Array UVManager
uvmArray)

-- | Get 'MVar' from blocking table with given slot.
--
getBlockMVar :: UVManager -> UVSlot -> IO (MVar Int)
{-# INLINABLE getBlockMVar #-}
getBlockMVar :: UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot = do
    UnliftedArray (MVar Int)
blockTable <- IORef (UnliftedArray (MVar Int)) -> IO (UnliftedArray (MVar Int))
forall a. IORef a -> IO a
readIORef (UVManager -> IORef (UnliftedArray (MVar Int))
uvmBlockTable UVManager
uvm)
    UnliftedArray (MVar Int) -> Int -> IO (MVar Int)
forall (arr :: * -> *) a (m :: * -> *).
(Arr arr a, Monad m) =>
arr a -> Int -> m a
indexArrM UnliftedArray (MVar Int)
blockTable Int
slot

-- | Poke a prepared buffer and size into loop data under given slot.
--
-- NOTE, this action is not protected with 'withUVManager' for effcient reason, you should merge this action
-- with other uv action and put them together inside a 'withUVManager' or 'withUVManager\''. for example:
--
-- @
--    ...
--    withUVManager' uvm $ do
--        pokeBufferTable uvm slot buf len
--        uvReadStart handle
--    ...
-- @
--
pokeBufferTable :: UVManager    -- ^ uv manager
                -> UVSlot       -- ^ uv slot
                -> Ptr Word8    -- ^ buffer pointer
                -> Int          -- ^ buffer length
                -> IO ()
{-# INLINABLE pokeBufferTable #-}
pokeBufferTable :: UVManager -> Int -> Ptr Word8 -> Int -> IO ()
pokeBufferTable UVManager
uvm Int
slot Ptr Word8
buf Int
bufSiz = do
    (Ptr (Ptr Word8)
bufTable, Ptr CSsize
bufSizTable) <- Ptr UVLoopData -> IO (Ptr (Ptr Word8), Ptr CSsize)
peekUVBufferTable (UVManager -> Ptr UVLoopData
uvmLoopData UVManager
uvm)
    Ptr (Ptr Word8) -> Int -> Ptr Word8 -> IO ()
forall a. Storable a => Ptr a -> Int -> a -> IO ()
pokeElemOff Ptr (Ptr Word8)
bufTable Int
slot Ptr Word8
buf
    Ptr CSsize -> Int -> CSsize -> IO ()
forall a. Storable a => Ptr a -> Int -> a -> IO ()
pokeElemOff Ptr CSsize
bufSizTable Int
slot (Int -> CSsize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
bufSiz)

-- | Peek buffer size table
--
-- The notes on 'pokeBufferTable' apply here too.
peekBufferSizeTable :: UVManager -> UVSlot -> IO Int
{-# INLINABLE peekBufferSizeTable #-}
peekBufferSizeTable :: UVManager -> Int -> IO Int
peekBufferSizeTable UVManager
uvm Int
slot = do
    (Ptr (Ptr Word8)
_, Ptr CSsize
bufSizTable) <- Ptr UVLoopData -> IO (Ptr (Ptr Word8), Ptr CSsize)
peekUVBufferTable (UVManager -> Ptr UVLoopData
uvmLoopData UVManager
uvm)
    CSsize -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral (CSsize -> Int) -> IO CSsize -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Ptr CSsize -> Int -> IO CSsize
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr CSsize
bufSizTable Int
slot

-- | Poke buffer size table
--
-- The notes on 'pokeBufferTable' apply here too.
pokeBufferSizeTable :: UVManager -> UVSlot -> Int -> IO ()
{-# INLINABLE pokeBufferSizeTable #-}
pokeBufferSizeTable :: UVManager -> Int -> Int -> IO ()
pokeBufferSizeTable UVManager
uvm Int
slot Int
bufSiz = do
    (Ptr (Ptr Word8)
_, Ptr CSsize
bufSizTable) <- Ptr UVLoopData -> IO (Ptr (Ptr Word8), Ptr CSsize)
peekUVBufferTable (UVManager -> Ptr UVLoopData
uvmLoopData UVManager
uvm)
    Ptr CSsize -> Int -> CSsize -> IO ()
forall a. Storable a => Ptr a -> Int -> a -> IO ()
pokeElemOff Ptr CSsize
bufSizTable Int
slot (Int -> CSsize
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
bufSiz)

initUVManager :: HasCallStack => Int -> Int -> Resource UVManager
initUVManager :: Int -> Int -> Resource UVManager
initUVManager Int
siz Int
cap = do
    Ptr UVLoop
loop  <- IO (Ptr UVLoop) -> (Ptr UVLoop -> IO ()) -> Resource (Ptr UVLoop)
forall a. IO a -> (a -> IO ()) -> Resource a
initResource
                (IO (Ptr UVLoop) -> IO (Ptr UVLoop)
forall a. HasCallStack => IO (Ptr a) -> IO (Ptr a)
throwOOMIfNull (IO (Ptr UVLoop) -> IO (Ptr UVLoop))
-> IO (Ptr UVLoop) -> IO (Ptr UVLoop)
forall a b. (a -> b) -> a -> b
$ Int -> IO (Ptr UVLoop)
hs_uv_loop_init (Int -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
siz))
                Ptr UVLoop -> IO ()
hs_uv_loop_close
    IO UVManager -> Resource UVManager
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UVManager -> Resource UVManager)
-> IO UVManager -> Resource UVManager
forall a b. (a -> b) -> a -> b
$ do
        MutableUnliftedArray RealWorld (MVar Int)
mblockTable <- Int -> IO (MArr UnliftedArray RealWorld (MVar Int))
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
newArr Int
siz
        [Int] -> (Int -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
0..Int
sizInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Int
i -> MArr UnliftedArray RealWorld (MVar Int) -> Int -> MVar Int -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
writeArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
mblockTable Int
i (MVar Int -> IO ()) -> IO (MVar Int) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (MVar Int)
forall a. IO (MVar a)
newEmptyMVar
        UnliftedArray (MVar Int)
blockTable <- MArr UnliftedArray RealWorld (MVar Int)
-> IO (UnliftedArray (MVar Int))
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
unsafeFreezeArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
mblockTable
        IORef (UnliftedArray (MVar Int))
blockTableRef <- UnliftedArray (MVar Int) -> IO (IORef (UnliftedArray (MVar Int)))
forall a. a -> IO (IORef a)
newIORef UnliftedArray (MVar Int)
blockTable
        Ptr UVLoopData
loopData <- Ptr UVLoop -> IO (Ptr UVLoopData)
peekUVLoopData Ptr UVLoop
loop
        MVar Bool
runningLock <- Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
        UVManager -> IO UVManager
forall (m :: * -> *) a. Monad m => a -> m a
return (IORef (UnliftedArray (MVar Int))
-> Ptr UVLoop -> Ptr UVLoopData -> MVar Bool -> Int -> UVManager
UVManager IORef (UnliftedArray (MVar Int))
blockTableRef Ptr UVLoop
loop Ptr UVLoopData
loopData MVar Bool
runningLock Int
cap)

-- | Lock an uv mananger, so that we can safely mutate its uv_loop's state.
--
-- libuv is not thread safe, use this function to perform any action which will mutate uv_loop's state.
--
withUVManager :: HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager :: UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager (UVManager IORef (UnliftedArray (MVar Int))
_ Ptr UVLoop
loop Ptr UVLoopData
loopData MVar Bool
runningLock Int
_) Ptr UVLoop -> IO a
f = IO a
go
  where
    go :: IO a
go = do
        Maybe a
r <- MVar Bool -> (Bool -> IO (Maybe a)) -> IO (Maybe a)
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Bool
runningLock ((Bool -> IO (Maybe a)) -> IO (Maybe a))
-> (Bool -> IO (Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \ Bool
running -> do
            if Bool
running
            then do
                -- if uv_run is running, it will stop
                -- if uv_run is not running, next running won't block
                IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVLoopData -> IO CInt
hs_uv_wake_up_async Ptr UVLoopData
loopData)
                Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
            else do
                !a
r <- Ptr UVLoop -> IO a
f Ptr UVLoop
loop
                Maybe a -> IO (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Maybe a
forall a. a -> Maybe a
Just a
r)
        case Maybe a
r of
            Just a
r' -> a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r'
            Maybe a
_       -> IO ()
yield IO () -> IO a -> IO a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO a
go -- we yield here, because uv_run is probably not finished yet

-- | Lock an uv mananger, so that we can safely mutate its uv_loop's state.
--
-- Some action did not request uv_loop pointer explicitly, but will mutate uv_loop underhood, for example:
-- @uv_read_start@. These actions have to be protected by locking the uv_loop.
--
-- In fact most of the libuv's functions are not thread safe, so watch out!
--
withUVManager' :: HasCallStack => UVManager -> IO a -> IO a
withUVManager' :: UVManager -> IO a -> IO a
withUVManager' UVManager
uvm IO a
f = UVManager -> (Ptr UVLoop -> IO a) -> IO a
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm (\ Ptr UVLoop
_ -> IO a
f)

-- | Start the uv loop
--
startUVManager :: HasCallStack => UVManager -> IO ()
startUVManager :: UVManager -> IO ()
startUVManager uvm :: UVManager
uvm@(UVManager IORef (UnliftedArray (MVar Int))
_ Ptr UVLoop
_ Ptr UVLoopData
_ MVar Bool
runningLock Int
_) = IO ()
poll -- use a closure capture uvm in case of stack memory leaking
  where
    -- we borrow mio's non-blocking/blocking poll strategy here
    poll :: IO ()
poll = do
        Int
e <- MVar Bool -> (Bool -> IO Int) -> IO Int
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Bool
runningLock ((Bool -> IO Int) -> IO Int) -> (Bool -> IO Int) -> IO Int
forall a b. (a -> b) -> a -> b
$ \ Bool
_ -> UVManager -> Bool -> IO Int
step UVManager
uvm Bool
False
        if Int
e Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0                                        -- first we do a non-blocking poll, if we got events
        then IO ()
yield IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
poll                              -- we yield here, to let other threads do actual work
        else do                                         -- otherwise we still yield once
            IO ()
yield                                       -- in case other threads can still progress
            Int
e' <- MVar Bool -> (Bool -> IO Int) -> IO Int
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar Bool
runningLock ((Bool -> IO Int) -> IO Int) -> (Bool -> IO Int) -> IO Int
forall a b. (a -> b) -> a -> b
$ \ Bool
_ -> UVManager -> Bool -> IO Int
step UVManager
uvm Bool
False   -- now we have done another non-blocking poll
            if Int
e' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0 then IO ()
yield IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
poll            -- if we got events somehow, we yield and go back
            else do                                 -- if there're still no events,
                                                    -- we directly jump to safe blocking poll
                Bool
_ <- MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
runningLock Bool
True      -- after swap this lock, other threads have to wake up us
                Int
_ <- UVManager -> Bool -> IO Int
step UVManager
uvm Bool
True                  -- by send async handler, thus libuv's states are safe
                Bool
_ <- MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
runningLock Bool
False
                                                    -- blocking poll only exits if there're events,
                IO ()
yield                               -- so we yield here, to let other threads do actual work
                IO ()
poll

    -- call uv_run, return the event number
    step :: UVManager -> Bool -> IO Int
    step :: UVManager -> Bool -> IO Int
step (UVManager IORef (UnliftedArray (MVar Int))
blockTableRef Ptr UVLoop
loop Ptr UVLoopData
loopData MVar Bool
_ Int
_) Bool
block = do
            UnliftedArray (MVar Int)
blockTable <- IORef (UnliftedArray (MVar Int)) -> IO (UnliftedArray (MVar Int))
forall a. IORef a -> IO a
readIORef IORef (UnliftedArray (MVar Int))
blockTableRef
            Ptr UVLoopData -> IO ()
clearUVEventCounter Ptr UVLoopData
loopData        -- clean event counter

            if Bool
block
            then if Bool
rtsSupportsBoundThreads
                then IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (IO CInt -> IO ()) -> IO CInt -> IO ()
forall a b. (a -> b) -> a -> b
$ Ptr UVLoop -> CInt -> IO CInt
uv_run_safe Ptr UVLoop
loop CInt
UV_RUN_ONCE
                else do
                    -- use a 1ms timeout blocking poll on non-threaded rts
                    IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVLoopData -> IO CInt
hs_uv_wake_up_timer Ptr UVLoopData
loopData)
                    IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVLoop -> CInt -> IO CInt
uv_run Ptr UVLoop
loop CInt
UV_RUN_ONCE)
            else IO CInt -> IO ()
forall a. (HasCallStack, Integral a) => IO a -> IO ()
throwUVIfMinus_ (Ptr UVLoop -> CInt -> IO CInt
uv_run Ptr UVLoop
loop CInt
UV_RUN_NOWAIT)

            (Int
c, Ptr Int
q) <- Ptr UVLoopData -> IO (Int, Ptr Int)
peekUVEventQueue Ptr UVLoopData
loopData
            [Int] -> (Int -> IO Bool) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
0..Int
cInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1] ((Int -> IO Bool) -> IO ()) -> (Int -> IO Bool) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Int
i -> do
                Int
slot <- Ptr Int -> Int -> IO Int
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr Int
q Int
i
                MVar Int
lock <- UnliftedArray (MVar Int) -> Int -> IO (MVar Int)
forall (arr :: * -> *) a (m :: * -> *).
(Arr arr a, Monad m) =>
arr a -> Int -> m a
indexArrM UnliftedArray (MVar Int)
blockTable Int
slot
                -- It's important to read the buffer size table inside running lock and
                -- unlock ghc thread with the result, where 'tryPutMVar' will mutate waiting
                -- thread's stack to ensure it will receive the result after get resumed.
                --
                -- After step finished, other threads are free to take the same slot,
                -- thus can overwrite the buffer size table, i.e. the previous result.
                --
                !Int
r <- UVManager -> Int -> IO Int
peekBufferSizeTable UVManager
uvm Int
slot
                MVar Int -> Int -> IO Bool
forall a. MVar a -> a -> IO Bool
tryPutMVar MVar Int
lock Int
r
            Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
c

-- | Run a libuv FFI to get a 'UVSlotUnsafe' (which may exceed block table size),
-- resize the block table in that case, so that the returned slot always has an
-- accompanying 'MVar' in block table.
--
-- Always use this function to turn an 'UVSlotUnsafe' into 'UVSlot', so that the block
-- table size synchronize with libuv side's slot table.
getUVSlot :: HasCallStack => UVManager -> IO UVSlotUnsafe -> IO UVSlot
{-# INLINE getUVSlot #-}
getUVSlot :: UVManager -> IO UVSlotUnsafe -> IO Int
getUVSlot (UVManager IORef (UnliftedArray (MVar Int))
blockTableRef Ptr UVLoop
_ Ptr UVLoopData
_ MVar Bool
_ Int
_) IO UVSlotUnsafe
f = do
    Int
slot <- IO Int -> IO Int
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (UVSlotUnsafe -> Int
unsafeGetSlot (UVSlotUnsafe -> Int) -> IO UVSlotUnsafe -> IO Int
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO UVSlotUnsafe
f)
    UnliftedArray (MVar Int)
blockTable <- IORef (UnliftedArray (MVar Int)) -> IO (UnliftedArray (MVar Int))
forall a. IORef a -> IO a
readIORef IORef (UnliftedArray (MVar Int))
blockTableRef
    let oldSiz :: Int
oldSiz = UnliftedArray (MVar Int) -> Int
forall (arr :: * -> *) a. Arr arr a => arr a -> Int
sizeofArr UnliftedArray (MVar Int)
blockTable
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
slot Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
oldSiz) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        let newSiz :: Int
newSiz = Int
oldSiz Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`shiftL` Int
2
        MutableUnliftedArray RealWorld (MVar Int)
blockTable' <- Int -> IO (MArr UnliftedArray RealWorld (MVar Int))
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
Int -> m (MArr arr s a)
newArr Int
newSiz
        MArr UnliftedArray RealWorld (MVar Int)
-> Int -> UnliftedArray (MVar Int) -> Int -> Int -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> arr a -> Int -> Int -> m ()
copyArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
blockTable' Int
0 UnliftedArray (MVar Int)
blockTable Int
0 Int
oldSiz
        [Int] -> (Int -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
oldSiz..Int
newSizInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Int
i ->
            MArr UnliftedArray RealWorld (MVar Int) -> Int -> MVar Int -> IO ()
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> Int -> a -> m ()
writeArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
blockTable' Int
i (MVar Int -> IO ()) -> IO (MVar Int) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO (MVar Int)
forall a. IO (MVar a)
newEmptyMVar
        !UnliftedArray (MVar Int)
iBlockTable' <- MArr UnliftedArray RealWorld (MVar Int)
-> IO (UnliftedArray (MVar Int))
forall (arr :: * -> *) a (m :: * -> *) s.
(Arr arr a, PrimMonad m, PrimState m ~ s) =>
MArr arr s a -> m (arr a)
unsafeFreezeArr MArr UnliftedArray RealWorld (MVar Int)
MutableUnliftedArray RealWorld (MVar Int)
blockTable'
        IORef (UnliftedArray (MVar Int))
-> UnliftedArray (MVar Int) -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (UnliftedArray (MVar Int))
blockTableRef UnliftedArray (MVar Int)
iBlockTable'
    Int -> IO Int
forall (m :: * -> *) a. Monad m => a -> m a
return Int
slot

--------------------------------------------------------------------------------

-- | Cancel uv async function (actions which can be cancelled with 'uv_cancel') with
-- best effort, if the action is already performed, run an extra clean up action.
cancelUVReq :: UVManager -> UVSlot -> (Int -> IO ()) -> IO ()
cancelUVReq :: UVManager -> Int -> (Int -> IO ()) -> IO ()
cancelUVReq UVManager
uvm Int
slot Int -> IO ()
extra_cleanup = UVManager -> (Ptr UVLoop -> IO ()) -> IO ()
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO ()) -> IO ()) -> (Ptr UVLoop -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> do
    MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot
    Maybe Int
r <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m
    case Maybe Int
r of
        Just Int
r' -> Int -> IO ()
extra_cleanup Int
r'             -- It's too late
        Maybe Int
_ -> do
            UVManager -> Int -> Ptr Word8 -> Int -> IO ()
pokeBufferTable UVManager
uvm Int
slot Ptr Word8
forall a. Ptr a
nullPtr Int
0  -- doing this let libuv side knows that
                                                -- we won't keep buffer alive in callbacks
            Ptr UVLoop -> Int -> IO ()
hs_uv_cancel Ptr UVLoop
loop Int
slot              -- then we cancel the io with best efforts

-- | Exception safe uv request helper
--
-- This helper will run a libuv's async function, which will return a
-- libuv side's slot, then we will accommodate a 'MVar' in block table and
-- wait on that 'MVar', until the async function finished or an exception
-- is received, in later case we will call 'cancelUVReq' to cancel the on-going
-- async function with best efforts,
withUVRequest :: HasCallStack
              => UVManager -> (Ptr UVLoop -> IO UVSlotUnsafe) -> IO Int
withUVRequest :: UVManager -> (Ptr UVLoop -> IO UVSlotUnsafe) -> IO Int
withUVRequest UVManager
uvm Ptr UVLoop -> IO UVSlotUnsafe
f = do
    (Int
slot, MVar Int
m) <- UVManager
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int))
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a. IO a -> IO a
mask_ (IO (Int, MVar Int) -> IO (Int, MVar Int))
-> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ do
        Int
slot <- HasCallStack => UVManager -> IO UVSlotUnsafe -> IO Int
UVManager -> IO UVSlotUnsafe -> IO Int
getUVSlot UVManager
uvm (Ptr UVLoop -> IO UVSlotUnsafe
f Ptr UVLoop
loop)
        MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot
        Maybe Int
_ <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m
        (Int, MVar Int) -> IO (Int, MVar Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
slot, MVar Int
m)
    IO Int -> IO Int
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (MVar Int -> IO Int
forall a. MVar a -> IO a
takeMVar MVar Int
m IO Int -> IO () -> IO Int
forall a b. IO a -> IO b -> IO a
`onException` UVManager -> Int -> (Int -> IO ()) -> IO ()
cancelUVReq UVManager
uvm Int
slot Int -> IO ()
forall b. b -> IO ()
no_extra_cleanup)
  where no_extra_cleanup :: b -> IO ()
no_extra_cleanup = IO () -> b -> IO ()
forall a b. a -> b -> a
const (IO () -> b -> IO ()) -> IO () -> b -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Same with 'withUVRequest' but disgard the result.
withUVRequest_ :: HasCallStack
               => UVManager -> (Ptr UVLoop -> IO UVSlotUnsafe) -> IO ()
withUVRequest_ :: UVManager -> (Ptr UVLoop -> IO UVSlotUnsafe) -> IO ()
withUVRequest_ UVManager
uvm Ptr UVLoop -> IO UVSlotUnsafe
f = IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (HasCallStack =>
UVManager -> (Ptr UVLoop -> IO UVSlotUnsafe) -> IO Int
UVManager -> (Ptr UVLoop -> IO UVSlotUnsafe) -> IO Int
withUVRequest UVManager
uvm Ptr UVLoop -> IO UVSlotUnsafe
f)

-- | Same with 'withUVRequest' but apply an convert function to result.
--
-- The convert function have all access to the returned value including
-- negative ones, it's convert funtions's responsiblity to throw an exception
-- if appropriate.
withUVRequest' :: HasCallStack
               => UVManager
               -> (Ptr UVLoop -> IO UVSlotUnsafe)
               -> (Int -> IO b)     -- ^ convert function
               -> IO b
withUVRequest' :: UVManager
-> (Ptr UVLoop -> IO UVSlotUnsafe) -> (Int -> IO b) -> IO b
withUVRequest' UVManager
uvm Ptr UVLoop -> IO UVSlotUnsafe
f Int -> IO b
g = do
    (Int
slot, MVar Int
m) <- UVManager
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int))
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a. IO a -> IO a
mask_ (IO (Int, MVar Int) -> IO (Int, MVar Int))
-> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ do
        Int
slot <- HasCallStack => UVManager -> IO UVSlotUnsafe -> IO Int
UVManager -> IO UVSlotUnsafe -> IO Int
getUVSlot UVManager
uvm (Ptr UVLoop -> IO UVSlotUnsafe
f Ptr UVLoop
loop)
        MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot
        -- since we locked uv manager here, it won't affect next event
        Maybe Int
_ <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m
        (Int, MVar Int) -> IO (Int, MVar Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
slot, MVar Int
m)
    Int -> IO b
g (Int -> IO b) -> IO Int -> IO b
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< (MVar Int -> IO Int
forall a. MVar a -> IO a
takeMVar MVar Int
m IO Int -> IO () -> IO Int
forall a b. IO a -> IO b -> IO a
`onException` UVManager -> Int -> (Int -> IO ()) -> IO ()
cancelUVReq UVManager
uvm Int
slot Int -> IO ()
forall b. b -> IO ()
no_extra_cleanup)
  where no_extra_cleanup :: b -> IO ()
no_extra_cleanup = IO () -> b -> IO ()
forall a b. a -> b -> a
const (IO () -> b -> IO ()) -> IO () -> b -> IO ()
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Same with 'withUVRequest', but will also run an extra cleanup function
-- if async exception hit this thread but the async action is already successfully performed,
-- e.g. release result memory.
withUVRequestEx :: HasCallStack
                => UVManager -> (Ptr UVLoop -> IO UVSlotUnsafe) -> (Int -> IO ()) -> IO Int
withUVRequestEx :: UVManager
-> (Ptr UVLoop -> IO UVSlotUnsafe) -> (Int -> IO ()) -> IO Int
withUVRequestEx UVManager
uvm Ptr UVLoop -> IO UVSlotUnsafe
f Int -> IO ()
extra_cleanup = do
    (Int
slot, MVar Int
m) <- UVManager
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a. HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager UVManager
uvm ((Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int))
-> (Ptr UVLoop -> IO (Int, MVar Int)) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ \ Ptr UVLoop
loop -> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a. IO a -> IO a
mask_ (IO (Int, MVar Int) -> IO (Int, MVar Int))
-> IO (Int, MVar Int) -> IO (Int, MVar Int)
forall a b. (a -> b) -> a -> b
$ do
        Int
slot <- HasCallStack => UVManager -> IO UVSlotUnsafe -> IO Int
UVManager -> IO UVSlotUnsafe -> IO Int
getUVSlot UVManager
uvm (Ptr UVLoop -> IO UVSlotUnsafe
f Ptr UVLoop
loop)
        MVar Int
m <- UVManager -> Int -> IO (MVar Int)
getBlockMVar UVManager
uvm Int
slot
        Maybe Int
_ <- MVar Int -> IO (Maybe Int)
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar Int
m
        (Int, MVar Int) -> IO (Int, MVar Int)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
slot, MVar Int
m)
    IO Int -> IO Int
forall a. (HasCallStack, Integral a) => IO a -> IO a
throwUVIfMinus (MVar Int -> IO Int
forall a. MVar a -> IO a
takeMVar MVar Int
m IO Int -> IO () -> IO Int
forall a b. IO a -> IO b -> IO a
`onException` UVManager -> Int -> (Int -> IO ()) -> IO ()
cancelUVReq UVManager
uvm Int
slot Int -> IO ()
extra_cleanup)

--------------------------------------------------------------------------------

-- | Fork a new GHC thread with active load-balancing.
--
-- Using libuv based IO solution has a disadvantage that file handlers are bound to certain
-- uv_loop, thus certain uv mananger/capability. Worker threads that migrate to other capability
-- will lead contention since various APIs here is protected by manager's lock, this makes GHC's
-- work-stealing strategy unsuitable for certain workload, such as a webserver.
-- we solve this problem with simple round-robin load-balancing: forkBa will automatically
-- distribute new threads to all capabilities in round-robin manner. Thus its name forkBa(lance).
forkBa :: IO () -> IO ThreadId
forkBa :: IO () -> IO ThreadId
forkBa IO ()
io = do
    Int
i <- Counter -> Int -> IO Int
atomicAddCounter Counter
counter Int
1
    Int -> IO () -> IO ThreadId
forkOn Int
i IO ()
io
  where
    counter :: Counter
    {-# NOINLINE counter #-}
    counter :: Counter
counter = IO Counter -> Counter
forall a. IO a -> a
unsafePerformIO (IO Counter -> Counter) -> IO Counter -> Counter
forall a b. (a -> b) -> a -> b
$ Int -> IO Counter
newCounter Int
0