{-# LANGUAGE RebindableSyntax #-}
module Network.Transport.Tests where

import Prelude hiding
  ( (>>=)
  , return
  , fail
  , (>>)
#if ! MIN_VERSION_base(4,6,0)
  , catch
#endif
  )
import Control.Concurrent (forkIO, killThread, yield)
import Control.Concurrent.MVar (newEmptyMVar, takeMVar, putMVar, readMVar, tryTakeMVar, modifyMVar_, newMVar)
import Control.Exception
  ( evaluate
  , throw
  , throwIO
  , bracket
  , catch
  , ErrorCall(..)
  )
import Control.Monad (replicateM, replicateM_, when, guard, forM_, unless)
import Control.Monad.Except ()
import Control.Applicative ((<$>))
import Network.Transport
import Network.Transport.Internal (tlog, tryIO, timeoutMaybe)
import Network.Transport.Util (spawn)
import System.Random (randomIO)
import Data.ByteString (ByteString)
import Data.ByteString.Char8 (pack)
import Data.Map (Map)
import qualified Data.Map as Map (empty, insert, delete, findWithDefault, adjust, null, toList, map)
import Data.String (fromString)
import Data.List (permutations)
import Network.Transport.Tests.Auxiliary (forkTry, runTests, trySome, randomThreadDelay)
import Network.Transport.Tests.Traced

-- | Server that echoes messages straight back to the origin endpoint.
echoServer :: EndPoint -> IO ()
echoServer :: EndPoint -> IO ()
echoServer EndPoint
endpoint = do
    Map ConnectionId Connection -> IO ()
go Map ConnectionId Connection
forall k a. Map k a
Map.empty
  where
    go :: Map ConnectionId Connection -> IO ()
    go :: Map ConnectionId Connection -> IO ()
go Map ConnectionId Connection
cs = do
      Event
event <- EndPoint -> IO Event
receive EndPoint
endpoint
      case Event
event of
        ConnectionOpened ConnectionId
cid Reliability
rel EndPointAddress
addr -> do
          [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Opened new connection " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ConnectionId -> [Char]
forall a. Show a => a -> [Char]
show ConnectionId
cid
          Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
rel ConnectHints
defaultConnectHints
          Map ConnectionId Connection -> IO ()
go (ConnectionId
-> Connection
-> Map ConnectionId Connection
-> Map ConnectionId Connection
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ConnectionId
cid Connection
conn Map ConnectionId Connection
cs)
        Received ConnectionId
cid [ByteString]
payload -> do
          Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send (Connection
-> ConnectionId -> Map ConnectionId Connection -> Connection
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault ([Char] -> Connection
forall a. HasCallStack => [Char] -> a
error ([Char] -> Connection) -> [Char] -> Connection
forall a b. (a -> b) -> a -> b
$ [Char]
"Received: Invalid cid " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ConnectionId -> [Char]
forall a. Show a => a -> [Char]
show ConnectionId
cid) ConnectionId
cid Map ConnectionId Connection
cs) [ByteString]
payload
          Map ConnectionId Connection -> IO ()
go Map ConnectionId Connection
cs
        ConnectionClosed ConnectionId
cid -> do
          [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Close connection " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ConnectionId -> [Char]
forall a. Show a => a -> [Char]
show ConnectionId
cid
          Connection -> IO ()
close (Connection
-> ConnectionId -> Map ConnectionId Connection -> Connection
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault ([Char] -> Connection
forall a. HasCallStack => [Char] -> a
error ([Char] -> Connection) -> [Char] -> Connection
forall a b. (a -> b) -> a -> b
$ [Char]
"ConnectionClosed: Invalid cid " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ConnectionId -> [Char]
forall a. Show a => a -> [Char]
show ConnectionId
cid) ConnectionId
cid Map ConnectionId Connection
cs)
          Map ConnectionId Connection -> IO ()
go (ConnectionId
-> Map ConnectionId Connection -> Map ConnectionId Connection
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete ConnectionId
cid Map ConnectionId Connection
cs)
        ReceivedMulticast MulticastAddress
_ [ByteString]
_ ->
          -- Ignore
          Map ConnectionId Connection -> IO ()
go Map ConnectionId Connection
cs
        ErrorEvent TransportError EventErrorCode
_ ->
          [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Echo server received error event: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Event -> [Char]
forall a. Show a => a -> [Char]
show Event
event
        Event
EndPointClosed ->
          () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

-- | Ping client used in a few tests
ping :: EndPoint -> EndPointAddress -> Int -> ByteString -> IO ()
ping :: EndPoint -> EndPointAddress -> Int -> ByteString -> IO ()
ping EndPoint
endpoint EndPointAddress
server Int
numPings ByteString
msg = do
  -- Open connection to the server
  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Connect to echo server"
  Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints

  -- Wait for the server to open reply connection
  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for ConnectionOpened message"
  ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

  -- Send pings and wait for reply
  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Send ping and wait for reply"
  Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
msg]
      Received ConnectionId
cid' [ByteString
reply] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid' Bool -> Bool -> Bool
&& ByteString
reply ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
msg
      () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

  -- Close the connection
  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Close the connection"
  Connection -> IO ()
close Connection
conn

  -- Wait for the server to close its connection to us
  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for ConnectionClosed message"
  ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

  -- Done
  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Ping client done"

-- | Basic ping test
testPingPong :: Transport -> Int -> IO ()
testPingPong :: Transport -> Int -> IO ()
testPingPong Transport
transport Int
numPings = do
  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Starting ping pong test"
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
result <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  -- Client
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Ping client"
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    EndPoint -> EndPointAddress -> Int -> ByteString -> IO ()
ping EndPoint
endpoint EndPointAddress
server Int
numPings ByteString
"ping"
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
result ()

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
result

-- | Test that endpoints don't get confused
testEndPoints :: Transport -> Int -> IO ()
testEndPoints :: Transport -> Int -> IO ()
testEndPoints Transport
transport Int
numPings = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  [MVar ()]
dones <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  [(MVar (), Char)] -> ((MVar (), Char) -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([MVar ()] -> [Char] -> [(MVar (), Char)]
forall a b. [a] -> [b] -> [(a, b)]
zip [MVar ()]
dones [Char
'A'..]) (((MVar (), Char) -> IO ThreadId) -> IO ())
-> ((MVar (), Char) -> IO ThreadId) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(MVar ()
done, Char
name) -> IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    let name' :: ByteString
        name' :: ByteString
name' = [Char] -> ByteString
pack [Char
name]
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Ping client " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ ByteString -> [Char]
forall a. Show a => a -> [Char]
show ByteString
name' [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
": " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ EndPointAddress -> [Char]
forall a. Show a => a -> [Char]
show (EndPoint -> EndPointAddress
address EndPoint
endpoint)
    EndPoint -> EndPointAddress -> Int -> ByteString -> IO ()
ping EndPoint
endpoint EndPointAddress
server Int
numPings ByteString
name'
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()

  [MVar ()] -> (MVar () -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [MVar ()]
dones MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar

-- Test that connections don't get confused
testConnections :: Transport -> Int -> IO ()
testConnections :: Transport -> Int -> IO ()
testConnections Transport
transport Int
numPings = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
result <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  -- Client
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    -- Open two connections to the server
    Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    ConnectionOpened ConnectionId
serv1 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    ConnectionOpened ConnectionId
serv2 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    -- One thread to send "pingA" on the first connection
    IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Int -> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO (Either (TransportError SendErrorCode) ()) -> IO ())
-> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"pingA"]

    -- One thread to send "pingB" on the second connection
    IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Int -> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO (Either (TransportError SendErrorCode) ()) -> IO ())
-> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"pingB"]

    -- Verify server responses
    let verifyResponse :: t -> IO ()
verifyResponse t
0 = MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
result ()
        verifyResponse t
n = do
          Event
