{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE MultiWayIf #-}
module Std.IO.UV.Manager
( UVManager
, getUVManager
, getBlockMVar
, peekBufferTable
, pokeBufferTable
, withUVManager
, withUVManager_
, getUVSlot
, withUVRequest
, withUVRequest_
, withUVRequest'
, withUVRequestEx
, initUVStream
, UVStream(..)
, forkBa
) where
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Control.Monad.IO.Class
import Data.IORef
import Data.Bits (shiftL)
import Data.Primitive.PrimArray
import Data.Word
import Foreign.C
import Foreign.Ptr
import Foreign.Storable
import GHC.Conc.Sync (labelThread)
import Std.Data.Array
import Std.Data.PrimIORef
import Std.IO.Buffered
import Std.IO.Exception
import Std.IO.UV.Errno
import Std.IO.Resource
import Std.IO.UV.FFI
import System.IO.Unsafe
#define IDLE_LIMIT 20
data UVManager = UVManager
{ uvmBlockTable :: {-# UNPACK #-} !(IORef (UnliftedArray (MVar Int)))
, uvmLoop :: {-# UNPACK #-} !(Ptr UVLoop)
, uvmLoopData :: {-# UNPACK #-} !(Ptr UVLoopData)
, uvmRunning :: {-# UNPACK #-} !(MVar Bool)
, uvmCap :: {-# UNPACK #-} !Int
}
instance Show UVManager where
show uvm = "UVManager on capability " ++ show (uvmCap uvm)
instance Eq UVManager where
uvm == uvm' =
uvmCap uvm == uvmCap uvm'
uvManagerArray :: IORef (Array UVManager)
{-# NOINLINE uvManagerArray #-}
uvManagerArray = unsafePerformIO $ do
numCaps <- getNumCapabilities
uvmArray <- newArr numCaps
s <- newQSemN 0
forM_ [0..numCaps-1] $ \ i -> do
forkOn i . withResource (initUVManager INIT_LOOP_SIZE i) $ \ m -> do
myThreadId >>= (`labelThread` ("uv manager on " ++ show i))
writeArr uvmArray i m
signalQSemN s 1
startUVManager m
waitQSemN s numCaps
iuvmArray <- unsafeFreezeArr uvmArray
newIORef iuvmArray
getUVManager :: IO UVManager
{-# INLINABLE getUVManager #-}
getUVManager = do
(cap, _) <- threadCapability =<< myThreadId
uvmArray <- readIORef uvManagerArray
indexArrM uvmArray (cap `rem` sizeofArr uvmArray)
getBlockMVar :: UVManager -> UVSlot -> IO (MVar Int)
{-# INLINABLE getBlockMVar #-}
getBlockMVar uvm slot = do
blockTable <- readIORef (uvmBlockTable uvm)
indexArrM blockTable slot
pokeBufferTable :: UVManager -> UVSlot -> Ptr Word8 -> Int -> IO ()
{-# INLINABLE pokeBufferTable #-}
pokeBufferTable uvm slot buf bufSiz = do
(bufTable, bufSizTable) <- peekUVBufferTable (uvmLoopData uvm)
pokeElemOff bufTable slot buf
pokeElemOff bufSizTable slot (fromIntegral bufSiz)
peekBufferTable :: UVManager -> UVSlot -> IO Int
{-# INLINABLE peekBufferTable #-}
peekBufferTable uvm slot = do
(bufTable, bufSizTable) <- peekUVBufferTable (uvmLoopData uvm)
fromIntegral <$> peekElemOff bufSizTable slot
initUVManager :: HasCallStack => Int -> Int -> Resource UVManager
initUVManager siz cap = do
loop <- initUVLoop (fromIntegral siz)
liftIO $ do
mblockTable <- newArr siz
forM_ [0..siz-1] $ \ i -> writeArr mblockTable i =<< newEmptyMVar
blockTable <- unsafeFreezeArr mblockTable
blockTableRef <- newIORef blockTable
loopData <- peekUVLoopData loop
running <- newMVar False
return (UVManager blockTableRef loop loopData running cap)
where
initUVLoop :: HasCallStack => Int -> Resource (Ptr UVLoop)
initUVLoop siz = initResource
(throwOOMIfNull $ hs_uv_loop_init siz
) hs_uv_loop_close
withUVManager :: HasCallStack => UVManager -> (Ptr UVLoop -> IO a) -> IO a
withUVManager (UVManager _ loop loopData running _) f = go
where
go = do
r <- withMVar running $ \ running ->
if running
then do
throwUVIfMinus_ (hs_uv_wake_up_async loopData)
return Nothing
else do
r <- f loop
return (Just r)
case r of
Just r' -> return r'
_ -> yield >> go
withUVManager_ :: HasCallStack => UVManager -> IO a -> IO a
withUVManager_ uvm f = withUVManager uvm (\ _ -> f)
startUVManager :: HasCallStack => UVManager -> IO ()
startUVManager uvm@(UVManager _ _ _ running _) = loop
where
loop = do
e <- withMVar running $ \ _ -> step uvm False
if e > 0
then yield >> loop
else do
yield
e <- withMVar running $ \ _ -> step uvm False
if e > 0 then yield >> loop
else do
_ <- swapMVar running True
e <- step uvm True
_ <- swapMVar running False
yield
loop
step :: UVManager -> Bool -> IO Int
step (UVManager blockTableRef loop loopData _ _) block = do
blockTable <- readIORef blockTableRef
clearUVEventCounter loopData
if block
then if rtsSupportsBoundThreads
then throwUVIfMinus_ $ uv_run_safe loop UV_RUN_ONCE
else do
throwUVIfMinus_ (hs_uv_wake_up_timer loopData)
throwUVIfMinus_ (uv_run loop UV_RUN_ONCE)
else throwUVIfMinus_ (uv_run loop UV_RUN_NOWAIT)
(c, q) <- peekUVEventQueue loopData
forM_ [0..c-1] $ \ i -> do
slot <- peekElemOff q i
lock <- indexArrM blockTable slot
r <- peekBufferTable uvm slot
tryPutMVar lock r
return c
getUVSlot :: HasCallStack => UVManager -> IO UVSlotUnSafe -> IO UVSlot
{-# INLINE getUVSlot #-}
getUVSlot (UVManager blockTableRef _ _ _ _) f = do
slot <- throwUVIfMinus (unsafeGetSlot <$> f)
blockTable <- readIORef blockTableRef
let oldSiz = sizeofArr blockTable
when (slot == oldSiz) $ do
let newSiz = oldSiz `shiftL` 2
blockTable' <- newArr newSiz
copyArr blockTable' 0 blockTable 0 oldSiz
forM_ [oldSiz..newSiz-1] $ \ i ->
writeArr blockTable' i =<< newEmptyMVar
!iBlockTable' <- unsafeFreezeArr blockTable'
writeIORef blockTableRef iBlockTable'
return slot
cancelUVReq :: UVManager -> UVSlot -> (Int -> IO ()) -> IO ()
cancelUVReq uvm slot extra_cleanup = withUVManager uvm $ \ loop -> do
m <- getBlockMVar uvm slot
r <- tryTakeMVar m
case r of
Just r' -> extra_cleanup r'
_ -> do
pokeBufferTable uvm slot nullPtr 0
hs_uv_cancel loop slot
withUVRequest :: HasCallStack
=> UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> IO Int
withUVRequest uvm f = do
(slot, m) <- withUVManager uvm $ \ loop -> mask_ $ do
slot <- getUVSlot uvm (f loop)
m <- getBlockMVar uvm slot
tryTakeMVar m
return (slot, m)
throwUVIfMinus $
takeMVar m `onException` cancelUVReq uvm slot no_extra_cleanup
where no_extra_cleanup = const $ return ()
withUVRequest_ :: HasCallStack
=> UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> IO ()
withUVRequest_ uvm f = void (withUVRequest uvm f)
withUVRequest' :: HasCallStack
=> UVManager
-> (Ptr UVLoop -> IO UVSlotUnSafe)
-> (Int -> IO b)
-> IO b
withUVRequest' uvm f g = do
(slot, m) <- withUVManager uvm $ \ loop -> mask_ $ do
slot <- getUVSlot uvm (f loop)
m <- getBlockMVar uvm slot
tryTakeMVar m
return (slot, m)
(g =<< takeMVar m) `onException` cancelUVReq uvm slot no_extra_cleanup
where no_extra_cleanup = const $ return ()
withUVRequestEx :: HasCallStack
=> UVManager -> (Ptr UVLoop -> IO UVSlotUnSafe) -> (Int -> IO ()) -> IO Int
withUVRequestEx uvm f extra_cleanup = do
(slot, m) <- withUVManager uvm $ \ loop -> mask_ $ do
slot <- getUVSlot uvm (f loop)
m <- getBlockMVar uvm slot
tryTakeMVar m
return (slot, m)
throwUVIfMinus $
takeMVar m `onException` cancelUVReq uvm slot extra_cleanup
forkBa :: IO () -> IO ThreadId
forkBa io = do
i <- atomicAddCounter counter 1
forkOn i io
where
counter :: Counter
{-# NOINLINE counter #-}
counter = unsafePerformIO $ newCounter 0
data UVStream = UVStream
{ uvsHandle :: {-# UNPACK #-} !(Ptr UVHandle)
, uvsSlot :: {-# UNPACK #-} !UVSlot
, uvsManager :: UVManager
, uvsClosed :: {-# UNPACK #-} !(IORef Bool)
}
instance Show UVStream where
show (UVStream handle slot uvm _) =
"UVStream{uvsHandle = " ++ show handle ++
",uvsSlot = " ++ show slot ++
",uvsManager =" ++ show uvm ++ "}"
initUVStream :: HasCallStack
=> (Ptr UVLoop -> Ptr UVHandle -> IO ())
-> UVManager
-> Resource UVStream
initUVStream init uvm = initResource
(withUVManager uvm $ \ loop -> do
handle <- hs_uv_handle_alloc loop
slot <- getUVSlot uvm (peekUVHandleData handle)
tryTakeMVar =<< getBlockMVar uvm slot
init loop handle `onException` hs_uv_handle_free handle
closed <- newIORef False
return (UVStream handle slot uvm closed))
closeUVStream
closeUVStream :: UVStream -> IO ()
closeUVStream (UVStream handle _ uvm closed) = withUVManager_ uvm $ do
c <- readIORef closed
unless c $ writeIORef closed True >> hs_uv_handle_close handle
instance Input UVStream where
readInput uvs@(UVStream handle slot uvm closed) buf len = mask_ $ do
c <- readIORef closed
when c throwECLOSED
m <- getBlockMVar uvm slot
withUVManager_ uvm $ do
throwUVIfMinus_ (hs_uv_read_start handle)
pokeBufferTable uvm slot buf len
tryTakeMVar m
r <- takeMVar m `onException` closeUVStream uvs
if | r > 0 -> return r
| r == fromIntegral UV_EOF -> return 0
| r < 0 -> throwUVIfMinus (return r)
instance Output UVStream where
writeOutput uvs@(UVStream handle _ uvm closed) buf len = mask_ $ do
c <- readIORef closed
when c throwECLOSED
(slot, m) <- withUVManager_ uvm $ do
slot <- getUVSlot uvm (hs_uv_write handle buf len)
m <- getBlockMVar uvm slot
tryTakeMVar m
return (slot, m)
throwUVIfMinus_ (takeMVar m `onException` closeUVStream uvs)