{-# LANGUAGE FlexibleInstances          #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase                 #-}
{-# LANGUAGE RecordWildCards            #-}
{-# LANGUAGE TemplateHaskell            #-}
{-# LANGUAGE TypeFamilies               #-}
{-# OPTIONS_GHC -fno-warn-orphans #-}
-- |
-- Module      : Data.Array.Accelerate.LLVM.Native.Execute.Async
-- Copyright   : [2014..2020] The Accelerate Team
-- License     : BSD3
--
-- Maintainer  : Trevor L. McDonell <trevor.mcdonell@gmail.com>
-- Stability   : experimental
-- Portability : non-portable (GHC extensions)
--

module Data.Array.Accelerate.LLVM.Native.Execute.Async (

  Async(..), Future(..), IVar(..), getArrays,
  evalPar, putIO,

) where

-- accelerate
import Data.Array.Accelerate.Error
import Data.Array.Accelerate.LLVM.Execute.Async
import Data.Array.Accelerate.LLVM.Native.Execute.Scheduler
import Data.Array.Accelerate.LLVM.Native.Target
import Data.Array.Accelerate.LLVM.State

-- standard library
import Control.Concurrent
import Control.Monad.Cont
import Control.Monad.State
import Data.IORef
import Data.Sequence                                                ( Seq )
import qualified Data.Sequence                                      as Seq


-- | Evaluate a parallel computation
--
-- The worker threads execute the computation, while the calling thread
-- effectively sleeps waiting for the result.
--
{-# INLINEABLE evalPar #-}
evalPar :: Par Native a -> LLVM Native a
evalPar :: Par Native a -> LLVM Native a
evalPar Par Native a
work = do
  MVar a
result <- IO (MVar a) -> LLVM Native (MVar a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar a)
forall a. IO (MVar a)
newEmptyMVar
  ContT () (LLVM Native) a -> (a -> LLVM Native ()) -> LLVM Native ()
forall k (r :: k) (m :: k -> *) a. ContT r m a -> (a -> m r) -> m r
runContT (Par Native a -> ContT () (LLVM Native) a
forall a. Par Native a -> ContT () (LLVM Native) a
runPar Par Native a
work) (IO () -> LLVM Native ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> LLVM Native ()) -> (a -> IO ()) -> a -> LLVM Native ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar a -> a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar a
result)
  IO a -> LLVM Native a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> LLVM Native a) -> IO a -> LLVM Native a
forall a b. (a -> b) -> a -> b
$ MVar a -> IO a
forall a. MVar a -> IO a
takeMVar MVar a
result

  -- XXX: Running the initial computation on the worker threads can lead to the
  -- workers becoming blocked, possibly waiting for the result MVars to be
  -- filled from previous (lazily evaluated) computations (speculation). This
  -- happened for example with the code from Issue255, when extracting the
  -- result at index > number of worker threads.
  --
  -- liftIO  $ do
  --   schedule (workers native)
  --     Job { jobTasks = Seq.singleton $ evalLLVM native (runContT (runPar work) (liftIO . putMVar result))
  --         , jobDone  = Nothing
  --         }
  --   takeMVar result


-- Implementation
-- --------------

data Future a = Future {-# UNPACK #-} !(IORef (IVar a))

data IVar a
    = Full    !a
    | Blocked !(Seq (a -> IO ()))
    | Empty

instance Async Native where
  type FutureR Native  = Future
  newtype Par Native a = Par { Par Native a -> ContT () (LLVM Native) a
runPar :: ContT () (LLVM Native) a }
    deriving ( a -> Par Native b -> Par Native a
(a -> b) -> Par Native a -> Par Native b
(forall a b. (a -> b) -> Par Native a -> Par Native b)
-> (forall a b. a -> Par Native b -> Par Native a)
-> Functor (Par Native)
forall a b. a -> Par Native b -> Par Native a
forall a b. (a -> b) -> Par Native a -> Par Native b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: a -> Par Native b -> Par Native a
$c<$ :: forall a b. a -> Par Native b -> Par Native a
fmap :: (a -> b) -> Par Native a -> Par Native b
$cfmap :: forall a b. (a -> b) -> Par Native a -> Par Native b
Functor, Functor (Par Native)
a -> Par Native a
Functor (Par Native)
-> (forall a. a -> Par Native a)
-> (forall a b.
    Par Native (a -> b) -> Par Native a -> Par Native b)
-> (forall a b c.
    (a -> b -> c) -> Par Native a -> Par Native b -> Par Native c)
-> (forall a b. Par Native a -> Par Native b -> Par Native b)
-> (forall a b. Par Native a -> Par Native b -> Par Native a)
-> Applicative (Par Native)
Par Native a -> Par Native b -> Par Native b
Par Native a -> Par Native b -> Par Native a
Par Native (a -> b) -> Par Native a -> Par Native b
(a -> b -> c) -> Par Native a -> Par Native b -> Par Native c
forall a. a -> Par Native a
forall a b. Par Native a -> Par Native b -> Par Native a
forall a b. Par Native a -> Par Native b -> Par Native b
forall a b. Par Native (a -> b) -> Par Native a -> Par Native b
forall a b c.
(a -> b -> c) -> Par Native a -> Par Native b -> Par Native c
forall (f :: * -> *).
Functor f
-> (forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
<* :: Par Native a -> Par Native b -> Par Native a
$c<* :: forall a b. Par Native a -> Par Native b -> Par Native a
*> :: Par Native a -> Par Native b -> Par Native b
$c*> :: forall a b. Par Native a -> Par Native b -> Par Native b
liftA2 :: (a -> b -> c) -> Par Native a -> Par Native b -> Par Native c
$cliftA2 :: forall a b c.
(a -> b -> c) -> Par Native a -> Par Native b -> Par Native c
<*> :: Par Native (a -> b) -> Par Native a -> Par Native b
$c<*> :: forall a b. Par Native (a -> b) -> Par Native a -> Par Native b
pure :: a -> Par Native a
$cpure :: forall a. a -> Par Native a
$cp1Applicative :: Functor (Par Native)
Applicative, Applicative (Par Native)
a -> Par Native a
Applicative (Par Native)
-> (forall a b.
    Par Native a -> (a -> Par Native b) -> Par Native b)
-> (forall a b. Par Native a -> Par Native b -> Par Native b)
-> (forall a. a -> Par Native a)
-> Monad (Par Native)
Par Native a -> (a -> Par Native b) -> Par Native b
Par Native a -> Par Native b -> Par Native b
forall a. a -> Par Native a
forall a b. Par Native a -> Par Native b -> Par Native b
forall a b. Par Native a -> (a -> Par Native b) -> Par Native b
forall (m :: * -> *).
Applicative m
-> (forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
return :: a -> Par Native a
$creturn :: forall a. a -> Par Native a
>> :: Par Native a -> Par Native b -> Par Native b
$c>> :: forall a b. Par Native a -> Par Native b -> Par Native b
>>= :: Par Native a -> (a -> Par Native b) -> Par Native b
$c>>= :: forall a b. Par Native a -> (a -> Par Native b) -> Par Native b
$cp1Monad :: Applicative (Par Native)
Monad, Monad (Par Native)
Monad (Par Native)
-> (forall a. IO a -> Par Native a) -> MonadIO (Par Native)
IO a -> Par Native a
forall a. IO a -> Par Native a
forall (m :: * -> *).
Monad m -> (forall a. IO a -> m a) -> MonadIO m
liftIO :: IO a -> Par Native a
$cliftIO :: forall a. IO a -> Par Native a
$cp1MonadIO :: Monad (Par Native)
MonadIO, Monad (Par Native)
Monad (Par Native)
-> (forall a b.
    ((a -> Par Native b) -> Par Native a) -> Par Native a)
-> MonadCont (Par Native)
((a -> Par Native b) -> Par Native a) -> Par Native a
forall a b. ((a -> Par Native b) -> Par Native a) -> Par Native a
forall (m :: * -> *).
Monad m -> (forall a b. ((a -> m b) -> m a) -> m a) -> MonadCont m
callCC :: ((a -> Par Native b) -> Par Native a) -> Par Native a
$ccallCC :: forall a b. ((a -> Par Native b) -> Par Native a) -> Par Native a
$cp1MonadCont :: Monad (Par Native)
MonadCont, MonadState Native )

  {-# INLINE new     #-}
  {-# INLINE newFull #-}
  new :: Par Native (FutureR Native a)
new       = IORef (IVar a) -> Future a
forall a. IORef (IVar a) -> Future a
Future (IORef (IVar a) -> Future a)
-> Par Native (IORef (IVar a)) -> Par Native (Future a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (IORef (IVar a)) -> Par Native (IORef (IVar a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IVar a -> IO (IORef (IVar a))
forall a. a -> IO (IORef a)
newIORef IVar a
forall a. IVar a
Empty)
  newFull :: a -> Par Native (FutureR Native a)
newFull a
v = IORef (IVar a) -> Future a
forall a. IORef (IVar a) -> Future a
Future (IORef (IVar a) -> Future a)
-> Par Native (IORef (IVar a)) -> Par Native (Future a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO (IORef (IVar a)) -> Par Native (IORef (IVar a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IVar a -> IO (IORef (IVar a))
forall a. a -> IO (IORef a)
newIORef (a -> IVar a
forall a. a -> IVar a
Full a
v))

  {-# INLINE fork  #-}
  {-# INLINE spawn #-}
  fork :: Par Native () -> Par Native ()
fork  = Par Native () -> Par Native ()
forall a. a -> a
id
  spawn :: Par Native a -> Par Native a
spawn = Par Native a -> Par Native a
forall a. a -> a
id

  {-# INLINE get #-}
  get :: FutureR Native a -> Par Native a
get (Future ref) =
    ((a -> Par Native ()) -> Par Native a) -> Par Native a
forall (m :: * -> *) a b. MonadCont m => ((a -> m b) -> m a) -> m a
callCC (((a -> Par Native ()) -> Par Native a) -> Par Native a)
-> ((a -> Par Native ()) -> Par Native a) -> Par Native a
forall a b. (a -> b) -> a -> b
$ \a -> Par Native ()
k -> do
      Native
native <- (Native -> Native) -> Par Native Native
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets Native -> Native
forall a. a -> a
llvmTarget
      Par Native a
next   <- IO (Par Native a) -> Par Native (Par Native a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Par Native a) -> Par Native (Par Native a))
-> ((IVar a -> (IVar a, Par Native a)) -> IO (Par Native a))
-> (IVar a -> (IVar a, Par Native a))
-> Par Native (Par Native a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IORef (IVar a)
-> (IVar a -> (IVar a, Par Native a)) -> IO (Par Native a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (IVar a)
ref ((IVar a -> (IVar a, Par Native a)) -> Par Native (Par Native a))
-> (IVar a -> (IVar a, Par Native a)) -> Par Native (Par Native a)
forall a b. (a -> b) -> a -> b
$ \case
                  IVar a
Empty      -> (Seq (a -> IO ()) -> IVar a
forall a. Seq (a -> IO ()) -> IVar a
Blocked ((a -> IO ()) -> Seq (a -> IO ())
forall a. a -> Seq a
Seq.singleton (Native -> Par Native () -> IO ()
evalParIO Native
native (Par Native () -> IO ()) -> (a -> Par Native ()) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Par Native ()
k)), Par Native a
forall a. Par Native a
reschedule)
                  Blocked Seq (a -> IO ())
ks -> (Seq (a -> IO ()) -> IVar a
forall a. Seq (a -> IO ()) -> IVar a
Blocked (Seq (a -> IO ())
ks Seq (a -> IO ()) -> (a -> IO ()) -> Seq (a -> IO ())
forall a. Seq a -> a -> Seq a
Seq.|>      Native -> Par Native () -> IO ()
evalParIO Native
native (Par Native () -> IO ()) -> (a -> Par Native ()) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Par Native ()
k),  Par Native a
forall a. Par Native a
reschedule)
                  Full a
a     -> (a -> IVar a
forall a. a -> IVar a
Full a
a,                                         a -> Par Native a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a)
      Par Native a
next

  {-# INLINE put #-}
  put :: FutureR Native a -> a -> Par Native ()
put FutureR Native a
future a
ref = do
    Native{LinkCache
Workers
workers :: Native -> Workers
linkCache :: Native -> LinkCache
workers :: Workers
linkCache :: LinkCache
..} <- (Native -> Native) -> Par Native Native
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets Native -> Native
forall a. a -> a
llvmTarget
    IO () -> Par Native ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Workers -> Future a -> a -> IO ()
forall a. HasCallStack => Workers -> Future a -> a -> IO ()
putIO Workers
workers FutureR Native a
Future a
future a
ref)

  {-# INLINE liftPar #-}
  liftPar :: LLVM Native a -> Par Native a
liftPar = ContT () (LLVM Native) a -> Par Native a
forall a. ContT () (LLVM Native) a -> Par Native a
Par (ContT () (LLVM Native) a -> Par Native a)
-> (LLVM Native a -> ContT () (LLVM Native) a)
-> LLVM Native a
-> Par Native a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LLVM Native a -> ContT () (LLVM Native) a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift

-- | Evaluate a continuation
--
{-# INLINE evalParIO #-}
evalParIO :: Native -> Par Native () -> IO ()
evalParIO :: Native -> Par Native () -> IO ()
evalParIO native :: Native
native@Native{} Par Native ()
work =
  Native -> LLVM Native () -> IO ()
forall t a. t -> LLVM t a -> IO a
evalLLVM Native
native (ContT () (LLVM Native) ()
-> (() -> LLVM Native ()) -> LLVM Native ()
forall k (r :: k) (m :: k -> *) a. ContT r m a -> (a -> m r) -> m r
runContT (Par Native () -> ContT () (LLVM Native) ()
forall a. Par Native a -> ContT () (LLVM Native) a
runPar Par Native ()
work) () -> LLVM Native ()
forall (m :: * -> *) a. Monad m => a -> m a
return)

-- | The value represented by a future is now available. Push any blocked
-- continuations to the worker threads.
--
{-# INLINEABLE putIO #-}
putIO :: HasCallStack => Workers -> Future a -> a -> IO ()
putIO :: Workers -> Future a -> a -> IO ()
putIO Workers
workers (Future IORef (IVar a)
ref) a
v = do
  Seq (a -> IO ())
ks <- IORef (IVar a)
-> (IVar a -> (IVar a, Seq (a -> IO ()))) -> IO (Seq (a -> IO ()))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (IVar a)
ref ((IVar a -> (IVar a, Seq (a -> IO ()))) -> IO (Seq (a -> IO ())))
-> (IVar a -> (IVar a, Seq (a -> IO ()))) -> IO (Seq (a -> IO ()))
forall a b. (a -> b) -> a -> b
$ \case
          IVar a
Empty      -> (a -> IVar a
forall a. a -> IVar a
Full a
v, Seq (a -> IO ())
forall a. Seq a
Seq.empty)
          Blocked Seq (a -> IO ())
ks -> (a -> IVar a
forall a. a -> IVar a
Full a
v, Seq (a -> IO ())
ks)
          IVar a
_          -> String -> (IVar a, Seq (a -> IO ()))
forall a. HasCallStack => String -> a
internalError String
"multiple put"
  --
  Workers -> Job -> IO ()
schedule Workers
workers Job :: Seq (IO ()) -> Maybe (IO ()) -> Job
Job { jobTasks :: Seq (IO ())
jobTasks = ((a -> IO ()) -> IO ()) -> Seq (a -> IO ()) -> Seq (IO ())
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ((a -> IO ()) -> a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
v) Seq (a -> IO ())
ks
                       , jobDone :: Maybe (IO ())
jobDone  = Maybe (IO ())
forall a. Maybe a
Nothing
                       }

-- | The worker threads should search for other work to execute
--
{-# INLINE reschedule #-}
reschedule :: Par Native a
reschedule :: Par Native a
reschedule = ContT () (LLVM Native) a -> Par Native a
forall a. ContT () (LLVM Native) a -> Par Native a
Par (ContT () (LLVM Native) a -> Par Native a)
-> ContT () (LLVM Native) a -> Par Native a
forall a b. (a -> b) -> a -> b
$ ((a -> LLVM Native ()) -> LLVM Native ())
-> ContT () (LLVM Native) a
forall k (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (\a -> LLVM Native ()
_ -> () -> LLVM Native ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())


-- reschedule :: Par Native a
-- reschedule = Par $ ContT (const loop)
--   where
--     loop :: ReaderT Schedule (LLVM Native) ()
--     loop = do
--       queue <- ask
--       mwork <- liftIO $ tryPopR queue
--       case mwork of
--         Just work -> runContT (runPar work) (const loop)
--         Nothing   -> liftIO yield >> loop

-- pushL :: MVar (Seq a) -> a -> IO ()
-- pushL ref a =
--   mask_ $ do
--     ma <- tryTakeMVar ref
--     case ma of
--       Nothing -> putMVar ref (Seq.singleton a)
--       Just as -> putMVar ref (a Seq.<| as)

-- popR :: MVar (Seq a) -> IO a
-- popR ref = do
--   q <- takeMVar ref
--   case Seq.viewr q of
--     Seq.EmptyR  -> popR ref   -- should be impossible
--     as Seq.:> a -> putMVar ref as >> return a