event <- EndPoint -> IO Event
receive EndPoint
endpoint
          case Event
event of
            Received ConnectionId
cid [ByteString
payload] -> do
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
serv1 Bool -> Bool -> Bool
&& ByteString
payload ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"pingA") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Wrong message"
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
serv2 Bool -> Bool -> Bool
&& ByteString
payload ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"pingB") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Wrong message"
              t -> IO ()
verifyResponse (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
            Event
_ ->
              t -> IO ()
verifyResponse t
n
    Int -> IO ()
forall {t}. (Eq t, Num t) => t -> IO ()
verifyResponse (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numPings)

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
result

-- | Test that closing one connection does not close the other
testCloseOneConnection :: Transport -> Int -> IO ()
testCloseOneConnection :: Transport -> Int -> IO ()
testCloseOneConnection Transport
transport Int
numPings = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
result <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  -- Client
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    -- Open two connections to the server
    Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    ConnectionOpened ConnectionId
serv1 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    ConnectionOpened ConnectionId
serv2 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    -- One thread to send "pingA" on the first connection
    IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
      Int -> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO (Either (TransportError SendErrorCode) ()) -> IO ())
-> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"pingA"]
      Connection -> IO ()
close Connection
conn1

    -- One thread to send "pingB" on the second connection
    IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ Int -> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ (Int
numPings Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
2) (IO (Either (TransportError SendErrorCode) ()) -> IO ())
-> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"pingB"]

    -- Verify server responses
    let verifyResponse :: t -> IO ()
verifyResponse t
0 = MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
result ()
        verifyResponse t
n = do
          Event
event <- EndPoint -> IO Event
receive EndPoint
endpoint
          case Event
event of
            Received ConnectionId
cid [ByteString
payload] -> do
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
serv1 Bool -> Bool -> Bool
&& ByteString
payload ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"pingA") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Wrong message"
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
serv2 Bool -> Bool -> Bool
&& ByteString
payload ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
/= ByteString
"pingB") (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Wrong message"
              t -> IO ()
verifyResponse (t
n t -> t -> t
forall a. Num a => a -> a -> a
- t
1)
            Event
_ ->
              t -> IO ()
verifyResponse t
n
    Int -> IO ()
forall {t}. (Eq t, Num t) => t -> IO ()
verifyResponse (Int
3 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
numPings)

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
result

-- | Test that if A connects to B and B connects to A, B can still send to A after
-- A closes its connection to B (for instance, in the TCP transport, the socket pair
-- connecting A and B should not yet be closed).
testCloseOneDirection :: Transport -> Int -> IO ()
testCloseOneDirection :: Transport -> Int -> IO ()
testCloseOneDirection Transport
transport Int
numPings = do
  MVar EndPointAddress
addrA <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
addrB <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
doneA <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
doneB <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  -- A
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A"
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog (EndPointAddress -> [Char]
forall a. Show a => a -> [Char]
show (EndPoint -> EndPointAddress
address EndPoint
endpoint))
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
addrA (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    -- Connect to B
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Connect to B"
    Right Connection
conn <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
addrB IO EndPointAddress
-> (EndPointAddress
    -> IO (Either (TransportError ConnectErrorCode) Connection))
-> IO (Either (TransportError ConnectErrorCode) Connection)
forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= \EndPointAddress
addr -> EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

    -- Wait for B to connect to us
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for B"
    ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    -- Send pings to B
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Send pings to B"
    Int -> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO (Either (TransportError SendErrorCode) ()) -> IO ())
-> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]

    -- Close our connection to B
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Close connection"
    Connection -> IO ()
close Connection
conn

    -- Wait for B's pongs
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for pongs from B"
    Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do Received ConnectionId
_ [ByteString]
_ <- EndPoint -> IO Event
receive EndPoint
endpoint ; () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    -- Wait for B to close it's connection to us
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for B to close connection"
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint
    Bool -> IO ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid')

    -- Done
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Done"
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
doneA ()

  -- B
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"B"
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog (EndPointAddress -> [Char]
forall a. Show a => a -> [Char]
show (EndPoint -> EndPointAddress
address EndPoint
endpoint))
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
addrB (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    -- Wait for A to connect
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for A to connect"
    ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    -- Connect to A
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Connect to A"
    Right Connection
conn <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
addrA IO EndPointAddress
-> (EndPointAddress
    -> IO (Either (TransportError ConnectErrorCode) Connection))
-> IO (Either (TransportError ConnectErrorCode) Connection)
forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= \EndPointAddress
addr -> EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

    -- Wait for A's pings
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for pings from A"
    Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do Received ConnectionId
_ [ByteString]
_ <- EndPoint -> IO Event
receive EndPoint
endpoint ; () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    -- Wait for A to close it's connection to us
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Wait for A to close connection"
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint
    Bool -> IO ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid')

    -- Send pongs to A
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Send pongs to A"
    Int -> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO (Either (TransportError SendErrorCode) ()) -> IO ())
-> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong"]

    -- Close our connection to A
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Close connection to A"
    Connection -> IO ()
close Connection
conn

    -- Done
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Done"
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
doneB ()

  (MVar () -> IO ()) -> [MVar ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar [MVar ()
doneA, MVar ()
doneB]

-- | Collect events and order them by connection ID
collect :: EndPoint -> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect :: EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endPoint Maybe Int
maxEvents Maybe Int
timeout = Maybe Int
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> IO [(ConnectionId, [[ByteString]])]
forall {a} {m :: * -> *}.
(Eq a, MonadS m, MonadIO m, Num a) =>
Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go Maybe Int
maxEvents Map ConnectionId [[ByteString]]
forall k a. Map k a
Map.empty Map ConnectionId [[ByteString]]
forall k a. Map k a
Map.empty
  where
    -- TODO: for more serious use of this function we'd need to make these arguments strict
    go :: Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go (Just a
0) Map ConnectionId [[ByteString]]
open Map ConnectionId [[ByteString]]
closed = Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
forall {m :: * -> *} {b} {b} {k} {a}.
(MonadS m, Show b) =>
Map b b -> Map k [a] -> m [(k, [a])]
finish Map ConnectionId [[ByteString]]
open Map ConnectionId [[ByteString]]
closed
    go Maybe a
n Map ConnectionId [[ByteString]]
open Map ConnectionId [[ByteString]]
closed = do
      Either IOError Event
mEvent <- IO Event -> m (Either IOError Event)
forall (m :: * -> *) a. MonadIO m => IO a -> m (Either IOError a)
tryIO (IO Event -> m (Either IOError Event))
-> (IO Event -> IO Event) -> IO Event -> m (Either IOError Event)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe Int -> IOError -> IO Event -> IO Event
forall e a. Exception e => Maybe Int -> e -> IO a -> IO a
timeoutMaybe Maybe Int
timeout ([Char] -> IOError
userError [Char]
"timeout") (IO Event -> m (Either IOError Event))
-> IO Event -> m (Either IOError Event)
forall a b. (a -> b) -> a -> b
$ EndPoint -> IO Event
receive EndPoint
endPoint
      case Either IOError Event
mEvent of
        Left IOError
_ -> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
forall {m :: * -> *} {b} {b} {k} {a}.
(MonadS m, Show b) =>
Map b b -> Map k [a] -> m [(k, [a])]
finish Map ConnectionId [[ByteString]]
open Map ConnectionId [[ByteString]]
closed
        Right Event
event -> do
          let n' :: Maybe a
n' = (\a
x -> a
x a -> a -> a
forall a. Num a => a -> a -> a
- a
1) (a -> a) -> Maybe a -> Maybe a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a
n
          case Event
event of
            ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ ->
              Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go Maybe a
n' (ConnectionId
-> [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ConnectionId
cid [] Map ConnectionId [[ByteString]]
open) Map ConnectionId [[ByteString]]
closed
            ConnectionClosed ConnectionId
cid ->
              let list :: [[ByteString]]
