{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZGossip (
    zgossipServer
  , zgossipClient
  , zgossipZRE) where


import Prelude hiding (putStrLn, take)
import Control.Monad hiding (join)
import Control.Monad.IO.Class
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM

import Data.ByteString (ByteString)
import Data.UUID
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy as BL

import Data.ZGossip
import Network.ZRE.Types (API(DoDiscover))
import Network.ZRE.Utils (bshow)

import Network.ZGossip.ZMQ
import Network.ZGossip.Types

import System.ZMQ4.Endpoint

zgossipClient :: Key -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient :: Key -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient Key
uuid Endpoint
endpoint Endpoint
ourEndpoint ZGSMsg -> IO ()
handler = do
  TBQueue ZGSCmd
gossipQ <- STM (TBQueue ZGSCmd) -> IO (TBQueue ZGSCmd)
forall a. STM a -> IO a
atomically (STM (TBQueue ZGSCmd) -> IO (TBQueue ZGSCmd))
-> STM (TBQueue ZGSCmd) -> IO (TBQueue ZGSCmd)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue ZGSCmd)
forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
10
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ (ZGSCmd -> STM ()) -> [ZGSCmd] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TBQueue ZGSCmd -> ZGSCmd -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue ZGSCmd
gossipQ) [ZGSCmd
Hello]
  Async Any
pa <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ IO () -> IO Any
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO Any) -> IO () -> IO Any
forall a b. (a -> b) -> a -> b
$ do
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue ZGSCmd -> ZGSCmd -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue ZGSCmd
gossipQ (ZGSCmd -> STM ()) -> ZGSCmd -> STM ()
forall a b. (a -> b) -> a -> b
$ Key -> Key -> TTL -> ZGSCmd
Publish Key
uuid (Endpoint -> Key
pEndpoint Endpoint
ourEndpoint) TTL
600
    -- publish every 50s
    TTL -> IO ()
threadDelay (TTL -> IO ()) -> TTL -> IO ()
forall a b. (a -> b) -> a -> b
$ TTL
1000000TTL -> TTL -> TTL
forall a. Num a => a -> a -> a
*TTL
50

  Async Any -> IO ()
forall a. Async a -> IO ()
link Async Any
pa
  Endpoint -> Key -> TBQueue ZGSCmd -> (ZGSMsg -> IO ()) -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
Endpoint -> Key -> TBQueue ZGSCmd -> (ZGSMsg -> IO ()) -> m a
zgossipDealer Endpoint
endpoint Key
uuid TBQueue ZGSCmd
gossipQ ZGSMsg -> IO ()
handler

zgossipServer :: Endpoint -> IO ()
zgossipServer :: Endpoint -> IO ()
zgossipServer Endpoint
endpoint = do
  TVar ZGossipState
gossipS <- STM (TVar ZGossipState) -> IO (TVar ZGossipState)
forall a. STM a -> IO a
atomically (STM (TVar ZGossipState) -> IO (TVar ZGossipState))
-> STM (TVar ZGossipState) -> IO (TVar ZGossipState)
forall a b. (a -> b) -> a -> b
$ ZGossipState -> STM (TVar ZGossipState)
forall a. a -> STM (TVar a)
newTVar ZGossipState
emptyGossipState

  let expire :: IO b
