{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZRE (
    runZre
  , runZreCfg
  , runZreEnvConfig
  , runZreParse
  , readZ
  , writeZ
  , unReadZ
  , defaultConf
  , API(..)
  , Event(..)
  , ZRE
  , Z.Group
  , Z.mkGroup
  , Z.unGroup
  , zjoin
  , zleave
  , zshout
  , zshout'
  , zwhisper
  , zdebug
  , znodebug
  , zquit
  , zfail
  , zrecv
  , pEndpoint
  , toASCIIBytes
  , getApiQueue
  , getEventQueue
  , module Network.ZRE.Lib
  ) where

import Prelude hiding (putStrLn, take)
import Control.Monad hiding (join)
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (SomeException)
import qualified Control.Exception.Lifted

import Data.ByteString (ByteString)
import Data.UUID
import Data.UUID.V1
import Data.Maybe

import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.ByteString.Char8 as B

import qualified Data.ZRE as Z
import Network.ZRE.Beacon
import Network.ZRE.Config
import Network.ZRE.Lib
import Network.ZRE.Options
import Network.ZRE.Peer
import Network.ZRE.Types
import Network.ZRE.Utils
import Network.ZRE.ZMQ

import Network.ZGossip
import System.ZMQ4.Endpoint

import Options.Applicative

getIfaces :: [ByteString]
          -> IO [(ByteString, ByteString, ByteString)]
getIfaces :: [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces [ByteString]
ifcs = do
  case [ByteString]
ifcs of
    [] -> do
      Maybe (ByteString, ByteString)
dr <- IO (Maybe (ByteString, ByteString))
getDefRoute
      case Maybe (ByteString, ByteString)
dr of
        Maybe (ByteString, ByteString)
Nothing -> forall b. ByteString -> IO b
exitFail ByteString
"Unable to get default route"
        Just (ByteString
_route, ByteString
iface) -> do
          (ByteString, ByteString, ByteString)
i <- ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport ByteString
iface
          forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString, ByteString)
i]
    [ByteString]
x  -> do
      forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ByteString]
x ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport

runIface :: Show a
         => TVar ZREState
         -> Int
         -> (ByteString, ByteString, a)
         -> IO ()
runIface :: forall a.
Show a =>
TVar ZREState -> GroupSeq -> (ByteString, ByteString, a) -> IO ()
runIface TVar ZREState
s GroupSeq
port (ByteString
iface, ByteString
ipv4, a
ipv6) = do
   Async ()
r <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a1 a.
MonadIO m =>
Endpoint -> (ZREMsg -> IO a1) -> m a
zreRouter (ByteString -> GroupSeq -> Endpoint
newTCPEndpoint ByteString
ipv4 GroupSeq
port) (TVar ZREState -> ZREMsg -> IO ()
inbox TVar ZREState
s)
   forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x ->
     ZREState
x { zreIfaces :: Map ByteString [Async ()]
zreIfaces = forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ByteString
iface [Async ()
r] (ZREState -> Map ByteString [Async ()]
zreIfaces ZREState
x) }

