{-# LANGUAGE TupleSections #-}
module General.Pool(
Pool, runPool,
addPool, addPoolWait, PoolPriority(..),
) where
import Control.Concurrent.Extra
import General.Thread
import System.Time.Extra
import Control.Exception.Extra
import Control.Monad.Extra
import qualified Data.Heap as Heap
import qualified Data.HashSet as Set
import Data.IORef.Extra
false = False
data S = S
{alive :: !Bool
,threads :: !(Set.HashSet Thread)
,threadsLimit :: {-# UNPACK #-} !Int
,threadsCount :: {-# UNPACK #-} !Int
,threadsMax :: {-# UNPACK #-} !Int
,threadsSum :: {-# UNPACK #-} !Int
,rand :: IO Int
,todo :: !(Heap.Heap (Heap.Entry (PoolPriority, Int) (IO ())))
}
emptyS :: Int -> Bool -> IO S
emptyS n deterministic = do
rand <- do
ref <- newIORef 0
pure $ do i <- readIORef ref; writeIORef' ref (i+1); pure i
pure $ S True Set.empty n 0 0 0 rand Heap.empty
data Pool = Pool
!(Var S)
!(Barrier (Either SomeException S))
withPool :: Pool -> (S -> IO (S, IO ())) -> IO ()
withPool (Pool var _) f = join $ modifyVar var $ \s ->
if alive s then f s else pure (s, pure ())
withPool_ :: Pool -> (S -> IO S) -> IO ()
withPool_ pool act = withPool pool $ fmap (, pure()) . act
worker :: Pool -> IO ()
worker pool = withPool pool $ \s -> pure $ case Heap.uncons $ todo s of
Nothing -> (s, pure ())
Just (Heap.Entry _ now, todo2) -> (s{todo = todo2}, now >> worker pool)
step :: Pool -> (S -> IO S) -> IO ()
step pool@(Pool _ done) op = uninterruptibleMask_ $ withPool_ pool $ \s -> do
s <- op s
case Heap.uncons $ todo s of
Just (Heap.Entry _ now, todo2) | threadsCount s < threadsLimit s -> do
t <- newThreadFinally (now >> worker pool) $ \t res ->
case res of
Left e | false -> withPool_ pool $ \s -> do
signalBarrier done $ Left e
pure (remThread t s){alive = False}
_ ->
step pool $ pure . remThread t
pure (addThread t s){todo = todo2}
Nothing | false, threadsCount s == 0 -> do
signalBarrier done $ Right s
pure s{alive = False}
_ -> pure s
where
addThread t s = s{threads = Set.insert t $ threads s, threadsCount = threadsCount s + 1
,threadsSum = threadsSum s + 1, threadsMax = threadsMax s `max` (threadsCount s + 1)}
remThread t s = s{threads = Set.delete t $ threads s, threadsCount = threadsCount s - 1}
addPool :: PoolPriority -> Pool -> IO a -> IO ()
addPool priority pool act = step pool $ \s -> do
i <- rand s
pure s{todo = Heap.insert (Heap.Entry (priority, i) $ void act) $ todo s}
addPoolWait :: PoolPriority -> Pool -> IO a -> IO a
addPoolWait priority pool act = do
bar <- newBarrier
addPool priority pool $ uninterruptibleMask $ \unmask ->
signalBarrier bar =<< try_ (unmask act)
res <- waitBarrier bar
either throwIO pure res
data PoolPriority
= PoolRequired
| PoolSpeculate
deriving (Eq,Ord)
runPool :: Bool -> Int -> (Pool -> IO a) -> IO a
runPool deterministic n act = do
s <- newVar =<< emptyS n deterministic
done <- newBarrier
let pool = Pool s done
let cleanup =
join $ modifyVar s $ \s -> pure (s{alive=False}, stopThreads $ Set.toList $ threads s)
let ghc10793 = do
sleep 1
res <- waitBarrierMaybe done
case res of
Just (Left e) -> throwIO e
_ -> throwIO BlockedIndefinitelyOnMVar
flip finally cleanup $ handle (\BlockedIndefinitelyOnMVar -> ghc10793) $
act pool