module Control.Distributed.Process.Tests.Closure (tests) where
import Network.Transport.Test (TestTransport(..))
import Data.ByteString.Lazy (empty)
import Data.IORef
import Data.Typeable (Typeable)
import Control.Monad (join, replicateM, forever, replicateM_, void, when)
import Control.Exception (IOException, throw)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar
( MVar
, newEmptyMVar
, readMVar
, takeMVar
, putMVar
, modifyMVar_
, newMVar
)
import Control.Applicative ((<$>))
import System.Random (randomIO)
import Control.Distributed.Process
import Control.Distributed.Process.Closure
import Control.Distributed.Process.Node
import Control.Distributed.Process.Internal.Types
( NodeId(nodeAddress)
, createMessage
, messageToPayload
)
import Control.Distributed.Static (staticLabel, staticClosure)
import qualified Network.Transport as NT
import Test.HUnit (Assertion)
import Test.Framework (Test)
import Test.Framework.Providers.HUnit (testCase)
quintuple :: a -> b -> c -> d -> e -> (a, b, c, d, e)
quintuple a b c d e = (a, b, c, d, e)
sdictInt :: SerializableDict Int
sdictInt = SerializableDict
factorial :: Int -> Process Int
factorial 0 = return 1
factorial n = (n *) <$> factorial (n 1)
addInt :: Int -> Int -> Int
addInt x y = x + y
putInt :: Int -> MVar Int -> IO ()
putInt = flip putMVar
sendPid :: ProcessId -> Process ()
sendPid toPid = do
fromPid <- getSelfPid
send toPid fromPid
wait :: Int -> Process ()
wait = liftIO . threadDelay
isPrime :: Integer -> Process Bool
isPrime n = return . (n `elem`) . takeWhile (<= n) . sieve $ [2..]
where
sieve :: [Integer] -> [Integer]
sieve (p : xs) = p : sieve [x | x <- xs, x `mod` p > 0]
sieve [] = error "Uh oh -- we've run out of primes"
typedPingServer :: () -> ReceivePort (SendPort ()) -> Process ()
typedPingServer () rport = forever $ do
sport <- receiveChan rport
sendChan sport ()
signal :: ProcessId -> Process ()
signal pid = send pid ()
remotable [ 'factorial
, 'addInt
, 'putInt
, 'sendPid
, 'sdictInt
, 'wait
, 'typedPingServer
, 'isPrime
, 'quintuple
, 'signal
]
randomElement :: [a] -> IO a
randomElement xs = do
ix <- randomIO
return (xs !! (ix `mod` length xs))
remotableDecl [
[d| dfib :: ([NodeId], SendPort Integer, Integer) -> Process () ;
dfib (_, reply, 0) = sendChan reply 0
dfib (_, reply, 1) = sendChan reply 1
dfib (nids, reply, n) = do
nid1 <- liftIO $ randomElement nids
nid2 <- liftIO $ randomElement nids
(sport, rport) <- newChan
spawn nid1 $ $(mkClosure 'dfib) (nids, sport, n 2)
spawn nid2 $ $(mkClosure 'dfib) (nids, sport, n 1)
n1 <- receiveChan rport
n2 <- receiveChan rport
sendChan reply $ n1 + n2
|]
]
staticQuintuple :: (Typeable a, Typeable b, Typeable c, Typeable d, Typeable e)
=> Static (a -> b -> c -> d -> e -> (a, b, c, d, e))
staticQuintuple = $(mkStatic 'quintuple)
factorialClosure :: Int -> Closure (Process Int)
factorialClosure = $(mkClosure 'factorial)
addIntClosure :: Int -> Closure (Int -> Int)
addIntClosure = $(mkClosure 'addInt)
putIntClosure :: Int -> Closure (MVar Int -> IO ())
putIntClosure = $(mkClosure 'putInt)
sendPidClosure :: ProcessId -> Closure (Process ())
sendPidClosure = $(mkClosure 'sendPid)
sendFac :: Int -> ProcessId -> Closure (Process ())
sendFac n pid = factorialClosure n `bindCP` cpSend $(mkStatic 'sdictInt) pid
factorialOf :: Closure (Int -> Process Int)
factorialOf = staticClosure $(mkStatic 'factorial)
factorial' :: Int -> Closure (Process Int)
factorial' n = returnCP $(mkStatic 'sdictInt) n `bindCP` factorialOf
waitClosure :: Int -> Closure (Process ())
waitClosure = $(mkClosure 'wait)
simulateNetworkFailure :: TestTransport -> NodeId -> NodeId -> Process ()
simulateNetworkFailure TestTransport{..} from to = liftIO $ do
threadDelay 10000
testBreakConnection (nodeAddress from) (nodeAddress to)
threadDelay 10000
testUnclosure :: TestTransport -> RemoteTable -> Assertion
testUnclosure TestTransport{..} rtable = do
node <- newLocalNode testTransport rtable
done <- newEmptyMVar
forkProcess node $ do
120 <- join . unClosure $ factorialClosure 5
liftIO $ putMVar done ()
takeMVar done
testBind :: TestTransport -> RemoteTable -> Assertion
testBind TestTransport{..} rtable = do
node <- newLocalNode testTransport rtable
done <- newEmptyMVar
runProcess node $ do
us <- getSelfPid
join . unClosure $ sendFac 6 us
(720 :: Int) <- expect
liftIO $ putMVar done ()
takeMVar done
testSendPureClosure :: TestTransport -> RemoteTable -> Assertion
testSendPureClosure TestTransport{..} rtable = do
serverAddr <- newEmptyMVar
serverDone <- newEmptyMVar
forkIO $ do
node <- newLocalNode testTransport rtable
addr <- forkProcess node $ do
cl <- expect
fn <- unClosure cl :: Process (Int -> Int)
13 <- return $ fn 6
liftIO $ putMVar serverDone ()
putMVar serverAddr addr
forkIO $ do
node <- newLocalNode testTransport rtable
theirAddr <- readMVar serverAddr
runProcess node $ send theirAddr (addIntClosure 7)
takeMVar serverDone
testSendIOClosure :: TestTransport -> RemoteTable -> Assertion
testSendIOClosure TestTransport{..} rtable = do
serverAddr <- newEmptyMVar
serverDone <- newEmptyMVar
forkIO $ do
node <- newLocalNode testTransport rtable
addr <- forkProcess node $ do
cl <- expect
io <- unClosure cl :: Process (MVar Int -> IO ())
liftIO $ do
someMVar <- newEmptyMVar
io someMVar
5 <- readMVar someMVar
putMVar serverDone ()
putMVar serverAddr addr
forkIO $ do
node <- newLocalNode testTransport rtable
theirAddr <- readMVar serverAddr
runProcess node $ send theirAddr (putIntClosure 5)
takeMVar serverDone
testSendProcClosure :: TestTransport -> RemoteTable -> Assertion
testSendProcClosure TestTransport{..} rtable = do
serverAddr <- newEmptyMVar
clientDone <- newEmptyMVar
forkIO $ do
node <- newLocalNode testTransport rtable
addr <- forkProcess node $ do
cl <- expect
pr <- unClosure cl :: Process (Int -> Process ())
pr 5
putMVar serverAddr addr
forkIO $ do
node <- newLocalNode testTransport rtable
theirAddr <- readMVar serverAddr
runProcess node $ do
pid <- getSelfPid
send theirAddr (cpSend $(mkStatic 'sdictInt) pid)
5 <- expect :: Process Int
liftIO $ putMVar clientDone ()
takeMVar clientDone
testSpawn :: TestTransport -> RemoteTable -> Assertion
testSpawn TestTransport{..} rtable = do
serverNodeAddr <- newEmptyMVar
clientDone <- newEmptyMVar
forkIO $ do
node <- newLocalNode testTransport rtable
putMVar serverNodeAddr (localNodeId node)
forkIO $ do
node <- newLocalNode testTransport rtable
nid <- readMVar serverNodeAddr
runProcess node $ do
pid <- getSelfPid
pid' <- spawn nid (sendPidClosure pid)
pid'' <- expect
True <- return $ pid' == pid''
liftIO $ putMVar clientDone ()
takeMVar clientDone
testSpawnRace :: TestTransport -> RemoteTable -> Assertion
testSpawnRace TestTransport{..} rtable = do
node1 <- newLocalNode (wrapTransport testTransport) rtable
node2 <- newLocalNode testTransport rtable
runProcess node1 $ do
pid <- getSelfPid
spawnLocal $ spawn (localNodeId node2) (sendPidClosure pid) >>= send pid
pid' <- expect :: Process ProcessId
pid'' <- expect :: Process ProcessId
True <- return $ pid' == pid''
return ()
where
wrapTransport (NT.Transport ne ct) = NT.Transport (fmap (fmap wrapEP) ne) ct
wrapEP :: NT.EndPoint -> NT.EndPoint
wrapEP e =
e { NT.connect = \x y z -> do
healthy <- newIORef True
fmap (fmap $ wrapConnection healthy) $ NT.connect e x y z
}
wrapConnection :: IORef Bool -> NT.Connection -> NT.Connection
wrapConnection healthy (NT.Connection s closeC) =
flip NT.Connection closeC $ \msg -> do
when (msg == messageToPayload (createMessage ())) $ do
writeIORef healthy False
isHealthy <- readIORef healthy
if isHealthy then s msg
else return $ Left $ NT.TransportError NT.SendFailed ""
testCall :: TestTransport -> RemoteTable -> Assertion
testCall TestTransport{..} rtable = do
serverNodeAddr <- newEmptyMVar
clientDone <- newEmptyMVar
forkIO $ do
node <- newLocalNode testTransport rtable
putMVar serverNodeAddr (localNodeId node)
forkIO $ do
node <- newLocalNode testTransport rtable
nid <- readMVar serverNodeAddr
runProcess node $ do
(120 :: Int) <- call $(mkStatic 'sdictInt) nid (factorialClosure 5)
liftIO $ putMVar clientDone ()
takeMVar clientDone
testCallBind :: TestTransport -> RemoteTable -> Assertion
testCallBind TestTransport{..} rtable = do
serverNodeAddr <- newEmptyMVar
clientDone <- newEmptyMVar
forkIO $ do
node <- newLocalNode testTransport rtable
putMVar serverNodeAddr (localNodeId node)
forkIO $ do
node <- newLocalNode testTransport rtable
nid <- readMVar serverNodeAddr
runProcess node $ do
(120 :: Int) <- call $(mkStatic 'sdictInt) nid (factorial' 5)
liftIO $ putMVar clientDone ()
takeMVar clientDone
testSeq :: TestTransport -> RemoteTable -> Assertion
testSeq TestTransport{..} rtable = do
node <- newLocalNode testTransport rtable
done <- newEmptyMVar
runProcess node $ do
us <- getSelfPid
join . unClosure $ sendFac 5 us `seqCP` sendFac 6 us
120 :: Int <- expect
720 :: Int <- expect
liftIO $ putMVar done ()
takeMVar done
testSpawnSupervised :: TestTransport -> RemoteTable -> Assertion
testSpawnSupervised TestTransport{..} rtable = do
[node1, node2] <- replicateM 2 $ newLocalNode testTransport rtable
[superPid, childPid] <- replicateM 2 $ newEmptyMVar
thirdProcessDone <- newEmptyMVar
forkProcess node1 $ do
us <- getSelfPid
liftIO $ putMVar superPid us
(child, _ref) <- spawnSupervised (localNodeId node2) (waitClosure 1000000)
liftIO $ do
putMVar childPid child
threadDelay 500000
throw supervisorDeath
forkProcess node2 $ do
[super, child] <- liftIO $ mapM readMVar [superPid, childPid]
ref <- monitor child
ProcessMonitorNotification ref' pid' (DiedException e) <- expect
True <- return $ ref' == ref
&& pid' == child
&& e == show (ProcessLinkException super (DiedException (show supervisorDeath)))
liftIO $ putMVar thirdProcessDone ()
takeMVar thirdProcessDone
where
supervisorDeath :: IOException
supervisorDeath = userError "Supervisor died"
testSpawnInvalid :: TestTransport -> RemoteTable -> Assertion
testSpawnInvalid TestTransport{..} rtable = do
node <- newLocalNode testTransport rtable
done <- newEmptyMVar
forkProcess node $ do
(pid, ref) <- spawnMonitor (localNodeId node) (closure (staticLabel "ThisDoesNotExist") empty)
ProcessMonitorNotification ref' pid' _reason <- expect
True <- return $ ref' == ref && pid == pid'
liftIO $ putMVar done ()
takeMVar done
testClosureExpect :: TestTransport -> RemoteTable -> Assertion
testClosureExpect TestTransport{..} rtable = do
node <- newLocalNode testTransport rtable
done <- newEmptyMVar
runProcess node $ do
nodeId <- getSelfNode
us <- getSelfPid
them <- spawn nodeId $ cpExpect $(mkStatic 'sdictInt) `bindCP` cpSend $(mkStatic 'sdictInt) us
send them (1234 :: Int)
(1234 :: Int) <- expect
liftIO $ putMVar done ()
takeMVar done
testSpawnChannel :: TestTransport -> RemoteTable -> Assertion
testSpawnChannel TestTransport{..} rtable = do
done <- newEmptyMVar
[node1, node2] <- replicateM 2 $ newLocalNode testTransport rtable
forkProcess node1 $ do
pingServer <- spawnChannel
(sdictSendPort sdictUnit)
(localNodeId node2)
($(mkClosure 'typedPingServer) ())
(sendReply, receiveReply) <- newChan
sendChan pingServer sendReply
receiveChan receiveReply
liftIO $ putMVar done ()
takeMVar done
testTDict :: TestTransport -> RemoteTable -> Assertion
testTDict TestTransport{..} rtable = do
done <- newEmptyMVar
[node1, node2] <- replicateM 2 $ newLocalNode testTransport rtable
forkProcess node1 $ do
True <- call $(functionTDict 'isPrime) (localNodeId node2) ($(mkClosure 'isPrime) (79 :: Integer))
liftIO $ putMVar done ()
takeMVar done
testFib :: TestTransport -> RemoteTable -> Assertion
testFib TestTransport{..} rtable = do
nodes <- replicateM 4 $ newLocalNode testTransport rtable
done <- newEmptyMVar
forkProcess (head nodes) $ do
(sport, rport) <- newChan
spawnLocal $ dfib (map localNodeId nodes, sport, 10)
55 <- receiveChan rport :: Process Integer
liftIO $ putMVar done ()
takeMVar done
testSpawnReconnect :: TestTransport -> RemoteTable -> Assertion
testSpawnReconnect testtrans@TestTransport{..} rtable = do
[node1, node2] <- replicateM 2 $ newLocalNode testTransport rtable
let nid1 = localNodeId node1
nid2 = localNodeId node2
done <- newEmptyMVar
iv <- newMVar (0 :: Int)
incr <- forkProcess node1 $ forever $ do
() <- expect
liftIO $ modifyMVar_ iv (return . (+ 1))
forkProcess node2 $ do
_pid1 <- spawn nid1 ($(mkClosure 'signal) incr)
simulateNetworkFailure testtrans nid2 nid1
_pid2 <- spawn nid1 ($(mkClosure 'signal) incr)
_pid3 <- spawn nid1 ($(mkClosure 'signal) incr)
liftIO $ threadDelay 100000
count <- liftIO $ takeMVar iv
True <- return $ count == 2 || count == 3
liftIO $ putMVar done ()
takeMVar done
testSpawnTerminate :: TestTransport -> RemoteTable -> Assertion
testSpawnTerminate TestTransport{..} rtable = do
slave <- newLocalNode testTransport rtable
master <- newLocalNode testTransport rtable
masterDone <- newEmptyMVar
runProcess master $ do
us <- getSelfPid
replicateM_ 1000 . spawnLocal . void . spawn (localNodeId slave) $ $(mkClosure 'signal) us
replicateM_ 1000 $ (expect :: Process ())
liftIO $ putMVar masterDone ()
takeMVar masterDone
tests :: TestTransport -> IO [Test]
tests testtrans = do
let rtable = __remoteTable . __remoteTableDecl $ initRemoteTable
return
[ testCase "Unclosure" (testUnclosure testtrans rtable)
, testCase "Bind" (testBind testtrans rtable)
, testCase "SendPureClosure" (testSendPureClosure testtrans rtable)
, testCase "SendIOClosure" (testSendIOClosure testtrans rtable)
, testCase "SendProcClosure" (testSendProcClosure testtrans rtable)
, testCase "Spawn" (testSpawn testtrans rtable)
, testCase "SpawnRace" (testSpawnRace testtrans rtable)
, testCase "Call" (testCall testtrans rtable)
, testCase "CallBind" (testCallBind testtrans rtable)
, testCase "Seq" (testSeq testtrans rtable)
, testCase "SpawnSupervised" (testSpawnSupervised testtrans rtable)
, testCase "SpawnInvalid" (testSpawnInvalid testtrans rtable)
, testCase "ClosureExpect" (testClosureExpect testtrans rtable)
, testCase "SpawnChannel" (testSpawnChannel testtrans rtable)
, testCase "TDict" (testTDict testtrans rtable)
, testCase "Fib" (testFib testtrans rtable)
, testCase "SpawnTerminate" (testSpawnTerminate testtrans rtable)
, testCase "SpawnReconnect" (testSpawnReconnect testtrans rtable)
]