expire = IO () -> IO b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO b) -> IO () -> IO b
forall a b. (a -> b) -> a -> b
$ do
        STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZGossipState -> (ZGossipState -> ZGossipState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZGossipState
gossipS ((ZGossipState -> ZGossipState) -> STM ())
-> (ZGossipState -> ZGossipState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ZGossipState
x -> ZGossipState
x { gossipPairs :: Map Key (Key, TTL)
gossipPairs = ((Key, TTL) -> Maybe (Key, TTL))
-> Map Key (Key, TTL) -> Map Key (Key, TTL)
forall a b k. (a -> Maybe b) -> Map k a -> Map k b
M.mapMaybe (Key, TTL) -> Maybe (Key, TTL)
forall b a. (Eq b, Num b) => (a, b) -> Maybe (a, b)
ttlUpdate (ZGossipState -> Map Key (Key, TTL)
gossipPairs ZGossipState
x) }
        TTL -> IO ()
threadDelay TTL
1000000
        where
           ttlUpdate :: (a, b) -> Maybe (a, b)
ttlUpdate (a
_, b
0)   = Maybe (a, b)
forall a. Maybe a
Nothing
           ttlUpdate (a
v, b
ttl) = (a, b) -> Maybe (a, b)
forall a. a -> Maybe a
Just (a
v, b
ttl b -> b -> b
forall a. Num a => a -> a -> a
- b
1)

  Async Any
ea <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async IO Any
forall b. IO b
expire
  Async Any -> IO ()
forall a. Async a -> IO ()
link Async Any
ea

  Endpoint -> (Key -> ZGSCmd -> IO [(Key, ZGSCmd)]) -> IO ()
forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadIO m) =>
Endpoint -> (Key -> ZGSCmd -> IO (t (Key, ZGSCmd))) -> m a
zgossipRouter Endpoint
endpoint (TVar ZGossipState -> Key -> ZGSCmd -> IO [(Key, ZGSCmd)]
serverHandle TVar ZGossipState
gossipS)

serverHandle :: TVar ZGossipState -> Peer -> ZGSCmd -> IO [(Peer, ZGSCmd)]
serverHandle :: TVar ZGossipState -> Key -> ZGSCmd -> IO [(Key, ZGSCmd)]
serverHandle TVar ZGossipState
s Key
from ZGSCmd
Hello = do
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZGossipState -> (ZGossipState -> ZGossipState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZGossipState
s ((ZGossipState -> ZGossipState) -> STM ())
-> (ZGossipState -> ZGossipState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ZGossipState
x -> ZGossipState
x { gossipPeers :: Set Key
gossipPeers = Key -> Set Key -> Set Key
forall a. Ord a => a -> Set a -> Set a
S.insert Key
from (ZGossipState -> Set Key
gossipPeers ZGossipState
x) }
  ZGossipState
st <- STM ZGossipState -> IO ZGossipState
forall a. STM a -> IO a
atomically (STM ZGossipState -> IO ZGossipState)
-> STM ZGossipState -> IO ZGossipState
forall a b. (a -> b) -> a -> b
$ TVar ZGossipState -> STM ZGossipState
forall a. TVar a -> STM a
readTVar TVar ZGossipState
s
  [Key] -> IO ()
dbg [Key
"Hello from", Key -> Key
tryUUID Key
from]
  -- send all the k,v pairs to this client
  [(Key, ZGSCmd)] -> IO [(Key, ZGSCmd)]
forall (m :: * -> *) a. Monad m => a -> m a
return [(Key
from, (Key, (Key, TTL)) -> ZGSCmd
cvtPub (Key, (Key, TTL))
pub) | (Key, (Key, TTL))
pub <- Map Key (Key, TTL) -> [(Key, (Key, TTL))]
forall k a. Map k a -> [(k, a)]
M.toList (Map Key (Key, TTL) -> [(Key, (Key, TTL))])
-> Map Key (Key, TTL) -> [(Key, (Key, TTL))]
forall a b. (a -> b) -> a -> b
$ ZGossipState -> Map Key (Key, TTL)
gossipPairs ZGossipState
st ]
serverHandle TVar ZGossipState
s Key
from pub :: ZGSCmd
pub@(Publish Key
k Key
v TTL
ttl) = do
  STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZGossipState -> (ZGossipState -> ZGossipState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZGossipState
s ((ZGossipState -> ZGossipState) -> STM ())
-> (ZGossipState -> ZGossipState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ZGossipState
x -> ZGossipState
x { gossipPairs :: Map Key (Key, TTL)
gossipPairs = Key -> (Key, TTL) -> Map Key (Key, TTL) -> Map Key (Key, TTL)
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert Key
k (Key
v, TTL
ttl) (ZGossipState -> Map Key (Key, TTL)
gossipPairs ZGossipState
x) }
  ZGossipState
st <- STM ZGossipState -> IO ZGossipState
forall a. STM a -> IO a
atomically (STM ZGossipState -> IO ZGossipState)
-> STM ZGossipState -> IO ZGossipState
forall a b. (a -> b) -> a -> b
$ TVar ZGossipState -> STM ZGossipState
forall a. TVar a -> STM a
readTVar TVar ZGossipState
s
  [Key] -> IO ()
dbg [Key
"Publish from", Key -> Key
tryUUID Key
from, Key -> Key
tryUUID Key
k, Key
"=", Key
v, Key
"( ttl", TTL -> Key
forall a. Show a => a -> Key
bshow TTL
ttl, Key
")"]

  -- republish this to all other clients
  [(Key, ZGSCmd)] -> IO [(Key, ZGSCmd)]
forall (m :: * -> *) a. Monad m => a -> m a
return [(Key
to, ZGSCmd
pub) | Key
to <- Map Key (Key, TTL) -> [Key]
forall k a. Map k a -> [k]
M.keys (Map Key (Key, TTL) -> [Key]) -> Map Key (Key, TTL) -> [Key]
forall a b. (a -> b) -> a -> b
$ ZGossipState -> Map Key (Key, TTL)
gossipPairs ZGossipState
st, Key
to Key -> Key -> Bool
forall a. Eq a => a -> a -> Bool
/= Key
from ]
serverHandle TVar ZGossipState
_ Key
from ZGSCmd
Ping = do
  [Key] -> IO ()
dbg [Key
"Ping from", Key -> Key
tryUUID Key
from]
  [(Key, ZGSCmd)] -> IO [(Key, ZGSCmd)]
forall (m :: * -> *) a. Monad m => a -> m a
return [(Key
from, ZGSCmd
PingOk)]
serverHandle TVar ZGossipState
_ Key
_ ZGSCmd
PingOk = [(Key, ZGSCmd)] -> IO [(Key, ZGSCmd)]
forall (m :: * -> *) a. Monad m => a -> m a
return []
serverHandle TVar ZGossipState
_ Key
_ ZGSCmd
Invalid = [(Key, ZGSCmd)] -> IO [(Key, ZGSCmd)]
forall (m :: * -> *) a. Monad m => a -> m a
return []

tryUUID :: ByteString -> ByteString
tryUUID :: Key -> Key
tryUUID Key
x = Key -> (UUID -> Key) -> Maybe UUID -> Key
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Key
x UUID -> Key
toASCIIBytes (ByteString -> Maybe UUID
fromByteString (ByteString -> Maybe UUID) -> ByteString -> Maybe UUID
forall a b. (a -> b) -> a -> b
$ Key -> ByteString
BL.fromStrict Key
x)

dbg :: [ByteString] -> IO ()
dbg :: [Key] -> IO ()
dbg = Key -> IO ()
B.putStrLn (Key -> IO ()) -> ([Key] -> Key) -> [Key] -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Key -> [Key] -> Key
B.intercalate Key
" ")

-- send DoDiscover ZRE API messages on new Publish message
zgossipZRE :: TBQueue API -> ZGSMsg -> IO ()
zgossipZRE :: TBQueue API -> ZGSMsg -> IO ()
zgossipZRE TBQueue API
q ZGSMsg{Maybe Key
ZGSCmd
zgsCmd :: ZGSMsg -> ZGSCmd
zgsFrom :: ZGSMsg -> Maybe Key
zgsCmd :: ZGSCmd
zgsFrom :: Maybe Key
..} = ZGSCmd -> IO ()
handlePublish ZGSCmd
zgsCmd
  where handlePublish :: ZGSCmd -> IO ()
handlePublish (Publish Key
k Key
v TTL
_) = do
          case ByteString -> Maybe UUID
fromByteString (ByteString -> Maybe UUID) -> ByteString -> Maybe UUID
forall a b. (a -> b) -> a -> b
$ Key -> ByteString
BL.fromStrict Key
k of
            Maybe UUID
Nothing ->  IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Key -> IO ()
B.putStrLn Key
"Can't parse zgossip uuid"
            Just UUID
uuid -> do
             case Key -> Either String Endpoint
parseAttoEndpoint Key
v of
               (Left String
_err) -> IO () -> IO ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Key -> IO ()
B.putStrLn Key
"Can't parse zgossip endpoint"
               (Right Endpoint
endpoint) -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue API -> API -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue API
q (UUID -> Endpoint -> API
DoDiscover UUID
uuid Endpoint
endpoint)
        handlePublish ZGSCmd
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()