{-# LANGUAGE OverloadedStrings, TypeFamilies #-}
module NetSpider.Spider
(
Spider,
connectWS,
connectWith,
close,
withSpider,
addFoundNode,
getSnapshotSimple,
getSnapshot,
clearAll
) where
import Control.Category ((<<<))
import Control.Exception.Safe (throwString, bracket)
import Control.Monad (void, mapM_, mapM)
import Data.Aeson (ToJSON)
import Data.Foldable (foldr', toList, foldl')
import Data.List (intercalate)
import Data.Greskell
( runBinder, ($.), (<$.>), (<*.>),
Binder, ToGreskell(GreskellReturn), AsIterator(IteratorItem), FromGraphSON,
liftWalk, gLimit, gIdentity, gSelect1, gAs, gProject, gByL, gIdentity, gFold,
lookupAsM, newAsLabel,
Transform, Walk
)
import Data.Hashable (Hashable)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HM
import Data.HashSet (HashSet)
import qualified Data.HashSet as HS
import Data.IORef (IORef, modifyIORef, newIORef, readIORef, atomicModifyIORef')
import Data.Maybe (catMaybes, mapMaybe, listToMaybe)
import Data.Monoid (mempty, (<>))
import Data.Text (Text, pack)
import Data.Vector (Vector)
import qualified Data.Vector as V
import Network.Greskell.WebSocket (Host, Port)
import qualified Network.Greskell.WebSocket as Gr
import NetSpider.Graph (EID, LinkAttributes, NodeAttributes)
import NetSpider.Graph.Internal
( VFoundNode, EFinds, VNode,
VFoundNodeData(..), EFindsData(..),
gVFoundNodeData, gEFindsData,
makeFoundNode, makeFoundLink
)
import NetSpider.Found
(FoundNode(..), FoundLink(..), LinkState(..), allTargetNodes)
import qualified NetSpider.Found as Found
import NetSpider.Log (runWriterLoggingM, WriterLoggingM, logDebugW, LogLine, spack)
import NetSpider.Pair (Pair)
import NetSpider.Queue (Queue, newQueue, popQueue, pushQueue)
import NetSpider.Query
( Query, defQuery, startsFrom, unifyLinkSamples, timeInterval,
foundNodePolicy,
Interval
)
import NetSpider.Query.Internal (FoundNodePolicy(..))
import NetSpider.Snapshot.Internal (SnapshotGraph, SnapshotNode(..), SnapshotLink(..))
import NetSpider.Spider.Config (Config(..), defConfig)
import NetSpider.Spider.Internal.Graph
( gMakeFoundNode, gAllNodes, gHasNodeID, gHasNodeEID, gNodeEID, gNodeID, gMakeNode, gClearAll,
gLatestFoundNode, gSelectFoundNode, gFinds, gFindsTarget, gHasFoundNodeEID, gAllFoundNode,
gFilterFoundNodeByTime
)
import NetSpider.Spider.Internal.Log
( runLogger, logDebug, logWarn, logLine
)
import NetSpider.Spider.Internal.Spider (Spider(..))
import NetSpider.Timestamp (Timestamp, showEpochTime)
import NetSpider.Unify (LinkSampleUnifier, LinkSample(..), LinkSampleID, linkSampleId)
import NetSpider.Weaver (Weaver, newWeaver)
import qualified NetSpider.Weaver as Weaver
connectWS :: Eq n => Host -> Port -> IO (Spider n na la)
connectWS host port = connectWith $ defConfig { wsHost = host,
wsPort = port
}
connectWith :: Config n na fla -> IO (Spider n na fla)
connectWith conf = do
client <- Gr.connect (wsHost conf) (wsPort conf)
return $ Spider { spiderConfig = conf,
spiderClient = client
}
close :: Spider n na fla -> IO ()
close sp = Gr.close $ spiderClient sp
withSpider :: Config n na fla -> (Spider n na fla -> IO a) -> IO a
withSpider conf = bracket (connectWith conf) close
submitB :: (ToGreskell g, r ~ GreskellReturn g, AsIterator r, v ~ IteratorItem r, FromGraphSON v)
=> Spider n na fla -> Binder g -> IO (Gr.ResultHandle v)
submitB sp b = Gr.submit (spiderClient sp) script mbs
where
(script, bs) = runBinder b
mbs = Just bs
clearAll :: Spider n na fla -> IO ()
clearAll spider = Gr.drainResults =<< submitB spider (return gClearAll)
addFoundNode :: (ToJSON n, LinkAttributes fla, NodeAttributes na) => Spider n na fla -> FoundNode n na fla -> IO ()
addFoundNode spider found_node = do
subject_vid <- getOrMakeNode spider $ subjectNode found_node
link_pairs <- traverse linkAndTargetVID $ neighborLinks found_node
makeFoundNodeVertex subject_vid link_pairs
where
linkAndTargetVID link = do
target_vid <- getOrMakeNode spider $ targetNode link
return (link, target_vid)
makeFoundNodeVertex subject_vid link_pairs =
Gr.drainResults =<< submitB spider (fmap void $ gMakeFoundNode subject_vid link_pairs found_node)
vToMaybe :: Vector a -> Maybe a
vToMaybe v = v V.!? 0
getNode :: (ToJSON n) => Spider n na fla -> n -> IO (Maybe (EID VNode))
getNode spider nid = fmap vToMaybe $ Gr.slurpResults =<< submitB spider gt
where
gt = gNodeEID <$.> gHasNodeID spider nid <*.> pure gAllNodes
getOrMakeNode :: (ToJSON n) => Spider n na fla -> n -> IO (EID VNode)
getOrMakeNode spider nid = do
mvid <- getNode spider nid
case mvid of
Just vid -> return vid
Nothing -> makeNode
where
makeNode = expectOne =<< Gr.slurpResults =<< submitB spider (liftWalk gNodeEID <$.> gMakeNode spider nid)
expectOne v = case vToMaybe v of
Just e -> return e
Nothing -> throwString "Expects at least single result, but got nothing."
getSnapshotSimple :: (FromGraphSON n, ToJSON n, Ord n, Hashable n, Show n, LinkAttributes fla, NodeAttributes na)
=> Spider n na fla
-> n
-> IO (SnapshotGraph n na fla)
getSnapshotSimple spider start_nid = getSnapshot spider $ defQuery [start_nid]
getSnapshot :: (FromGraphSON n, ToJSON n, Ord n, Hashable n, Show n, LinkAttributes fla, NodeAttributes na)
=> Spider n na fla
-> Query n na fla sla
-> IO (SnapshotGraph n na sla)
getSnapshot spider query = do
ref_state <- newIORef $ initSnapshotState (startsFrom query) (foundNodePolicy query)
recurseVisitNodesForSnapshot spider query ref_state
(nodes, links, logs) <- fmap (makeSnapshot $ unifyLinkSamples query) $ readIORef ref_state
mapM_ (logLine spider) logs
return (nodes, links)
recurseVisitNodesForSnapshot :: (ToJSON n, Ord n, Hashable n, FromGraphSON n, Show n, LinkAttributes fla, NodeAttributes na)
=> Spider n na fla
-> Query n na fla sla
-> IORef (SnapshotState n na fla)
-> IO ()
recurseVisitNodesForSnapshot spider query ref_state = go
where
go = do
mnext_visit <- getNextVisit
case mnext_visit of
Nothing -> return ()
Just next_visit -> do
visitNodeForSnapshot spider query ref_state next_visit
go
getNextVisit = atomicModifyIORef' ref_state popUnvisitedNode
traverseEFindsOneHop :: (FromGraphSON n, NodeAttributes na, LinkAttributes fla)
=> Spider n na fla
-> Interval Timestamp
-> FoundNodePolicy n na
-> EID VNode
-> IO [(VFoundNodeData na, [(EFindsData fla, n)])]
traverseEFindsOneHop spider time_interval fn_policy visit_eid = getTraversedEdges
where
foundNodeTraversal = pure latestFoundNodeIfOverwrite
<*.> fmap gSelectFoundNode (gFilterFoundNodeByTime time_interval)
<*.> gHasNodeEID visit_eid
<*.> pure gAllNodes
latestFoundNodeIfOverwrite =
case fn_policy of
PolicyOverwrite -> gLatestFoundNode
PolicyAppend -> gIdentity
getTraversedEdges = fmap V.toList $ traverse extractFromSMap =<< Gr.slurpResults =<< submitQuery
where
submitQuery = Gr.submit (spiderClient spider) query (Just bindings)
((query, label_vfnd, label_efs, label_efd, label_target_nid), bindings) = runBinder $ do
lvfnd <- newAsLabel
lefs <- newAsLabel
lefd <- newAsLabel
ltarget <- newAsLabel
let gEFindsAndTarget =
gProject
( gByL lefd gEFindsData )
[ gByL ltarget (gNodeID spider <<< gFindsTarget)
]
<<< gFinds
gt <- gProject
( gByL lvfnd gVFoundNodeData )
[ gByL lefs (gFold <<< gEFindsAndTarget)
]
<$.> foundNodeTraversal
return (gt, lvfnd, lefs, lefd, ltarget)
extractFromSMap smap = do
vfnd <- lookupAsM label_vfnd smap
efs <- lookupAsM label_efs smap
parsed_efs <- mapM extractHopFromSMap efs
return (vfnd, parsed_efs)
extractHopFromSMap smap =
(,)
<$> lookupAsM label_efd smap
<*> lookupAsM label_target_nid smap
makeFoundNodesFromHops :: n
-> (VFoundNodeData na, [(EFindsData fla, n)])
-> FoundNode n na fla
makeFoundNodesFromHops subject_nid (vfnd, efs) =
makeFoundNode subject_nid vfnd $ map toFoundLink efs
where
toFoundLink (ef, target_nid) = makeFoundLink target_nid ef
visitNodeForSnapshot :: (ToJSON n, Ord n, Hashable n, FromGraphSON n, Show n, LinkAttributes fla, NodeAttributes na)
=> Spider n na fla
-> Query n na fla sla
-> IORef (SnapshotState n na fla)
-> n
-> IO ()
visitNodeForSnapshot spider query ref_state visit_nid = do
logDebug spider ("Visiting node " <> spack visit_nid <> " ...")
cur_state <- readIORef ref_state
if isAlreadyVisited cur_state visit_nid
then logAndQuit
else doVisit
where
logAndQuit = do
logDebug spider ("Node " <> spack visit_nid <> " is already visited. Skip.")
return ()
doVisit = do
mvisit_eid <- getVisitedNodeEID
case mvisit_eid of
Nothing -> do
logWarn spider ("Node " <> spack visit_nid <> " does not exist.")
return ()
Just visit_eid -> do
found_nodes <- fmap (map $ makeFoundNodesFromHops visit_nid)
$ traverseEFindsOneHop spider (timeInterval query) (foundNodePolicy query) visit_eid
logFoundNodes found_nodes
modifyIORef ref_state $ addFoundNodes visit_nid found_nodes
getVisitedNodeEID = fmap vToMaybe $ Gr.slurpResults =<< submitB spider binder
where
binder = gNodeEID <$.> gHasNodeID spider visit_nid <*.> pure gAllNodes
logFoundNodes [] = logDebug spider ("No local finding is found for node " <> spack visit_nid)
logFoundNodes fns = mapM_ logFoundNode $ Found.sortByTime fns
logFoundNode fn = do
let neighbors = neighborLinks fn
logDebug spider
( "Node " <> (spack $ subjectNode fn)
<> ": local finding at " <> (showEpochTime $ foundAt fn)
<> ", " <> (spack $ length neighbors) <> " neighbors"
)
mapM_ logFoundLink neighbors
logFoundLink fl =
logDebug spider
( " Link is found to " <> (spack $ targetNode fl)
)
data SnapshotState n na fla =
SnapshotState
{ ssUnvisitedNodes :: Queue n,
ssWeaver :: Weaver n na fla
}
deriving (Show)
emptySnapshotState :: FoundNodePolicy n na -> SnapshotState n na fla
emptySnapshotState p =
SnapshotState
{ ssUnvisitedNodes = mempty,
ssWeaver = newWeaver p
}
initSnapshotState :: [n] -> FoundNodePolicy n na -> SnapshotState n na fla
initSnapshotState init_unvisited_nodes p =
(emptySnapshotState p) { ssUnvisitedNodes = newQueue init_unvisited_nodes }
isAlreadyVisited :: (Eq n, Hashable n) => SnapshotState n na fla -> n -> Bool
isAlreadyVisited state nid = Weaver.isVisited nid $ ssWeaver state
popUnvisitedNode :: SnapshotState n na fla -> (SnapshotState n na fla, Maybe n)
popUnvisitedNode state = (updated, popped)
where
updated = state { ssUnvisitedNodes = updatedUnvisited }
(popped, updatedUnvisited) = popQueue $ ssUnvisitedNodes state
makeSnapshot :: (Ord n, Hashable n, Show n)
=> LinkSampleUnifier n na fla sla
-> SnapshotState n na fla
-> ([SnapshotNode n na], [SnapshotLink n sla], [LogLine])
makeSnapshot unifier state = (nodes, links, logs)
where
((nodes, links), logs) = Weaver.getSnapshot' unifier $ ssWeaver state
addFoundNodes :: (Eq n, Hashable n)
=> n -> [FoundNode n na fla] -> SnapshotState n na fla -> SnapshotState n na fla
addFoundNodes visited_nid [] state = state { ssWeaver = new_weaver }
where
new_weaver = Weaver.markAsVisited visited_nid $ ssWeaver state
addFoundNodes _ fns state = foldl' (\s fn -> addOneFoundNode fn s) state fns
addOneFoundNode :: (Eq n, Hashable n)
=> FoundNode n na fla -> SnapshotState n na fla -> SnapshotState n na fla
addOneFoundNode fn state = state { ssUnvisitedNodes = new_queue, ssWeaver = new_weaver }
where
new_weaver = Weaver.addFoundNode fn $ ssWeaver state
new_boundary_nodes = filter (\n -> not $ Weaver.isVisited n new_weaver) $ allTargetNodes fn
new_queue = foldl' (\q n -> pushQueue n q) (ssUnvisitedNodes state) new_boundary_nodes