{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeOperators #-}
module Control.Monad.Schedule.OSThreadPool where
import Control.Concurrent
import Control.Monad (forM, replicateM, void)
import Control.Monad.IO.Class
import Data.Either (partitionEithers)
import Data.List.NonEmpty hiding (cycle, zip)
import Data.Proxy
import GHC.TypeLits
import Prelude hiding (take)
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Monad.Schedule.Class
newtype OSThreadPool (n :: Nat) a = OSThreadPool {forall (n :: Nat) a. OSThreadPool n a -> IO a
unOSThreadPool :: IO a}
deriving ((forall a b. (a -> b) -> OSThreadPool n a -> OSThreadPool n b)
-> (forall a b. a -> OSThreadPool n b -> OSThreadPool n a)
-> Functor (OSThreadPool n)
forall (n :: Nat) a b. a -> OSThreadPool n b -> OSThreadPool n a
forall (n :: Nat) a b.
(a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall a b. a -> OSThreadPool n b -> OSThreadPool n a
forall a b. (a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall (n :: Nat) a b.
(a -> b) -> OSThreadPool n a -> OSThreadPool n b
fmap :: forall a b. (a -> b) -> OSThreadPool n a -> OSThreadPool n b
$c<$ :: forall (n :: Nat) a b. a -> OSThreadPool n b -> OSThreadPool n a
<$ :: forall a b. a -> OSThreadPool n b -> OSThreadPool n a
Functor, Functor (OSThreadPool n)
Functor (OSThreadPool n) =>
(forall a. a -> OSThreadPool n a)
-> (forall a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b)
-> (forall a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c)
-> (forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b)
-> (forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a)
-> Applicative (OSThreadPool n)
forall (n :: Nat). Functor (OSThreadPool n)
forall (n :: Nat) a. a -> OSThreadPool n a
forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall (n :: Nat) a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall (n :: Nat) a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
forall a. a -> OSThreadPool n a
forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
forall a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall (n :: Nat) a. a -> OSThreadPool n a
pure :: forall a. a -> OSThreadPool n a
$c<*> :: forall (n :: Nat) a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
<*> :: forall a b.
OSThreadPool n (a -> b) -> OSThreadPool n a -> OSThreadPool n b
$cliftA2 :: forall (n :: Nat) a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
liftA2 :: forall a b c.
(a -> b -> c)
-> OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n c
$c*> :: forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
*> :: forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
$c<* :: forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
<* :: forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n a
Applicative, Applicative (OSThreadPool n)
Applicative (OSThreadPool n) =>
(forall a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b)
-> (forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b)
-> (forall a. a -> OSThreadPool n a)
-> Monad (OSThreadPool n)
forall (n :: Nat). Applicative (OSThreadPool n)
forall (n :: Nat) a. a -> OSThreadPool n a
forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall (n :: Nat) a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
forall a. a -> OSThreadPool n a
forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
forall a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall (n :: Nat) a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
>>= :: forall a b.
OSThreadPool n a -> (a -> OSThreadPool n b) -> OSThreadPool n b
$c>> :: forall (n :: Nat) a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
>> :: forall a b.
OSThreadPool n a -> OSThreadPool n b -> OSThreadPool n b
$creturn :: forall (n :: Nat) a. a -> OSThreadPool n a
return :: forall a. a -> OSThreadPool n a
Monad, Monad (OSThreadPool n)
Monad (OSThreadPool n) =>
(forall a. IO a -> OSThreadPool n a) -> MonadIO (OSThreadPool n)
forall (n :: Nat). Monad (OSThreadPool n)
forall (n :: Nat) a. IO a -> OSThreadPool n a
forall a. IO a -> OSThreadPool n a
forall (m :: * -> *).
Monad m =>
(forall a. IO a -> m a) -> MonadIO m
$cliftIO :: forall (n :: Nat) a. IO a -> OSThreadPool n a
liftIO :: forall a. IO a -> OSThreadPool n a
MonadIO)
data WorkerLink a = WorkerLink
{ forall a. WorkerLink a -> TChan (Maybe (IO a))
jobTChan :: TChan (Maybe (IO a))
, forall a. WorkerLink a -> TChan a
resultTChan :: TChan a
}
putJob :: WorkerLink a -> OSThreadPool n a -> IO ()
putJob :: forall a (n :: Nat). WorkerLink a -> OSThreadPool n a -> IO ()
putJob WorkerLink {TChan a
TChan (Maybe (IO a))
jobTChan :: forall a. WorkerLink a -> TChan (Maybe (IO a))
resultTChan :: forall a. WorkerLink a -> TChan a
jobTChan :: TChan (Maybe (IO a))
resultTChan :: TChan a
..} OSThreadPool {IO a
unOSThreadPool :: forall (n :: Nat) a. OSThreadPool n a -> IO a
unOSThreadPool :: IO a
..} =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
TChan (Maybe (IO a)) -> Maybe (IO a) -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan (Maybe (IO a))
jobTChan (Maybe (IO a) -> STM ()) -> Maybe (IO a) -> STM ()
forall a b. (a -> b) -> a -> b
$
IO a -> Maybe (IO a)
forall a. a -> Maybe a
Just IO a
unOSThreadPool
makeWorkerLink :: IO (WorkerLink a)
makeWorkerLink :: forall a. IO (WorkerLink a)
makeWorkerLink = do
TChan (Maybe (IO a))
jobTChan <- STM (TChan (Maybe (IO a))) -> IO (TChan (Maybe (IO a)))
forall a. STM a -> IO a
atomically STM (TChan (Maybe (IO a)))
forall a. STM (TChan a)
newTChan
TChan a
resultTChan <- STM (TChan a) -> IO (TChan a)
forall a. STM a -> IO a
atomically STM (TChan a)
forall a. STM (TChan a)
newTChan
let worker :: IO ()
worker = do
Maybe (IO a)
job <- STM (Maybe (IO a)) -> IO (Maybe (IO a))
forall a. STM a -> IO a
atomically (STM (Maybe (IO a)) -> IO (Maybe (IO a)))
-> STM (Maybe (IO a)) -> IO (Maybe (IO a))
forall a b. (a -> b) -> a -> b
$ TChan (Maybe (IO a)) -> STM (Maybe (IO a))
forall a. TChan a -> STM a
readTChan TChan (Maybe (IO a))
jobTChan
case Maybe (IO a)
job of
Maybe (IO a)
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just IO a
action -> do
a
result <- IO a
action
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TChan a -> a -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan a
resultTChan a
result
IO ()
worker
IO ThreadId -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO ThreadId -> IO ()) -> IO ThreadId -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkOS IO ()
worker
WorkerLink a -> IO (WorkerLink a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return WorkerLink {TChan a
TChan (Maybe (IO a))
jobTChan :: TChan (Maybe (IO a))
resultTChan :: TChan a
jobTChan :: TChan (Maybe (IO a))
resultTChan :: TChan a
..}
proxyForActions :: NonEmpty (OSThreadPool n a) -> Proxy n
proxyForActions :: forall (n :: Nat) a. NonEmpty (OSThreadPool n a) -> Proxy n
proxyForActions NonEmpty (OSThreadPool n a)
_ = Proxy n
forall {k} (t :: k). Proxy t
Proxy
instance (KnownNat n, (1 <=? n) ~ True) => MonadSchedule (OSThreadPool n) where
schedule :: forall a.
NonEmpty (OSThreadPool n a)
-> OSThreadPool n (NonEmpty a, [OSThreadPool n a])
schedule NonEmpty (OSThreadPool n a)
actions = IO (NonEmpty a, [OSThreadPool n a])
-> OSThreadPool n (NonEmpty a, [OSThreadPool n a])
forall (n :: Nat) a. IO a -> OSThreadPool n a
OSThreadPool (IO (NonEmpty a, [OSThreadPool n a])
-> OSThreadPool n (NonEmpty a, [OSThreadPool n a]))
-> IO (NonEmpty a, [OSThreadPool n a])
-> OSThreadPool n (NonEmpty a, [OSThreadPool n a])
forall a b. (a -> b) -> a -> b
$ do
let n :: Integer
n = Proxy n -> Integer
forall (n :: Nat) (proxy :: Nat -> *).
KnownNat n =>
proxy n -> Integer
natVal (Proxy n -> Integer) -> Proxy n -> Integer
forall a b. (a -> b) -> a -> b
$ NonEmpty (OSThreadPool n a) -> Proxy n
forall (n :: Nat) a. NonEmpty (OSThreadPool n a) -> Proxy n
proxyForActions NonEmpty (OSThreadPool n a)
actions
[WorkerLink a]
workerLinks <- Int -> IO (WorkerLink a) -> IO [WorkerLink a]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM (Integer -> Int
forall a. Num a => Integer -> a
fromInteger Integer
n) IO (WorkerLink a)
forall a. IO (WorkerLink a)
makeWorkerLink
[TChan a]
backgroundActions <- [(WorkerLink a, OSThreadPool n a)]
-> ((WorkerLink a, OSThreadPool n a) -> IO (TChan a))
-> IO [TChan a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([WorkerLink a]
-> [OSThreadPool n a] -> [(WorkerLink a, OSThreadPool n a)]
forall a b. [a] -> [b] -> [(a, b)]
zip ([WorkerLink a] -> [WorkerLink a]
forall a. HasCallStack => [a] -> [a]
cycle [WorkerLink a]
workerLinks) (NonEmpty (OSThreadPool n a) -> [OSThreadPool n a]
forall a. NonEmpty a -> [a]
toList NonEmpty (OSThreadPool n a)
actions)) (((WorkerLink a, OSThreadPool n a) -> IO (TChan a))
-> IO [TChan a])
-> ((WorkerLink a, OSThreadPool n a) -> IO (TChan a))
-> IO [TChan a]
forall a b. (a -> b) -> a -> b
$
\(WorkerLink a
link, OSThreadPool n a
action) -> do
WorkerLink a -> OSThreadPool n a -> IO ()
forall a (n :: Nat). WorkerLink a -> OSThreadPool n a -> IO ()
putJob WorkerLink a
link OSThreadPool n a
action
TChan a -> IO (TChan a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (TChan a -> IO (TChan a)) -> TChan a -> IO (TChan a)
forall a b. (a -> b) -> a -> b
$ WorkerLink a -> TChan a
forall a. WorkerLink a -> TChan a
resultTChan WorkerLink a
link
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
forall a (n :: Nat).
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools [TChan a]
backgroundActions
where
pollPools :: [TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools :: forall a (n :: Nat).
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools [TChan a]
chans = do
[Either (TChan a) a]
results <- (TChan a -> IO (Either (TChan a) a))
-> [TChan a] -> IO [Either (TChan a) a]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
forall (f :: * -> *) a b.
Applicative f =>
(a -> f b) -> [a] -> f [b]
traverse TChan a -> IO (Either (TChan a) a)
forall a. TChan a -> IO (Either (TChan a) a)
pollPool [TChan a]
chans
case [Either (TChan a) a] -> ([TChan a], [a])
forall a b. [Either a b] -> ([a], [b])
partitionEithers [Either (TChan a) a]
results of
([TChan a]
_, []) -> do
Int -> IO ()
threadDelay Int
1000
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
forall a (n :: Nat).
[TChan a] -> IO (NonEmpty a, [OSThreadPool n a])
pollPools [TChan a]
chans
([TChan a]
remainingChans, a
a : [a]
as) ->
(NonEmpty a, [OSThreadPool n a])
-> IO (NonEmpty a, [OSThreadPool n a])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return
( a
a a -> [a] -> NonEmpty a
forall a. a -> [a] -> NonEmpty a
:| [a]
as
, IO a -> OSThreadPool n a
forall (n :: Nat) a. IO a -> OSThreadPool n a
OSThreadPool (IO a -> OSThreadPool n a)
-> (TChan a -> IO a) -> TChan a -> OSThreadPool n a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> IO a
forall a. STM a -> IO a
atomically (STM a -> IO a) -> (TChan a -> STM a) -> TChan a -> IO a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan a -> STM a
forall a. TChan a -> STM a
readTChan (TChan a -> OSThreadPool n a) -> [TChan a] -> [OSThreadPool n a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [TChan a]
remainingChans
)
pollPool :: TChan a -> IO (Either (TChan a) a)
pollPool :: forall a. TChan a -> IO (Either (TChan a) a)
pollPool TChan a
chan = Either (TChan a) a
-> (a -> Either (TChan a) a) -> Maybe a -> Either (TChan a) a
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (TChan a -> Either (TChan a) a
forall a b. a -> Either a b
Left TChan a
chan) a -> Either (TChan a) a
forall a b. b -> Either a b
Right (Maybe a -> Either (TChan a) a)
-> IO (Maybe a) -> IO (Either (TChan a) a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (Maybe a) -> IO (Maybe a)
forall a. STM a -> IO a
atomically (TChan a -> STM (Maybe a)
forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan a
chan)