runZre :: ZRE a -> IO ()
runZre :: forall a. ZRE a -> IO ()
runZre ZRE a
app = forall extra a. Parser extra -> (extra -> ZRE a) -> IO ()
runZreParse (forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (forall a b. a -> b -> a
const ZRE a
app)

-- | Run with config file loaded from the environment variable ENVCFG
-- (@/etc/zre.conf@ or @~/.zre.conf@), possibly overriden by command-line options.
--
-- Accepts another `optparse-applicative` `Parser` for extending
-- built-in one.
runZreParse :: Parser extra -> (extra -> ZRE a) -> IO ()
runZreParse :: forall extra a. Parser extra -> (extra -> ZRE a) -> IO ()
runZreParse Parser extra
parseExtra extra -> ZRE a
app = do
  -- try to get config from the enviornment variable ENVCFG, /etc/zre.conf
  -- or ~/.zre.conf and override with command line options.
  ZRECfg
cfgIni <- String -> IO ZRECfg
envZRECfg String
"zre"
  (ZRECfg
cfgOpts, extra
extras) <- forall a. ParserInfo a -> IO a
execParser ParserInfo (ZRECfg, extra)
opts
  forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg (ZRECfg -> ZRECfg -> ZRECfg
overrideNonDefault ZRECfg
cfgIni ZRECfg
cfgOpts) (extra -> ZRE a
app extra
extras)
  where
    opts :: ParserInfo (ZRECfg, extra)
opts = forall a. Parser a -> InfoMod a -> ParserInfo a
info (((,) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser ZRECfg
parseOptions forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser extra
parseExtra)  forall (f :: * -> *) a b. Applicative f => f a -> f (a -> b) -> f b
<**> forall a. Parser (a -> a)
helper)
      ( forall a. InfoMod a
fullDesc
     forall a. Semigroup a => a -> a -> a
<> forall a. String -> InfoMod a
progDesc String
"ZRE"
     forall a. Semigroup a => a -> a -> a
<> forall a. String -> InfoMod a
header String
"zre tools" )

-- | Run with config file loaded from the enviornment variable ENVCFG
-- (@/etc/zre.conf@ or @~/.zre.conf@)
runZreEnvConfig :: ZRE a -> IO ()
runZreEnvConfig :: forall a. ZRE a -> IO ()
runZreEnvConfig ZRE a
app = do
  ZRECfg
cfgIni <- String -> IO ZRECfg
envZRECfg String
"zre"
  forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg ZRECfg
cfgIni ZRE a
app

runZreCfg :: ZRECfg -> ZRE a -> IO ()
runZreCfg :: forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg cfg :: ZRECfg
cfg@ZRECfg{Bool
Float
[ByteString]
Maybe Endpoint
ByteString
Endpoint
zreDbg :: ZRECfg -> Bool
zreZGossip :: ZRECfg -> Maybe Endpoint
zreMCast :: ZRECfg -> Endpoint
zreInterfaces :: ZRECfg -> [ByteString]
zreBeaconPeriod :: ZRECfg -> Float
zreDeadPeriod :: ZRECfg -> Float
zreQuietPingRate :: ZRECfg -> Float
zreQuietPeriod :: ZRECfg -> Float
zreNamed :: ZRECfg -> ByteString
zreDbg :: Bool
zreZGossip :: Maybe Endpoint
zreMCast :: Endpoint
zreInterfaces :: [ByteString]
zreBeaconPeriod :: Float
zreDeadPeriod :: Float
zreQuietPingRate :: Float
zreQuietPeriod :: Float
zreNamed :: ByteString
..} ZRE a
app = do
    [(ByteString, ByteString, ByteString)]
ifcs <- [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces [ByteString]
zreInterfaces

    UUID
u <- forall (m :: * -> *) b a.
Monad m =>
m b -> (a -> m b) -> m (Maybe a) -> m b
maybeM (forall b. ByteString -> IO b
exitFail ByteString
"Unable to get UUID") forall (m :: * -> *) a. Monad m => a -> m a
return IO (Maybe UUID)
nextUUID
    let uuid :: ByteString
uuid = UUID -> ByteString
uuidByteString UUID
u

    case [(ByteString, ByteString, ByteString)]
ifcs of
      [] -> forall b. ByteString -> IO b
exitFail ByteString
"No interfaces found"
      ifaces :: [(ByteString, ByteString, ByteString)]
ifaces@((ByteString
_ifcname, ByteString
ipv4, ByteString
_ipv6):[(ByteString, ByteString, ByteString)]
_) -> do
        GroupSeq
zrePort <- ByteString -> IO GroupSeq
randPort ByteString
ipv4

        let zreEndpoint :: Endpoint
zreEndpoint = ByteString -> GroupSeq -> Endpoint
newTCPEndpoint ByteString
ipv4 GroupSeq
zrePort
        forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
zreDbg forall a b. (a -> b) -> a -> b
$ ByteString -> IO ()
B.putStrLn forall a b. (a -> b) -> a -> b
$ ByteString
"Starting with " forall a. Semigroup a => a -> a -> a
<> (forall a. Show a => a -> ByteString
bshow Endpoint
zreEndpoint)

        ByteString
zreName <- ByteString -> IO ByteString
getName ByteString
zreNamed

        -- 1M events both ways, not sure about this
        TBQueue Event
inQ <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
1000000
        TBQueue API
outQ <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
1000000

        TVar ZREState
s <- ByteString
-> Endpoint
-> UUID
-> TBQueue Event
-> TBQueue API
-> Bool
-> ZRECfg
-> IO (TVar ZREState)
newZREState ByteString
zreName Endpoint
zreEndpoint UUID
u TBQueue Event
inQ TBQueue API
outQ Bool
zreDbg ZRECfg
cfg

        -- FIXME: support multiple gossip clients
        case Maybe Endpoint
zreZGossip of
          Maybe Endpoint
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just Endpoint
end -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ ByteString -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient ByteString
uuid Endpoint
end Endpoint
zreEndpoint (TBQueue API -> ZGSMsg -> IO ()
zgossipZRE TBQueue API
outQ)

        (AddrInfo
mCastAddr:[AddrInfo]
_) <- Endpoint -> IO [AddrInfo]
toAddrInfo Endpoint
zreMCast
        Async ()
_beaconAsync <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ Float -> AddrInfo -> ByteString -> GroupSeq -> IO ()
beacon Float
zreBeaconPeriod AddrInfo
mCastAddr ByteString
uuid GroupSeq
zrePort
        Async Any
_beaconRecvAsync <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ forall b. TVar ZREState -> Endpoint -> IO b
beaconRecv TVar ZREState
s Endpoint
zreMCast

        forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a.
Show a =>
TVar ZREState -> GroupSeq -> (ByteString, ByteString, a) -> IO ()
runIface TVar ZREState
s GroupSeq
zrePort) [(ByteString, ByteString, ByteString)]
ifaces

        Async ()
apiAsync <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ TVar ZREState -> IO ()
api TVar ZREState
s
        GroupSeq -> IO ()
threadDelay GroupSeq
500000
        Async a
_userAppAsync <- forall a. IO a -> IO (Async a)
async
          forall a b. (a -> b) -> a -> b
$ forall a. ZRE a -> TBQueue Event -> TBQueue API -> IO a
runZ
              (ZRE a
app
                forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
`Control.Exception.Lifted.catch`
                (\SomeException
e -> do let err :: String
err = forall a. Show a => a -> String
show (SomeException
e :: SomeException) in forall a. String -> ZRE a
zfail String
err)
                forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`Control.Exception.Lifted.finally`
                ZRE ()
zquit
              )
              TBQueue Event
inQ
              TBQueue API
outQ

        forall a. Async a -> IO a
wait Async ()
apiAsync
        forall (m :: * -> *) a. Monad m => a -> m a
return ()

api :: TVar ZREState -> IO ()
api :: TVar ZREState -> IO ()
api TVar ZREState
s = do
  API
a <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar ZREState
s forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. TBQueue a -> STM a
readTBQueue forall b c a. (b -> c) -> (a -> b) -> a -> c
. ZREState -> TBQueue API
zreOut
  TVar ZREState -> API -> IO ()
handleApi TVar ZREState
s API
a
  case API
a of
    API
DoQuit -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
    API
_ -> TVar ZREState -> IO ()
api TVar ZREState
s

handleApi :: TVar ZREState -> API -> IO ()
handleApi :: TVar ZREState -> API -> IO ()
handleApi TVar ZREState
s API
act = do
  case API
act of
    DoJoin Group
group -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
      STM ()
incGroupSeq
      forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = forall a. Ord a => a -> Set a -> Set a
S.insert Group
group (ZREState -> Groups
zreGroups ZREState
x) }
      ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
      TVar ZREState -> Group -> GroupSeq -> STM ()
