{-# LANGUAGE OverloadedStrings, TypeFamilies #-}
module NetSpider.Spider
(
Spider,
connectWS,
connectWith,
close,
withSpider,
addFoundNode,
getSnapshotSimple,
getSnapshot,
clearAll
) where
import Control.Exception.Safe (throwString, bracket)
import Control.Monad (void, mapM_, mapM)
import Data.Aeson (ToJSON)
import Data.Foldable (foldr', toList)
import Data.List (intercalate)
import Data.Greskell
( runBinder, ($.), (<$.>), (<*.>),
Binder, ToGreskell(GreskellReturn), AsIterator(IteratorItem), FromGraphSON,
liftWalk, gLimit, gIdentity, gSelectN, gAs,
lookupAsM, newAsLabel
)
import Data.Hashable (Hashable)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HM
import Data.HashSet (HashSet)
import qualified Data.HashSet as HS
import Data.IORef (IORef, modifyIORef, newIORef, readIORef, atomicModifyIORef')
import Data.Maybe (catMaybes, mapMaybe, listToMaybe)
import Data.Monoid (mempty, (<>))
import Data.List (sortOn)
import Data.Text (Text, pack)
import Data.Vector (Vector)
import qualified Data.Vector as V
import Network.Greskell.WebSocket (Host, Port)
import qualified Network.Greskell.WebSocket as Gr
import NetSpider.Graph (EID, LinkAttributes, NodeAttributes)
import NetSpider.Graph.Internal (VFoundNode(..), EFinds(..))
import NetSpider.Found (FoundNode(..), FoundLink(..), LinkState(..))
import NetSpider.Log (runWriterLoggingM, WriterLoggingM, logDebugW, LogLine, spack)
import NetSpider.Pair (Pair)
import NetSpider.Queue (Queue, newQueue, popQueue, pushQueue)
import NetSpider.Query
( Query, defQuery, startsFrom, unifyLinkSamples, timeInterval,
foundNodePolicy,
Interval
)
import NetSpider.Query.Internal (FoundNodePolicy(..))
import NetSpider.Snapshot.Internal (SnapshotGraph, SnapshotNode(..), SnapshotLink(..))
import NetSpider.Spider.Config (Config(..), defConfig)
import NetSpider.Spider.Internal.Graph
( gMakeFoundNode, gAllNodes, gHasNodeID, gHasNodeEID, gNodeEID, gNodeID, gMakeNode, gClearAll,
gLatestFoundNode, gSelectFoundNode, gFinds, gFindsTarget, gHasFoundNodeEID, gAllFoundNode,
gFilterFoundNodeByTime
)
import NetSpider.Spider.Internal.Log
( runLogger, logDebug, logWarn, logLine
)
import NetSpider.Spider.Internal.Spider (Spider(..))
import NetSpider.Timestamp (Timestamp, showEpochTime)
import NetSpider.Unify (LinkSampleUnifier, LinkSample(..), LinkSampleID, linkSampleId)
connectWS :: Eq n => Host -> Port -> IO (Spider n na la)
connectWS host port = connectWith $ defConfig { wsHost = host,
wsPort = port
}
connectWith :: Config n na fla -> IO (Spider n na fla)
connectWith conf = do
client <- Gr.connect (wsHost conf) (wsPort conf)
return $ Spider { spiderConfig = conf,
spiderClient = client
}
close :: Spider n na fla -> IO ()
close sp = Gr.close $ spiderClient sp
withSpider :: Config n na fla -> (Spider n na fla -> IO a) -> IO a
withSpider conf = bracket (connectWith conf) close
submitB :: (ToGreskell g, r ~ GreskellReturn g, AsIterator r, v ~ IteratorItem r, FromGraphSON v)
=> Spider n na fla -> Binder g -> IO (Gr.ResultHandle v)
submitB sp b = Gr.submit (spiderClient sp) script mbs
where
(script, bs) = runBinder b
mbs = Just bs
clearAll :: Spider n na fla -> IO ()
clearAll spider = Gr.drainResults =<< submitB spider (return gClearAll)
addFoundNode :: (ToJSON n, LinkAttributes fla, NodeAttributes na) => Spider n na fla -> FoundNode n na fla -> IO ()
addFoundNode spider found_node = do
subject_vid <- getOrMakeNode spider $ subjectNode found_node
link_pairs <- traverse linkAndTargetVID $ neighborLinks found_node
makeFoundNodeVertex subject_vid link_pairs
where
linkAndTargetVID link = do
target_vid <- getOrMakeNode spider $ targetNode link
return (link, target_vid)
makeFoundNodeVertex subject_vid link_pairs =
Gr.drainResults =<< submitB spider (fmap void $ gMakeFoundNode subject_vid link_pairs found_node)
vToMaybe :: Vector a -> Maybe a
vToMaybe v = v V.!? 0
getNode :: (ToJSON n) => Spider n na fla -> n -> IO (Maybe EID)
getNode spider nid = fmap vToMaybe $ Gr.slurpResults =<< submitB spider gt
where
gt = gNodeEID <$.> gHasNodeID spider nid <*.> pure gAllNodes
getOrMakeNode :: (ToJSON n) => Spider n na fla -> n -> IO EID
getOrMakeNode spider nid = do
mvid <- getNode spider nid
case mvid of
Just vid -> return vid
Nothing -> makeNode
where
makeNode = expectOne =<< Gr.slurpResults =<< submitB spider (liftWalk gNodeEID <$.> gMakeNode spider nid)
expectOne v = case vToMaybe v of
Just e -> return e
Nothing -> throwString "Expects at least single result, but got nothing."
getSnapshotSimple :: (FromGraphSON n, ToJSON n, Ord n, Hashable n, Show n, LinkAttributes fla, NodeAttributes na)
=> Spider n na fla
-> n
-> IO (SnapshotGraph n na fla)
getSnapshotSimple spider start_nid = getSnapshot spider $ defQuery [start_nid]
getSnapshot :: (FromGraphSON n, ToJSON n, Ord n, Hashable n, Show n, LinkAttributes fla, NodeAttributes na)
=> Spider n na fla
-> Query n na fla sla
-> IO (SnapshotGraph n na sla)
getSnapshot spider query = do
ref_state <- newIORef $ initSnapshotState $ startsFrom query
recurseVisitNodesForSnapshot spider query ref_state
(nodes, links, logs) <- fmap (makeSnapshot $ unifyLinkSamples query) $ readIORef ref_state
mapM_ (logLine spider) logs
return (nodes, links)
recurseVisitNodesForSnapshot :: (ToJSON n, Ord n, Hashable n, FromGraphSON n, Show n, LinkAttributes fla, NodeAttributes na)
=> Spider n na fla
-> Query n na fla sla
-> IORef (SnapshotState n na fla)
-> IO ()
recurseVisitNodesForSnapshot spider query ref_state = go
where
go = do
mnext_visit <- getNextVisit
case mnext_visit of
Nothing -> return ()
Just next_visit -> do
visitNodeForSnapshot spider query ref_state next_visit
go
getNextVisit = atomicModifyIORef' ref_state popUnvisitedNode
traverseEFindsOneHop :: (FromGraphSON n, NodeAttributes na, LinkAttributes fla)
=> Spider n na fla
-> Interval Timestamp
-> EID
-> IO (Maybe (VFoundNode na), [(VFoundNode na, EFinds fla, n)])
traverseEFindsOneHop spider time_interval visit_eid = (,) <$> getLatestVFoundNode <*> getTraversedEdges
where
foundNodeTraversal = fmap gSelectFoundNode (gFilterFoundNodeByTime time_interval)
<*.> gHasNodeEID visit_eid
<*.> pure gAllNodes
getLatestVFoundNode = fmap vToMaybe $ Gr.slurpResults =<< submitQuery
where
submitQuery = submitB spider binder
binder = gLatestFoundNode <$.> foundNodeTraversal
getTraversedEdges = fmap V.toList $ traverse extractFromSMap =<< Gr.slurpResults =<< submitQuery
where
submitQuery = Gr.submit (spiderClient spider) query (Just bindings)
((query, label_vfn, label_ef, label_target_nid), bindings) = runBinder $ do
lvfn <- newAsLabel
lef <- newAsLabel
ln <- newAsLabel
gt <- gSelectN lvfn lef [ln]
<$.> gAs ln <$.> gNodeID spider <$.> gFindsTarget
<$.> gAs lef <$.> gFinds <$.> gAs lvfn <$.> foundNodeTraversal
return (gt, lvfn, lef, ln)
extractFromSMap smap =
(,,)
<$> lookupAsM label_vfn smap
<*> lookupAsM label_ef smap
<*> lookupAsM label_target_nid smap
makeLinkSample :: n -> VFoundNode na -> EFinds la -> n -> LinkSample n la
makeLinkSample subject_nid vfn efinds target_nid =
LinkSample { lsSubjectNode = subject_nid,
lsTargetNode = target_nid,
lsLinkState = efLinkState efinds,
lsTimestamp = vfnTimestamp vfn,
lsLinkAttributes = efLinkAttributes efinds
}
visitNodeForSnapshot :: (ToJSON n, Ord n, Hashable n, FromGraphSON n, Show n, LinkAttributes fla, NodeAttributes na)
=> Spider n na fla
-> Query n na fla sla
-> IORef (SnapshotState n na fla)
-> n
-> IO ()
visitNodeForSnapshot spider query ref_state visit_nid = do
logDebug spider ("Visiting node " <> spack visit_nid <> " ...")
cur_state <- readIORef ref_state
if isAlreadyVisited cur_state visit_nid
then logAndQuit
else doVisit
where
logAndQuit = do
logDebug spider ("Node " <> spack visit_nid <> " is already visited. Skip.")
return ()
doVisit = do
mvisit_eid <- getVisitedNodeEID
case mvisit_eid of
Nothing -> do
logWarn spider ("Node " <> spack visit_nid <> " does not exist.")
return ()
Just visit_eid -> do
(mlatest_vfn, hops) <- traverseEFindsOneHop spider (timeInterval query) visit_eid
markAsVisited $ mlatest_vfn
logLatestVFN mlatest_vfn
let next_hops = filterHops mlatest_vfn $ hops
mapM_ logHop next_hops
modifyIORef ref_state $ addLinkSamples
$ map (uncurry3 $ makeLinkSample visit_nid) $ next_hops
getVisitedNodeEID = fmap vToMaybe $ Gr.slurpResults =<< submitB spider binder
where
binder = gNodeEID <$.> gHasNodeID spider visit_nid <*.> pure gAllNodes
markAsVisited mvfn = modifyIORef ref_state $ addVisitedNode visit_nid mvfn
logLatestVFN Nothing = logDebug spider ("No local finding is found for node " <> spack visit_nid)
logLatestVFN (Just vfn) = logDebug spider ( "Node " <> spack visit_nid
<> ": latest local finding is at "
<> (showEpochTime $ vfnTimestamp vfn)
)
uncurry3 f (a,b,c) = f a b c
filterHops mvfn =
case foundNodePolicy query of
PolicyOverwrite -> filter (hopHasVFN mvfn)
PolicyAppend -> id
hopHasVFN Nothing _ = True
hopHasVFN (Just vfn1) (vfn2, _, _) = vfnId vfn1 == vfnId vfn2
logHop (vfn, _, next_nid) = logDebug spider ( "Link " <> spack visit_nid
<> " -> " <> spack next_nid
<> " at " <> (showEpochTime $ vfnTimestamp vfn)
)
data SnapshotState n na fla =
SnapshotState
{ ssUnvisitedNodes :: Queue n,
ssVisitedNodes :: HashMap n (Maybe (VFoundNode na)),
ssVisitedLinks :: HashMap (LinkSampleID n) [LinkSample n fla]
}
deriving (Show)
emptySnapshotState :: (Ord n, Hashable n) => SnapshotState n na fla
emptySnapshotState = SnapshotState
{ ssUnvisitedNodes = mempty,
ssVisitedNodes = mempty,
ssVisitedLinks = mempty
}
initSnapshotState :: (Ord n, Hashable n) => [n] -> SnapshotState n na fla
initSnapshotState init_unvisited_nodes = emptySnapshotState { ssUnvisitedNodes = newQueue init_unvisited_nodes }
addVisitedNode :: (Eq n, Hashable n) => n -> Maybe (VFoundNode na) -> SnapshotState n na fla -> SnapshotState n na fla
addVisitedNode nid mv state = state { ssVisitedNodes = HM.insert nid mv $ ssVisitedNodes state }
isAlreadyVisited :: (Eq n, Hashable n) => SnapshotState n na fla -> n -> Bool
isAlreadyVisited state nid = HM.member nid $ ssVisitedNodes state
addLinkSample :: (Ord n, Hashable n)
=> LinkSample n fla -> SnapshotState n na fla -> SnapshotState n na fla
addLinkSample ls state = state { ssVisitedLinks = updatedLinks,
ssUnvisitedNodes = updatedUnvisited
}
where
link_id = linkSampleId ls
updatedLinks = HM.insertWith (++) link_id (return ls) $ ssVisitedLinks state
target_nid = lsTargetNode ls
target_already_visited = isAlreadyVisited state target_nid
updatedUnvisited = if target_already_visited
then ssUnvisitedNodes state
else pushQueue target_nid $ ssUnvisitedNodes state
addLinkSamples :: (Ord n, Hashable n)
=> [LinkSample n fla] -> SnapshotState n na fla -> SnapshotState n na fla
addLinkSamples links orig_state = foldr' addLinkSample orig_state links
popUnvisitedNode :: SnapshotState n na fla -> (SnapshotState n na fla, Maybe n)
popUnvisitedNode state = (updated, popped)
where
updated = state { ssUnvisitedNodes = updatedUnvisited }
(popped, updatedUnvisited) = popQueue $ ssUnvisitedNodes state
makeSnapshot :: (Eq n, Hashable n, Show n)
=> LinkSampleUnifier n na fla sla
-> SnapshotState n na fla
-> ([SnapshotNode n na], [SnapshotLink n sla], [LogLine])
makeSnapshot unifier state = (nodes, links, logs)
where
nodes = visited_nodes ++ boundary_nodes
visited_nodes = map (makeSnapshotNode state) $ HM.keys $ ssVisitedNodes state
boundary_nodes = map (makeSnapshotNode state) $ toList $ ssUnvisitedNodes state
(links, logs) = runWriterLoggingM $ fmap mconcat
$ mapM (makeSnapshotLinks unifier state) $ HM.elems $ ssVisitedLinks state
makeSnapshotNode :: (Eq n, Hashable n) => SnapshotState n na fla -> n -> SnapshotNode n na
makeSnapshotNode state nid =
SnapshotNode { _nodeId = nid,
_isOnBoundary = on_boundary,
_nodeTimestamp = fmap vfnTimestamp $ mvfn,
_nodeAttributes = fmap vfnAttributes $ mvfn
}
where
(on_boundary, mvfn) =
case HM.lookup nid $ ssVisitedNodes state of
Nothing -> (True, Nothing)
Just mv -> (False, mv)
makeSnapshotLinks :: (Eq n, Hashable n, Show n)
=> LinkSampleUnifier n na fla sla
-> SnapshotState n na fla
-> [LinkSample n fla]
-> WriterLoggingM [SnapshotLink n sla]
makeSnapshotLinks _ _ [] = return []
makeSnapshotLinks unifier state link_samples@(head_sample : _) = do
unified <- doUnify link_samples
logUnified unified
return $ mapMaybe makeSnapshotLink unified
where
makeEndNode getter = makeSnapshotNode state $ getter $ head_sample
doUnify = unifier (makeEndNode lsSubjectNode) (makeEndNode lsTargetNode)
logUnified unified = logDebugW ( "Unify link [" <> (spack $ lsSubjectNode head_sample) <> "]-["
<> (spack $ lsTargetNode head_sample) <> "]: "
<> "from " <> (spack $ length link_samples) <> " samples "
<> "to " <> (spack $ length unified) <> " samples"
)
makeSnapshotLink unified_sample = do
case lsLinkState unified_sample of
LinkUnused -> Nothing
LinkToTarget -> Just $ sampleToLink unified_sample True True
LinkToSubject -> Just $ sampleToLink unified_sample False True
LinkBidirectional -> Just $ sampleToLink unified_sample True False
sampleToLink sample to_target is_directed =
SnapshotLink { _sourceNode = (if to_target then lsSubjectNode else lsTargetNode) sample,
_destinationNode = (if to_target then lsTargetNode else lsSubjectNode) sample,
_isDirected = is_directed,
_linkTimestamp = lsTimestamp sample,
_linkAttributes = lsLinkAttributes sample
}