{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ImportQualifiedPost #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE NoFieldSelectors #-}

module Haskoin.Node.Peer
  ( PeerConfig (..),
    PeerEvent (..),
    Conduits (..),
    PeerException (..),
    WithConnection,
    Peer (..),
    peer,
    wrapPeer,
    sendMessage,
    killPeer,
    getBlocks,
    getTxs,
    getData,
    pingPeer,
    getBusy,
    setBusy,
    setFree,
  )
where

import Conduit
  ( ConduitT,
    Void,
    awaitForever,
    foldC,
    mapM_C,
    runConduit,
    takeCE,
    transPipe,
    yield,
    (.|),
  )
import Control.Monad (forever, join, unless, when)
import Control.Monad.Logger (MonadLoggerIO, logDebugS, logErrorS, logInfoS)
import Control.Monad.Trans.Maybe (MaybeT (MaybeT), runMaybeT)
import Data.Bool (bool)
import Data.ByteString (ByteString)
import Data.ByteString qualified as B
import Data.Function (on)
import Data.List (union)
import Data.Maybe (isJust)
import Data.Serialize (decode, runGet, runPut)
import Data.String.Conversions (cs)
import Data.Text (Text)
import Data.Word (Word32)
import Haskoin
  ( Block (..),
    BlockHash (..),
    GetData (..),
    InvType (..),
    InvVector (..),
    Message (..),
    MessageCommand (..),
    MessageHeader (..),
    Network (..),
    NotFound (..),
    Ping (..),
    Pong (..),
    Tx,
    TxHash (..),
    commandToString,
    encodeHex,
    getMessage,
    headerHash,
    putMessage,
    txHash,
  )
import NQE
  ( Inbox,
    Mailbox,
    Publisher,
    inboxToMailbox,
    publish,
    receive,
    receiveMatchS,
    send,
    withSubscription,
  )
import System.Random (randomIO)
import UnliftIO
  ( Exception,
    MonadIO,
    MonadUnliftIO,
    TVar,
    atomically,
    liftIO,
    link,
    readTVar,
    readTVarIO,
    throwIO,
    timeout,
    withAsync,
    withRunInIO,
    writeTVar,
  )

data Conduits = Conduits
  { Conduits -> ConduitT () ByteString IO ()
inboundConduit :: ConduitT () ByteString IO (),
    Conduits -> ConduitT ByteString Void IO ()
outboundConduit :: ConduitT ByteString Void IO ()
  }

type WithConnection = (Conduits -> IO ()) -> IO ()

data PeerConfig = PeerConfig
  { PeerConfig -> Publisher PeerEvent
pub :: !(Publisher PeerEvent),
    PeerConfig -> Network
net :: !Network,
    PeerConfig -> Text
label :: !Text,
    PeerConfig -> WithConnection
connect :: !WithConnection
  }

data PeerEvent
  = PeerConnected !Peer
  | PeerDisconnected !Peer
  | PeerMessage !Peer !Message
  deriving (PeerEvent -> PeerEvent -> Bool
(PeerEvent -> PeerEvent -> Bool)
-> (PeerEvent -> PeerEvent -> Bool) -> Eq PeerEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PeerEvent -> PeerEvent -> Bool
== :: PeerEvent -> PeerEvent -> Bool
$c/= :: PeerEvent -> PeerEvent -> Bool
/= :: PeerEvent -> PeerEvent -> Bool
Eq)

data PeerException
  = PeerMisbehaving !String
  | DuplicateVersion
  | DecodeHeaderError
  | CannotDecodePayload !MessageCommand
  | PeerIsMyself
  | PayloadTooLarge !Word32
  | PeerAddressInvalid
  | PeerSentBadHeaders
  | NotNetworkPeer
  | PeerNoSegWit
  | PeerTimeout
  | UnknownPeer
  | PeerTooOld
  | EmptyHeader
  deriving (PeerException -> PeerException -> Bool
(PeerException -> PeerException -> Bool)
-> (PeerException -> PeerException -> Bool) -> Eq PeerException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: PeerException -> PeerException -> Bool
== :: PeerException -> PeerException -> Bool
$c/= :: PeerException -> PeerException -> Bool
/= :: PeerException -> PeerException -> Bool
Eq)

instance Show PeerException where
  show :: PeerException -> String
show (PeerMisbehaving String
s) = String
"Peer misbehaving: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
s
  show PeerException
DuplicateVersion = String
"Duplicate version"
  show PeerException
DecodeHeaderError = String
"Error decoding header"
  show (CannotDecodePayload MessageCommand
c) =
    String
"Cannot decode payload: "
      String -> ShowS
forall a. Semigroup a => a -> a -> a
<> ByteString -> String
forall a b. ConvertibleStrings a b => a -> b
cs (MessageCommand -> ByteString
commandToString MessageCommand
c)
  show PeerException
PeerIsMyself = String
"Peer is myself"
  show (PayloadTooLarge Word32
s) = String
"Payload too large: " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Word32 -> String
forall a. Show a => a -> String
show Word32
s
  show PeerException
PeerAddressInvalid = String
"Peer address invalid"
  show PeerException
PeerSentBadHeaders = String
"Peer sent bad headers"
  show PeerException
NotNetworkPeer = String
"Not network peer"
  show PeerException
PeerNoSegWit = String
"Segwit not supported by peer"
  show PeerException
PeerTimeout = String
"Peer timed out"
  show PeerException
UnknownPeer = String
"Unknown peer"
  show PeerException
PeerTooOld = String
"Peer too old"
  show PeerException
EmptyHeader = String
"Empty header"

instance Exception PeerException

-- | Mailbox for a peer.
data Peer = Peer
  { Peer -> Mailbox PeerMessage
mailbox :: !(Mailbox PeerMessage),
    Peer -> Publisher PeerEvent
pub :: !(Publisher PeerEvent),
    Peer -> Text
label :: !Text,
    Peer -> TVar Bool
busy :: !(TVar Bool)
  }

instance Eq Peer where
  == :: Peer -> Peer -> Bool
(==) = Mailbox PeerMessage -> Mailbox PeerMessage -> Bool
forall a. Eq a => a -> a -> Bool
(==) (Mailbox PeerMessage -> Mailbox PeerMessage -> Bool)
-> (Peer -> Mailbox PeerMessage) -> Peer -> Peer -> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` (.mailbox)

instance Show Peer where
  show :: Peer -> String
show = Text -> String
forall a b. ConvertibleStrings a b => a -> b
cs (Text -> String) -> (Peer -> Text) -> Peer -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.label)

-- | Incoming messages that a peer accepts.
data PeerMessage
  = KillPeer !PeerException
  | SendMessage !Message

wrapPeer ::
  (MonadIO m) =>
  PeerConfig ->
  TVar Bool ->
  Mailbox PeerMessage ->
  m Peer
wrapPeer :: forall (m :: * -> *).
MonadIO m =>
PeerConfig -> TVar Bool -> Mailbox PeerMessage -> m Peer
wrapPeer PeerConfig
cfg TVar Bool
busy Mailbox PeerMessage
mbox =
  Peer -> m Peer
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
    Peer
      { $sel:mailbox:Peer :: Mailbox PeerMessage
mailbox = Mailbox PeerMessage
mbox,
        $sel:pub:Peer :: Publisher PeerEvent
pub = PeerConfig
cfg.pub,
        $sel:label:Peer :: Text
label = PeerConfig
cfg.label,
        $sel:busy:Peer :: TVar Bool
busy = TVar Bool
busy
      }

-- | Run peer process in current thread.
peer ::
  (MonadUnliftIO m, MonadLoggerIO m) =>
  PeerConfig ->
  TVar Bool ->
  Inbox PeerMessage ->
  m ()
peer :: forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m) =>
PeerConfig -> TVar Bool -> Inbox PeerMessage -> m ()
peer cfg :: PeerConfig
cfg@PeerConfig {Text
Network
Publisher PeerEvent
WithConnection
$sel:pub:PeerConfig :: PeerConfig -> Publisher PeerEvent
$sel:net:PeerConfig :: PeerConfig -> Network
$sel:label:PeerConfig :: PeerConfig -> Text
$sel:connect:PeerConfig :: PeerConfig -> WithConnection
pub :: Publisher PeerEvent
net :: Network
label :: Text
connect :: WithConnection
..} TVar Bool
busy Inbox PeerMessage
inbox = do
  Peer
p <- PeerConfig -> TVar Bool -> Mailbox PeerMessage -> m Peer
forall (m :: * -> *).
MonadIO m =>
PeerConfig -> TVar Bool -> Mailbox PeerMessage -> m Peer
wrapPeer PeerConfig
cfg TVar Bool
busy (Inbox PeerMessage -> Mailbox PeerMessage
forall msg. Inbox msg -> Mailbox msg
inboxToMailbox Inbox PeerMessage
inbox)
  ((forall a. m a -> IO a) -> IO ()) -> m ()
forall b. ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
restore -> do
    WithConnection
connect (m () -> IO ()
forall a. m a -> IO a
restore (m () -> IO ()) -> (Conduits -> m ()) -> Conduits -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> Conduits -> m ()
peer_session Peer
p)
  where
    go :: ConduitT () Message m ()
go = ConduitT () Message m () -> ConduitT () Message m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ConduitT () Message m () -> ConduitT () Message m ())
-> ConduitT () Message m () -> ConduitT () Message m ()
forall a b. (a -> b) -> a -> b
$ do
      $(logDebugS) Text
"Peer" (Text -> ConduitT () Message m ())
-> Text -> ConduitT () Message m ()
forall a b. (a -> b) -> a -> b
$ Text
label Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" awaiting event..."
      PeerMessage
msg <- Inbox PeerMessage -> ConduitT () Message m PeerMessage
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox PeerMessage
inbox
      PeerConfig -> PeerMessage -> ConduitT () Message m ()
forall (m :: * -> *) i.
MonadLoggerIO m =>
PeerConfig -> PeerMessage -> ConduitT i Message m ()
dispatchMessage PeerConfig
cfg PeerMessage
msg
    peer_session :: Peer -> Conduits -> m ()
peer_session Peer
p Conduits
ad = do
      let ins :: ConduitT () ByteString m ()
ins = (forall a. IO a -> m a)
-> ConduitT () ByteString IO () -> ConduitT () ByteString m ()
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe IO a -> m a
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO Conduits
ad.inboundConduit
          ons :: ConduitT ByteString Void m ()
ons = (forall a. IO a -> m a)
-> ConduitT ByteString Void IO () -> ConduitT ByteString Void m ()
forall (m :: * -> *) (n :: * -> *) i o r.
Monad m =>
(forall a. m a -> n a) -> ConduitT i o m r -> ConduitT i o n r
transPipe IO a -> m a
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO Conduits
ad.outboundConduit
          src :: m ()
src =
            ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Void m () -> m ()) -> ConduitT () Void m () -> m ()
forall a b. (a -> b) -> a -> b
$
              ConduitT () ByteString m ()
ins
                ConduitT () ByteString m ()
-> ConduitT ByteString Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| Network -> PeerConfig -> Text -> ConduitT ByteString Message m ()
forall (m :: * -> *).
MonadLoggerIO m =>
Network -> PeerConfig -> Text -> ConduitT ByteString Message m ()
inPeerConduit Network
net PeerConfig
cfg Text
label
                ConduitT ByteString Message m ()
-> ConduitT Message Void m () -> ConduitT ByteString Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| (Message -> m ()) -> ConduitT Message Void m ()
forall (m :: * -> *) a o.
Monad m =>
(a -> m ()) -> ConduitT a o m ()
mapM_C (Peer -> Message -> m ()
send_msg Peer
p)
          snk :: ConduitT Message Void m ()
snk = Network -> ConduitT Message ByteString m ()
forall (m :: * -> *).
Monad m =>
Network -> ConduitT Message ByteString m ()
outPeerConduit Network
net ConduitT Message ByteString m ()
-> ConduitT ByteString Void m () -> ConduitT Message Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Void m ()
ons
      m () -> (Async () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m ()
src ((Async () -> m ()) -> m ()) -> (Async () -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Async ()
as -> do
        Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
as
        ConduitT () Void m () -> m ()
forall (m :: * -> *) r. Monad m => ConduitT () Void m r -> m r
runConduit (ConduitT () Message m ()
go ConduitT () Message m ()
-> ConduitT Message Void m () -> ConduitT () Void m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT Message Void m ()
snk)
    send_msg :: Peer -> Message -> m ()
send_msg Peer
p Message
msg = PeerEvent -> Publisher PeerEvent -> m ()
forall (m :: * -> *) msg. MonadIO m => msg -> Publisher msg -> m ()
publish (Peer -> Message -> PeerEvent
PeerMessage Peer
p Message
msg) Publisher PeerEvent
pub

-- | Internal function to dispatch peer messages.
dispatchMessage ::
  (MonadLoggerIO m) =>
  PeerConfig ->
  PeerMessage ->
  ConduitT i Message m ()
dispatchMessage :: forall (m :: * -> *) i.
MonadLoggerIO m =>
PeerConfig -> PeerMessage -> ConduitT i Message m ()
dispatchMessage PeerConfig {Text
$sel:label:PeerConfig :: PeerConfig -> Text
label :: Text
label} (SendMessage Message
msg) = do
  $(logDebugS) Text
"Peer" (Text -> ConduitT i Message m ())
-> Text -> ConduitT i Message m ()
forall a b. (a -> b) -> a -> b
$ Text
label Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" sending: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (Message -> String
forall a. Show a => a -> String
show Message
msg)
  Message -> ConduitT i Message m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield Message
msg
dispatchMessage PeerConfig {Text
$sel:label:PeerConfig :: PeerConfig -> Text
label :: Text
label} (KillPeer PeerException
e) = do
  $(logInfoS) Text
"Peer" (Text -> ConduitT i Message m ())
-> Text -> ConduitT i Message m ()
forall a b. (a -> b) -> a -> b
$ Text
label Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" killing with error: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (PeerException -> String
forall a. Show a => a -> String
show PeerException
e)
  PeerException -> ConduitT i Message m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO PeerException
e

-- | Internal conduit to parse messages coming from peer.
inPeerConduit ::
  (MonadLoggerIO m) =>
  Network ->
  PeerConfig ->
  Text ->
  ConduitT ByteString Message m ()
inPeerConduit :: forall (m :: * -> *).
MonadLoggerIO m =>
Network -> PeerConfig -> Text -> ConduitT ByteString Message m ()
inPeerConduit Network
net PeerConfig {Text
$sel:label:PeerConfig :: PeerConfig -> Text
label :: Text
label} Text
a =
  ConduitT ByteString Message m ()
-> ConduitT ByteString Message m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ConduitT ByteString Message m ()
 -> ConduitT ByteString Message m ())
-> ConduitT ByteString Message m ()
-> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ do
    $(logDebugS) Text
"Peer" (Text -> ConduitT ByteString Message m ())
-> Text -> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ Text
label Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" awaiting network message..."
    ByteString
x <- Index ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
Index seq -> ConduitT seq seq m ()
takeCE Int
Index ByteString
24 ConduitT ByteString ByteString m ()
-> ConduitT ByteString Message m ByteString
-> ConduitT ByteString Message m ByteString
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Message m ByteString
forall (m :: * -> *) a o. (Monad m, Monoid a) => ConduitT a o m a
foldC
    Bool
-> ConduitT ByteString Message m ()
-> ConduitT ByteString Message m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ByteString -> Bool
B.null ByteString
x) (ConduitT ByteString Message m ()
 -> ConduitT ByteString Message m ())
-> ConduitT ByteString Message m ()
-> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ do
      $(logErrorS) Text
"Peer" (Text -> ConduitT ByteString Message m ())
-> Text -> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ Text
label Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" empty header"
      PeerException -> ConduitT ByteString Message m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO PeerException
EmptyHeader
    case ByteString -> Either String MessageHeader
forall a. Serialize a => ByteString -> Either String a
decode ByteString
x of
      Left String
e -> do
        $(logErrorS) Text
"Peer" (Text -> ConduitT ByteString Message m ())
-> Text -> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ Text
label Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" error decoding header"
        PeerException -> ConduitT ByteString Message m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO PeerException
DecodeHeaderError
      Right (MessageHeader Word32
_ MessageCommand
cmd Word32
len CheckSum32
_) -> do
        $(logDebugS) Text
"Peer" (Text -> ConduitT ByteString Message m ())
-> Text -> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ Text
label Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" received: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (MessageCommand -> String
forall a. Show a => a -> String
show MessageCommand
cmd)
        Bool
-> ConduitT ByteString Message m ()
-> ConduitT ByteString Message m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Word32
len Word32 -> Word32 -> Bool
forall a. Ord a => a -> a -> Bool
> Word32
32 Word32 -> Word32 -> Word32
forall a. Num a => a -> a -> a
* Word32
2 Word32 -> Int -> Word32
forall a b. (Num a, Integral b) => a -> b -> a
^ (Int
20 :: Int)) (ConduitT ByteString Message m ()
 -> ConduitT ByteString Message m ())
-> ConduitT ByteString Message m ()
-> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ do
          $(logErrorS) Text
"Peer" (Text -> ConduitT ByteString Message m ())
-> Text -> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ Text
label Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" payload too large: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (Word32 -> String
forall a. Show a => a -> String
show Word32
len)
          PeerException -> ConduitT ByteString Message m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (PeerException -> ConduitT ByteString Message m ())
-> PeerException -> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ Word32 -> PeerException
PayloadTooLarge Word32
len
        ByteString
y <- Index ByteString -> ConduitT ByteString ByteString m ()
forall (m :: * -> *) seq.
(Monad m, IsSequence seq) =>
Index seq -> ConduitT seq seq m ()
takeCE (Word32 -> Int
forall a b. (Integral a, Num b) => a -> b
fromIntegral Word32
len) ConduitT ByteString ByteString m ()
-> ConduitT ByteString Message m ByteString
-> ConduitT ByteString Message m ByteString
forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.| ConduitT ByteString Message m ByteString
forall (m :: * -> *) a o. (Monad m, Monoid a) => ConduitT a o m a
foldC
        case Get Message -> ByteString -> Either String Message
forall a. Get a -> ByteString -> Either String a
runGet (Network -> Get Message
forall (m :: * -> *). MonadGet m => Network -> m Message
getMessage Network
net) (ByteString -> Either String Message)
-> ByteString -> Either String Message
forall a b. (a -> b) -> a -> b
$ ByteString
x ByteString -> ByteString -> ByteString
`B.append` ByteString
y of
          Left String
e -> do
            $(logErrorS) Text
"Peer" (Text -> ConduitT ByteString Message m ())
-> Text -> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$
              Text
label
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" could not decode payload for cmd: "
                Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (MessageCommand -> String
forall a. Show a => a -> String
show MessageCommand
cmd)
            PeerException -> ConduitT ByteString Message m ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO (MessageCommand -> PeerException
CannotDecodePayload MessageCommand
cmd)
          Right Message
msg -> do
            $(logDebugS) Text
"Peer" (Text -> ConduitT ByteString Message m ())
-> Text -> ConduitT ByteString Message m ()
forall a b. (a -> b) -> a -> b
$ Text
label Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> Text
" forwarding: " Text -> Text -> Text
forall a. Semigroup a => a -> a -> a
<> String -> Text
forall a b. ConvertibleStrings a b => a -> b
cs (Message -> String
forall a. Show a => a -> String
show Message
msg)
            Message -> ConduitT ByteString Message m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield Message
msg

-- | Outgoing peer conduit to serialize and send messages.
outPeerConduit :: (Monad m) => Network -> ConduitT Message ByteString m ()
outPeerConduit :: forall (m :: * -> *).
Monad m =>
Network -> ConduitT Message ByteString m ()
outPeerConduit Network
net = (Message -> ConduitT Message ByteString m ())
-> ConduitT Message ByteString m ()
forall (m :: * -> *) i o r.
Monad m =>
(i -> ConduitT i o m r) -> ConduitT i o m ()
awaitForever ((Message -> ConduitT Message ByteString m ())
 -> ConduitT Message ByteString m ())
-> (Message -> ConduitT Message ByteString m ())
-> ConduitT Message ByteString m ()
forall a b. (a -> b) -> a -> b
$ ByteString -> ConduitT Message ByteString m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield (ByteString -> ConduitT Message ByteString m ())
-> (Message -> ByteString)
-> Message
-> ConduitT Message ByteString m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Put -> ByteString
runPut (Put -> ByteString) -> (Message -> Put) -> Message -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Network -> Message -> Put
forall (m :: * -> *). MonadPut m => Network -> Message -> m ()
putMessage Network
net

-- | Kill a peer with the provided exception.
killPeer :: (MonadIO m) => PeerException -> Peer -> m ()
killPeer :: forall (m :: * -> *). MonadIO m => PeerException -> Peer -> m ()
killPeer PeerException
e Peer
p = PeerException -> PeerMessage
KillPeer PeerException
e PeerMessage -> Mailbox PeerMessage -> m ()
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, OutChan mbox) =>
msg -> mbox msg -> m ()
`send` Peer
p.mailbox

-- | Send a network message to peer.
sendMessage :: (MonadIO m) => Message -> Peer -> m ()
sendMessage :: forall (m :: * -> *). MonadIO m => Message -> Peer -> m ()
sendMessage Message
msg Peer
p = Message -> PeerMessage
SendMessage Message
msg PeerMessage -> Mailbox PeerMessage -> m ()
forall (m :: * -> *) (mbox :: * -> *) msg.
(MonadIO m, OutChan mbox) =>
msg -> mbox msg -> m ()
`send` Peer
p.mailbox

getBusy :: (MonadIO m) => Peer -> m Bool
getBusy :: forall (m :: * -> *). MonadIO m => Peer -> m Bool
getBusy Peer
p = TVar Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO Peer
p.busy

setBusy :: (MonadIO m) => Peer -> m Bool
setBusy :: forall (m :: * -> *). MonadIO m => Peer -> m Bool
setBusy Peer
p =
  STM Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> m Bool) -> STM Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
    Bool
b <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar Peer
p.busy
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
b (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Peer
p.busy Bool
True
    Bool -> STM Bool
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> STM Bool) -> Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not Bool
b

setFree :: (MonadIO m) => Peer -> m ()
setFree :: forall (m :: * -> *). MonadIO m => Peer -> m ()
setFree Peer
p = STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> STM () -> m ()
forall a b. (a -> b) -> a -> b
$ TVar Bool -> Bool -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar Peer
p.busy Bool
False

-- | Request full blocks from peer. Will return 'Nothing' if the list of blocks
-- returned by the peer is incomplete, comes out of order, or a timeout is
-- reached.
getBlocks ::
  (MonadUnliftIO m) =>
  Network ->
  Int ->
  Peer ->
  [BlockHash] ->
  m (Maybe [Block])
getBlocks :: forall (m :: * -> *).
MonadUnliftIO m =>
Network -> Int -> Peer -> [BlockHash] -> m (Maybe [Block])
getBlocks Network
net Int
time Peer
p [BlockHash]
bhs =
  MaybeT m [Block] -> m (Maybe [Block])
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT m [Block] -> m (Maybe [Block]))
-> MaybeT m [Block] -> m (Maybe [Block])
forall a b. (a -> b) -> a -> b
$ (Either Tx Block -> MaybeT m Block)
-> [Either Tx Block] -> MaybeT m [Block]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM Either Tx Block -> MaybeT m Block
forall {m :: * -> *} {a} {a}. Monad m => Either a a -> MaybeT m a
f ([Either Tx Block] -> MaybeT m [Block])
-> MaybeT m [Either Tx Block] -> MaybeT m [Block]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block]
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (Int -> Peer -> GetData -> m (Maybe [Either Tx Block])
forall (m :: * -> *).
MonadUnliftIO m =>
Int -> Peer -> GetData -> m (Maybe [Either Tx Block])
getData Int
time Peer
p ([InvVector] -> GetData
GetData [InvVector]
ivs))
  where
    f :: Either a a -> MaybeT m a
f (Right a
b) = a -> MaybeT m a
forall a. a -> MaybeT m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
b
    f (Left a
_) = m (Maybe a) -> MaybeT m a
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (m (Maybe a) -> MaybeT m a) -> m (Maybe a) -> MaybeT m a
forall a b. (a -> b) -> a -> b
$ Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    c :: InvType
c
      | Network
net.segWit = InvType
InvWitnessBlock
      | Bool
otherwise = InvType
InvBlock
    ivs :: [InvVector]
ivs = (BlockHash -> InvVector) -> [BlockHash] -> [InvVector]
forall a b. (a -> b) -> [a] -> [b]
map (InvType -> Hash256 -> InvVector
InvVector InvType
c (Hash256 -> InvVector)
-> (BlockHash -> Hash256) -> BlockHash -> InvVector
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.get)) [BlockHash]
bhs

-- | Request transactions from peer. Will return 'Nothing' if the list of
-- transactions returned by the peer is incomplete, comes out of order, or a
-- timeout is reached.
getTxs ::
  (MonadUnliftIO m) =>
  Network ->
  Int ->
  Peer ->
  [TxHash] ->
  m (Maybe [Tx])
getTxs :: forall (m :: * -> *).
MonadUnliftIO m =>
Network -> Int -> Peer -> [TxHash] -> m (Maybe [Tx])
getTxs Network
net Int
time Peer
p [TxHash]
ths =
  MaybeT m [Tx] -> m (Maybe [Tx])
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT (MaybeT m [Tx] -> m (Maybe [Tx]))
-> MaybeT m [Tx] -> m (Maybe [Tx])
forall a b. (a -> b) -> a -> b
$ (Either Tx Block -> MaybeT m Tx)
-> [Either Tx Block] -> MaybeT m [Tx]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM Either Tx Block -> MaybeT m Tx
forall {m :: * -> *} {a} {b}. Monad m => Either a b -> MaybeT m a
f ([Either Tx Block] -> MaybeT m [Tx])
-> MaybeT m [Either Tx Block] -> MaybeT m [Tx]
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block]
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (Int -> Peer -> GetData -> m (Maybe [Either Tx Block])
forall (m :: * -> *).
MonadUnliftIO m =>
Int -> Peer -> GetData -> m (Maybe [Either Tx Block])
getData Int
time Peer
p ([InvVector] -> GetData
GetData [InvVector]
ivs))
  where
    f :: Either a b -> MaybeT m a
f (Right b
_) = m (Maybe a) -> MaybeT m a
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (m (Maybe a) -> MaybeT m a) -> m (Maybe a) -> MaybeT m a
forall a b. (a -> b) -> a -> b
$ Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
    f (Left a
t) = a -> MaybeT m a
forall a. a -> MaybeT m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
t
    c :: InvType
c
      | Network
net.segWit = InvType
InvWitnessTx
      | Bool
otherwise = InvType
InvTx
    ivs :: [InvVector]
ivs = (TxHash -> InvVector) -> [TxHash] -> [InvVector]
forall a b. (a -> b) -> [a] -> [b]
map (InvType -> Hash256 -> InvVector
InvVector InvType
c (Hash256 -> InvVector)
-> (TxHash -> Hash256) -> TxHash -> InvVector
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (.get)) [TxHash]
ths

-- | Request transactions and/or blocks from peer. Return 'Nothing' if any
-- single inventory fails to be retrieved, if they come out of order, or if
-- timeout is reached.
getData ::
  (MonadUnliftIO m) => Int -> Peer -> GetData -> m (Maybe [Either Tx Block])
getData :: forall (m :: * -> *).
MonadUnliftIO m =>
Int -> Peer -> GetData -> m (Maybe [Either Tx Block])
getData Int
seconds Peer
p gd :: GetData
gd@(GetData [InvVector]
ivs) =
  Publisher PeerEvent
-> (Inbox PeerEvent -> m (Maybe [Either Tx Block]))
-> m (Maybe [Either Tx Block])
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Peer
p.pub ((Inbox PeerEvent -> m (Maybe [Either Tx Block]))
 -> m (Maybe [Either Tx Block]))
-> (Inbox PeerEvent -> m (Maybe [Either Tx Block]))
-> m (Maybe [Either Tx Block])
forall a b. (a -> b) -> a -> b
$ \Inbox PeerEvent
inb -> do
    Word64
r <- IO Word64 -> m Word64
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Word64
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
    GetData -> Message
MGetData GetData
gd Message -> Peer -> m ()
forall (m :: * -> *). MonadIO m => Message -> Peer -> m ()
`sendMessage` Peer
p
    Ping -> Message
MPing (Word64 -> Ping
Ping Word64
r) Message -> Peer -> m ()
forall (m :: * -> *). MonadIO m => Message -> Peer -> m ()
`sendMessage` Peer
p
    (Maybe (Maybe [Either Tx Block]) -> Maybe [Either Tx Block])
-> m (Maybe (Maybe [Either Tx Block]))
-> m (Maybe [Either Tx Block])
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Maybe [Either Tx Block]) -> Maybe [Either Tx Block]
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join
      (m (Maybe (Maybe [Either Tx Block]))
 -> m (Maybe [Either Tx Block]))
-> (MaybeT m [Either Tx Block]
    -> m (Maybe (Maybe [Either Tx Block])))
-> MaybeT m [Either Tx Block]
-> m (Maybe [Either Tx Block])
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int
-> m (Maybe [Either Tx Block])
-> m (Maybe (Maybe [Either Tx Block]))
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Maybe a)
timeout (Int
seconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
      (m (Maybe [Either Tx Block])
 -> m (Maybe (Maybe [Either Tx Block])))
-> (MaybeT m [Either Tx Block] -> m (Maybe [Either Tx Block]))
-> MaybeT m [Either Tx Block]
-> m (Maybe (Maybe [Either Tx Block]))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MaybeT m [Either Tx Block] -> m (Maybe [Either Tx Block])
forall (m :: * -> *) a. MaybeT m a -> m (Maybe a)
runMaybeT
      (MaybeT m [Either Tx Block] -> m (Maybe [Either Tx Block]))
-> MaybeT m [Either Tx Block] -> m (Maybe [Either Tx Block])
forall a b. (a -> b) -> a -> b
$ Inbox PeerEvent
-> Word64
-> [Either Tx Block]
-> [InvVector]
-> MaybeT m [Either Tx Block]
get_thing Inbox PeerEvent
inb Word64
r [] [InvVector]
ivs
  where
    get_thing :: Inbox PeerEvent
-> Word64
-> [Either Tx Block]
-> [InvVector]
-> MaybeT m [Either Tx Block]
get_thing Inbox PeerEvent
_inb Word64
_r [Either Tx Block]
acc [] =
      [Either Tx Block] -> MaybeT m [Either Tx Block]
forall a. a -> MaybeT m a
forall (m :: * -> *) a. Monad m => a -> m a
return ([Either Tx Block] -> MaybeT m [Either Tx Block])
-> [Either Tx Block] -> MaybeT m [Either Tx Block]
forall a b. (a -> b) -> a -> b
$ [Either Tx Block] -> [Either Tx Block]
forall a. [a] -> [a]
reverse [Either Tx Block]
acc
    get_thing Inbox PeerEvent
inb Word64
r [Either Tx Block]
acc hss :: [InvVector]
hss@(InvVector InvType
t Hash256
h : [InvVector]
hs) =
      Peer -> Inbox PeerEvent -> MaybeT m Message
forall (m :: * -> *).
MonadIO m =>
Peer -> Inbox PeerEvent -> m Message
filterReceive Peer
p Inbox PeerEvent
inb MaybeT m Message
-> (Message -> MaybeT m [Either Tx Block])
-> MaybeT m [Either Tx Block]
forall a b. MaybeT m a -> (a -> MaybeT m b) -> MaybeT m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        MTx Tx
tx
          | InvType -> Bool
is_tx InvType
t Bool -> Bool -> Bool
&& (Tx -> TxHash
txHash Tx
tx).get Hash256 -> Hash256 -> Bool
forall a. Eq a => a -> a -> Bool
== Hash256
h ->
              Inbox PeerEvent
-> Word64
-> [Either Tx Block]
-> [InvVector]
-> MaybeT m [Either Tx Block]
get_thing Inbox PeerEvent
inb Word64
r (Tx -> Either Tx Block
forall a b. a -> Either a b
Left Tx
tx Either Tx Block -> [Either Tx Block] -> [Either Tx Block]
forall a. a -> [a] -> [a]
: [Either Tx Block]
acc) [InvVector]
hs
        MBlock b :: Block
b@(Block BlockHeader
bh [Tx]
_)
          | InvType -> Bool
is_block InvType
t Bool -> Bool -> Bool
&& (BlockHeader -> BlockHash
headerHash BlockHeader
bh).get Hash256 -> Hash256 -> Bool
forall a. Eq a => a -> a -> Bool
== Hash256
h ->
              Inbox PeerEvent
-> Word64
-> [Either Tx Block]
-> [InvVector]
-> MaybeT m [Either Tx Block]
get_thing Inbox PeerEvent
inb Word64
r (Block -> Either Tx Block
forall a b. b -> Either a b
Right Block
b Either Tx Block -> [Either Tx Block] -> [Either Tx Block]
forall a. a -> [a] -> [a]
: [Either Tx Block]
acc) [InvVector]
hs
        MNotFound (NotFound [InvVector]
nvs)
          | Bool -> Bool
not ([InvVector] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([InvVector]
nvs [InvVector] -> [InvVector] -> [InvVector]
forall a. Eq a => [a] -> [a] -> [a]
`union` [InvVector]
hs)) ->
              m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block]
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block])
-> m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block]
forall a b. (a -> b) -> a -> b
$ Maybe [Either Tx Block] -> m (Maybe [Either Tx Block])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe [Either Tx Block]
forall a. Maybe a
Nothing
        MPong (Pong Word64
r')
          | Word64
r Word64 -> Word64 -> Bool
forall a. Eq a => a -> a -> Bool
== Word64
r' ->
              m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block]
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block])
-> m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block]
forall a b. (a -> b) -> a -> b
$ Maybe [Either Tx Block] -> m (Maybe [Either Tx Block])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe [Either Tx Block]
forall a. Maybe a
Nothing
        Message
_
          | [Either Tx Block] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [Either Tx Block]
acc ->
              Inbox PeerEvent
-> Word64
-> [Either Tx Block]
-> [InvVector]
-> MaybeT m [Either Tx Block]
get_thing Inbox PeerEvent
inb Word64
r [Either Tx Block]
acc [InvVector]
hss
          | Bool
otherwise ->
              m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block]
forall (m :: * -> *) a. m (Maybe a) -> MaybeT m a
MaybeT (m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block])
-> m (Maybe [Either Tx Block]) -> MaybeT m [Either Tx Block]
forall a b. (a -> b) -> a -> b
$ Maybe [Either Tx Block] -> m (Maybe [Either Tx Block])
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe [Either Tx Block]
forall a. Maybe a
Nothing
    is_tx :: InvType -> Bool
is_tx InvType
InvWitnessTx = Bool
True
    is_tx InvType
InvTx = Bool
True
    is_tx InvType
_ = Bool
False
    is_block :: InvType -> Bool
is_block InvType
InvWitnessBlock = Bool
True
    is_block InvType
InvBlock = Bool
True
    is_block InvType
_ = Bool
False

-- | Ping a peer and await response. Return 'False' if response not received
-- before timeout.
pingPeer :: (MonadUnliftIO m) => Int -> Peer -> m Bool
pingPeer :: forall (m :: * -> *). MonadUnliftIO m => Int -> Peer -> m Bool
pingPeer Int
time Peer
p =
  (Maybe () -> Bool) -> m (Maybe ()) -> m Bool
forall a b. (a -> b) -> m a -> m b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe () -> Bool
forall a. Maybe a -> Bool
isJust (m (Maybe ()) -> m Bool)
-> ((Inbox PeerEvent -> m (Maybe ())) -> m (Maybe ()))
-> (Inbox PeerEvent -> m (Maybe ()))
-> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Publisher PeerEvent
-> (Inbox PeerEvent -> m (Maybe ())) -> m (Maybe ())
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Peer
p.pub ((Inbox PeerEvent -> m (Maybe ())) -> m Bool)
-> (Inbox PeerEvent -> m (Maybe ())) -> m Bool
forall a b. (a -> b) -> a -> b
$ \Inbox PeerEvent
sub -> do
    Word64
r <- IO Word64 -> m Word64
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO Word64
forall a (m :: * -> *). (Random a, MonadIO m) => m a
randomIO
    Ping -> Message
MPing (Word64 -> Ping
Ping Word64
r) Message -> Peer -> m ()
forall (m :: * -> *). MonadIO m => Message -> Peer -> m ()
`sendMessage` Peer
p
    Int -> Inbox PeerEvent -> (PeerEvent -> Maybe ()) -> m (Maybe ())
forall (m :: * -> *) (mbox :: * -> *) msg a.
(MonadUnliftIO m, InChan mbox) =>
Int -> mbox msg -> (msg -> Maybe a) -> m (Maybe a)
receiveMatchS Int
time Inbox PeerEvent
sub ((PeerEvent -> Maybe ()) -> m (Maybe ()))
-> (PeerEvent -> Maybe ()) -> m (Maybe ())
forall a b. (a -> b) -> a -> b
$ \case
      PeerMessage Peer
p' (MPong (Pong Word64
r'))
        | Peer
p Peer -> Peer -> Bool
forall a. Eq a => a -> a -> Bool
== Peer
p' Bool -> Bool -> Bool
&& Word64
r Word64 -> Word64 -> Bool
forall a. Eq a => a -> a -> Bool
== Word64
r' -> () -> Maybe ()
forall a. a -> Maybe a
Just ()
      PeerEvent
_ -> Maybe ()
forall a. Maybe a
Nothing

filterReceive :: (MonadIO m) => Peer -> Inbox PeerEvent -> m Message
filterReceive :: forall (m :: * -> *).
MonadIO m =>
Peer -> Inbox PeerEvent -> m Message
filterReceive Peer
p Inbox PeerEvent
inb =
  Inbox PeerEvent -> m PeerEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox PeerEvent
inb m PeerEvent -> (PeerEvent -> m Message) -> m Message
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    PeerMessage Peer
p' Message
msg | Peer
p Peer -> Peer -> Bool
forall a. Eq a => a -> a -> Bool
== Peer
p' -> Message -> m Message
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Message
msg
    PeerEvent
_ -> Peer -> Inbox PeerEvent -> m Message
forall (m :: * -> *).
MonadIO m =>
Peer -> Inbox PeerEvent -> m Message
filterReceive Peer
p Inbox PeerEvent
inb