msgAllJoin TVar ZREState
s Group
group (ZREState -> GroupSeq
zreGroupSeq ZREState
st)

    DoLeave Group
group -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
      STM ()
incGroupSeq
      forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = forall a. Ord a => a -> Set a -> Set a
S.delete Group
group (ZREState -> Groups
zreGroups ZREState
x) }
      ZREState
st <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
      TVar ZREState -> Group -> GroupSeq -> STM ()
msgAllLeave TVar ZREState
s Group
group (ZREState -> GroupSeq
zreGroupSeq ZREState
st)

    DoShout Group
group ByteString
msg -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> Group -> ByteString -> STM ()
shoutGroup TVar ZREState
s Group
group ByteString
msg
    DoShoutMulti Group
group [ByteString]
mmsg -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> Group -> [ByteString] -> STM ()
shoutGroupMulti TVar ZREState
s Group
group [ByteString]
mmsg
    DoWhisper UUID
uuid ByteString
msg -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> ByteString -> STM ()
whisperPeerUUID TVar ZREState
s UUID
uuid ByteString
msg

    DoDiscover UUID
uuid Endpoint
endpoint -> do
      Maybe (TVar Peer)
mp <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
      case Maybe (TVar Peer)
mp of
        Just TVar Peer
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Maybe (TVar Peer)
Nothing -> do
          forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadIO m =>
Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromEndpoint Endpoint
endpoint

    DoDebug Bool
