module Network.Metaverse.Circuit (
Circuit,
circuitConnect,
circuitAgentID,
circuitSessionID,
circuitCode,
circuitSend,
circuitSendSync,
circuitIncoming,
circuitClose,
circuitIsClosed
)
where
import Prelude hiding (catch)
import Control.Arrow (first)
import Control.Concurrent
import Control.Event.Relative
import Control.Exception
import Control.Monad
import Control.Monad.Trans
import Data.Binary
import Data.Binary.Put
import Data.Binary.Get
import Data.Binary.IEEE754
import Data.Bits
import Data.Time.Clock
import Data.UUID hiding (null)
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import Network.Metaverse.Login
import Network.Metaverse.PacketTypes
import qualified Data.Map as M
import Data.Map (Map)
import Control.Monad.State hiding (get, put)
import qualified Control.Monad.State as S
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as L
data TaskQueue k = TaskQueue {
taskVar :: MVar (M.Map k EventId)
}
newTaskQueue :: Ord k => IO (TaskQueue k)
newTaskQueue = fmap TaskQueue (newMVar M.empty)
schedule :: Ord k => TaskQueue k -> k -> Int -> IO () -> IO ()
schedule (TaskQueue v) k t a = do
id <- addEvent t $ modifyMVar_ v (return . M.delete k) >> a
modifyMVar_ v (return . M.insert k id)
cancel :: Ord k => TaskQueue k -> k -> IO Bool
cancel (TaskQueue v) k = do
m <- takeMVar v
case M.lookup k m of Nothing -> putMVar v m >> return False
Just id -> putMVar v (M.delete k m) >> delEvent id >> return True
closeQueue :: Ord k => TaskQueue k -> IO ()
closeQueue (TaskQueue v) = do
tasks <- takeMVar v
mapM_ delEvent $ M.elems tasks
putMVar v M.empty
type SequenceNum = Word32
data Packet = Packet {
packetZerocoded :: Bool,
packetReliable :: Bool,
packetRetransmit :: Bool,
packetSequence :: SequenceNum,
packetExtra :: B.ByteString,
packetBody :: PacketBody,
packetAcks :: [SequenceNum]
}
deriving Show
serialize :: Packet -> B.ByteString
serialize (Packet zcode reliable retrans seq extra body acks) =
let putter = do
let mask i b = if b then bit i else 0
let nacks = length acks
let flags = mask 4 (nacks > 0)
.|. mask 5 retrans
.|. mask 6 reliable
.|. mask 7 zcode
putWord8 flags
putWord32be seq
putWord8 (fromIntegral (B.length extra))
putByteString extra
if zcode
then putLazyByteString (zeroencode (encode body))
else put body
mapM_ putWord32be acks
when (nacks > 0) (putWord8 (fromIntegral nacks))
in B.concat $ L.toChunks $ runPut putter
deserialize :: B.ByteString -> Packet
deserialize fullMsg =
let
flags = B.head fullMsg
hasAcks = testBit flags 4
retrans = testBit flags 5
reliable = testBit flags 6
zcode = testBit flags 7
(withoutAcks, acks) = if hasAcks
then let msg1 = B.init fullMsg
nacks = B.last fullMsg
(result, appended) = B.splitAt (B.length msg1 4 * fromIntegral nacks) msg1
ackGetter = replicateM (fromIntegral nacks) getWord32be
acks = runGet ackGetter (L.fromChunks [ appended ])
in (result, acks)
else (fullMsg, [])
headerGetter = do _ <- getWord8
seq <- getWord32be
extralen <- getWord8
extra <- getBytes (fromIntegral extralen)
body <- getRemainingLazyByteString
return (seq, extra, body)
(seq, extra, encodedBody) = runGet headerGetter (L.fromChunks [ withoutAcks ])
decodedBody = if zcode then zerodecode encodedBody else encodedBody
body = decode (decodedBody)
in Packet zcode reliable retrans seq extra body acks
zerodecode :: L.ByteString -> L.ByteString
zerodecode r | L.length r <= 1 = r
| x == 0 = let Just (n, r') = L.uncons xs
in L.append (L.replicate (fromIntegral n) 0) (zerodecode r')
| otherwise = L.cons x (zerodecode xs)
where Just (x, xs) = L.uncons r
zeroencode :: L.ByteString -> L.ByteString
zeroencode r | L.null r = r
| L.null pfx = L.cons x (zeroencode xs)
| otherwise = L.append (zeros (L.length pfx)) (zeroencode rest)
where (pfx, rest) = L.span (== 0) r
Just (x, xs) = L.uncons r
zeros n | n > 255 = L.append (L.pack [0, 255]) (zeros (n 255))
| n > 0 = L.pack [0, fromIntegral n]
| otherwise = L.empty
data Circuit = Circuit {
circuitAgentID :: !UUID,
circuitSessionID :: !UUID,
circuitCode :: !Word32,
circuitSocket :: Socket,
circuitAddr :: SockAddr,
circuitThreads :: MVar [ThreadId],
circuitIncoming :: Chan (Maybe PacketBody),
circuitAccounting :: MVar Accounting
}
data Accounting = Accounting {
acctClosed :: Bool,
acctSequence :: SequenceNum,
acctRecentPackets :: [SequenceNum],
acctPendingAcks :: [(UTCTime, SequenceNum)],
acctReliableQueue :: TaskQueue SequenceNum,
acctConfirmations :: Map SequenceNum (MVar Bool),
acctLastTime :: UTCTime
}
runWithMVar :: MVar a -> StateT a IO b -> IO b
runWithMVar v m = modifyMVar v (fmap (fmap swap) (runStateT m))
where swap (a,b) = (b,a)
nextSequence :: StateT Accounting IO SequenceNum
nextSequence = do
seq <- fmap acctSequence S.get
modify $ \s -> s { acctSequence = seq + 1 }
return seq
sendRaw :: Socket -> SockAddr -> Packet -> IO ()
sendRaw sock addr packet = sendAllTo sock (serialize packet) addr
getAcks :: Int -> StateT Accounting IO [SequenceNum]
getAcks size = do
let nacks = size `div` 4
pending <- fmap acctPendingAcks S.get
let (sending, leftovers) = splitAt (min 255 nacks) pending
modify (\s -> s { acctPendingAcks = leftovers })
return (map snd sending)
sendWithAcks :: Socket -> SockAddr -> Packet -> StateT Accounting IO ()
sendWithAcks sock addr packet = do
acks <- getAcks $ 10000 7 packetLength (packetBody packet)
liftIO $ sendRaw sock addr packet { packetAcks = acks }
`catch` \(e :: SomeException) -> return ()
data Reliability = Unreliable
| Reliable (Maybe (MVar Bool))
isReliable :: Reliability -> Bool
isReliable Unreliable = False
isReliable (Reliable _) = True
ifNotClosed :: Monad m => StateT Accounting m () -> StateT Accounting m ()
ifNotClosed action = do
acct <- S.get
let r = acctClosed acct
when (not r) action
circuitSendImpl :: Circuit
-> Reliability
-> PacketBody
-> StateT Accounting IO ()
circuitSendImpl circ rel body = ifNotClosed $ do
let sock = circuitSocket circ
let addr = circuitAddr circ
seq <- nextSequence
let packet = Packet
(shouldZerocode body) (isReliable rel) False seq B.empty body []
sendWithAcks sock addr packet
reliableAccounting rel circ packet
reliableAccounting :: Reliability
-> Circuit
-> Packet
-> StateT Accounting IO ()
reliableAccounting Unreliable _ _ = return ()
reliableAccounting (Reliable mv) circ packet = do
flip (maybe $ return ()) mv $ \ v -> do
con <- fmap acctConfirmations S.get
modify $ \s -> s { acctConfirmations = M.insert seq v con }
queue <- fmap acctReliableQueue S.get
liftIO $ schedule queue seq retryTime (retry retryCount)
where
retryTime = 1500000
retryCount = 3
sock = circuitSocket circ
addr = circuitAddr circ
seq = packetSequence packet
retried = packet { packetRetransmit = True }
retry 0 = flip (maybe $ return ()) mv $ \ v -> do
putMVar v False
runWithMVar (circuitAccounting circ) $ do
con <- fmap acctConfirmations S.get
modify $ \s -> s { acctConfirmations = M.delete seq con }
retry n = runWithMVar (circuitAccounting circ) $ do
sendWithAcks sock addr retried
queue <- fmap acctReliableQueue S.get
liftIO $ schedule queue seq retryTime (retry (n1))
circuitSend :: Circuit
-> Bool
-> PacketBody
-> IO ()
circuitSend circ reliable msg = runWithMVar (circuitAccounting circ) $ do
circuitSendImpl circ
(if reliable then Reliable Nothing else Unreliable) msg
circuitSendSync :: Circuit
-> PacketBody
-> IO Bool
circuitSendSync circ msg = do
v <- newEmptyMVar
runWithMVar (circuitAccounting circ) $
circuitSendImpl circ (Reliable (Just v)) msg
takeMVar v
ackSender :: Circuit -> IO ()
ackSender circ = do
cont <- runWithMVar (circuitAccounting circ) $ do
acks <- fmap acctPendingAcks S.get
t <- liftIO $ getCurrentTime
let ackThreshold = 0.75
when (not (null acks)
&& t `diffUTCTime` fst (head acks) > ackThreshold) $ do
acks <- getAcks (10000 7)
circuitSendImpl circ Unreliable (PacketAck (map PacketAck_Packets acks))
fmap (not . acctClosed) S.get
when cont $ do
threadDelay 500000
ackSender circ
confirmPacket :: SequenceNum -> StateT Accounting IO ()
confirmPacket seq = do
q <- fmap acctReliableQueue S.get
m <- fmap acctConfirmations S.get
liftIO $ cancel q seq
case M.lookup seq m of
Nothing -> return ()
Just mv -> do
liftIO $ putMVar mv True
modify $ \s -> s { acctConfirmations = M.delete seq m }
recvRaw :: Socket -> IO (Maybe (Packet, SockAddr))
recvRaw sock = fmap (Just . first deserialize) (recvFrom sock 10000)
`catch` \(e :: SomeException) -> return Nothing
packetReceiver :: Circuit -> IO ()
packetReceiver circ = do
let sock = circuitSocket circ
let addr = circuitAddr circ
res <- recvRaw sock
case res of
Just (packet, addr') -> when (addr == addr') $ do
cont <- runWithMVar (circuitAccounting circ) $ do
t <- liftIO getCurrentTime
modify $ \s -> s { acctLastTime = t }
mapM_ confirmPacket (packetAcks packet)
when (packetReliable packet) $ do
modify $ \s -> s {
acctPendingAcks = acctPendingAcks s ++ [ (t, packetSequence packet) ]
}
recent <- fmap acctRecentPackets S.get
when (packetReliable packet) $ modify $ \s ->
s { acctRecentPackets = take 100 (packetSequence packet : acctRecentPackets s) }
case packetBody packet of
PacketAck acks -> do
mapM_ confirmPacket (map packetAck_Packets_ID acks)
_ -> do
when (not (packetRetransmit packet)
|| not (packetSequence packet `elem` recent)) $ do
liftIO $ writeChan (circuitIncoming circ) (Just (packetBody packet))
fmap (not . acctClosed) S.get
when cont $ packetReceiver circ
Nothing -> circuitClose circ
pingSender :: Circuit -> Word8 -> IO ()
pingSender circ n = do
threadDelay 5000000
t0 <- fmap acctLastTime $ readMVar $ circuitAccounting circ
t1 <- getCurrentTime
cont <- if (t1 `diffUTCTime` t0 > 60)
then circuitClose circ >> return False
else runWithMVar (circuitAccounting circ) $ do
circuitSendImpl circ Unreliable $
StartPingCheck (StartPingCheck_PingID n 0)
return True
when cont $ pingSender circ (n+1)
pingResponder :: Circuit -> Chan (Maybe PacketBody) -> IO ()
pingResponder circ source = do
packet <- readChan source
cont <- case packet of
Just (StartPingCheck (StartPingCheck_PingID x y)) -> do
circuitSend circ False $ CompletePingCheck
(CompletePingCheck_PingID x)
return True
Just _ -> return True
Nothing -> return False
when cont $ pingResponder circ source
circuitClose :: Circuit
-> IO ()
circuitClose circ = do
writeChan (circuitIncoming circ) Nothing
closeQueue . acctReliableQueue =<< readMVar (circuitAccounting circ)
threads <- swapMVar (circuitThreads circ) []
mapM_ killThread threads
sClose (circuitSocket circ)
circuitIsClosed :: Circuit -> IO Bool
circuitIsClosed circ = fmap acctClosed $ readMVar (circuitAccounting circ)
circuitConnect :: MVToken
-> IO Circuit
circuitConnect token = do
sock <- socket AF_INET Datagram defaultProtocol
host <- inet_addr (tokenSimIP token)
let port = fromIntegral (tokenSimPort token)
acct <- newEmptyMVar
threads <- newEmptyMVar
inc <- newChan
let circ = Circuit {
circuitAgentID = tokenAgentID token,
circuitSessionID = tokenSessionID token,
circuitCode = tokenCircuitCode token,
circuitSocket = sock,
circuitAddr = SockAddrInet port host,
circuitThreads = threads,
circuitIncoming = inc,
circuitAccounting = acct
}
queue <- newTaskQueue
pingSource <- dupChan inc
putMVar threads =<< mapM forkIO [
ackSender circ,
packetReceiver circ,
pingSender circ 0,
pingResponder circ pingSource
]
time <- getCurrentTime
putMVar acct $ Accounting {
acctClosed = False,
acctSequence = 1,
acctRecentPackets = [],
acctPendingAcks = [],
acctReliableQueue = queue,
acctConfirmations = M.empty,
acctLastTime = time
}
circuitSendSync circ $ UseCircuitCode $ UseCircuitCode_CircuitCode
(circuitCode circ) (circuitSessionID circ) (circuitAgentID circ)
return circ