{-# LANGUAGE DeriveGeneric #-}
module Control.Distributed.Process.Tests.Mx (tests) where

import Control.Distributed.Process.Tests.Internal.Utils
import Network.Transport.Test (TestTransport(..))

import Control.Concurrent (threadDelay)
import Control.Distributed.Process
import Control.Distributed.Process.Node
import Control.Distributed.Process.Management
  ( MxEvent(..)
  , MxAgentId(..)
  , mxAgent
  , mxSink
  , mxReady
  , mxReceive
  , mxDeactivate
  , liftMX
  , mxGetLocal
  , mxSetLocal
  , mxUpdateLocal
  , mxNotify
  , mxBroadcast
  , mxGetId
  , mxGet
  , mxSet
  , mxClear
  , mxPurgeTable
  , mxDropTable
  )
import Control.Monad (void)
import Data.Binary
import Data.List (find, sort)
import Data.Maybe (isJust)
import Data.Typeable
import GHC.Generics
#if ! MIN_VERSION_base(4,6,0)
import Prelude hiding (catch, log)
#endif

import Test.Framework
  ( Test
  , testGroup
  )
import Test.Framework.Providers.HUnit (testCase)

data Publish = Publish
  deriving (Typeable, Generic, Eq)

instance Binary Publish where

testAgentBroadcast :: TestResult () -> Process ()
testAgentBroadcast result = do
  (resultSP, resultRP) <- newChan :: Process (SendPort (), ReceivePort ())

  publisher <- mxAgent (MxAgentId "publisher-agent") () [
      mxSink $ \() -> mxBroadcast Publish >> mxReady
    ]

  consumer  <- mxAgent (MxAgentId "consumer-agent") () [
      mxSink $ \Publish -> (liftMX $ sendChan resultSP ()) >> mxReady
    ]

  mxNotify ()
  -- Once the publisher has seen our message, it will broadcast the Publish
  -- and the consumer will see that and send the result to our typed channel.
  stash result =<< receiveChan resultRP

  kill publisher "finished"
  kill consumer  "finished"

testAgentDualInput :: TestResult (Maybe Int) -> Process ()
testAgentDualInput result = do
  (sp, rp) <- newChan
  _ <- mxAgent (MxAgentId "sum-agent") (0 :: Int) [
        mxSink $ (\(i :: Int) -> do
                     mxSetLocal . (+i) =<< mxGetLocal
                     i' <- mxGetLocal
                     if i' == 15
                        then do mxGetLocal >>= liftMX . sendChan sp
                                mxDeactivate "finished"
                        else mxReady)
    ]

  mxNotify          (1 :: Int)
  nsend "sum-agent" (3 :: Int)
  mxNotify          (2 :: Int)
  nsend "sum-agent" (4 :: Int)
  mxNotify          (5 :: Int)

  stash result =<< receiveChanTimeout 10000000 rp

testAgentPrioritisation :: TestResult [String] -> Process ()
testAgentPrioritisation result = do

  -- TODO: this isn't really testing how we /prioritise/ one source
  -- over another at all, but I've not yet figured out the right way
  -- to do so, since we're at the whim of the scheduler with regards
  -- the timeliness of nsend versus mxNotify anyway.

  let name = "prioritising-agent"
  (sp, rp) <- newChan
  void $ mxAgent (MxAgentId name) ["first"] [
        mxSink (\(s :: String) -> do
                   mxUpdateLocal ((s:))
                   st <- mxGetLocal
                   case length st of
                     n | n == 5 -> do liftMX $ sendChan sp st
                                      mxDeactivate "finished"
                     _          -> mxReceive  -- go to the mailbox
                   )
    ]

  nsend name "second"
  mxNotify "third"
  mxNotify "fourth"
  nsend name "fifth"

  stash result . sort =<< receiveChan rp

testAgentMailboxHandling :: TestResult (Maybe ()) -> Process ()
testAgentMailboxHandling result = do
  (sp, rp) <- newChan
  agent <- mxAgent (MxAgentId "listener-agent") () [
      mxSink $ \() -> (liftMX $ sendChan sp ()) >> mxReady
    ]

  nsend "listener-agent" ()

  stash result =<< receiveChanTimeout 1000000 rp
  kill agent "finished"

testAgentEventHandling :: TestResult Bool -> Process ()
testAgentEventHandling result = do
  let initState = [] :: [MxEvent]
  agentPid <- mxAgent (MxAgentId "lifecycle-listener-agent") initState [
      (mxSink $ \ev -> do
         st <- mxGetLocal
         let act =
               case ev of
                 (MxSpawned _)       -> mxSetLocal (ev:st)
                 (MxProcessDied _ _) -> mxSetLocal (ev:st)
                 _                   -> return ()
         act >> mxReady),
      (mxSink $ \(ev, sp :: SendPort Bool) -> do
          st <- mxGetLocal
          let found =
                case ev of
                  MxSpawned p ->
                    isJust $ find (\ev' ->
                                    case ev' of
                                      (MxSpawned p') -> p' == p
                                      _              -> False) st
                  MxProcessDied p r ->
                    isJust $ find (\ev' ->
                                    case ev' of
                                      (MxProcessDied p' r') -> p' == p && r == r'
                                      _                     -> False) st
                  _ -> False
          liftMX $ sendChan sp found
          mxReady)
    ]

  _ <- monitor agentPid
  (sp, rp) <- newChan
  pid <- spawnLocal $ sendChan sp ()
  () <- receiveChan rp

  -- By waiting for a monitor notification, we have a
  -- higher probably that the agent has seen
  monitor pid
  receiveWait [ match (\(ProcessMonitorNotification _ _ _) -> return ()) ]

  (replyTo, reply) <- newChan :: Process (SendPort Bool, ReceivePort Bool)
  mxNotify (MxSpawned pid, replyTo)
  mxNotify (MxProcessDied pid DiedNormal, replyTo)

  seenAlive <- receiveChan reply
  seenDead  <- receiveChan reply

  stash result $ seenAlive && seenDead

testAgentPublication :: TestResult (Maybe Publish) -> Process ()
testAgentPublication result = do
  (syncChan, rp) <- newChan
  let ourId = MxAgentId "publication-agent"
  agentPid <- mxAgent ourId () [
      mxSink $ \() -> do
         selfId <- mxGetId
         liftMX $ mxSet selfId "publish" Publish >> sendChan syncChan ()
         mxReady
    ]

  mxNotify ()
  () <- receiveChan rp

  stash result =<< mxGet ourId "publish"

  mref <- monitor agentPid
  kill agentPid "finished"
  receiveWait [
      matchIf (\(ProcessMonitorNotification ref _ _) -> ref == mref)
              ((\_ -> return ()))
    ]

testAgentTableClear :: Int -> TestResult (Maybe Int, Maybe Int) -> Process ()
testAgentTableClear val result =
  let tId  = (MxAgentId "agent-1")
      tKey = "key-1" in do
    mxSet tId tKey val
    get1 <- mxGet tId tKey
    mxClear tId tKey
    get2 <- mxGet tId tKey
    stash result (get1, get2)

testAgentTablePurge :: TestResult (Maybe Int) -> Process ()
testAgentTablePurge result =
  let tId  = MxAgentId "agent-2"
      tKey = "key-2" in do
    mxSet tId tKey (12345 :: Int)
    mxPurgeTable tId
    stash result =<< mxGet tId tKey

testAgentTableDelete :: Int
                     -> TestResult (Maybe Int, Maybe Int, Maybe Int)
                     -> Process ()
testAgentTableDelete val result =
  let tId  = (MxAgentId "agent-3")
      tKey = "key-3" in do
    mxSet tId tKey val
    get1 <- mxGet tId tKey
    mxDropTable tId
    get2 <- mxGet tId tKey
    mxSet tId tKey val
    get3 <- mxGet tId tKey
    stash result (get1, get2, get3)

tests :: TestTransport -> IO [Test]
tests TestTransport{..} = do
  node1 <- newLocalNode testTransport initRemoteTable
  return [
    testGroup "Mx Agents" [
        testCase "Event Handling"
            (delayedAssertion
             "expected True, but events where not as expected"
             node1 True testAgentEventHandling)
      , testCase "Inter-Agent Broadcast"
            (delayedAssertion
             "expected (), but no broadcast was received"
             node1 () testAgentBroadcast)
      , testCase "Agent Mailbox Handling"
            (delayedAssertion
             "expected (Just ()), but no regular (mailbox) input was handled"
             node1 (Just ()) testAgentMailboxHandling)
      , testCase "Agent Dual Input Handling"
            (delayedAssertion
             "expected sum = 15, but the result was Nothing"
             node1 (Just 15 :: Maybe Int) testAgentDualInput)
      , testCase "Agent Input Prioritisation"
            (delayedAssertion
             "expected [first, second, third, fourth, fifth], but result diverged"
             node1 (sort ["first", "second",
                          "third", "fourth",
                          "fifth"]) testAgentPrioritisation)
    ],
    testGroup "Mx Global Properties" [
        testCase "Global Property Publication"
            (delayedAssertion
             "expected (Just Publish), but no table entry was found"
             node1 (Just Publish) testAgentPublication)
      , testCase "Clearing Global Properties"
            (delayedAssertion
             "expected (Just 1024, Nothing): invalid table entry found!"
             node1 (Just 1024, Nothing) (testAgentTableClear 1024))
      , testCase "Purging Global Tables"
            (delayedAssertion
             "expected Nothing, but a table entry was found"
             node1 Nothing testAgentTablePurge)
      , testCase "Deleting and (Re)Creating Global Tables"
            (delayedAssertion
             "expected (Just 15, Nothing, Just 15): invalid table entry found!"
             node1 (Just 15, Nothing, Just 15) (testAgentTableDelete 15))
        -- Wait for other tests to finish.
      , testCase "Wait" $
            threadDelay 100000
      ]]