bool -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreDebug :: Bool
zreDebug = Bool
bool }

    API
DoQuit -> do
      let chk :: IO Bool
chk = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
                   ZREState
s' <- forall a. TVar a -> STM a
readTVar TVar ZREState
s
                   [Bool]
pqs <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (forall k a. Map k a -> [(k, a)]
M.toList forall a b. (a -> b) -> a -> b
$ ZREState -> Peers
zrePeers ZREState
s') forall a b. (a -> b) -> a -> b
$ \(UUID
_, TVar Peer
tp) -> forall a. TVar a -> STM a
readTVar TVar Peer
tp forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. TBQueue a -> STM Bool
isEmptyTBQueue forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> TBQueue ZRECmd
peerQueue
                   forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *). Foldable t => t Bool -> Bool
and [Bool]
pqs

      let loop :: IO ()
loop = do
              Bool
res <- IO Bool
chk
              forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res forall a b. (a -> b) -> a -> b
$ GroupSeq -> IO ()
threadDelay (forall a. RealFrac a => a -> GroupSeq
sec (Float
0.1 :: Float)) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop

      IO ()
loop
  where
    incGroupSeq :: STM ()
incGroupSeq = forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreGroupSeq :: GroupSeq
zreGroupSeq = (ZREState -> GroupSeq
zreGroupSeq ZREState
x) forall a. Num a => a -> a -> a
+ GroupSeq
1 }

-- handles incoming ZRE messages
-- creates peers, updates state
inbox :: TVar ZREState -> Z.ZREMsg -> IO ()
inbox :: TVar ZREState -> ZREMsg -> IO ()
inbox TVar ZREState
s msg :: ZREMsg
msg@Z.ZREMsg{GroupSeq
Maybe UTCTime
Maybe UUID
ZRECmd
msgCmd :: ZREMsg -> ZRECmd
msgTime :: ZREMsg -> Maybe UTCTime
msgSeq :: ZREMsg -> GroupSeq
msgFrom :: ZREMsg -> Maybe UUID
msgCmd :: ZRECmd
msgTime :: Maybe UTCTime
msgSeq :: GroupSeq
msgFrom :: Maybe UUID
..} = do
  let uuid :: UUID
uuid = forall a. HasCallStack => Maybe a -> a
fromJust Maybe UUID
msgFrom

  -- print msg , "state pre-msg", printAll s

  Maybe (TVar Peer)
mpt <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
  case Maybe (TVar Peer)
mpt of
    Maybe (TVar Peer)
Nothing -> do
      case ZRECmd
msgCmd of
        -- if the peer is not known but a message is HELLO we create a new
        -- peer, for other messages we don't know the endpoint to connect to
        h :: ZRECmd
h@(Z.Hello Endpoint
_endpoint Groups
_groups GroupSeq
_groupSeq ByteString
_name Headers
_headers) -> do
          TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerSeq :: GroupSeq
peerSeq = (Peer -> GroupSeq
peerSeq Peer
x) forall a. Num a => a -> a -> a
+ GroupSeq
1 }
        -- silently drop any other messages
        ZRECmd
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()

    (Just TVar Peer
peer) -> do
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar Peer -> UTCTime -> STM ()
updateLastHeard TVar Peer
peer forall a b. (a -> b) -> a -> b
$ forall a. HasCallStack => Maybe a -> a
fromJust Maybe UTCTime
msgTime

      -- destroy/re-start peer when this doesn't match
      Peer
p <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Peer
peer
      case Peer -> GroupSeq
peerSeq Peer
p forall a. Eq a => a -> a -> Bool
== GroupSeq
msgSeq of
        Bool
True -> do
          -- rename to peerExpectSeq, need to update at line 127 too
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerSeq :: GroupSeq
peerSeq = (Peer -> GroupSeq
peerSeq Peer
x) forall a. Num a => a -> a -> a
+ GroupSeq
1 }
          TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd TVar ZREState
