module Network.Kademlia.Implementation
( lookup
, store
, joinNetwork
, JoinResult(..)
, Network.Kademlia.Implementation.lookupNode
) where
import Network.Kademlia.Networking
import Network.Kademlia.Instance
import qualified Network.Kademlia.Tree as T
import Network.Kademlia.Types
import Network.Kademlia.ReplyQueue
import Prelude hiding (lookup)
import Control.Monad (forM_, unless, when)
import Control.Monad.Trans.State hiding (state)
import Control.Concurrent.Chan
import Control.Concurrent.STM
import Control.Monad.IO.Class (liftIO)
import Data.List (delete, find, (\\))
import Data.Maybe (isJust, fromJust)
lookup :: (Serialize i, Serialize a, Eq i, Ord i) => KademliaInstance i a -> i
-> IO (Maybe (a, Node i))
lookup inst id = runLookup go inst id
where go = startLookup sendS cancel checkSignal
cancel = return Nothing
checkSignal (Signal origin (RETURN_VALUE _ value)) = do
modify $ \s -> s { known = [origin] }
finish
known <- gets known
polled <- gets polled
let rest = polled \\ known
unless (null rest) $ do
let cachePeer = peer . head . sortByDistanceTo rest $ id
liftIO . send (handle inst) cachePeer . STORE id $ value
return . Just $ (value, origin)
checkSignal (Signal _ (RETURN_NODES _ nodes)) =
continueLookup nodes sendS continue cancel
continue = waitForReply cancel checkSignal
sendS = sendSignal (FIND_VALUE id)
finish = do
pending <- gets pending
unless (null pending) $ waitForReply (return ()) finishCheck
finishCheck (Signal origin (RETURN_VALUE _ _)) = do
known <- gets known
modify $ \s -> s { known = origin:known }
finish
finishCheck _ = finish
store :: (Serialize i, Serialize a, Eq i, Ord i) =>
KademliaInstance i a -> i -> a -> IO ()
store inst key val = runLookup go inst key
where go = startLookup sendS end checkSignal
checkSignal (Signal _ (RETURN_NODES _ nodes)) =
continueLookup nodes sendS continue end
continue = waitForReply end checkSignal
sendS = sendSignal (FIND_NODE key)
end = do
polled <- gets polled
unless (null polled) $ do
let h = handle inst
peerNum = if length polled > 7 then 7 else length polled
storePeers =
map peer . take peerNum . sortByDistanceTo polled $ key
forM_ storePeers $
\storePeer -> liftIO . send h storePeer . STORE key $ val
data JoinResult = JoinSucces | NodeDown | IDClash deriving (Eq, Ord, Show)
joinNetwork :: (Serialize i, Serialize a, Eq i, Ord i) => KademliaInstance i a
-> Node i -> IO JoinResult
joinNetwork inst node = ownId >>= runLookup go inst
where go = do
sendS node
waitForReply nodeDown checkSignal
nodeDown = return NodeDown
ownId =
fmap T.extractId . atomically . readTVar . sTree . state $ inst
checkSignal (Signal _ (RETURN_NODES _ nodes)) = do
tId <- gets targetId
case find (\node -> nodeId node == tId) nodes of
Just _ -> return IDClash
_ -> continueLookup nodes sendS continue finish
continue = waitForReply finish checkSignal
sendS node = liftIO ownId >>= flip sendSignal node . FIND_NODE
finish = return JoinSucces
lookupNode :: (Serialize i, Serialize a, Eq i, Ord i) => KademliaInstance i a -> i
-> IO (Maybe (Node i))
lookupNode inst id = runLookup go inst id
where go = startLookup sendS end checkSignal
end = return Nothing
checkSignal (Signal _ (RETURN_NODES _ nodes)) =
case find (\(Node _ nId) -> nId == id) nodes of
Just node -> return . Just $ node
_ -> continueLookup nodes sendS continue end
continue = waitForReply end checkSignal
sendS = sendSignal (FIND_NODE id)
data LookupState i a = LookupState {
inst :: KademliaInstance i a
, targetId :: i
, replyChan :: Chan (Reply i a)
, known :: [Node i]
, pending :: [Node i]
, polled :: [Node i]
}
type LookupM i a = StateT (LookupState i a) IO
runLookup :: LookupM i a b -> KademliaInstance i a -> i ->IO b
runLookup lookup inst id = do
chan <- newChan
let state = LookupState inst id chan [] [] []
evalStateT lookup state
startLookup :: (Serialize i, Serialize a, Eq i, Ord i) => (Node i -> LookupM i a ())
-> LookupM i a b -> (Signal i a -> LookupM i a b) -> LookupM i a b
startLookup sendSignal cancel onSignal = do
inst <- gets inst
tree <- liftIO . atomically . readTVar . sTree . state $ inst
chan <- gets replyChan
id <- gets targetId
case T.findClosest tree id 3 of
[] -> cancel
closest -> do
forM_ closest sendSignal
modify $ \s -> s { known = closest }
waitForReply cancel onSignal
waitForReply :: (Serialize i, Serialize a, Ord i) => LookupM i a b
-> (Signal i a -> LookupM i a b) -> LookupM i a b
waitForReply cancel onSignal = do
chan <- gets replyChan
sPending <- gets pending
known <- gets known
inst <- gets inst
polled <- gets polled
result <- liftIO . readChan $ chan
case result of
Answer sig@(Signal node _) -> do
liftIO . insertNode inst $ node
modify $ \s -> s { pending = delete node sPending }
onSignal sig
Timeout registration -> do
let id = replyOrigin registration
let node = fromJust . find (\n -> nodeId n == id) $ polled
modify $ \s -> s {
pending = delete node sPending
, known = delete node known
, polled = delete node polled
}
updatedPending <- gets pending
if not . null $ updatedPending
then waitForReply cancel onSignal
else cancel
Closed -> cancel
continueLookup :: (Serialize i, Serialize a, Eq i) => [Node i]
-> (Node i -> LookupM i a ()) -> LookupM i a b -> LookupM i a b
-> LookupM i a b
continueLookup nodes sendSignal continue end = do
known <- gets known
id <- gets targetId
pending <- gets pending
polled <- gets polled
let newKnown = take 7 . filter (`notElem` polled) $ nodes ++ known
closestPolled <- closestPolled newKnown
if (not . null $ newKnown) && not closestPolled
then do
let next = head . sortByDistanceTo newKnown $ id
sendSignal next
modify $ \s -> s { known = newKnown }
continue
else if not . null $ pending
then continue
else end
where closestPolled known = do
polled <- gets polled
closest <- closest known
return . all (`elem` polled) $ closest
closest known = do
id <- gets targetId
polled <- gets polled
return . take 7 . sortByDistanceTo (known ++ polled) $ id
sendSignal :: (Serialize i, Serialize a, Eq i) => Command i a
-> Node i -> LookupM i a ()
sendSignal cmd node = do
h <- fmap handle . gets $ inst
chan <- gets replyChan
polled <- gets polled
pending <- gets pending
liftIO . send h (peer node) $ cmd
liftIO . expect h regs $ chan
modify $ \s -> s {
polled = node:polled
, pending = node:pending
}
where regs = case cmd of
(FIND_NODE id) -> RR [R_RETURN_NODES id] (nodeId node)
(FIND_VALUE id) ->
RR [R_RETURN_NODES id, R_RETURN_VALUE id] (nodeId node)