list = [[ByteString]]
-> ConnectionId
-> Map ConnectionId [[ByteString]]
-> [[ByteString]]
forall k a. Ord k => a -> k -> Map k a -> a
Map.findWithDefault ([Char] -> [[ByteString]]
forall a. HasCallStack => [Char] -> a
error [Char]
"Invalid ConnectionClosed") ConnectionId
cid Map ConnectionId [[ByteString]]
open in
              Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go Maybe a
n' (ConnectionId
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
forall k a. Ord k => k -> Map k a -> Map k a
Map.delete ConnectionId
cid Map ConnectionId [[ByteString]]
open) (ConnectionId
-> [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
forall k a. Ord k => k -> a -> Map k a -> Map k a
Map.insert ConnectionId
cid [[ByteString]]
list Map ConnectionId [[ByteString]]
closed)
            Received ConnectionId
cid [ByteString]
msg ->
              Maybe a
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
-> m [(ConnectionId, [[ByteString]])]
go Maybe a
n' (([[ByteString]] -> [[ByteString]])
-> ConnectionId
-> Map ConnectionId [[ByteString]]
-> Map ConnectionId [[ByteString]]
forall k a. Ord k => (a -> a) -> k -> Map k a -> Map k a
Map.adjust ([ByteString]
msg [ByteString] -> [[ByteString]] -> [[ByteString]]
forall a. a -> [a] -> [a]
:) ConnectionId
cid Map ConnectionId [[ByteString]]
open) Map ConnectionId [[ByteString]]
closed
            ReceivedMulticast MulticastAddress
_ [ByteString]
_ ->
              [Char] -> m [(ConnectionId, [[ByteString]])]
forall (m :: * -> *) a. MonadS m => [Char] -> m a
fail [Char]
"Unexpected multicast"
            ErrorEvent TransportError EventErrorCode
_ ->
              [Char] -> m [(ConnectionId, [[ByteString]])]
forall (m :: * -> *) a. MonadS m => [Char] -> m a
fail [Char]
"Unexpected error"
            Event
EndPointClosed ->
              [Char] -> m [(ConnectionId, [[ByteString]])]
forall (m :: * -> *) a. MonadS m => [Char] -> m a
fail [Char]
"Unexpected endpoint closure"

    finish :: Map b b -> Map k [a] -> m [(k, [a])]
finish Map b b
open Map k [a]
closed =
      if Map b b -> Bool
forall k a. Map k a -> Bool
Map.null Map b b
open
        then [(k, [a])] -> m [(k, [a])]
forall (m :: * -> *) a. MonadS m => a -> m a
return ([(k, [a])] -> m [(k, [a])])
-> (Map k [a] -> [(k, [a])]) -> Map k [a] -> m [(k, [a])]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map k [a] -> [(k, [a])]
forall k a. Map k a -> [(k, a)]
Map.toList (Map k [a] -> [(k, [a])])
-> (Map k [a] -> Map k [a]) -> Map k [a] -> [(k, [a])]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([a] -> [a]) -> Map k [a] -> Map k [a]
forall a b k. (a -> b) -> Map k a -> Map k b
Map.map [a] -> [a]
forall a. [a] -> [a]
reverse (Map k [a] -> m [(k, [a])]) -> Map k [a] -> m [(k, [a])]
forall a b. (a -> b) -> a -> b
$ Map k [a]
closed
        else [Char] -> m [(k, [a])]
forall (m :: * -> *) a. MonadS m => [Char] -> m a
fail ([Char] -> m [(k, [a])]) -> [Char] -> m [(k, [a])]
forall a b. (a -> b) -> a -> b
$ [Char]
"Open connections: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [b] -> [Char]
forall a. Show a => a -> [Char]
show (((b, b) -> b) -> [(b, b)] -> [b]
forall a b. (a -> b) -> [a] -> [b]
map (b, b) -> b
forall a b. (a, b) -> a
fst ([(b, b)] -> [b]) -> (Map b b -> [(b, b)]) -> Map b b -> [b]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Map b b -> [(b, b)]
forall k a. Map k a -> [(k, a)]
Map.toList (Map b b -> [b]) -> Map b b -> [b]
forall a b. (a -> b) -> a -> b
$ Map b b
open)

-- | Open connection, close it, then reopen it
-- (In the TCP transport this means the socket will be closed, then reopened)
--
-- Note that B cannot expect to receive all of A's messages on the first connection
-- before receiving the messages on the second connection. What might (and sometimes
-- does) happen is that finishes sending all of its messages on the first connection
-- (in the TCP transport, the first socket pair) while B is behind on reading _from_
-- this connection (socket pair) -- the messages are "in transit" on the network
-- (these tests are done on localhost, so there are in some OS buffer). Then when
-- A opens the second connection (socket pair) B will spawn a new thread for this
-- connection, and hence might start interleaving messages from the first and second
-- connection.
--
-- This is correct behaviour, however: the transport API guarantees reliability and
-- ordering _per connection_, but not _across_ connections.
testCloseReopen :: Transport -> Int -> IO ()
testCloseReopen :: Transport -> Int -> IO ()
testCloseReopen Transport
transport Int
numPings = do
  MVar EndPointAddress
addrB <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
doneB <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  let numRepeats :: Int
numRepeats = Int
2 :: Int

  -- A
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    [Int] -> (Int -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1 .. Int
numRepeats] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
i -> do
      [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A connecting"
      -- Connect to B
      Right Connection
conn <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
addrB IO EndPointAddress
-> (EndPointAddress
    -> IO (Either (TransportError ConnectErrorCode) Connection))
-> IO (Either (TransportError ConnectErrorCode) Connection)
forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= \EndPointAddress
addr -> EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

      [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A pinging"
      -- Say hi
      [Int]
-> (Int -> IO (Either (TransportError SendErrorCode) ())) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1 .. Int
numPings] ((Int -> IO (Either (TransportError SendErrorCode) ())) -> IO ())
-> (Int -> IO (Either (TransportError SendErrorCode) ())) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
j -> Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [[Char] -> ByteString
pack ([Char] -> ByteString) -> [Char] -> ByteString
forall a b. (a -> b) -> a -> b
$ [Char]
"ping" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"/" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
j]

      [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A closing"
      -- Disconnect again
      Connection -> IO ()
close Connection
conn

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"A finishing"

  -- B
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
addrB (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    [(ConnectionId, [[ByteString]])]
eventss <- EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endpoint (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int
numRepeats Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
numPings Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
2))) Maybe Int
forall a. Maybe a
Nothing

    [(Int, (ConnectionId, [[ByteString]]))]
-> ((Int, (ConnectionId, [[ByteString]])) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int]
-> [(ConnectionId, [[ByteString]])]
-> [(Int, (ConnectionId, [[ByteString]]))]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 .. Int
numRepeats] [(ConnectionId, [[ByteString]])]
eventss) (((Int, (ConnectionId, [[ByteString]])) -> IO ()) -> IO ())
-> ((Int, (ConnectionId, [[ByteString]])) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int
i, (ConnectionId
_, [[ByteString]]
events)) -> do
      [(Int, [ByteString])] -> ((Int, [ByteString]) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int] -> [[ByteString]] -> [(Int, [ByteString])]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
1 .. Int
numPings] [[ByteString]]
events) (((Int, [ByteString]) -> IO ()) -> IO ())
-> ((Int, [ByteString]) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int
j, [ByteString]
event) -> do
        Bool -> IO ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard ([ByteString]
event [ByteString] -> [ByteString] -> Bool
forall a. Eq a => a -> a -> Bool
== [[Char] -> ByteString
pack ([Char] -> ByteString) -> [Char] -> ByteString
forall a b. (a -> b) -> a -> b
$ [Char]
"ping" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ [Char]
"/" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
j])

    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
doneB ()

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
doneB

