{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Experimenter.ConcurrentIO
( doFork
, collectForkResult
, mapConurrentIO
) where
import Control.Concurrent (forkIO, yield)
import Control.Concurrent.STM
import Control.DeepSeq
import Control.Monad (void)
import Data.IORef
import Data.Maybe (fromJust)
mapConurrentIO :: (NFData b) => Int -> (a -> IO b) -> [a] -> IO [b]
mapConurrentIO :: forall b a. NFData b => Int -> (a -> IO b) -> [a] -> IO [b]
mapConurrentIO Int
maxNr a -> IO b
f [a]
xs = do
TMVar Int
nr <- forall a. a -> IO (TMVar a)
newTMVarIO Int
0
forall b a.
NFData b =>
TMVar Int -> Int -> (a -> IO b) -> [a] -> IO [b]
mapConurrentIO' TMVar Int
nr Int
maxNr a -> IO b
f [a]
xs
mapConurrentIO' :: (NFData b) => TMVar Int -> Int -> (a -> IO b) -> [a] -> IO [b]
mapConurrentIO' :: forall b a.
NFData b =>
TMVar Int -> Int -> (a -> IO b) -> [a] -> IO [b]
mapConurrentIO' TMVar Int
_ Int
_ a -> IO b
_ [] = forall (m :: * -> *) a. Monad m => a -> m a
return []
mapConurrentIO' TMVar Int
tmVar Int
maxNr a -> IO b
f (a
x:[a]
xs) = do
Int
nr <- forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. HasCallStack => Maybe a -> a
fromJust forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> STM (Maybe a)
tryReadTMVar TMVar Int
tmVar
if Int
nr forall a. Ord a => a -> a -> Bool
>= Int
maxNr
then forall a. STM a -> IO a
atomically (forall a. TMVar a -> STM a
readTMVar TMVar Int
tmVar) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall b a.
NFData b =>
TMVar Int -> Int -> (a -> IO b) -> [a] -> IO [b]
mapConurrentIO' TMVar Int
tmVar Int
maxNr a -> IO b
f (a
x forall a. a -> [a] -> [a]
: [a]
xs)
else do
IO ()
increase
!IORef (ThreadState b)
xThread <- forall a. NFData a => IO a -> IO (IORef (ThreadState a))
doFork forall a b. (a -> b) -> a -> b
$ a -> IO b
f a
x forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (\b
v -> IO ()
decrease forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return b
v)
[b]
xs' <- forall b a.
NFData b =>
TMVar Int -> Int -> (a -> IO b) -> [a] -> IO [b]
mapConurrentIO' TMVar Int
tmVar Int
maxNr a -> IO b
f [a]
xs
b
x' <- forall a. IORef (ThreadState a) -> IO a
collectForkResult IORef (ThreadState b)
xThread
forall (m :: * -> *) a. Monad m => a -> m a
return (b
x' forall a. a -> [a] -> [a]
: [b]
xs')
where
increase :: IO ()
increase = (Int -> Int) -> IO ()
modify (forall a. Num a => a -> a -> a
+ Int
1)
decrease :: IO ()
decrease = (Int -> Int) -> IO ()
modify (forall a. Num a => a -> a -> a
subtract Int
1)
modify :: (Int -> Int) -> IO ()
modify Int -> Int
g =
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Int
nr <- forall a. HasCallStack => Maybe a -> a
fromJust forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TMVar a -> STM (Maybe a)
tryReadTMVar TMVar Int
tmVar
forall a. TMVar a -> a -> STM a
swapTMVar TMVar Int
tmVar (Int -> Int
g Int
nr)
doFork :: NFData a => IO a -> IO (IORef (ThreadState a))
doFork :: forall a. NFData a => IO a -> IO (IORef (ThreadState a))
doFork IO a
f = do
IORef (ThreadState a)
ref <- forall a. a -> IO (IORef a)
newIORef forall a. ThreadState a
NotReady
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (IO a
f forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. IORef a -> a -> IO ()
writeIORef IORef (ThreadState a)
ref forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> ThreadState a
Ready forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. NFData a => a -> a
force)
forall (m :: * -> *) a. Monad m => a -> m a
return IORef (ThreadState a)
ref
collectForkResult :: IORef (ThreadState a) -> IO a
collectForkResult :: forall a. IORef (ThreadState a) -> IO a
collectForkResult IORef (ThreadState a)
ref = do
ThreadState a
mRes <- forall a. IORef a -> IO a
readIORef IORef (ThreadState a)
ref
case ThreadState a
mRes of
ThreadState a
NotReady -> IO ()
yield forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall a. IORef (ThreadState a) -> IO a
collectForkResult IORef (ThreadState a)
ref
Ready a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
data ThreadState a = NotReady | Ready !a