s ZREMsg
msg TVar Peer
peer
        Bool
_ -> do
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s ByteString
"sequence mismatch, recreating peer"
          UUID -> ZRECmd -> IO ()
recreatePeer (Peer -> UUID
peerUUID Peer
p) ZRECmd
msgCmd

  -- "state post-msg", printAll s
  where
    recreatePeer :: UUID -> ZRECmd -> IO ()
recreatePeer UUID
uuid h :: ZRECmd
h@(Z.Hello Endpoint
_ Groups
_ GroupSeq
_ ByteString
_ Headers
_) = do
          TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid
          TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerSeq :: GroupSeq
peerSeq = (Peer -> GroupSeq
peerSeq Peer
x) forall a. Num a => a -> a -> a
+ GroupSeq
1 }
    recreatePeer UUID
uuid ZRECmd
_ = TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid

handleCmd :: TVar ZREState -> Z.ZREMsg -> TVar Peer -> IO ()
handleCmd :: TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd TVar ZREState
s Z.ZREMsg{msgFrom :: ZREMsg -> Maybe UUID
msgFrom=(Just UUID
from), msgTime :: ZREMsg -> Maybe UTCTime
msgTime=(Just UTCTime
time), msgCmd :: ZREMsg -> ZRECmd
msgCmd=ZRECmd
cmd} TVar Peer
peer = do
      case ZRECmd
cmd of
        (Z.Whisper [ByteString]
content) -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> [ByteString] -> UTCTime -> Event
Whisper UUID
from [ByteString]
content UTCTime
time
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"whisper", [ByteString] -> ByteString
B.concat [ByteString]
content]

        Z.Shout Group
group [ByteString]
content -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> Group -> [ByteString] -> UTCTime -> Event
Shout UUID
from Group
group [ByteString]
content UTCTime
time
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"shout for group", Group -> ByteString
Z.unGroup Group
group, ByteString
">", [ByteString] -> ByteString
B.concat [ByteString]
content]

        Z.Join Group
group GroupSeq
groupSeq -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> TVar Peer -> Group -> GroupSeq -> STM ()
joinGroup TVar ZREState
s TVar Peer
peer Group
group GroupSeq
groupSeq
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"join", Group -> ByteString
Z.unGroup Group
group, forall a. Show a => a -> ByteString
bshow GroupSeq
groupSeq]

        Z.Leave Group
group GroupSeq
groupSeq -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> TVar Peer -> Group -> GroupSeq -> STM ()
leaveGroup TVar ZREState
s TVar Peer
peer Group
group GroupSeq
groupSeq
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"leave", Group -> ByteString
Z.unGroup Group
group, forall a. Show a => a -> ByteString
bshow GroupSeq
groupSeq]

        ZRECmd
Z.Ping -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
          TVar Peer -> ZRECmd -> STM ()
msgPeer TVar Peer
peer ZRECmd
Z.PingOk
          Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B.unwords [ByteString
"sending pings to ", forall a. Show a => a -> ByteString
bshow Peer
p]
        ZRECmd
Z.PingOk -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Z.Hello Endpoint
endpoint Groups
groups GroupSeq
groupSeq ByteString
name Headers
headers -> do
          -- if this peer was already registered
          -- (e.g. from beacon) update appropriate data
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
            TVar ZREState -> TVar Peer -> Groups -> GroupSeq -> STM ()
joinGroups TVar ZREState
s TVar Peer
peer Groups
groups GroupSeq
groupSeq
            TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x {
                         peerName :: Maybe ByteString
peerName = forall a. a -> Maybe a
Just ByteString
name
                       , peerHeaders :: Headers
peerHeaders = Headers
headers
                       }
            Peer
p <- forall a. TVar a -> STM a
readTVar TVar Peer
peer
            TVar ZREState -> Event -> STM ()
emit TVar ZREState
s forall a b. (a -> b) -> a -> b
$ UUID -> ByteString -> Groups -> Headers -> Endpoint -> Event
Ready (Peer -> UUID
peerUUID Peer
p) ByteString
name Groups
groups Headers
headers Endpoint
endpoint
            TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s forall a b. (a -> b) -> a -> b
$ ByteString
"update peer"
          forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleCmd TVar ZREState
_ ZREMsg
_ TVar Peer
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()