-- | Test lots of parallel connection attempts
testParallelConnects :: Transport -> Int -> IO ()
testParallelConnects :: Transport -> Int -> IO ()
testParallelConnects Transport
transport Int
numPings = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
done   <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

  -- Spawn lots of clients
  [Int] -> (Int -> IO ThreadId) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1 .. Int
numPings] ((Int -> IO ThreadId) -> IO ()) -> (Int -> IO ThreadId) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
i -> IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [[Char] -> ByteString
pack ([Char] -> ByteString) -> [Char] -> ByteString
forall a b. (a -> b) -> a -> b
$ [Char]
"ping" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i]
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [[Char] -> ByteString
pack ([Char] -> ByteString) -> [Char] -> ByteString
forall a b. (a -> b) -> a -> b
$ [Char]
"ping" [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ Int -> [Char]
forall a. Show a => a -> [Char]
show Int
i]
    Connection -> IO ()
close Connection
conn

  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    [(ConnectionId, [[ByteString]])]
eventss <- EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endpoint (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int
numPings Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
4)) Maybe Int
forall a. Maybe a
Nothing
    -- Check that no pings got sent to the wrong connection
    [(ConnectionId, [[ByteString]])]
-> ((ConnectionId, [[ByteString]]) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(ConnectionId, [[ByteString]])]
eventss (((ConnectionId, [[ByteString]]) -> IO ()) -> IO ())
-> ((ConnectionId, [[ByteString]]) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(ConnectionId
_, [[ByteString
ping1], [ByteString
ping2]]) ->
      Bool -> IO ()
forall (f :: * -> *). Alternative f => Bool -> f ()
guard (ByteString
ping1 ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
ping2)
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
done

-- | Test that sending an error to self gives an error in the sender
testSelfSend :: Transport -> IO ()
testSelfSend :: Transport -> IO ()
testSelfSend Transport
transport = do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint (EndPoint -> EndPointAddress
address EndPoint
endpoint) Reliability
ReliableOrdered
                          ConnectHints
defaultConnectHints

    -- Must clear the ConnectionOpened event or else sending may block
    ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    do Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ [Char] -> ByteString
forall a. HasCallStack => [Char] -> a
error [Char]
"bang!" ]
       [Char] -> IO ()
forall a. HasCallStack => [Char] -> a
error [Char]
"testSelfSend: send didn't fail"
     IO () -> (ErrorCall -> IO ()) -> IO ()
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` (\(ErrorCall [Char]
"bang!") -> () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ())

    Connection -> IO ()
close Connection
conn

    -- Must clear this event or else closing the end point may block.
    ConnectionClosed ConnectionId
_ <- EndPoint -> IO Event
receive EndPoint
endpoint

    EndPoint -> IO ()
closeEndPoint EndPoint
endpoint

-- | Test that sending on a closed connection gives an error
testSendAfterClose :: Transport -> Int -> IO ()
testSendAfterClose :: Transport -> Int -> IO ()
testSendAfterClose Transport
transport Int
numRepeats = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
clientDone <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    -- We request two lightweight connections
    Int -> IO () -> IO [()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numRepeats (IO () -> IO [()]) -> IO () -> IO [()]
forall a b. (a -> b) -> a -> b
$ do
      Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints

      -- Close the second, but leave the first open; then output on the second
      -- connection (i.e., on a closed connection while there is still another
      -- connection open)
      Connection -> IO ()
close Connection
conn2
      Left (TransportError SendErrorCode
SendClosed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"ping2"]

      -- Now close the first connection, and output on it (i.e., output while
      -- there are no lightweight connection at all anymore)
      Connection -> IO ()
close Connection
conn1
      Left (TransportError SendErrorCode
SendClosed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"ping2"]

      () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
clientDone

-- | Test that closing the same connection twice has no effect
testCloseTwice :: Transport -> Int -> IO ()
testCloseTwice :: Transport -> Int -> IO ()
testCloseTwice Transport
transport Int
numRepeats = do
  EndPointAddress
server <- Transport -> (EndPoint -> IO ()) -> IO EndPointAddress
spawn Transport
transport EndPoint -> IO ()
echoServer
  MVar ()
clientDone <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    Int -> IO () -> IO [()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numRepeats (IO () -> IO [()]) -> IO () -> IO [()]
forall a b. (a -> b) -> a -> b
$ do
      -- We request two lightweight connections
      Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
server Reliability
ReliableOrdered ConnectHints
defaultConnectHints

      -- Close the second one twice
      Connection -> IO ()
close Connection
conn2
      Connection -> IO ()
close Connection
conn2

      -- Then send a message on the first and close that twice too
      Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"ping"]
      Connection -> IO ()
close Connection
conn1

      -- Verify expected response from the echo server
      ConnectionOpened ConnectionId
cid1 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
      ConnectionOpened ConnectionId
cid2 Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
      -- ordering of the following messages may differ depending of
      -- implementation
      [Event]
ms   <- Int -> IO Event -> IO [Event]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
3 (IO Event -> IO [Event]) -> IO Event -> IO [Event]
forall a b. (a -> b) -> a -> b
$ EndPoint -> IO Event
receive EndPoint
endpoint
      Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ [Event] -> [[Event]] -> Bool
forall a. Eq a => [a] -> [[a]] -> Bool
testStreams [Event]
ms [ [ ConnectionId -> Event
ConnectionClosed ConnectionId
cid2 ]
                                      , [ ConnectionId -> [ByteString] -> Event
Received ConnectionId
cid1 [ByteString
"ping"]
                                        , ConnectionId -> Event
ConnectionClosed ConnectionId
cid1 ]
                                      ]
      () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
clientDone

-- | Test that we can connect an endpoint to itself
testConnectToSelf :: Transport -> Int -> IO ()
testConnectToSelf :: Transport -> Int -> IO ()
testConnectToSelf Transport
transport Int
numPings = do
  MVar ()
done <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
reconnect <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Creating self-connection"
  Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint (EndPoint -> EndPointAddress
address EndPoint
endpoint) Reliability
ReliableOrdered ConnectHints
defaultConnectHints

  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Talk to myself"

  -- One thread to write to the endpoint
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"writing"

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Sending ping"
    Int -> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO (Either (TransportError SendErrorCode) ()) -> IO ())
-> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Closing connection"
    Connection -> IO ()
close Connection
conn
    MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
reconnect
    ConnectionOpened ConnectionId
cid' Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
    ConnectionClosed ConnectionId
cid'' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid' ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid''
    () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

  -- And one thread to read
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"reading"

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Waiting for ConnectionOpened"
    ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Waiting for Received"
    Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
       Received ConnectionId
cid' [ByteString
"ping"] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
       () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Waiting for ConnectionClosed"
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
reconnect ()

    -- Check that the addr supplied also connects to self.
    -- The other thread verifies this.
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection -> IO ()
close Connection
conn

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Done"
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
done

-- | Test that we can connect an endpoint to itself multiple times
testConnectToSelfTwice :: Transport -> Int -> IO ()
testConnectToSelfTwice :: Transport -> Int -> IO ()
testConnectToSelfTwice Transport
transport Int
numPings = do
  MVar ()
done <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

  [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Talk to myself"

  -- An MVar to ensure that the node which sends pingA will connect first, as
  -- this determines the order of the events given out by 'collect' and is
  -- essential for the equality test there.
  MVar ()
firstConnectionMade <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  -- One thread to write to the endpoint using the first connection
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Creating self-connection"
    Right Connection
conn1 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint (EndPoint -> EndPointAddress
address EndPoint
endpoint) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
firstConnectionMade ()

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"writing"

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Sending ping"
    Int -> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO (Either (TransportError SendErrorCode) ()) -> IO ())
-> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"pingA"]

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Closing connection"
    Connection -> IO ()
close Connection
conn1

  -- One thread to write to the endpoint using the second connection
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
firstConnectionMade
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Creating self-connection"
    Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint (EndPoint -> EndPointAddress
address EndPoint
endpoint) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"writing"

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Sending ping"
    Int -> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numPings (IO (Either (TransportError SendErrorCode) ()) -> IO ())
-> IO (Either (TransportError SendErrorCode) ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"pingB"]

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Closing connection"
    Connection -> IO ()
close Connection
conn2

  -- And one thread to read
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"reading"

    [(ConnectionId
_, [[ByteString]]
events1), (ConnectionId
_, [[ByteString]]
events2)] <- EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endpoint (Int -> Maybe Int
forall a. a -> Maybe a
Just (Int
2 Int -> Int -> Int
forall a. Num a => a -> a -> a
* (Int
numPings Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
2))) Maybe Int
forall a. Maybe a
Nothing
    Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ [[ByteString]]
events1 [[ByteString]] -> [[ByteString]] -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> [ByteString] -> [[ByteString]]
forall a. Int -> a -> [a]
replicate Int
numPings [ByteString
"pingA"]
    Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ [[ByteString]]
events2 [[ByteString]] -> [[ByteString]] -> Bool
forall a. Eq a => a -> a -> Bool
== Int -> [ByteString] -> [[ByteString]]
forall a. Int -> a -> [a]
replicate Int
numPings [ByteString
"pingB"]

    [Char] -> IO ()
forall (m :: * -> *). MonadIO m => [Char] -> m ()
tlog [Char]
"Done"
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
done

-- | Test that we self-connections no longer work once we close our endpoint
-- or our transport
testCloseSelf :: IO (Either String Transport) -> IO ()
testCloseSelf :: IO (Either [Char] Transport) -> IO ()
testCloseSelf IO (Either [Char] Transport)
newTransport = do
  Right Transport
transport <- IO (Either [Char] Transport)
newTransport
  Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  Right Connection
conn1     <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint1) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Right Connection
conn2     <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint1) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Right Connection
conn3     <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint2 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2

  -- Close the conneciton and try to send
  Connection -> IO ()
close Connection
conn1
  ConnectionClosed ConnectionId
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Left (TransportError SendErrorCode
SendClosed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn1 [ByteString
"ping"]

  -- Close the first endpoint. We should not be able to use the first
  -- connection anymore, or open more self connections, but the self connection
  -- to the second endpoint should still be fine
  EndPoint -> IO ()
closeEndPoint EndPoint
endpoint1
  Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"ping"]
  Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint1) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  Right () <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn3 [ByteString
"ping"]
  Received ConnectionId
_ [ByteString]
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2

  -- Close the transport; now the second should no longer work
  Transport -> IO ()
closeTransport Transport
transport
  Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn3 [ByteString
"ping"]
  Left TransportError ConnectErrorCode
r <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint2 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  case TransportError ConnectErrorCode
r of
    TransportError ConnectErrorCode
ConnectFailed [Char]
_ -> () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()
    TransportError ConnectErrorCode
_ -> do [Char] -> IO ()
putStrLn ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"Actual: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ TransportError ConnectErrorCode -> [Char]
forall a. Show a => a -> [Char]
show TransportError ConnectErrorCode
r
            TransportError ConnectErrorCode
ConnectFailed [Char]
_ <- TransportError ConnectErrorCode
-> IO (TransportError ConnectErrorCode)
forall (m :: * -> *) a. MonadS m => a -> m a
return TransportError ConnectErrorCode
r
            () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

  () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

-- | Test various aspects of 'closeEndPoint'
testCloseEndPoint :: Transport -> Int -> IO ()
testCloseEndPoint :: Transport -> Int -> IO ()
testCloseEndPoint Transport
transport Int
_ = do
  MVar ()
serverFirstTestDone <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
serverDone <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
clientDone <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
clientAddr1 <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
clientAddr2 <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
serverAddr <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar

  -- Server
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
serverAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    -- First test (see client)
    do
      EndPointAddress
theirAddr <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
clientAddr1
      ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint
      -- Ensure that connecting to the supplied address reaches the peer.
      Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Connection -> IO ()
close Connection
conn
      MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
serverFirstTestDone ()
      ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
      MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
serverAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)
      () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    -- Second test
    do
      EndPointAddress
theirAddr <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
clientAddr2

      ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint
      -- Ensure that connecting to the supplied address reaches the peer.
      Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Connection -> IO ()
close Connection
conn
      Received ConnectionId
cid' [ByteString
"ping"] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

      Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong"]

      ConnectionClosed ConnectionId
cid'' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid''
      ErrorEvent (TransportError (EventConnectionLost EndPointAddress
addr') [Char]
_) <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ EndPointAddress
addr' EndPointAddress -> EndPointAddress -> Bool
forall a. Eq a => a -> a -> Bool
== EndPointAddress
theirAddr

      Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong2"]

      () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
serverDone ()

  -- Client
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do

    -- First test: close endpoint with one outgoing but no incoming connections
    do
      EndPointAddress
theirAddr <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
takeMVar MVar EndPointAddress
serverAddr
      Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
      MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
clientAddr1 (EndPoint -> EndPointAddress
address EndPoint
endpoint)

      -- Connect to the server, then close the endpoint without disconnecting explicitly
      Right Connection
_ <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
      ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
      -- Don't close before the remote server had a chance to digest the
      -- connection.
      MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
serverFirstTestDone
      EndPoint -> IO ()
closeEndPoint EndPoint
endpoint
      Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint
      () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    -- Second test: close endpoint with one outgoing and one incoming connection
    do
      EndPointAddress
theirAddr <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
takeMVar MVar EndPointAddress
serverAddr
      Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
      MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
clientAddr2 (EndPoint -> EndPointAddress
address EndPoint
endpoint)

      Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
      ConnectionOpened ConnectionId
cid Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint
      ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
      Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]

      -- Reply from the server
      ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint
      Received ConnectionId
cid' [ByteString
"pong"] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

      -- Close the endpoint
      EndPoint -> IO ()
closeEndPoint EndPoint
endpoint
      Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint

      -- Attempt to send should fail with connection closed
      Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping2"]

      -- An attempt to close the already closed connection should just return
      () <- Connection -> IO ()
close Connection
conn

      -- And so should an attempt to connect
      Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

      () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  (MVar () -> IO ()) -> [MVar ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar [MVar ()
serverDone, MVar ()
clientDone]

-- Test closeTransport
--
-- This tests many of the same things that testEndPoint does, and some more
testCloseTransport :: IO (Either String Transport) -> IO ()
testCloseTransport :: IO (Either [Char] Transport) -> IO ()
testCloseTransport IO (Either [Char] Transport)
newTransport = do
  MVar ()
serverDone <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
clientDone <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
clientAddr1 <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
clientAddr2 <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar
  MVar EndPointAddress
serverAddr <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar

  -- Server
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right Transport
transport <- IO (Either [Char] Transport)
newTransport
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
serverAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    -- Client sets up first endpoint
    EndPointAddress
theirAddr1 <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
clientAddr1
    ConnectionOpened ConnectionId
cid1 Reliability
ReliableOrdered EndPointAddress
addr <- EndPoint -> IO Event
receive EndPoint
endpoint
    -- Test that the address given does indeed point back to the client
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr1 Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection -> IO ()
close Connection
conn
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection -> IO ()
close Connection
conn

    -- Client sets up second endpoint
    EndPointAddress
theirAddr2 <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
clientAddr2

    ConnectionOpened ConnectionId
cid2 Reliability
ReliableOrdered EndPointAddress
addr' <- EndPoint -> IO Event
receive EndPoint
endpoint
    -- We're going to use addr' to connect back to the server, which tests
    -- that it's a valid address (but not *necessarily* == to theirAddr2

    Received ConnectionId
cid2' [ByteString
"ping"] <- EndPoint -> IO Event
receive EndPoint
endpoint ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid2' ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid2

    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddr2 Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong"]
    Connection -> IO ()
close Connection
conn
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr' Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong"]

    -- Client now closes down its transport. We should receive connection closed messages (we don't know the precise order, however)
    -- TODO: should we get an EventConnectionLost for theirAddr1? We have no outgoing connections
    [Event]
evs <- Int -> IO Event -> IO [Event]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
3 (IO Event -> IO [Event]) -> IO Event -> IO [Event]
forall a b. (a -> b) -> a -> b
$ EndPoint -> IO Event
receive EndPoint
endpoint
    let expected :: [Event]
expected = [ ConnectionId -> Event
ConnectionClosed ConnectionId
cid1
                   , ConnectionId -> Event
ConnectionClosed ConnectionId
cid2
                   -- , ErrorEvent (TransportError (EventConnectionLost theirAddr1) "")
                   , TransportError EventErrorCode -> Event
ErrorEvent (EventErrorCode -> [Char] -> TransportError EventErrorCode
forall error. error -> [Char] -> TransportError error
TransportError (EndPointAddress -> EventErrorCode
EventConnectionLost EndPointAddress
addr') [Char]
"")
                   ]
    Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ [Event]
expected [Event] -> [[Event]] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [Event] -> [[Event]]
forall a. [a] -> [[a]]
permutations [Event]
evs

    -- An attempt to send to the endpoint should now fail
    Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"pong2"]

    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
serverDone ()

  -- Client
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right Transport
transport <- IO (Either [Char] Transport)
newTransport
    EndPointAddress
theirAddr <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
serverAddr

    -- Set up endpoint with one outgoing but no incoming connections
    Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
clientAddr1 (EndPoint -> EndPointAddress
address EndPoint
endpoint1)

    -- Connect to the server, then close the endpoint without disconnecting explicitly
    Right Connection
_ <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    -- Server connects back to verify that both addresses they have for us
    -- are suitable to reach us.
    ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint1 ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
    ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint1
    ConnectionClosed ConnectionId
cid' <- EndPoint -> IO Event
receive EndPoint
endpoint1 ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

    -- Set up an endpoint with one outgoing and one incoming connection
    Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
clientAddr2 (EndPoint -> EndPointAddress
address EndPoint
endpoint2)

    -- The outgoing connection.
    Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint2 EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]

    -- Reply from the server. It will connect twice, using both addresses
    -- (the one that the client sees, and the one that the server sees).
    ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2
    Received ConnectionId
cid' [ByteString
"pong"] <- EndPoint -> IO Event
receive EndPoint
endpoint2 ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'
    ConnectionClosed ConnectionId
cid'' <- EndPoint -> IO Event
receive EndPoint
endpoint2 ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid''
    ConnectionOpened ConnectionId
cid Reliability
ReliableOrdered EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2
    Received ConnectionId
cid' [ByteString
"pong"] <- EndPoint -> IO Event
receive EndPoint
endpoint2 ; Bool
True <- Bool -> IO Bool
forall (m :: * -> *) a. MonadS m => a -> m a
return (Bool -> IO Bool) -> Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ ConnectionId
cid ConnectionId -> ConnectionId -> Bool
forall a. Eq a => a -> a -> Bool
== ConnectionId
cid'

    -- Now shut down the entire transport
    Transport -> IO ()
closeTransport Transport
transport

    -- Both endpoints should report that they have been closed
    Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint1
    Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint2

    -- Attempt to send should fail with connection closed
    Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping2"]

    -- An attempt to close the already closed connection should just return
    () <- Connection -> IO ()
close Connection
conn

    -- And so should an attempt to connect on either endpoint
    Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints
    Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint2 EndPointAddress
theirAddr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

    -- And finally, so should an attempt to create a new endpoint
    Left (TransportError NewEndPointErrorCode
NewEndPointFailed [Char]
_) <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  (MVar () -> IO ()) -> [MVar ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar [MVar ()
serverDone, MVar ()
clientDone]

-- | Remote node attempts to connect to a closed local endpoint
testConnectClosedEndPoint :: Transport -> IO ()
testConnectClosedEndPoint :: Transport -> IO ()
testConnectClosedEndPoint Transport
transport = do
  MVar EndPointAddress
serverAddr   <- IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
serverClosed <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
clientDone   <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  -- Server
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
serverAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)

    EndPoint -> IO ()
closeEndPoint EndPoint
endpoint
    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
serverClosed ()

  -- Client
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
serverClosed

    Left (TransportError ConnectErrorCode
ConnectNotFound [Char]
_) <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
serverAddr IO EndPointAddress
-> (EndPointAddress
    -> IO (Either (TransportError ConnectErrorCode) Connection))
-> IO (Either (TransportError ConnectErrorCode) Connection)
forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= \EndPointAddress
addr -> EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
addr Reliability
ReliableOrdered ConnectHints
defaultConnectHints

    MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
clientDone ()

  MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
clientDone

-- | We should receive an exception when doing a 'receive' after we have been
-- notified that an endpoint has been closed
testExceptionOnReceive :: IO (Either String Transport) -> IO ()
testExceptionOnReceive :: IO (Either [Char] Transport) -> IO ()
testExceptionOnReceive IO (Either [Char] Transport)
newTransport = do
  Right Transport
transport <- IO (Either [Char] Transport)
newTransport

  -- Test one: when we close an endpoint specifically
  Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  EndPoint -> IO ()
closeEndPoint EndPoint
endpoint1
  Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint1
  Left SomeException
_ <- IO Event -> IO (Either SomeException Event)
forall a. IO a -> IO (Either SomeException a)
trySome (EndPoint -> IO Event
receive EndPoint
endpoint1 IO Event -> (Event -> IO Event) -> IO Event
forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= Event -> IO Event
forall a. a -> IO a
evaluate)

  -- Test two: when we close the entire transport
  Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  Transport -> IO ()
closeTransport Transport
transport
  Event
EndPointClosed <- EndPoint -> IO Event
receive EndPoint
endpoint2
  Left SomeException
_ <- IO Event -> IO (Either SomeException Event)
forall a. IO a -> IO (Either SomeException a)
trySome (EndPoint -> IO Event
receive EndPoint
endpoint2 IO Event -> (Event -> IO Event) -> IO Event
forall (m :: * -> *) a b.
(MonadS m, Traceable a) =>
m a -> (a -> m b) -> m b
>>= Event -> IO Event
forall a. a -> IO a
evaluate)

  () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

-- | Test what happens when the argument to 'send' is an exceptional value
testSendException :: IO (Either String Transport) -> IO ()
testSendException :: IO (Either [Char] Transport) -> IO ()
testSendException IO (Either [Char] Transport)
newTransport = do
  Right Transport
transport <- IO (Either [Char] Transport)
newTransport
  Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
  Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport

  -- Connect endpoint1 to endpoint2
  Right Connection
conn <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2

  -- Send an exceptional value
  Left (TransportError SendErrorCode
SendFailed [Char]
_) <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn (IOError -> [ByteString]
forall a e. Exception e => e -> a
throw (IOError -> [ByteString]) -> IOError -> [ByteString]
forall a b. (a -> b) -> a -> b
$ [Char] -> IOError
userError [Char]
"uhoh")

  -- This will have been as a failure to send by endpoint1, which will
  -- therefore have closed the socket. In turn this will have caused endpoint2
  -- to report that the connection was lost
  ErrorEvent (TransportError (EventConnectionLost EndPointAddress
_) [Char]
_) <- EndPoint -> IO Event
receive EndPoint
endpoint1
  ErrorEvent (TransportError (EventConnectionLost EndPointAddress
_) [Char]
_) <- EndPoint -> IO Event
receive EndPoint
endpoint2

  -- A new connection will re-establish the connection
  Right Connection
conn2 <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints
  Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn2 [ByteString
"ping"]
  Connection -> IO ()
close Connection
conn2

  ConnectionOpened ConnectionId
_ Reliability
_ EndPointAddress
_ <- EndPoint -> IO Event
receive EndPoint
endpoint2
  Received ConnectionId
_ [ByteString
"ping"]    <- EndPoint -> IO Event
receive EndPoint
endpoint2
  ConnectionClosed ConnectionId
_     <- EndPoint -> IO Event
receive EndPoint
endpoint2

  () -> IO ()
forall (m :: * -> *) a. MonadS m => a -> m a
return ()

-- | If threads get killed while executing a 'connect', 'send', or 'close', this
-- should not affect other threads.
--
-- The intention of this test is to see what happens when a asynchronous
-- exception happes _while executing a send_. This is exceedingly difficult to
-- guarantee, however. Hence we run a large number of tests and insert random
-- thread delays -- and even then it might not happen.  Moreover, it will only
-- happen when we run on multiple cores.
testKill :: IO (Either String Transport) -> Int -> IO ()
testKill :: IO (Either [Char] Transport) -> Int -> IO ()
testKill IO (Either [Char] Transport)
newTransport Int
numThreads = do
  Right Transport
transport1 <- IO (Either [Char] Transport)
newTransport
  Right Transport
transport2 <- IO (Either [Char] Transport)
newTransport
  Right EndPoint
endpoint1 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport1
  Right EndPoint
endpoint2 <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport2

  [ThreadId]
threads <- Int -> IO ThreadId -> IO [ThreadId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numThreads (IO ThreadId -> IO [ThreadId])
-> (IO () -> IO ThreadId) -> IO () -> IO [ThreadId]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO () -> IO ThreadId
forkIO (IO () -> IO [ThreadId]) -> IO () -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ do
    Int -> IO ()
randomThreadDelay Int
100
    IO (Either (TransportError ConnectErrorCode) Connection)
-> (Either (TransportError ConnectErrorCode) Connection -> IO ())
-> (Either (TransportError ConnectErrorCode) Connection -> IO ())
-> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint1 (EndPoint -> EndPointAddress
address EndPoint
endpoint2) Reliability
ReliableOrdered ConnectHints
defaultConnectHints)
            -- Note that we should not insert a randomThreadDelay into the
            -- exception handler itself as this means that the exception handler
            -- could be interrupted and we might not close
            (\(Right Connection
conn) -> Connection -> IO ()
close Connection
conn)
            (\(Right Connection
conn) -> do Int -> IO ()
randomThreadDelay Int
100
                                 Right () <- Connection
-> [ByteString] -> IO (Either (TransportError SendErrorCode) ())
send Connection
conn [ByteString
"ping"]
                                 Int -> IO ()
randomThreadDelay Int
100)

  MVar Int
numAlive <- Int -> IO (MVar Int)
forall a. a -> IO (MVar a)
newMVar (Int
0 :: Int)

  -- Kill half of those threads
  IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId)
-> ((ThreadId -> IO ()) -> IO ())
-> (ThreadId -> IO ())
-> IO ThreadId
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ThreadId] -> (ThreadId -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ThreadId]
threads ((ThreadId -> IO ()) -> IO ThreadId)
-> (ThreadId -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \ThreadId
tid -> do
    Bool
shouldKill <- IO Bool
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
    if Bool
shouldKill
      then Int -> IO ()
randomThreadDelay Int
600 IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadS m => m a -> m b -> m b
>> ThreadId -> IO ()
killThread ThreadId
tid
      else MVar Int -> (Int -> IO Int) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar Int
numAlive (Int -> IO Int
forall (m :: * -> *) a. MonadS m => a -> m a
return (Int -> IO Int) -> (Int -> Int) -> Int -> IO Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1))

  -- Since it is impossible to predict when the kill exactly happens, we don't
  -- know how many connects were opened and how many pings were sent. But we
  -- should not have any open connections (if we do, collect will throw an
  -- error) and we should have at least the number of pings equal to the number
  -- of threads we did *not* kill
  [(ConnectionId, [[ByteString]])]
eventss <- EndPoint
-> Maybe Int -> Maybe Int -> IO [(ConnectionId, [[ByteString]])]
collect EndPoint
endpoint2 Maybe Int
forall a. Maybe a
Nothing (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1000000)
  let actualPings :: Int
actualPings = [Int] -> Int
forall a. Num a => [a] -> a
forall (t :: * -> *) a. (Foldable t, Num a) => t a -> a
sum ([Int] -> Int)
-> ([(ConnectionId, [[ByteString]])] -> [Int])
-> [(ConnectionId, [[ByteString]])]
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((ConnectionId, [[ByteString]]) -> Int)
-> [(ConnectionId, [[ByteString]])] -> [Int]
forall a b. (a -> b) -> [a] -> [b]
map ([[ByteString]] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length ([[ByteString]] -> Int)
-> ((ConnectionId, [[ByteString]]) -> [[ByteString]])
-> (ConnectionId, [[ByteString]])
-> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ConnectionId, [[ByteString]]) -> [[ByteString]]
forall a b. (a, b) -> b
snd) ([(ConnectionId, [[ByteString]])] -> Int)
-> [(ConnectionId, [[ByteString]])] -> Int
forall a b. (a -> b) -> a -> b
$ [(ConnectionId, [[ByteString]])]
eventss
  Int
expectedPings <- MVar Int -> IO Int
forall a. MVar a -> IO a
takeMVar MVar Int
numAlive
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int
actualPings Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
expectedPings) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
    IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO ([Char] -> IOError
userError [Char]
"Missing pings")

--  print (actualPings, expectedPings)


-- | Set up conditions with a high likelyhood of "crossing" (for transports
-- that multiplex lightweight connections across heavyweight connections)
testCrossing :: Transport -> Int -> IO ()
testCrossing :: Transport -> Int -> IO ()
testCrossing Transport
transport Int
numRepeats = do
  [MVar EndPointAddress
aAddr, MVar EndPointAddress
bAddr] <- Int -> IO (MVar EndPointAddress) -> IO [MVar EndPointAddress]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 IO (MVar EndPointAddress)
forall a. IO (MVar a)
newEmptyMVar
  [MVar ()
aDone, MVar ()
bDone] <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  [MVar ()
aGo,   MVar ()
bGo]   <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  [MVar ()
aTimeout, MVar ()
bTimeout] <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  let hints :: ConnectHints
hints = ConnectHints
defaultConnectHints {
                connectTimeout = Just 5000000
              }

  -- A
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
aAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)
    EndPointAddress
theirAddress <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
bAddr

    Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numRepeats (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
aGo IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadS m => m a -> m b -> m b
>> IO ()
yield
      -- Because we are creating lots of connections, it's possible that
      -- connect times out (for instance, in the TCP transport,
      -- Network.Socket.connect may time out). We shouldn't regard this as an
      -- error in the Transport, though.
      Either (TransportError ConnectErrorCode) Connection
connectResult <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddress Reliability
ReliableOrdered ConnectHints
hints
      case Either (TransportError ConnectErrorCode) Connection
connectResult of
        Right Connection
conn -> Connection -> IO ()
close Connection
conn
        Left (TransportError ConnectErrorCode
ConnectTimeout [Char]
_) -> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
aTimeout ()
        Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) -> MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
bTimeout
        Left TransportError ConnectErrorCode
err -> IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (IOError -> IO ()) -> ([Char] -> IOError) -> [Char] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IOError
userError ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"testCrossed: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ TransportError ConnectErrorCode -> [Char]
forall a. Show a => a -> [Char]
show TransportError ConnectErrorCode
err
      MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
aDone ()

  -- B
  IO () -> IO ThreadId
forkTry (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
    Right EndPoint
endpoint <- Transport
-> IO (Either (TransportError NewEndPointErrorCode) EndPoint)
newEndPoint Transport
transport
    MVar EndPointAddress -> EndPointAddress -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar EndPointAddress
bAddr (EndPoint -> EndPointAddress
address EndPoint
endpoint)
    EndPointAddress
theirAddress <- MVar EndPointAddress -> IO EndPointAddress
forall a. MVar a -> IO a
readMVar MVar EndPointAddress
aAddr

    Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
numRepeats (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
bGo IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadS m => m a -> m b -> m b
>> IO ()
yield
      Either (TransportError ConnectErrorCode) Connection
connectResult <- EndPoint
-> EndPointAddress
-> Reliability
-> ConnectHints
-> IO (Either (TransportError ConnectErrorCode) Connection)
connect EndPoint
endpoint EndPointAddress
theirAddress Reliability
ReliableOrdered ConnectHints
hints
      case Either (TransportError ConnectErrorCode) Connection
connectResult of
        Right Connection
conn -> Connection -> IO ()
close Connection
conn
        Left (TransportError ConnectErrorCode
ConnectTimeout [Char]
_) -> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
bTimeout ()
        Left (TransportError ConnectErrorCode
ConnectFailed [Char]
_) -> MVar () -> IO ()
forall a. MVar a -> IO a
readMVar MVar ()
aTimeout
        Left TransportError ConnectErrorCode
err -> IOError -> IO ()
forall e a. Exception e => e -> IO a
throwIO (IOError -> IO ()) -> ([Char] -> IOError) -> [Char] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Char] -> IOError
userError ([Char] -> IO ()) -> [Char] -> IO ()
forall a b. (a -> b) -> a -> b
$ [Char]
"testCrossed: " [Char] -> [Char] -> [Char]
forall a. [a] -> [a] -> [a]
++ TransportError ConnectErrorCode -> [Char]
forall a. Show a => a -> [Char]
show TransportError ConnectErrorCode
err
      MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
bDone ()

  -- Driver
  [Int] -> (Int -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Int
1 .. Int
numRepeats] ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
_i -> do
    -- putStrLn $ "Round " ++ show _i
    MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar ()
aTimeout
    MVar () -> IO (Maybe ())
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar ()
bTimeout
    Bool
b <- IO Bool
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
    if Bool
b then do MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
aGo () ; MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
bGo ()
         else do MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
bGo () ; MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
aGo ()
    IO ()
yield
    MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
aDone
    MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
bDone

-- Transport tests
testTransport :: IO (Either String Transport) -> IO ()
testTransport :: IO (Either [Char] Transport) -> IO ()
testTransport = ([Char] -> Bool) -> IO (Either [Char] Transport) -> IO ()
testTransportWithFilter (Bool -> [Char] -> Bool
forall a b. a -> b -> a
const Bool
True)

testTransportWithFilter :: (String -> Bool) -> IO (Either String Transport) -> IO ()
testTransportWithFilter :: ([Char] -> Bool) -> IO (Either [Char] Transport) -> IO ()
testTransportWithFilter [Char] -> Bool
p IO (Either [Char] Transport)
newTransport = do
  Right Transport
transport <- IO (Either [Char] Transport)
newTransport
  [([Char], IO ())] -> IO ()
runTests ([([Char], IO ())] -> IO ()) -> [([Char], IO ())] -> IO ()
forall a b. (a -> b) -> a -> b
$ (([Char], IO ()) -> Bool) -> [([Char], IO ())] -> [([Char], IO ())]
forall a. (a -> Bool) -> [a] -> [a]
filter ([Char] -> Bool
p ([Char] -> Bool)
-> (([Char], IO ()) -> [Char]) -> ([Char], IO ()) -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([Char], IO ()) -> [Char]
forall a b. (a, b) -> a
fst)
    [ ([Char]
"PingPong",              Transport -> Int -> IO ()
testPingPong Transport
transport Int
numPings)
    , ([Char]
"EndPoints",             Transport -> Int -> IO ()
testEndPoints Transport
transport Int
numPings)
    , ([Char]
"Connections",           Transport -> Int -> IO ()
testConnections Transport
transport Int
numPings)
    , ([Char]
"CloseOneConnection",    Transport -> Int -> IO ()
testCloseOneConnection Transport
transport Int
numPings)
    , ([Char]
"CloseOneDirection",     Transport -> Int -> IO ()
testCloseOneDirection Transport
transport Int
numPings)
    , ([Char]
"CloseReopen",           Transport -> Int -> IO ()
testCloseReopen Transport
transport Int
numPings)
    , ([Char]
"ParallelConnects",      Transport -> Int -> IO ()
testParallelConnects Transport
transport Int
numPings)
    , ([Char]
"SelfSend",              Transport -> IO ()
testSelfSend Transport
transport)
    , ([Char]
"SendAfterClose",        Transport -> Int -> IO ()
testSendAfterClose Transport
transport Int
100)
    , ([Char]
"Crossing",              Transport -> Int -> IO ()
testCrossing Transport
transport Int
10)
    , ([Char]
"CloseTwice",            Transport -> Int -> IO ()
testCloseTwice Transport
transport Int
100)
    , ([Char]
"ConnectToSelf",         Transport -> Int -> IO ()
testConnectToSelf Transport
transport Int
numPings)
    , ([Char]
"ConnectToSelfTwice",    Transport -> Int -> IO ()
testConnectToSelfTwice Transport
transport Int
numPings)
    , ([Char]
"CloseSelf",             IO (Either [Char] Transport) -> IO ()
testCloseSelf IO (Either [Char] Transport)
newTransport)
    , ([Char]
"CloseEndPoint",         Transport -> Int -> IO ()
testCloseEndPoint Transport
transport Int
numPings)
    , ([Char]
"CloseTransport",        IO (Either [Char] Transport) -> IO ()
testCloseTransport IO (Either [Char] Transport)
newTransport)
    , ([Char]
"ConnectClosedEndPoint", Transport -> IO ()
testConnectClosedEndPoint Transport
transport)
    , ([Char]
"ExceptionOnReceive",    IO (Either [Char] Transport) -> IO ()
testExceptionOnReceive IO (Either [Char] Transport)
newTransport)
    , ([Char]
"SendException",         IO (Either [Char] Transport) -> IO ()
testSendException IO (Either [Char] Transport)
newTransport)
    , ([Char]
"Kill",                  IO (Either [Char] Transport) -> Int -> IO ()
testKill IO (Either [Char] Transport)
newTransport Int
1000)
    ]
  where
    numPings :: Int
numPings = Int
10000 :: Int


-- Test that list is a union of stream message, with preserved ordering
-- within each stream.
-- Note: this function may not work if different streams contains equal
-- messages.
testStreams :: Eq a => [a] -> [[a]] -> Bool
testStreams :: forall a. Eq a => [a] -> [[a]] -> Bool
testStreams []      [[a]]
ys = ([a] -> Bool) -> [[a]] -> Bool
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all [a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [[a]]
ys
testStreams (a
x:[a]
xs)  [[a]]
ys =
    case [[a]] -> [[a]] -> [[a]]
go [] [[a]]
ys of
      []  -> Bool
False
      [[a]]
ys' -> [a] -> [[a]] -> Bool
forall a. Eq a => [a] -> [[a]] -> Bool
testStreams [a]
xs [[a]]
ys'
  where
    go :: [[a]] -> [[a]] -> [[a]]
go [[a]]
_ [] = []
    go [[a]]
c ([]:[[a]]
zss) = [[a]] -> [[a]] -> [[a]]
go [[a]]
c [[a]]
zss
    go [[a]]
c (z' :: [a]
z'@(a
z:[a]
zs):[[a]]
zss)
        |  a
x a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
z    = ([a]
zs[a] -> [[a]] -> [[a]]
forall a. a -> [a] -> [a]
:[[a]]
c)[[a]] -> [[a]] -> [[a]]
forall a. [a] -> [a] -> [a]
++[[a]]
zss
        |  Bool
otherwise = [[a]] -> [[a]] -> [[a]]
go ([a]
z'[a] -> [[a]] -> [[a]]
forall a. a -> [a] -> [a]
:[[a]]
c) [[a]]
zss