{-# LANGUAGE CPP #-}
module Erebos.Network (
Server,
startServer,
stopServer,
getNextPeerChange,
ServerOptions(..), serverIdentity, defaultServerOptions,
Peer, peerServer, peerStorage,
PeerAddress(..), peerAddress,
PeerIdentity(..), peerIdentity,
WaitingRef, wrDigest,
Service(..),
serverPeer,
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce,
#endif
sendToPeer, sendToPeerStored, sendToPeerWith,
runPeerService,
discoveryPort,
) where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Control.Monad.Except
import Control.Monad.Reader
import Control.Monad.State
import qualified Data.ByteString.Char8 as BC
import Data.Function
import Data.IP qualified as IP
import Data.List
import Data.Map (Map)
import qualified Data.Map as M
import Data.Maybe
import Data.Typeable
import Data.Word
import Foreign.Ptr
import Foreign.Storable
import GHC.Conc.Sync (unsafeIOToSTM)
import Network.Socket hiding (ControlMessage)
import qualified Network.Socket.ByteString as S
import Erebos.Channel
#ifdef ENABLE_ICE_SUPPORT
import Erebos.ICE
#endif
import Erebos.Identity
import Erebos.Network.Protocol
import Erebos.PubKey
import Erebos.Service
import Erebos.State
import Erebos.Storage
import Erebos.Storage.Key
import Erebos.Storage.Merge
discoveryPort :: PortNumber
discoveryPort :: PortNumber
discoveryPort = PortNumber
29665
announceIntervalSeconds :: Int
announceIntervalSeconds :: Int
announceIntervalSeconds = Int
60
data Server = Server
{ Server -> Storage
serverStorage :: Storage
, Server -> Head LocalState
serverOrigHead :: Head LocalState
, Server -> MVar UnifiedIdentity
serverIdentity_ :: MVar UnifiedIdentity
, Server -> MVar [ThreadId]
serverThreads :: MVar [ThreadId]
, Server -> MVar Socket
serverSocket :: MVar Socket
, Server -> SymFlow (PeerAddress, ByteString)
serverRawPath :: SymFlow (PeerAddress, BC.ByteString)
, Server
-> Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
, Server -> TQueue (Peer, Maybe PartialRef)
serverDataResponse :: TQueue (Peer, Maybe PartialRef)
, Server -> TQueue (ExceptT String IO ())
serverIOActions :: TQueue (ExceptT String IO ())
, Server -> [SomeService]
serverServices :: [SomeService]
, Server -> TMVar (Map ServiceID SomeServiceGlobalState)
serverServiceStates :: TMVar (M.Map ServiceID SomeServiceGlobalState)
, Server -> MVar (Map PeerAddress Peer)
serverPeers :: MVar (Map PeerAddress Peer)
, Server -> TChan Peer
serverChanPeer :: TChan Peer
, Server -> TQueue String
serverErrorLog :: TQueue String
}
serverIdentity :: Server -> IO UnifiedIdentity
serverIdentity :: Server -> IO UnifiedIdentity
serverIdentity = MVar UnifiedIdentity -> IO UnifiedIdentity
forall a. MVar a -> IO a
readMVar (MVar UnifiedIdentity -> IO UnifiedIdentity)
-> (Server -> MVar UnifiedIdentity) -> Server -> IO UnifiedIdentity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Server -> MVar UnifiedIdentity
serverIdentity_
getNextPeerChange :: Server -> IO Peer
getNextPeerChange :: Server -> IO Peer
getNextPeerChange = STM Peer -> IO Peer
forall a. STM a -> IO a
atomically (STM Peer -> IO Peer) -> (Server -> STM Peer) -> Server -> IO Peer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TChan Peer -> STM Peer
forall a. TChan a -> STM a
readTChan (TChan Peer -> STM Peer)
-> (Server -> TChan Peer) -> Server -> STM Peer
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Server -> TChan Peer
serverChanPeer
data ServerOptions = ServerOptions
{ ServerOptions -> PortNumber
serverPort :: PortNumber
, ServerOptions -> Bool
serverLocalDiscovery :: Bool
}
defaultServerOptions :: ServerOptions
defaultServerOptions :: ServerOptions
defaultServerOptions = ServerOptions
{ serverPort :: PortNumber
serverPort = PortNumber
discoveryPort
, serverLocalDiscovery :: Bool
serverLocalDiscovery = Bool
True
}
data Peer = Peer
{ Peer -> PeerAddress
peerAddress :: PeerAddress
, Peer -> Server
peerServer_ :: Server
, Peer
-> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerConnection :: TVar (Either [(Bool, TransportPacket Ref, [TransportHeaderItem])] (Connection PeerAddress))
, Peer -> TVar PeerIdentity
peerIdentityVar :: TVar PeerIdentity
, Peer -> Storage
peerStorage_ :: Storage
, Peer -> PartialStorage
peerInStorage :: PartialStorage
, Peer -> TMVar (Map ServiceID SomeServiceState)
peerServiceState :: TMVar (M.Map ServiceID SomeServiceState)
, Peer -> TMVar [WaitingRef]
peerWaitingRefs :: TMVar [WaitingRef]
}
peerServer :: Peer -> Server
peerServer :: Peer -> Server
peerServer = Peer -> Server
peerServer_
peerStorage :: Peer -> Storage
peerStorage :: Peer -> Storage
peerStorage = Peer -> Storage
peerStorage_
getPeerChannel :: Peer -> STM ChannelState
getPeerChannel :: Peer -> STM ChannelState
getPeerChannel Peer {TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
TVar PeerIdentity
TMVar [WaitingRef]
TMVar (Map ServiceID SomeServiceState)
PartialStorage
Storage
Server
PeerAddress
peerAddress :: Peer -> PeerAddress
peerServer_ :: Peer -> Server
peerConnection :: Peer
-> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: Peer -> TVar PeerIdentity
peerStorage_ :: Peer -> Storage
peerInStorage :: Peer -> PartialStorage
peerServiceState :: Peer -> TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: Peer -> TMVar [WaitingRef]
peerAddress :: PeerAddress
peerServer_ :: Server
peerConnection :: TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: TVar PeerIdentity
peerStorage_ :: Storage
peerInStorage :: PartialStorage
peerServiceState :: TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: TMVar [WaitingRef]
..} = ([(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> STM ChannelState)
-> (Connection PeerAddress -> STM ChannelState)
-> Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> STM ChannelState
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (STM ChannelState
-> [(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> STM ChannelState
forall a b. a -> b -> a
const (STM ChannelState
-> [(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> STM ChannelState)
-> STM ChannelState
-> [(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> STM ChannelState
forall a b. (a -> b) -> a -> b
$ ChannelState -> STM ChannelState
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ChannelState
ChannelNone) Connection PeerAddress -> STM ChannelState
forall addr. Connection addr -> STM ChannelState
connGetChannel (Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> STM ChannelState)
-> STM
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> STM ChannelState
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> STM
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
forall a. TVar a -> STM a
readTVar TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerConnection
setPeerChannel :: Peer -> ChannelState -> STM ()
setPeerChannel :: Peer -> ChannelState -> STM ()
setPeerChannel Peer {TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
TVar PeerIdentity
TMVar [WaitingRef]
TMVar (Map ServiceID SomeServiceState)
PartialStorage
Storage
Server
PeerAddress
peerAddress :: Peer -> PeerAddress
peerServer_ :: Peer -> Server
peerConnection :: Peer
-> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: Peer -> TVar PeerIdentity
peerStorage_ :: Peer -> Storage
peerInStorage :: Peer -> PartialStorage
peerServiceState :: Peer -> TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: Peer -> TMVar [WaitingRef]
peerAddress :: PeerAddress
peerServer_ :: Server
peerConnection :: TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: TVar PeerIdentity
peerStorage_ :: Storage
peerInStorage :: PartialStorage
peerServiceState :: TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: TMVar [WaitingRef]
..} ChannelState
ch = do
TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> STM
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
forall a. TVar a -> STM a
readTVar TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerConnection STM
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> (Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> STM ())
-> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left [(Bool, TransportPacket Ref, [TransportHeaderItem])]
_ -> STM ()
forall a. STM a
retry
Right Connection PeerAddress
conn -> Connection PeerAddress -> ChannelState -> STM ()
forall addr. Connection addr -> ChannelState -> STM ()
connSetChannel Connection PeerAddress
conn ChannelState
ch
instance Eq Peer where
== :: Peer -> Peer -> Bool
(==) = TVar PeerIdentity -> TVar PeerIdentity -> Bool
forall a. Eq a => a -> a -> Bool
(==) (TVar PeerIdentity -> TVar PeerIdentity -> Bool)
-> (Peer -> TVar PeerIdentity) -> Peer -> Peer -> Bool
forall b c a. (b -> b -> c) -> (a -> b) -> a -> a -> c
`on` Peer -> TVar PeerIdentity
peerIdentityVar
data PeerAddress = DatagramAddress SockAddr
#ifdef ENABLE_ICE_SUPPORT
| PeerIceSession IceSession
#endif
instance Show PeerAddress where
show :: PeerAddress -> String
show (DatagramAddress SockAddr
saddr) = [String] -> String
unwords ([String] -> String) -> [String] -> String
forall a b. (a -> b) -> a -> b
$ case SockAddr -> Maybe (IP, PortNumber)
IP.fromSockAddr SockAddr
saddr of
Just (IP.IPv6 IPv6
ipv6, PortNumber
port)
| (Word32
0, Word32
0, Word32
0xffff, Word32
ipv4) <- IPv6 -> (Word32, Word32, Word32, Word32)
IP.fromIPv6w IPv6
ipv6
-> [IPv4 -> String
forall a. Show a => a -> String
show (Word32 -> IPv4
IP.toIPv4w Word32
ipv4), PortNumber -> String
forall a. Show a => a -> String
show PortNumber
port]
Just (IP
addr, PortNumber
port)
-> [IP -> String
forall a. Show a => a -> String
show IP
addr, PortNumber -> String
forall a. Show a => a -> String
show PortNumber
port]
Maybe (IP, PortNumber)
_ -> [SockAddr -> String
forall a. Show a => a -> String
show SockAddr
saddr]
#ifdef ENABLE_ICE_SUPPORT
show (PeerIceSession ice) = show ice
#endif
instance Eq PeerAddress where
DatagramAddress SockAddr
addr == :: PeerAddress -> PeerAddress -> Bool
== DatagramAddress SockAddr
addr' = SockAddr
addr SockAddr -> SockAddr -> Bool
forall a. Eq a => a -> a -> Bool
== SockAddr
addr'
#ifdef ENABLE_ICE_SUPPORT
PeerIceSession ice == PeerIceSession ice' = ice == ice'
_ == _ = False
#endif
instance Ord PeerAddress where
compare :: PeerAddress -> PeerAddress -> Ordering
compare (DatagramAddress SockAddr
addr) (DatagramAddress SockAddr
addr') = SockAddr -> SockAddr -> Ordering
forall a. Ord a => a -> a -> Ordering
compare SockAddr
addr SockAddr
addr'
#ifdef ENABLE_ICE_SUPPORT
compare (DatagramAddress _ ) _ = LT
compare _ (DatagramAddress _ ) = GT
compare (PeerIceSession ice ) (PeerIceSession ice') = compare ice ice'
#endif
data PeerIdentity = PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()])
| PeerIdentityRef WaitingRef (TVar [UnifiedIdentity -> ExceptT String IO ()])
| PeerIdentityFull UnifiedIdentity
peerIdentity :: MonadIO m => Peer -> m PeerIdentity
peerIdentity :: forall (m :: * -> *). MonadIO m => Peer -> m PeerIdentity
peerIdentity = IO PeerIdentity -> m PeerIdentity
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO PeerIdentity -> m PeerIdentity)
-> (Peer -> IO PeerIdentity) -> Peer -> m PeerIdentity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM PeerIdentity -> IO PeerIdentity
forall a. STM a -> IO a
atomically (STM PeerIdentity -> IO PeerIdentity)
-> (Peer -> STM PeerIdentity) -> Peer -> IO PeerIdentity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar PeerIdentity -> STM PeerIdentity
forall a. TVar a -> STM a
readTVar (TVar PeerIdentity -> STM PeerIdentity)
-> (Peer -> TVar PeerIdentity) -> Peer -> STM PeerIdentity
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> TVar PeerIdentity
peerIdentityVar
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
lookupServiceType :: [TransportHeaderItem] -> Maybe ServiceID
lookupServiceType (ServiceType ServiceID
stype : [TransportHeaderItem]
_) = ServiceID -> Maybe ServiceID
forall a. a -> Maybe a
Just ServiceID
stype
lookupServiceType (TransportHeaderItem
_ : [TransportHeaderItem]
hs) = [TransportHeaderItem] -> Maybe ServiceID
lookupServiceType [TransportHeaderItem]
hs
lookupServiceType [] = Maybe ServiceID
forall a. Maybe a
Nothing
newWaitingRef :: RefDigest -> (Ref -> WaitingRefCallback) -> PacketHandler WaitingRef
newWaitingRef :: RefDigest
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
newWaitingRef RefDigest
dgst Ref -> ExceptT String IO ()
act = do
peer :: Peer
peer@Peer {TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
TVar PeerIdentity
TMVar [WaitingRef]
TMVar (Map ServiceID SomeServiceState)
PartialStorage
Storage
Server
PeerAddress
peerAddress :: Peer -> PeerAddress
peerServer_ :: Peer -> Server
peerConnection :: Peer
-> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: Peer -> TVar PeerIdentity
peerStorage_ :: Peer -> Storage
peerInStorage :: Peer -> PartialStorage
peerServiceState :: Peer -> TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: Peer -> TMVar [WaitingRef]
peerAddress :: PeerAddress
peerServer_ :: Server
peerConnection :: TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: TVar PeerIdentity
peerStorage_ :: Storage
peerInStorage :: PartialStorage
peerServiceState :: TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: TMVar [WaitingRef]
..} <- (PacketHandlerState -> Peer) -> PacketHandler Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets PacketHandlerState -> Peer
phPeer
WaitingRef
wref <- Storage
-> PartialRef
-> (Ref -> ExceptT String IO ())
-> TVar (Either [RefDigest] Ref)
-> WaitingRef
WaitingRef Storage
peerStorage_ (PartialStorage -> RefDigest -> PartialRef
partialRefFromDigest PartialStorage
peerInStorage RefDigest
dgst) Ref -> ExceptT String IO ()
act (TVar (Either [RefDigest] Ref) -> WaitingRef)
-> PacketHandler (TVar (Either [RefDigest] Ref))
-> PacketHandler WaitingRef
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM (TVar (Either [RefDigest] Ref))
-> PacketHandler (TVar (Either [RefDigest] Ref))
forall a. STM a -> PacketHandler a
liftSTM (Either [RefDigest] Ref -> STM (TVar (Either [RefDigest] Ref))
forall a. a -> STM (TVar a)
newTVar ([RefDigest] -> Either [RefDigest] Ref
forall a b. a -> Either a b
Left []))
TMVar [WaitingRef]
-> ([WaitingRef] -> [WaitingRef]) -> PacketHandler ()
forall a. TMVar a -> (a -> a) -> PacketHandler ()
modifyTMVarP TMVar [WaitingRef]
peerWaitingRefs (WaitingRef
wrefWaitingRef -> [WaitingRef] -> [WaitingRef]
forall a. a -> [a] -> [a]
:)
STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ()) -> STM () -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ TQueue (Peer, Maybe PartialRef)
-> (Peer, Maybe PartialRef) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue (Peer, Maybe PartialRef)
serverDataResponse (Server -> TQueue (Peer, Maybe PartialRef))
-> Server -> TQueue (Peer, Maybe PartialRef)
forall a b. (a -> b) -> a -> b
$ Peer -> Server
peerServer Peer
peer) (Peer
peer, Maybe PartialRef
forall a. Maybe a
Nothing)
WaitingRef -> PacketHandler WaitingRef
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return WaitingRef
wref
forkServerThread :: Server -> IO () -> IO ()
forkServerThread :: Server -> IO () -> IO ()
forkServerThread Server
server IO ()
act = MVar [ThreadId] -> ([ThreadId] -> IO [ThreadId]) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ (Server -> MVar [ThreadId]
serverThreads Server
server) (([ThreadId] -> IO [ThreadId]) -> IO ())
-> ([ThreadId] -> IO [ThreadId]) -> IO ()
forall a b. (a -> b) -> a -> b
$ \[ThreadId]
ts -> do
(ThreadId -> [ThreadId] -> [ThreadId]
forall a. a -> [a] -> [a]
:[ThreadId]
ts) (ThreadId -> [ThreadId]) -> IO ThreadId -> IO [ThreadId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO () -> IO ThreadId
forkIO IO ()
act
startServer :: ServerOptions -> Head LocalState -> (String -> IO ()) -> [SomeService] -> IO Server
startServer :: ServerOptions
-> Head LocalState
-> (String -> IO ())
-> [SomeService]
-> IO Server
startServer ServerOptions
opt Head LocalState
serverOrigHead String -> IO ()
logd' [SomeService]
serverServices = do
let serverStorage :: Storage
serverStorage = Head LocalState -> Storage
forall a. Head a -> Storage
headStorage Head LocalState
serverOrigHead
MVar UnifiedIdentity
serverIdentity_ <- UnifiedIdentity -> IO (MVar UnifiedIdentity)
forall a. a -> IO (MVar a)
newMVar (UnifiedIdentity -> IO (MVar UnifiedIdentity))
-> UnifiedIdentity -> IO (MVar UnifiedIdentity)
forall a b. (a -> b) -> a -> b
$ Head LocalState -> UnifiedIdentity
headLocalIdentity Head LocalState
serverOrigHead
MVar [ThreadId]
serverThreads <- [ThreadId] -> IO (MVar [ThreadId])
forall a. a -> IO (MVar a)
newMVar []
MVar Socket
serverSocket <- IO (MVar Socket)
forall a. IO (MVar a)
newEmptyMVar
(SymFlow (PeerAddress, ByteString)
serverRawPath, SymFlow (PeerAddress, ByteString)
protocolRawPath) <- IO
(SymFlow (PeerAddress, ByteString),
SymFlow (PeerAddress, ByteString))
forall a b. IO (Flow a b, Flow b a)
newFlowIO
(Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverControlFlow, Flow (ControlRequest PeerAddress) (ControlMessage PeerAddress)
protocolControlFlow) <- IO
(Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress),
Flow (ControlRequest PeerAddress) (ControlMessage PeerAddress))
forall a b. IO (Flow a b, Flow b a)
newFlowIO
TQueue (Peer, Maybe PartialRef)
serverDataResponse <- IO (TQueue (Peer, Maybe PartialRef))
forall a. IO (TQueue a)
newTQueueIO
TQueue (ExceptT String IO ())
serverIOActions <- IO (TQueue (ExceptT String IO ()))
forall a. IO (TQueue a)
newTQueueIO
TMVar (Map ServiceID SomeServiceGlobalState)
serverServiceStates <- Map ServiceID SomeServiceGlobalState
-> IO (TMVar (Map ServiceID SomeServiceGlobalState))
forall a. a -> IO (TMVar a)
newTMVarIO Map ServiceID SomeServiceGlobalState
forall k a. Map k a
M.empty
MVar (Map PeerAddress Peer)
serverPeers <- Map PeerAddress Peer -> IO (MVar (Map PeerAddress Peer))
forall a. a -> IO (MVar a)
newMVar Map PeerAddress Peer
forall k a. Map k a
M.empty
TChan Peer
serverChanPeer <- IO (TChan Peer)
forall a. IO (TChan a)
newTChanIO
TQueue String
serverErrorLog <- IO (TQueue String)
forall a. IO (TQueue a)
newTQueueIO
let server :: Server
server = Server {[SomeService]
MVar [ThreadId]
MVar (Map PeerAddress Peer)
MVar Socket
MVar UnifiedIdentity
TChan Peer
TMVar (Map ServiceID SomeServiceGlobalState)
TQueue String
TQueue (Peer, Maybe PartialRef)
TQueue (ExceptT String IO ())
SymFlow (PeerAddress, ByteString)
Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
Head LocalState
Storage
serverStorage :: Storage
serverOrigHead :: Head LocalState
serverIdentity_ :: MVar UnifiedIdentity
serverThreads :: MVar [ThreadId]
serverSocket :: MVar Socket
serverRawPath :: SymFlow (PeerAddress, ByteString)
serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverDataResponse :: TQueue (Peer, Maybe PartialRef)
serverIOActions :: TQueue (ExceptT String IO ())
serverServices :: [SomeService]
serverServiceStates :: TMVar (Map ServiceID SomeServiceGlobalState)
serverPeers :: MVar (Map PeerAddress Peer)
serverChanPeer :: TChan Peer
serverErrorLog :: TQueue String
serverOrigHead :: Head LocalState
serverServices :: [SomeService]
serverStorage :: Storage
serverIdentity_ :: MVar UnifiedIdentity
serverThreads :: MVar [ThreadId]
serverSocket :: MVar Socket
serverRawPath :: SymFlow (PeerAddress, ByteString)
serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverDataResponse :: TQueue (Peer, Maybe PartialRef)
serverIOActions :: TQueue (ExceptT String IO ())
serverServiceStates :: TMVar (Map ServiceID SomeServiceGlobalState)
serverPeers :: MVar (Map PeerAddress Peer)
serverChanPeer :: TChan Peer
serverErrorLog :: TQueue String
..}
TQueue (Peer, ServiceID, Ref)
chanSvc <- IO (TQueue (Peer, ServiceID, Ref))
forall a. IO (TQueue a)
newTQueueIO
let logd :: String -> STM ()
logd = TQueue String -> String -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue String
serverErrorLog
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> IO ()
logd' (String -> IO ()) -> IO String -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< STM String -> IO String
forall a. STM a -> IO a
atomically (TQueue String -> STM String
forall a. TQueue a -> STM a
readTQueue TQueue String
serverErrorLog)
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Server -> IO ()
dataResponseWorker Server
server
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(String -> IO ()) -> (() -> IO ()) -> Either String () -> IO ()
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (String -> STM ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> STM ()
logd) () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either String () -> IO ()) -> IO (Either String ()) -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ExceptT String IO () -> IO (Either String ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ExceptT String IO () -> IO (Either String ()))
-> IO (ExceptT String IO ()) -> IO (Either String ())
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<
STM (ExceptT String IO ()) -> IO (ExceptT String IO ())
forall a. STM a -> IO a
atomically (TQueue (ExceptT String IO ()) -> STM (ExceptT String IO ())
forall a. TQueue a -> STM a
readTQueue TQueue (ExceptT String IO ())
serverIOActions)
[SockAddr]
broadcastAddreses <- PortNumber -> IO [SockAddr]
getBroadcastAddresses PortNumber
discoveryPort
let open :: AddrInfo -> IO Socket
open AddrInfo
addr = do
Socket
sock <- Family -> SocketType -> ProtocolNumber -> IO Socket
socket (AddrInfo -> Family
addrFamily AddrInfo
addr) (AddrInfo -> SocketType
addrSocketType AddrInfo
addr) (AddrInfo -> ProtocolNumber
addrProtocol AddrInfo
addr)
MVar Socket -> Socket -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Socket
serverSocket Socket
sock
Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
sock SocketOption
ReuseAddr Int
1
Socket -> SocketOption -> Int -> IO ()
setSocketOption Socket
sock SocketOption
Broadcast Int
1
Socket -> (ProtocolNumber -> IO ()) -> IO ()
forall r. Socket -> (ProtocolNumber -> IO r) -> IO r
withFdSocket Socket
sock ProtocolNumber -> IO ()
setCloseOnExecIfNeeded
Socket -> SockAddr -> IO ()
bind Socket
sock (AddrInfo -> SockAddr
addrAddress AddrInfo
addr)
Socket -> IO Socket
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Socket
sock
loop :: Socket -> IO ()
loop Socket
sock = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (ServerOptions -> Bool
serverLocalDiscovery ServerOptions
opt) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
-> [ControlRequest PeerAddress] -> STM ()
forall r w. Flow r w -> [w] -> STM ()
writeFlowBulk Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverControlFlow ([ControlRequest PeerAddress] -> STM ())
-> [ControlRequest PeerAddress] -> STM ()
forall a b. (a -> b) -> a -> b
$ (SockAddr -> ControlRequest PeerAddress)
-> [SockAddr] -> [ControlRequest PeerAddress]
forall a b. (a -> b) -> [a] -> [b]
map (PeerAddress -> ControlRequest PeerAddress
forall addr. addr -> ControlRequest addr
SendAnnounce (PeerAddress -> ControlRequest PeerAddress)
-> (SockAddr -> PeerAddress)
-> SockAddr
-> ControlRequest PeerAddress
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SockAddr -> PeerAddress
DatagramAddress) [SockAddr]
broadcastAddreses
Int -> IO ()
threadDelay (Int -> IO ()) -> Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Int
announceIntervalSeconds Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000
let announceUpdate :: UnifiedIdentity -> IO ()
announceUpdate UnifiedIdentity
identity = do
PartialStorage
st <- Storage -> IO PartialStorage
derivePartialStorage Storage
serverStorage
let selfRef :: PartialRef
selfRef = PartialStorage -> Ref -> PartialRef
partialRef PartialStorage
st (Ref -> PartialRef) -> Ref -> PartialRef
forall a b. (a -> b) -> a -> b
$ Stored (Signed ExtendedIdentityData) -> Ref
forall a. Stored a -> Ref
storedRef (Stored (Signed ExtendedIdentityData) -> Ref)
-> Stored (Signed ExtendedIdentityData) -> Ref
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> Stored (Signed ExtendedIdentityData)
idExtData UnifiedIdentity
identity
updateRefs :: [RefDigest]
updateRefs = (PartialRef -> RefDigest) -> [PartialRef] -> [RefDigest]
forall a b. (a -> b) -> [a] -> [b]
map PartialRef -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest ([PartialRef] -> [RefDigest]) -> [PartialRef] -> [RefDigest]
forall a b. (a -> b) -> a -> b
$ PartialRef
selfRef PartialRef -> [PartialRef] -> [PartialRef]
forall a. a -> [a] -> [a]
: (Stored (Signed ExtendedIdentityData) -> PartialRef)
-> [Stored (Signed ExtendedIdentityData)] -> [PartialRef]
forall a b. (a -> b) -> [a] -> [b]
map (PartialStorage -> Ref -> PartialRef
partialRef PartialStorage
st (Ref -> PartialRef)
-> (Stored (Signed ExtendedIdentityData) -> Ref)
-> Stored (Signed ExtendedIdentityData)
-> PartialRef
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stored (Signed ExtendedIdentityData) -> Ref
forall a. Stored a -> Ref
storedRef) (UnifiedIdentity -> [Stored (Signed ExtendedIdentityData)]
forall (m :: * -> *).
Identity m -> [Stored (Signed ExtendedIdentityData)]
idUpdates UnifiedIdentity
identity)
ackedBy :: [TransportHeaderItem]
ackedBy = [[TransportHeaderItem]] -> [TransportHeaderItem]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[ RefDigest -> TransportHeaderItem
Acknowledged RefDigest
r, RefDigest -> TransportHeaderItem
Rejected RefDigest
r, RefDigest -> TransportHeaderItem
DataRequest RefDigest
r ] | RefDigest
r <- [RefDigest]
updateRefs ]
hitems :: [TransportHeaderItem]
hitems = (RefDigest -> TransportHeaderItem)
-> [RefDigest] -> [TransportHeaderItem]
forall a b. (a -> b) -> [a] -> [b]
map RefDigest -> TransportHeaderItem
AnnounceUpdate [RefDigest]
updateRefs
packet :: TransportPacket Ref
packet = TransportHeader -> [Ref] -> TransportPacket Ref
forall a. TransportHeader -> [a] -> TransportPacket a
TransportPacket ([TransportHeaderItem] -> TransportHeader
TransportHeader ([TransportHeaderItem] -> TransportHeader)
-> [TransportHeaderItem] -> TransportHeader
forall a b. (a -> b) -> a -> b
$ [TransportHeaderItem]
hitems) []
Map PeerAddress Peer
ps <- MVar (Map PeerAddress Peer) -> IO (Map PeerAddress Peer)
forall a. MVar a -> IO a
readMVar MVar (Map PeerAddress Peer)
serverPeers
Map PeerAddress Peer -> (Peer -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Map PeerAddress Peer
ps ((Peer -> IO ()) -> IO ()) -> (Peer -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Peer
peer -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
((,) (PeerIdentity -> ChannelState -> (PeerIdentity, ChannelState))
-> STM PeerIdentity
-> STM (ChannelState -> (PeerIdentity, ChannelState))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar PeerIdentity -> STM PeerIdentity
forall a. TVar a -> STM a
readTVar (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) STM (ChannelState -> (PeerIdentity, ChannelState))
-> STM ChannelState -> STM (PeerIdentity, ChannelState)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Peer -> STM ChannelState
getPeerChannel Peer
peer) STM (PeerIdentity, ChannelState)
-> ((PeerIdentity, ChannelState) -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
(PeerIdentityFull UnifiedIdentity
_, ChannelEstablished Channel
_) ->
Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS Peer
peer [TransportHeaderItem]
ackedBy TransportPacket Ref
packet
(PeerIdentity, ChannelState)
_ -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
IO WatchedHead -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO WatchedHead -> IO ()) -> IO WatchedHead -> IO ()
forall a b. (a -> b) -> a -> b
$ Head LocalState -> (Head LocalState -> IO ()) -> IO WatchedHead
forall a.
HeadType a =>
Head a -> (Head a -> IO ()) -> IO WatchedHead
watchHead Head LocalState
serverOrigHead ((Head LocalState -> IO ()) -> IO WatchedHead)
-> (Head LocalState -> IO ()) -> IO WatchedHead
forall a b. (a -> b) -> a -> b
$ \Head LocalState
h -> do
let idt :: UnifiedIdentity
idt = Head LocalState -> UnifiedIdentity
headLocalIdentity Head LocalState
h
Bool
changedId <- MVar UnifiedIdentity
-> (UnifiedIdentity -> IO (UnifiedIdentity, Bool)) -> IO Bool
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar UnifiedIdentity
serverIdentity_ ((UnifiedIdentity -> IO (UnifiedIdentity, Bool)) -> IO Bool)
-> (UnifiedIdentity -> IO (UnifiedIdentity, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \UnifiedIdentity
cur ->
(UnifiedIdentity, Bool) -> IO (UnifiedIdentity, Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (UnifiedIdentity
idt, UnifiedIdentity
cur UnifiedIdentity -> UnifiedIdentity -> Bool
forall a. Eq a => a -> a -> Bool
/= UnifiedIdentity
idt)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
changedId (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
-> ControlRequest PeerAddress -> IO ()
forall r w. Flow r w -> w -> IO ()
writeFlowIO Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverControlFlow (ControlRequest PeerAddress -> IO ())
-> ControlRequest PeerAddress -> IO ()
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> ControlRequest PeerAddress
forall addr. UnifiedIdentity -> ControlRequest addr
UpdateSelfIdentity UnifiedIdentity
idt
UnifiedIdentity -> IO ()
announceUpdate UnifiedIdentity
idt
[SomeService] -> (SomeService -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [SomeService]
serverServices ((SomeService -> IO ()) -> IO ())
-> (SomeService -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeService Proxy s
service ServiceAttributes s
_) -> do
[SomeStorageWatcher s]
-> (SomeStorageWatcher s -> IO WatchedHead) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (Proxy s -> [SomeStorageWatcher s]
forall s (proxy :: * -> *).
Service s =>
proxy s -> [SomeStorageWatcher s]
forall (proxy :: * -> *). proxy s -> [SomeStorageWatcher s]
serviceStorageWatchers Proxy s
service) ((SomeStorageWatcher s -> IO WatchedHead) -> IO ())
-> (SomeStorageWatcher s -> IO WatchedHead) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(SomeStorageWatcher Stored LocalState -> a
sel a -> ServiceHandler s ()
act) -> do
Head LocalState
-> (Head LocalState -> a) -> (a -> IO ()) -> IO WatchedHead
forall a b.
(HeadType a, Eq b) =>
Head a -> (Head a -> b) -> (b -> IO ()) -> IO WatchedHead
watchHeadWith Head LocalState
serverOrigHead (Stored LocalState -> a
sel (Stored LocalState -> a)
-> (Head LocalState -> Stored LocalState) -> Head LocalState -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Head LocalState -> Stored LocalState
forall a. Head a -> Stored a
headStoredObject) ((a -> IO ()) -> IO WatchedHead) -> (a -> IO ()) -> IO WatchedHead
forall a b. (a -> b) -> a -> b
$ \a
x -> do
MVar (Map PeerAddress Peer)
-> (Map PeerAddress Peer -> IO ()) -> IO ()
forall a b. MVar a -> (a -> IO b) -> IO b
withMVar MVar (Map PeerAddress Peer)
serverPeers ((Map PeerAddress Peer -> IO ()) -> IO ())
-> (Map PeerAddress Peer -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Peer -> IO ()) -> Map PeerAddress Peer -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Peer -> IO ()) -> Map PeerAddress Peer -> IO ())
-> (Peer -> IO ()) -> Map PeerAddress Peer -> IO ()
forall a b. (a -> b) -> a -> b
$ \Peer
peer -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar PeerIdentity -> STM PeerIdentity
forall a. TVar a -> STM a
readTVar (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) STM PeerIdentity -> (PeerIdentity -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PeerIdentityFull UnifiedIdentity
_ -> TQueue (ExceptT String IO ()) -> ExceptT String IO () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (ExceptT String IO ())
serverIOActions (ExceptT String IO () -> STM ()) -> ExceptT String IO () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
Peer -> ServiceHandler s () -> ExceptT String IO ()
forall s (m :: * -> *).
(Service s, MonadIO m) =>
Peer -> ServiceHandler s () -> m ()
runPeerService Peer
peer (ServiceHandler s () -> ExceptT String IO ())
-> ServiceHandler s () -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ a -> ServiceHandler s ()
act a
x
PeerIdentity
_ -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(ByteString
msg, SockAddr
saddr) <- Socket -> Int -> IO (ByteString, SockAddr)
S.recvFrom Socket
sock Int
4096
SymFlow (PeerAddress, ByteString)
-> (PeerAddress, ByteString) -> IO ()
forall r w. Flow r w -> w -> IO ()
writeFlowIO SymFlow (PeerAddress, ByteString)
serverRawPath (SockAddr -> PeerAddress
DatagramAddress SockAddr
saddr, ByteString
msg)
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(PeerAddress
paddr, ByteString
msg) <- SymFlow (PeerAddress, ByteString) -> IO (PeerAddress, ByteString)
forall r w. Flow r w -> IO r
readFlowIO SymFlow (PeerAddress, ByteString)
serverRawPath
case PeerAddress
paddr of
DatagramAddress SockAddr
addr -> IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ Socket -> ByteString -> SockAddr -> IO Int
S.sendTo Socket
sock ByteString
msg SockAddr
addr
#ifdef ENABLE_ICE_SUPPORT
PeerIceSession ice -> iceSend ice msg
#endif
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
-> IO (ControlMessage PeerAddress)
forall r w. Flow r w -> IO r
readFlowIO Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverControlFlow IO (ControlMessage PeerAddress)
-> (ControlMessage PeerAddress -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
NewConnection Connection PeerAddress
conn Maybe RefDigest
mbpid -> do
let paddr :: PeerAddress
paddr = Connection PeerAddress -> PeerAddress
forall addr. Connection addr -> addr
connAddress Connection PeerAddress
conn
Peer
peer <- MVar (Map PeerAddress Peer)
-> (Map PeerAddress Peer -> IO (Map PeerAddress Peer, Peer))
-> IO Peer
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar (Map PeerAddress Peer)
serverPeers ((Map PeerAddress Peer -> IO (Map PeerAddress Peer, Peer))
-> IO Peer)
-> (Map PeerAddress Peer -> IO (Map PeerAddress Peer, Peer))
-> IO Peer
forall a b. (a -> b) -> a -> b
$ \Map PeerAddress Peer
pvalue -> do
case PeerAddress -> Map PeerAddress Peer -> Maybe Peer
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup PeerAddress
paddr Map PeerAddress Peer
pvalue of
Just Peer
peer -> (Map PeerAddress Peer, Peer) -> IO (Map PeerAddress Peer, Peer)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map PeerAddress Peer
pvalue, Peer
peer)
Maybe Peer
Nothing -> do
Peer
peer <- Server -> PeerAddress -> IO Peer
mkPeer Server
server PeerAddress
paddr
(Map PeerAddress Peer, Peer) -> IO (Map PeerAddress Peer, Peer)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerAddress -> Peer -> Map PeerAddress Peer -> Map PeerAddress Peer
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert PeerAddress
paddr Peer
peer Map PeerAddress Peer
pvalue, Peer
peer)
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> STM
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
forall a. TVar a -> STM a
readTVar (Peer
-> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerConnection Peer
peer) STM
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> (Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> STM ())
-> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left [(Bool, TransportPacket Ref, [TransportHeaderItem])]
packets -> Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
-> [(Bool, TransportPacket Ref, [TransportHeaderItem])] -> STM ()
forall r w. Flow r w -> [w] -> STM ()
writeFlowBulk (Connection PeerAddress
-> Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
forall addr.
Connection addr
-> Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
connData Connection PeerAddress
conn) ([(Bool, TransportPacket Ref, [TransportHeaderItem])] -> STM ())
-> [(Bool, TransportPacket Ref, [TransportHeaderItem])] -> STM ()
forall a b. (a -> b) -> a -> b
$ [(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> [(Bool, TransportPacket Ref, [TransportHeaderItem])]
forall a. [a] -> [a]
reverse [(Bool, TransportPacket Ref, [TransportHeaderItem])]
packets
Right Connection PeerAddress
_ -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (Peer
-> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerConnection Peer
peer) (Connection PeerAddress
-> Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
forall a b. b -> Either a b
Right Connection PeerAddress
conn)
case Maybe RefDigest
mbpid of
Just RefDigest
dgst -> do
UnifiedIdentity
identity <- MVar UnifiedIdentity -> IO UnifiedIdentity
forall a. MVar a -> IO a
readMVar MVar UnifiedIdentity
serverIdentity_
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> Peer -> PacketHandler () -> STM ()
runPacketHandler Bool
False Peer
peer (PacketHandler () -> STM ()) -> PacketHandler () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
WaitingRef
wref <- RefDigest
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
newWaitingRef RefDigest
dgst ((Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef)
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> Peer -> Ref -> ExceptT String IO ()
handleIdentityAnnounce UnifiedIdentity
identity Peer
peer
TVar PeerIdentity -> PacketHandler PeerIdentity
forall a. TVar a -> PacketHandler a
readTVarP (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) PacketHandler PeerIdentity
-> (PeerIdentity -> PacketHandler ()) -> PacketHandler ()
forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PeerIdentityUnknown TVar [UnifiedIdentity -> ExceptT String IO ()]
idwait -> do
TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
AnnounceSelf (RefDigest -> TransportHeaderItem)
-> RefDigest -> TransportHeaderItem
forall a b. (a -> b) -> a -> b
$ Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Ref -> RefDigest) -> Ref -> RefDigest
forall a b. (a -> b) -> a -> b
$ Stored (Signed IdentityData) -> Ref
forall a. Stored a -> Ref
storedRef (Stored (Signed IdentityData) -> Ref)
-> Stored (Signed IdentityData) -> Ref
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> Stored (Signed IdentityData)
idData UnifiedIdentity
identity
TVar PeerIdentity -> PeerIdentity -> PacketHandler ()
forall a. TVar a -> a -> PacketHandler ()
writeTVarP (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) (PeerIdentity -> PacketHandler ())
-> PeerIdentity -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ WaitingRef
-> TVar [UnifiedIdentity -> ExceptT String IO ()] -> PeerIdentity
PeerIdentityRef WaitingRef
wref TVar [UnifiedIdentity -> ExceptT String IO ()]
idwait
STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ()) -> STM () -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ TChan Peer -> Peer -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan TChan Peer
serverChanPeer Peer
peer
PeerIdentity
_ -> () -> PacketHandler ()
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe RefDigest
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(Bool
secure, TransportPacket TransportHeader
header [PartialObject]
objs) <- Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
-> IO (Bool, TransportPacket PartialObject)
forall r w. Flow r w -> IO r
readFlowIO (Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
-> IO (Bool, TransportPacket PartialObject))
-> Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
-> IO (Bool, TransportPacket PartialObject)
forall a b. (a -> b) -> a -> b
$ Connection PeerAddress
-> Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
forall addr.
Connection addr
-> Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
connData Connection PeerAddress
conn
[PartialRef]
prefs <- [PartialObject]
-> (PartialObject -> IO PartialRef) -> IO [PartialRef]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [PartialObject]
objs ((PartialObject -> IO PartialRef) -> IO [PartialRef])
-> (PartialObject -> IO PartialRef) -> IO [PartialRef]
forall a b. (a -> b) -> a -> b
$ PartialStorage -> PartialObject -> IO PartialRef
storeObject (PartialStorage -> PartialObject -> IO PartialRef)
-> PartialStorage -> PartialObject -> IO PartialRef
forall a b. (a -> b) -> a -> b
$ Peer -> PartialStorage
peerInStorage Peer
peer
UnifiedIdentity
identity <- MVar UnifiedIdentity -> IO UnifiedIdentity
forall a. MVar a -> IO a
readMVar MVar UnifiedIdentity
serverIdentity_
let svcs :: [ServiceID]
svcs = (SomeService -> ServiceID) -> [SomeService] -> [ServiceID]
forall a b. (a -> b) -> [a] -> [b]
map SomeService -> ServiceID
someServiceID [SomeService]
serverServices
UnifiedIdentity
-> Bool
-> Peer
-> TQueue (Peer, ServiceID, Ref)
-> [ServiceID]
-> TransportHeader
-> [PartialRef]
-> IO ()
handlePacket UnifiedIdentity
identity Bool
secure Peer
peer TQueue (Peer, ServiceID, Ref)
chanSvc [ServiceID]
svcs TransportHeader
header [PartialRef]
prefs
ReceivedAnnounce PeerAddress
addr RefDigest
_ -> do
IO Peer -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Peer -> IO ()) -> IO Peer -> IO ()
forall a b. (a -> b) -> a -> b
$ Server -> PeerAddress -> IO Peer
serverPeer' Server
server PeerAddress
addr
UnifiedIdentity
-> (String -> STM ())
-> SymFlow (PeerAddress, ByteString)
-> Flow (ControlRequest PeerAddress) (ControlMessage PeerAddress)
-> IO ()
forall addr.
(Eq addr, Ord addr, Show addr) =>
UnifiedIdentity
-> (String -> STM ())
-> SymFlow (addr, ByteString)
-> Flow (ControlRequest addr) (ControlMessage addr)
-> IO ()
erebosNetworkProtocol (Head LocalState -> UnifiedIdentity
headLocalIdentity Head LocalState
serverOrigHead) String -> STM ()
logd SymFlow (PeerAddress, ByteString)
protocolRawPath Flow (ControlRequest PeerAddress) (ControlMessage PeerAddress)
protocolControlFlow
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall a. IO a -> IO a
withSocketsDo (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let hints :: AddrInfo
hints = AddrInfo
defaultHints
{ addrFlags = [AI_PASSIVE]
, addrFamily = AF_INET6
, addrSocketType = Datagram
}
AddrInfo
addr:[AddrInfo]
_ <- Maybe AddrInfo -> Maybe String -> Maybe String -> IO [AddrInfo]
getAddrInfo (AddrInfo -> Maybe AddrInfo
forall a. a -> Maybe a
Just AddrInfo
hints) Maybe String
forall a. Maybe a
Nothing (String -> Maybe String
forall a. a -> Maybe a
Just (String -> Maybe String) -> String -> Maybe String
forall a b. (a -> b) -> a -> b
$ PortNumber -> String
forall a. Show a => a -> String
show (PortNumber -> String) -> PortNumber -> String
forall a b. (a -> b) -> a -> b
$ ServerOptions -> PortNumber
serverPort ServerOptions
opt)
IO Socket -> (Socket -> IO ()) -> (Socket -> IO ()) -> IO ()
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (AddrInfo -> IO Socket
open AddrInfo
addr) Socket -> IO ()
close Socket -> IO ()
loop
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(Peer
peer, ServiceID
svc, Ref
ref) <- STM (Peer, ServiceID, Ref) -> IO (Peer, ServiceID, Ref)
forall a. STM a -> IO a
atomically (STM (Peer, ServiceID, Ref) -> IO (Peer, ServiceID, Ref))
-> STM (Peer, ServiceID, Ref) -> IO (Peer, ServiceID, Ref)
forall a b. (a -> b) -> a -> b
$ TQueue (Peer, ServiceID, Ref) -> STM (Peer, ServiceID, Ref)
forall a. TQueue a -> STM a
readTQueue TQueue (Peer, ServiceID, Ref)
chanSvc
case (SomeService -> Bool) -> [SomeService] -> Maybe SomeService
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find ((ServiceID
svc ServiceID -> ServiceID -> Bool
forall a. Eq a => a -> a -> Bool
==) (ServiceID -> Bool)
-> (SomeService -> ServiceID) -> SomeService -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SomeService -> ServiceID
someServiceID) [SomeService]
serverServices of
Just service :: SomeService
service@(SomeService (Proxy s
_ :: Proxy s) ServiceAttributes s
attr) -> Maybe (SomeService, ServiceAttributes s)
-> Peer -> ServiceHandler s () -> IO ()
forall s (m :: * -> *).
(Service s, MonadIO m) =>
Maybe (SomeService, ServiceAttributes s)
-> Peer -> ServiceHandler s () -> m ()
runPeerServiceOn ((SomeService, ServiceAttributes s)
-> Maybe (SomeService, ServiceAttributes s)
forall a. a -> Maybe a
Just (SomeService
service, ServiceAttributes s
attr)) Peer
peer (Stored s -> ServiceHandler s ()
forall s. Service s => Stored s -> ServiceHandler s ()
serviceHandler (Stored s -> ServiceHandler s ())
-> Stored s -> ServiceHandler s ()
forall a b. (a -> b) -> a -> b
$ forall a. Storable a => Ref -> Stored a
wrappedLoad @s Ref
ref)
Maybe SomeService
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> STM ()
logd (String -> STM ()) -> String -> STM ()
forall a b. (a -> b) -> a -> b
$ String
"unhandled service '" String -> ShowS
forall a. [a] -> [a] -> [a]
++ UUID -> String
forall a. Show a => a -> String
show (ServiceID -> UUID
forall a. StorableUUID a => a -> UUID
toUUID ServiceID
svc) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"'"
Server -> IO Server
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Server
server
stopServer :: Server -> IO ()
stopServer :: Server -> IO ()
stopServer Server {[SomeService]
MVar [ThreadId]
MVar (Map PeerAddress Peer)
MVar Socket
MVar UnifiedIdentity
TChan Peer
TMVar (Map ServiceID SomeServiceGlobalState)
TQueue String
TQueue (Peer, Maybe PartialRef)
TQueue (ExceptT String IO ())
SymFlow (PeerAddress, ByteString)
Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
Head LocalState
Storage
serverStorage :: Server -> Storage
serverOrigHead :: Server -> Head LocalState
serverIdentity_ :: Server -> MVar UnifiedIdentity
serverThreads :: Server -> MVar [ThreadId]
serverSocket :: Server -> MVar Socket
serverRawPath :: Server -> SymFlow (PeerAddress, ByteString)
serverControlFlow :: Server
-> Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverDataResponse :: Server -> TQueue (Peer, Maybe PartialRef)
serverIOActions :: Server -> TQueue (ExceptT String IO ())
serverServices :: Server -> [SomeService]
serverServiceStates :: Server -> TMVar (Map ServiceID SomeServiceGlobalState)
serverPeers :: Server -> MVar (Map PeerAddress Peer)
serverChanPeer :: Server -> TChan Peer
serverErrorLog :: Server -> TQueue String
serverStorage :: Storage
serverOrigHead :: Head LocalState
serverIdentity_ :: MVar UnifiedIdentity
serverThreads :: MVar [ThreadId]
serverSocket :: MVar Socket
serverRawPath :: SymFlow (PeerAddress, ByteString)
serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverDataResponse :: TQueue (Peer, Maybe PartialRef)
serverIOActions :: TQueue (ExceptT String IO ())
serverServices :: [SomeService]
serverServiceStates :: TMVar (Map ServiceID SomeServiceGlobalState)
serverPeers :: MVar (Map PeerAddress Peer)
serverChanPeer :: TChan Peer
serverErrorLog :: TQueue String
..} = do
(ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ThreadId -> IO ()
killThread ([ThreadId] -> IO ()) -> IO [ThreadId] -> IO ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< MVar [ThreadId] -> IO [ThreadId]
forall a. MVar a -> IO a
takeMVar MVar [ThreadId]
serverThreads
dataResponseWorker :: Server -> IO ()
dataResponseWorker :: Server -> IO ()
dataResponseWorker Server
server = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
(Peer
peer, Maybe PartialRef
npref) <- STM (Peer, Maybe PartialRef) -> IO (Peer, Maybe PartialRef)
forall a. STM a -> IO a
atomically (TQueue (Peer, Maybe PartialRef) -> STM (Peer, Maybe PartialRef)
forall a. TQueue a -> STM a
readTQueue (TQueue (Peer, Maybe PartialRef) -> STM (Peer, Maybe PartialRef))
-> TQueue (Peer, Maybe PartialRef) -> STM (Peer, Maybe PartialRef)
forall a b. (a -> b) -> a -> b
$ Server -> TQueue (Peer, Maybe PartialRef)
serverDataResponse Server
server)
[WaitingRef]
wait <- STM [WaitingRef] -> IO [WaitingRef]
forall a. STM a -> IO a
atomically (STM [WaitingRef] -> IO [WaitingRef])
-> STM [WaitingRef] -> IO [WaitingRef]
forall a b. (a -> b) -> a -> b
$ TMVar [WaitingRef] -> STM [WaitingRef]
forall a. TMVar a -> STM a
takeTMVar (Peer -> TMVar [WaitingRef]
peerWaitingRefs Peer
peer)
[(Maybe WaitingRef, [RefDigest])]
list <- [WaitingRef]
-> (WaitingRef -> IO (Maybe WaitingRef, [RefDigest]))
-> IO [(Maybe WaitingRef, [RefDigest])]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [WaitingRef]
wait ((WaitingRef -> IO (Maybe WaitingRef, [RefDigest]))
-> IO [(Maybe WaitingRef, [RefDigest])])
-> (WaitingRef -> IO (Maybe WaitingRef, [RefDigest]))
-> IO [(Maybe WaitingRef, [RefDigest])]
forall a b. (a -> b) -> a -> b
$ \wr :: WaitingRef
wr@WaitingRef { wrefStatus :: WaitingRef -> TVar (Either [RefDigest] Ref)
wrefStatus = TVar (Either [RefDigest] Ref)
tvar } ->
STM (Either [RefDigest] Ref) -> IO (Either [RefDigest] Ref)
forall a. STM a -> IO a
atomically (TVar (Either [RefDigest] Ref) -> STM (Either [RefDigest] Ref)
forall a. TVar a -> STM a
readTVar TVar (Either [RefDigest] Ref)
tvar) IO (Either [RefDigest] Ref)
-> (Either [RefDigest] Ref -> IO (Maybe WaitingRef, [RefDigest]))
-> IO (Maybe WaitingRef, [RefDigest])
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left [RefDigest]
ds -> case ([RefDigest] -> [RefDigest])
-> (PartialRef -> [RefDigest] -> [RefDigest])
-> Maybe PartialRef
-> [RefDigest]
-> [RefDigest]
forall b a. b -> (a -> b) -> Maybe a -> b
maybe [RefDigest] -> [RefDigest]
forall a. a -> a
id ((RefDigest -> Bool) -> [RefDigest] -> [RefDigest]
forall a. (a -> Bool) -> [a] -> [a]
filter ((RefDigest -> Bool) -> [RefDigest] -> [RefDigest])
-> (PartialRef -> RefDigest -> Bool)
-> PartialRef
-> [RefDigest]
-> [RefDigest]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RefDigest -> RefDigest -> Bool
forall a. Eq a => a -> a -> Bool
(/=) (RefDigest -> RefDigest -> Bool)
-> (PartialRef -> RefDigest) -> PartialRef -> RefDigest -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartialRef -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest) Maybe PartialRef
npref ([RefDigest] -> [RefDigest]) -> [RefDigest] -> [RefDigest]
forall a b. (a -> b) -> a -> b
$ [RefDigest]
ds of
[] -> Storage -> PartialRef -> IO (LoadResult Partial Ref)
forall (c :: * -> *) (c' :: * -> *) (m :: * -> *).
(StorageCompleteness c, StorageCompleteness c', MonadIO m) =>
Storage' c' -> Ref' c -> m (LoadResult c (Ref' c'))
copyRef (WaitingRef -> Storage
wrefStorage WaitingRef
wr) (WaitingRef -> PartialRef
wrefPartial WaitingRef
wr) IO (Either RefDigest Ref)
-> (Either RefDigest Ref -> IO (Maybe WaitingRef, [RefDigest]))
-> IO (Maybe WaitingRef, [RefDigest])
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right Ref
ref -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar (Either [RefDigest] Ref) -> Either [RefDigest] Ref -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Either [RefDigest] Ref)
tvar (Either [RefDigest] Ref -> STM ())
-> Either [RefDigest] Ref -> STM ()
forall a b. (a -> b) -> a -> b
$ Ref -> Either [RefDigest] Ref
forall a b. b -> Either a b
Right Ref
ref)
Server -> IO () -> IO ()
forkServerThread Server
server (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ExceptT String IO () -> IO (Either String ())
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (WaitingRef -> Ref -> ExceptT String IO ()
wrefAction WaitingRef
wr Ref
ref) IO (Either String ()) -> (Either String () -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left String
err -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue String -> String -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue String
serverErrorLog Server
server) String
err
Right () -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
(Maybe WaitingRef, [RefDigest])
-> IO (Maybe WaitingRef, [RefDigest])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WaitingRef
forall a. Maybe a
Nothing, [])
Left RefDigest
dgst -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar (Either [RefDigest] Ref) -> Either [RefDigest] Ref -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Either [RefDigest] Ref)
tvar (Either [RefDigest] Ref -> STM ())
-> Either [RefDigest] Ref -> STM ()
forall a b. (a -> b) -> a -> b
$ [RefDigest] -> Either [RefDigest] Ref
forall a b. a -> Either a b
Left [RefDigest
dgst])
(Maybe WaitingRef, [RefDigest])
-> IO (Maybe WaitingRef, [RefDigest])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (WaitingRef -> Maybe WaitingRef
forall a. a -> Maybe a
Just WaitingRef
wr, [RefDigest
dgst])
[RefDigest]
ds' -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (TVar (Either [RefDigest] Ref) -> Either [RefDigest] Ref -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Either [RefDigest] Ref)
tvar (Either [RefDigest] Ref -> STM ())
-> Either [RefDigest] Ref -> STM ()
forall a b. (a -> b) -> a -> b
$ [RefDigest] -> Either [RefDigest] Ref
forall a b. a -> Either a b
Left [RefDigest]
ds')
(Maybe WaitingRef, [RefDigest])
-> IO (Maybe WaitingRef, [RefDigest])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (WaitingRef -> Maybe WaitingRef
forall a. a -> Maybe a
Just WaitingRef
wr, [])
Right Ref
_ -> (Maybe WaitingRef, [RefDigest])
-> IO (Maybe WaitingRef, [RefDigest])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe WaitingRef
forall a. Maybe a
Nothing, [])
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar [WaitingRef] -> [WaitingRef] -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (Peer -> TMVar [WaitingRef]
peerWaitingRefs Peer
peer) ([WaitingRef] -> STM ()) -> [WaitingRef] -> STM ()
forall a b. (a -> b) -> a -> b
$ [Maybe WaitingRef] -> [WaitingRef]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe WaitingRef] -> [WaitingRef])
-> [Maybe WaitingRef] -> [WaitingRef]
forall a b. (a -> b) -> a -> b
$ ((Maybe WaitingRef, [RefDigest]) -> Maybe WaitingRef)
-> [(Maybe WaitingRef, [RefDigest])] -> [Maybe WaitingRef]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe WaitingRef, [RefDigest]) -> Maybe WaitingRef
forall a b. (a, b) -> a
fst [(Maybe WaitingRef, [RefDigest])]
list
let reqs :: [RefDigest]
reqs = [[RefDigest]] -> [RefDigest]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat ([[RefDigest]] -> [RefDigest]) -> [[RefDigest]] -> [RefDigest]
forall a b. (a -> b) -> a -> b
$ ((Maybe WaitingRef, [RefDigest]) -> [RefDigest])
-> [(Maybe WaitingRef, [RefDigest])] -> [[RefDigest]]
forall a b. (a -> b) -> [a] -> [b]
map (Maybe WaitingRef, [RefDigest]) -> [RefDigest]
forall a b. (a, b) -> b
snd [(Maybe WaitingRef, [RefDigest])]
list
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [RefDigest] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [RefDigest]
reqs) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let packet :: TransportPacket Ref
packet = TransportHeader -> [Ref] -> TransportPacket Ref
forall a. TransportHeader -> [a] -> TransportPacket a
TransportPacket ([TransportHeaderItem] -> TransportHeader
TransportHeader ([TransportHeaderItem] -> TransportHeader)
-> [TransportHeaderItem] -> TransportHeader
forall a b. (a -> b) -> a -> b
$ (RefDigest -> TransportHeaderItem)
-> [RefDigest] -> [TransportHeaderItem]
forall a b. (a -> b) -> [a] -> [b]
map RefDigest -> TransportHeaderItem
DataRequest [RefDigest]
reqs) []
ackedBy :: [TransportHeaderItem]
ackedBy = [[TransportHeaderItem]] -> [TransportHeaderItem]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[ RefDigest -> TransportHeaderItem
Rejected RefDigest
r, RefDigest -> TransportHeaderItem
DataResponse RefDigest
r ] | RefDigest
r <- [RefDigest]
reqs ]
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerPlain Peer
peer [TransportHeaderItem]
ackedBy TransportPacket Ref
packet
newtype PacketHandler a = PacketHandler { forall a.
PacketHandler a -> StateT PacketHandlerState (ExceptT String STM) a
unPacketHandler :: StateT PacketHandlerState (ExceptT String STM) a }
deriving ((forall a b. (a -> b) -> PacketHandler a -> PacketHandler b)
-> (forall a b. a -> PacketHandler b -> PacketHandler a)
-> Functor PacketHandler
forall a b. a -> PacketHandler b -> PacketHandler a
forall a b. (a -> b) -> PacketHandler a -> PacketHandler b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
$cfmap :: forall a b. (a -> b) -> PacketHandler a -> PacketHandler b
fmap :: forall a b. (a -> b) -> PacketHandler a -> PacketHandler b
$c<$ :: forall a b. a -> PacketHandler b -> PacketHandler a
<$ :: forall a b. a -> PacketHandler b -> PacketHandler a
Functor, Functor PacketHandler
Functor PacketHandler =>
(forall a. a -> PacketHandler a)
-> (forall a b.
PacketHandler (a -> b) -> PacketHandler a -> PacketHandler b)
-> (forall a b c.
(a -> b -> c)
-> PacketHandler a -> PacketHandler b -> PacketHandler c)
-> (forall a b.
PacketHandler a -> PacketHandler b -> PacketHandler b)
-> (forall a b.
PacketHandler a -> PacketHandler b -> PacketHandler a)
-> Applicative PacketHandler
forall a. a -> PacketHandler a
forall a b. PacketHandler a -> PacketHandler b -> PacketHandler a
forall a b. PacketHandler a -> PacketHandler b -> PacketHandler b
forall a b.
PacketHandler (a -> b) -> PacketHandler a -> PacketHandler b
forall a b c.
(a -> b -> c)
-> PacketHandler a -> PacketHandler b -> PacketHandler c
forall (f :: * -> *).
Functor f =>
(forall a. a -> f a)
-> (forall a b. f (a -> b) -> f a -> f b)
-> (forall a b c. (a -> b -> c) -> f a -> f b -> f c)
-> (forall a b. f a -> f b -> f b)
-> (forall a b. f a -> f b -> f a)
-> Applicative f
$cpure :: forall a. a -> PacketHandler a
pure :: forall a. a -> PacketHandler a
$c<*> :: forall a b.
PacketHandler (a -> b) -> PacketHandler a -> PacketHandler b
<*> :: forall a b.
PacketHandler (a -> b) -> PacketHandler a -> PacketHandler b
$cliftA2 :: forall a b c.
(a -> b -> c)
-> PacketHandler a -> PacketHandler b -> PacketHandler c
liftA2 :: forall a b c.
(a -> b -> c)
-> PacketHandler a -> PacketHandler b -> PacketHandler c
$c*> :: forall a b. PacketHandler a -> PacketHandler b -> PacketHandler b
*> :: forall a b. PacketHandler a -> PacketHandler b -> PacketHandler b
$c<* :: forall a b. PacketHandler a -> PacketHandler b -> PacketHandler a
<* :: forall a b. PacketHandler a -> PacketHandler b -> PacketHandler a
Applicative, Applicative PacketHandler
Applicative PacketHandler =>
(forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b)
-> (forall a b.
PacketHandler a -> PacketHandler b -> PacketHandler b)
-> (forall a. a -> PacketHandler a)
-> Monad PacketHandler
forall a. a -> PacketHandler a
forall a b. PacketHandler a -> PacketHandler b -> PacketHandler b
forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b
forall (m :: * -> *).
Applicative m =>
(forall a b. m a -> (a -> m b) -> m b)
-> (forall a b. m a -> m b -> m b)
-> (forall a. a -> m a)
-> Monad m
$c>>= :: forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b
>>= :: forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b
$c>> :: forall a b. PacketHandler a -> PacketHandler b -> PacketHandler b
>> :: forall a b. PacketHandler a -> PacketHandler b -> PacketHandler b
$creturn :: forall a. a -> PacketHandler a
return :: forall a. a -> PacketHandler a
Monad, MonadState PacketHandlerState, MonadError String)
instance MonadFail PacketHandler where
fail :: forall a. String -> PacketHandler a
fail = String -> PacketHandler a
forall a. String -> PacketHandler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError
runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM ()
runPacketHandler :: Bool -> Peer -> PacketHandler () -> STM ()
runPacketHandler Bool
secure peer :: Peer
peer@Peer {TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
TVar PeerIdentity
TMVar [WaitingRef]
TMVar (Map ServiceID SomeServiceState)
PartialStorage
Storage
Server
PeerAddress
peerAddress :: Peer -> PeerAddress
peerServer_ :: Peer -> Server
peerConnection :: Peer
-> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: Peer -> TVar PeerIdentity
peerStorage_ :: Peer -> Storage
peerInStorage :: Peer -> PartialStorage
peerServiceState :: Peer -> TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: Peer -> TMVar [WaitingRef]
peerAddress :: PeerAddress
peerServer_ :: Server
peerConnection :: TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: TVar PeerIdentity
peerStorage_ :: Storage
peerInStorage :: PartialStorage
peerServiceState :: TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: TMVar [WaitingRef]
..} PacketHandler ()
act = do
let logd :: String -> STM ()
logd = TQueue String -> String -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (TQueue String -> String -> STM ())
-> TQueue String -> String -> STM ()
forall a b. (a -> b) -> a -> b
$ Server -> TQueue String
serverErrorLog Server
peerServer_
ExceptT String STM PacketHandlerState
-> STM (Either String PacketHandlerState)
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT ((StateT PacketHandlerState (ExceptT String STM) ()
-> PacketHandlerState -> ExceptT String STM PacketHandlerState)
-> PacketHandlerState
-> StateT PacketHandlerState (ExceptT String STM) ()
-> ExceptT String STM PacketHandlerState
forall a b c. (a -> b -> c) -> b -> a -> c
flip StateT PacketHandlerState (ExceptT String STM) ()
-> PacketHandlerState -> ExceptT String STM PacketHandlerState
forall (m :: * -> *) s a. Monad m => StateT s m a -> s -> m s
execStateT (Peer
-> [TransportHeaderItem]
-> [TransportHeaderItem]
-> [Ref]
-> PacketHandlerState
PacketHandlerState Peer
peer [] [] []) (StateT PacketHandlerState (ExceptT String STM) ()
-> ExceptT String STM PacketHandlerState)
-> StateT PacketHandlerState (ExceptT String STM) ()
-> ExceptT String STM PacketHandlerState
forall a b. (a -> b) -> a -> b
$ PacketHandler ()
-> StateT PacketHandlerState (ExceptT String STM) ()
forall a.
PacketHandler a -> StateT PacketHandlerState (ExceptT String STM) a
unPacketHandler PacketHandler ()
act) STM (Either String PacketHandlerState)
-> (Either String PacketHandlerState -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left String
err -> do
String -> STM ()
logd (String -> STM ()) -> String -> STM ()
forall a b. (a -> b) -> a -> b
$ String
"Error in handling packet from " String -> ShowS
forall a. [a] -> [a] -> [a]
++ PeerAddress -> String
forall a. Show a => a -> String
show PeerAddress
peerAddress String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
": " String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
err
Right PacketHandlerState
ph -> do
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [TransportHeaderItem] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null ([TransportHeaderItem] -> Bool) -> [TransportHeaderItem] -> Bool
forall a b. (a -> b) -> a -> b
$ PacketHandlerState -> [TransportHeaderItem]
phHead PacketHandlerState
ph) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
let packet :: TransportPacket Ref
packet = TransportHeader -> [Ref] -> TransportPacket Ref
forall a. TransportHeader -> [a] -> TransportPacket a
TransportPacket ([TransportHeaderItem] -> TransportHeader
TransportHeader ([TransportHeaderItem] -> TransportHeader)
-> [TransportHeaderItem] -> TransportHeader
forall a b. (a -> b) -> a -> b
$ PacketHandlerState -> [TransportHeaderItem]
phHead PacketHandlerState
ph) (PacketHandlerState -> [Ref]
phBody PacketHandlerState
ph)
Bool
-> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' Bool
secure Peer
peer (PacketHandlerState -> [TransportHeaderItem]
phAckedBy PacketHandlerState
ph) TransportPacket Ref
packet
liftSTM :: STM a -> PacketHandler a
liftSTM :: forall a. STM a -> PacketHandler a
liftSTM = StateT PacketHandlerState (ExceptT String STM) a -> PacketHandler a
forall a.
StateT PacketHandlerState (ExceptT String STM) a -> PacketHandler a
PacketHandler (StateT PacketHandlerState (ExceptT String STM) a
-> PacketHandler a)
-> (STM a -> StateT PacketHandlerState (ExceptT String STM) a)
-> STM a
-> PacketHandler a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExceptT String STM a
-> StateT PacketHandlerState (ExceptT String STM) a
forall (m :: * -> *) a.
Monad m =>
m a -> StateT PacketHandlerState m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (ExceptT String STM a
-> StateT PacketHandlerState (ExceptT String STM) a)
-> (STM a -> ExceptT String STM a)
-> STM a
-> StateT PacketHandlerState (ExceptT String STM) a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. STM a -> ExceptT String STM a
forall (m :: * -> *) a. Monad m => m a -> ExceptT String m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift
readTVarP :: TVar a -> PacketHandler a
readTVarP :: forall a. TVar a -> PacketHandler a
readTVarP = STM a -> PacketHandler a
forall a. STM a -> PacketHandler a
liftSTM (STM a -> PacketHandler a)
-> (TVar a -> STM a) -> TVar a -> PacketHandler a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar a -> STM a
forall a. TVar a -> STM a
readTVar
writeTVarP :: TVar a -> a -> PacketHandler ()
writeTVarP :: forall a. TVar a -> a -> PacketHandler ()
writeTVarP TVar a
v = STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ())
-> (a -> STM ()) -> a -> PacketHandler ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar a -> a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar a
v
modifyTMVarP :: TMVar a -> (a -> a) -> PacketHandler ()
modifyTMVarP :: forall a. TMVar a -> (a -> a) -> PacketHandler ()
modifyTMVarP TMVar a
v a -> a
f = STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ()) -> STM () -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
v (a -> STM ()) -> (a -> a) -> a -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> a
f (a -> STM ()) -> STM a -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TMVar a -> STM a
forall a. TMVar a -> STM a
takeTMVar TMVar a
v
data PacketHandlerState = PacketHandlerState
{ PacketHandlerState -> Peer
phPeer :: Peer
, PacketHandlerState -> [TransportHeaderItem]
phHead :: [TransportHeaderItem]
, PacketHandlerState -> [TransportHeaderItem]
phAckedBy :: [TransportHeaderItem]
, PacketHandlerState -> [Ref]
phBody :: [Ref]
}
addHeader :: TransportHeaderItem -> PacketHandler ()
TransportHeaderItem
h = (PacketHandlerState -> PacketHandlerState) -> PacketHandler ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PacketHandlerState -> PacketHandlerState) -> PacketHandler ())
-> (PacketHandlerState -> PacketHandlerState) -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ \PacketHandlerState
ph -> PacketHandlerState
ph { phHead = h `appendDistinct` phHead ph }
addAckedBy :: [TransportHeaderItem] -> PacketHandler ()
addAckedBy :: [TransportHeaderItem] -> PacketHandler ()
addAckedBy [TransportHeaderItem]
hs = (PacketHandlerState -> PacketHandlerState) -> PacketHandler ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PacketHandlerState -> PacketHandlerState) -> PacketHandler ())
-> (PacketHandlerState -> PacketHandlerState) -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ \PacketHandlerState
ph -> PacketHandlerState
ph { phAckedBy = foldr appendDistinct (phAckedBy ph) hs }
addBody :: Ref -> PacketHandler ()
addBody :: Ref -> PacketHandler ()
addBody Ref
r = (PacketHandlerState -> PacketHandlerState) -> PacketHandler ()
forall s (m :: * -> *). MonadState s m => (s -> s) -> m ()
modify ((PacketHandlerState -> PacketHandlerState) -> PacketHandler ())
-> (PacketHandlerState -> PacketHandlerState) -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ \PacketHandlerState
ph -> PacketHandlerState
ph { phBody = r `appendDistinct` phBody ph }
appendDistinct :: Eq a => a -> [a] -> [a]
appendDistinct :: forall a. Eq a => a -> [a] -> [a]
appendDistinct a
x (a
y:[a]
ys) | a
x a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
y = a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
ys
| Bool
otherwise = a
y a -> [a] -> [a]
forall a. a -> [a] -> [a]
: a -> [a] -> [a]
forall a. Eq a => a -> [a] -> [a]
appendDistinct a
x [a]
ys
appendDistinct a
x [] = [a
x]
handlePacket :: UnifiedIdentity -> Bool
-> Peer -> TQueue (Peer, ServiceID, Ref) -> [ServiceID]
-> TransportHeader -> [PartialRef] -> IO ()
handlePacket :: UnifiedIdentity
-> Bool
-> Peer
-> TQueue (Peer, ServiceID, Ref)
-> [ServiceID]
-> TransportHeader
-> [PartialRef]
-> IO ()
handlePacket UnifiedIdentity
identity Bool
secure Peer
peer TQueue (Peer, ServiceID, Ref)
chanSvc [ServiceID]
svcs (TransportHeader [TransportHeaderItem]
headers) [PartialRef]
prefs = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let server :: Server
server = Peer -> Server
peerServer Peer
peer
ChannelState
ochannel <- Peer -> STM ChannelState
getPeerChannel Peer
peer
let sidentity :: Stored (Signed IdentityData)
sidentity = UnifiedIdentity -> Stored (Signed IdentityData)
idData UnifiedIdentity
identity
plaintextRefs :: [RefDigest]
plaintextRefs = (Stored Object -> RefDigest) -> [Stored Object] -> [RefDigest]
forall a b. (a -> b) -> [a] -> [b]
map (Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Ref -> RefDigest)
-> (Stored Object -> Ref) -> Stored Object -> RefDigest
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stored Object -> Ref
forall a. Stored a -> Ref
storedRef) ([Stored Object] -> [RefDigest]) -> [Stored Object] -> [RefDigest]
forall a b. (a -> b) -> a -> b
$ (Ref -> [Stored Object]) -> [Ref] -> [Stored Object]
forall (t :: * -> *) a b. Foldable t => (a -> [b]) -> t a -> [b]
concatMap (Stored Object -> [Stored Object]
collectStoredObjects (Stored Object -> [Stored Object])
-> (Ref -> Stored Object) -> Ref -> [Stored Object]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ref -> Stored Object
forall a. Storable a => Ref -> Stored a
wrappedLoad) ([Ref] -> [Stored Object]) -> [Ref] -> [Stored Object]
forall a b. (a -> b) -> a -> b
$ [[Ref]] -> [Ref]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
[ [ Stored (Signed IdentityData) -> Ref
forall a. Stored a -> Ref
storedRef Stored (Signed IdentityData)
sidentity ]
, (Stored (Signed ExtendedIdentityData) -> Ref)
-> [Stored (Signed ExtendedIdentityData)] -> [Ref]
forall a b. (a -> b) -> [a] -> [b]
map Stored (Signed ExtendedIdentityData) -> Ref
forall a. Stored a -> Ref
storedRef ([Stored (Signed ExtendedIdentityData)] -> [Ref])
-> [Stored (Signed ExtendedIdentityData)] -> [Ref]
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> [Stored (Signed ExtendedIdentityData)]
forall (m :: * -> *).
Identity m -> [Stored (Signed ExtendedIdentityData)]
idUpdates UnifiedIdentity
identity
, case ChannelState
ochannel of
ChannelOurRequest Maybe Cookie
_ Stored ChannelRequest
req -> [ Stored ChannelRequest -> Ref
forall a. Stored a -> Ref
storedRef Stored ChannelRequest
req ]
ChannelOurAccept Maybe Cookie
_ Stored ChannelAccept
acc Channel
_ -> [ Stored ChannelAccept -> Ref
forall a. Stored a -> Ref
storedRef Stored ChannelAccept
acc ]
ChannelState
_ -> []
]
Bool -> Peer -> PacketHandler () -> STM ()
runPacketHandler Bool
secure Peer
peer (PacketHandler () -> STM ()) -> PacketHandler () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
let logd :: String -> PacketHandler ()
logd = STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ())
-> (String -> STM ()) -> String -> PacketHandler ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TQueue String -> String -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue String
serverErrorLog Server
server)
[TransportHeaderItem]
-> (TransportHeaderItem -> PacketHandler ()) -> PacketHandler ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [TransportHeaderItem]
headers ((TransportHeaderItem -> PacketHandler ()) -> PacketHandler ())
-> (TransportHeaderItem -> PacketHandler ()) -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ \case
Acknowledged RefDigest
dgst -> do
STM ChannelState -> PacketHandler ChannelState
forall a. STM a -> PacketHandler a
liftSTM (Peer -> STM ChannelState
getPeerChannel Peer
peer) PacketHandler ChannelState
-> (ChannelState -> PacketHandler ()) -> PacketHandler ()
forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ChannelOurAccept Maybe Cookie
_ Stored ChannelAccept
acc Channel
ch | Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Stored ChannelAccept -> Ref
forall a. Stored a -> Ref
storedRef Stored ChannelAccept
acc) RefDigest -> RefDigest -> Bool
forall a. Eq a => a -> a -> Bool
== RefDigest
dgst -> do
STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ()) -> STM () -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ Peer -> Channel -> UnifiedIdentity -> STM ()
finalizedChannel Peer
peer Channel
ch UnifiedIdentity
identity
ChannelState
_ -> () -> PacketHandler ()
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Rejected RefDigest
dgst -> do
String -> PacketHandler ()
logd (String -> PacketHandler ()) -> String -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ String
"rejected by peer: " String -> ShowS
forall a. [a] -> [a] -> [a]
++ RefDigest -> String
forall a. Show a => a -> String
show RefDigest
dgst
DataRequest RefDigest
dgst
| Bool
secure Bool -> Bool -> Bool
|| RefDigest
dgst RefDigest -> [RefDigest] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [RefDigest]
plaintextRefs -> do
Right Ref
mref <- STM (Either RefDigest Ref) -> PacketHandler (Either RefDigest Ref)
forall a. STM a -> PacketHandler a
liftSTM (STM (Either RefDigest Ref)
-> PacketHandler (Either RefDigest Ref))
-> STM (Either RefDigest Ref)
-> PacketHandler (Either RefDigest Ref)
forall a b. (a -> b) -> a -> b
$ IO (Either RefDigest Ref) -> STM (Either RefDigest Ref)
forall a. IO a -> STM a
unsafeIOToSTM (IO (Either RefDigest Ref) -> STM (Either RefDigest Ref))
-> IO (Either RefDigest Ref) -> STM (Either RefDigest Ref)
forall a b. (a -> b) -> a -> b
$
Storage -> PartialRef -> IO (LoadResult Partial Ref)
forall (c :: * -> *) (c' :: * -> *) (m :: * -> *).
(StorageCompleteness c, StorageCompleteness c', MonadIO m) =>
Storage' c' -> Ref' c -> m (LoadResult c (Ref' c'))
copyRef (Peer -> Storage
peerStorage Peer
peer) (PartialRef -> IO (LoadResult Partial Ref))
-> PartialRef -> IO (LoadResult Partial Ref)
forall a b. (a -> b) -> a -> b
$
PartialStorage -> RefDigest -> PartialRef
partialRefFromDigest (Peer -> PartialStorage
peerInStorage Peer
peer) RefDigest
dgst
TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
DataResponse RefDigest
dgst
[TransportHeaderItem] -> PacketHandler ()
addAckedBy [ RefDigest -> TransportHeaderItem
Acknowledged RefDigest
dgst, RefDigest -> TransportHeaderItem
Rejected RefDigest
dgst ]
Ref -> PacketHandler ()
addBody (Ref -> PacketHandler ()) -> Ref -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ Ref
mref
| Bool
otherwise -> do
String -> PacketHandler ()
logd (String -> PacketHandler ()) -> String -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ String
"unauthorized data request for " String -> ShowS
forall a. [a] -> [a] -> [a]
++ RefDigest -> String
forall a. Show a => a -> String
show RefDigest
dgst
TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
Rejected RefDigest
dgst
DataResponse RefDigest
dgst -> if
| Just PartialRef
pref <- (PartialRef -> Bool) -> [PartialRef] -> Maybe PartialRef
forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Maybe a
find ((RefDigest -> RefDigest -> Bool
forall a. Eq a => a -> a -> Bool
==RefDigest
dgst) (RefDigest -> Bool)
-> (PartialRef -> RefDigest) -> PartialRef -> Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. PartialRef -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest) [PartialRef]
prefs -> do
Bool -> PacketHandler () -> PacketHandler ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not Bool
secure) (PacketHandler () -> PacketHandler ())
-> PacketHandler () -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ do
TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
Acknowledged RefDigest
dgst
STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ()) -> STM () -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ TQueue (Peer, Maybe PartialRef)
-> (Peer, Maybe PartialRef) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue (Peer, Maybe PartialRef)
serverDataResponse Server
server) (Peer
peer, PartialRef -> Maybe PartialRef
forall a. a -> Maybe a
Just PartialRef
pref)
| Bool
otherwise -> String -> PacketHandler ()
forall a. String -> PacketHandler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (String -> PacketHandler ()) -> String -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ String
"mismatched data response " String -> ShowS
forall a. [a] -> [a] -> [a]
++ RefDigest -> String
forall a. Show a => a -> String
show RefDigest
dgst
AnnounceSelf RefDigest
dgst
| RefDigest
dgst RefDigest -> RefDigest -> Bool
forall a. Eq a => a -> a -> Bool
== Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Stored (Signed IdentityData) -> Ref
forall a. Stored a -> Ref
storedRef Stored (Signed IdentityData)
sidentity) -> () -> PacketHandler ()
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Bool
otherwise -> do
WaitingRef
wref <- RefDigest
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
newWaitingRef RefDigest
dgst ((Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef)
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> Peer -> Ref -> ExceptT String IO ()
handleIdentityAnnounce UnifiedIdentity
identity Peer
peer
TVar PeerIdentity -> PacketHandler PeerIdentity
forall a. TVar a -> PacketHandler a
readTVarP (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) PacketHandler PeerIdentity
-> (PeerIdentity -> PacketHandler ()) -> PacketHandler ()
forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PeerIdentityUnknown TVar [UnifiedIdentity -> ExceptT String IO ()]
idwait -> do
TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
AnnounceSelf (RefDigest -> TransportHeaderItem)
-> RefDigest -> TransportHeaderItem
forall a b. (a -> b) -> a -> b
$ Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Ref -> RefDigest) -> Ref -> RefDigest
forall a b. (a -> b) -> a -> b
$ Stored (Signed IdentityData) -> Ref
forall a. Stored a -> Ref
storedRef (Stored (Signed IdentityData) -> Ref)
-> Stored (Signed IdentityData) -> Ref
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> Stored (Signed IdentityData)
idData UnifiedIdentity
identity
TVar PeerIdentity -> PeerIdentity -> PacketHandler ()
forall a. TVar a -> a -> PacketHandler ()
writeTVarP (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) (PeerIdentity -> PacketHandler ())
-> PeerIdentity -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ WaitingRef
-> TVar [UnifiedIdentity -> ExceptT String IO ()] -> PeerIdentity
PeerIdentityRef WaitingRef
wref TVar [UnifiedIdentity -> ExceptT String IO ()]
idwait
STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ()) -> STM () -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ TChan Peer -> Peer -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan (Server -> TChan Peer
serverChanPeer (Server -> TChan Peer) -> Server -> TChan Peer
forall a b. (a -> b) -> a -> b
$ Peer -> Server
peerServer Peer
peer) Peer
peer
PeerIdentity
_ -> () -> PacketHandler ()
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
AnnounceUpdate RefDigest
dgst -> do
TVar PeerIdentity -> PacketHandler PeerIdentity
forall a. TVar a -> PacketHandler a
readTVarP (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) PacketHandler PeerIdentity
-> (PeerIdentity -> PacketHandler ()) -> PacketHandler ()
forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PeerIdentityFull UnifiedIdentity
_ -> do
PacketHandler WaitingRef -> PacketHandler ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (PacketHandler WaitingRef -> PacketHandler ())
-> PacketHandler WaitingRef -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
newWaitingRef RefDigest
dgst ((Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef)
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
forall a b. (a -> b) -> a -> b
$ Peer -> Ref -> ExceptT String IO ()
handleIdentityUpdate Peer
peer
PeerIdentity
_ -> () -> PacketHandler ()
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
TrChannelRequest RefDigest
dgst -> do
let process :: Maybe Cookie -> PacketHandler ()
process Maybe Cookie
cookie = do
TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
Acknowledged RefDigest
dgst
WaitingRef
wref <- RefDigest
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
newWaitingRef RefDigest
dgst ((Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef)
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
forall a b. (a -> b) -> a -> b
$ Peer -> UnifiedIdentity -> Ref -> ExceptT String IO ()
handleChannelRequest Peer
peer UnifiedIdentity
identity
STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ()) -> STM () -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ Peer -> ChannelState -> STM ()
setPeerChannel Peer
peer (ChannelState -> STM ()) -> ChannelState -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe Cookie -> WaitingRef -> ChannelState
ChannelPeerRequest Maybe Cookie
cookie WaitingRef
wref
reject :: PacketHandler ()
reject = TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
Rejected RefDigest
dgst
STM ChannelState -> PacketHandler ChannelState
forall a. STM a -> PacketHandler a
liftSTM (Peer -> STM ChannelState
getPeerChannel Peer
peer) PacketHandler ChannelState
-> (ChannelState -> PacketHandler ()) -> PacketHandler ()
forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ChannelNone {} -> () -> PacketHandler ()
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
ChannelCookieWait {} -> () -> PacketHandler ()
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
ChannelCookieReceived Cookie
cookie -> Maybe Cookie -> PacketHandler ()
process (Maybe Cookie -> PacketHandler ())
-> Maybe Cookie -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ Cookie -> Maybe Cookie
forall a. a -> Maybe a
Just Cookie
cookie
ChannelCookieConfirmed Cookie
cookie -> Maybe Cookie -> PacketHandler ()
process (Maybe Cookie -> PacketHandler ())
-> Maybe Cookie -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ Cookie -> Maybe Cookie
forall a. a -> Maybe a
Just Cookie
cookie
ChannelOurRequest Maybe Cookie
mbcookie Stored ChannelRequest
our | RefDigest
dgst RefDigest -> RefDigest -> Bool
forall a. Ord a => a -> a -> Bool
< Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Stored ChannelRequest -> Ref
forall a. Stored a -> Ref
storedRef Stored ChannelRequest
our) -> Maybe Cookie -> PacketHandler ()
process Maybe Cookie
mbcookie
| Bool
otherwise -> PacketHandler ()
reject
ChannelPeerRequest Maybe Cookie
mbcookie WaitingRef
_ -> Maybe Cookie -> PacketHandler ()
process Maybe Cookie
mbcookie
ChannelOurAccept {} -> PacketHandler ()
reject
ChannelEstablished {} -> Maybe Cookie -> PacketHandler ()
process Maybe Cookie
forall a. Maybe a
Nothing
TrChannelAccept RefDigest
dgst -> do
let process :: PacketHandler ()
process = do
UnifiedIdentity -> PartialRef -> PacketHandler ()
handleChannelAccept UnifiedIdentity
identity (PartialRef -> PacketHandler ()) -> PartialRef -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ PartialStorage -> RefDigest -> PartialRef
partialRefFromDigest (Peer -> PartialStorage
peerInStorage Peer
peer) RefDigest
dgst
reject :: PacketHandler ()
reject = TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
Rejected RefDigest
dgst
STM ChannelState -> PacketHandler ChannelState
forall a. STM a -> PacketHandler a
liftSTM (Peer -> STM ChannelState
getPeerChannel Peer
peer) PacketHandler ChannelState
-> (ChannelState -> PacketHandler ()) -> PacketHandler ()
forall a b.
PacketHandler a -> (a -> PacketHandler b) -> PacketHandler b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ChannelNone {} -> PacketHandler ()
reject
ChannelCookieWait {} -> PacketHandler ()
reject
ChannelCookieReceived {} -> PacketHandler ()
reject
ChannelCookieConfirmed {} -> PacketHandler ()
reject
ChannelOurRequest {} -> PacketHandler ()
process
ChannelPeerRequest {} -> PacketHandler ()
process
ChannelOurAccept Maybe Cookie
_ Stored ChannelAccept
our Channel
_ | RefDigest
dgst RefDigest -> RefDigest -> Bool
forall a. Ord a => a -> a -> Bool
< Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Stored ChannelAccept -> Ref
forall a. Stored a -> Ref
storedRef Stored ChannelAccept
our) -> PacketHandler ()
process
| Bool
otherwise -> TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
Rejected RefDigest
dgst
ChannelEstablished {} -> PacketHandler ()
process
ServiceType ServiceID
_ -> () -> PacketHandler ()
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
ServiceRef RefDigest
dgst
| Bool -> Bool
not Bool
secure -> String -> PacketHandler ()
forall a. String -> PacketHandler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (String -> PacketHandler ()) -> String -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ String
"service packet without secure channel"
| Just ServiceID
svc <- [TransportHeaderItem] -> Maybe ServiceID
lookupServiceType [TransportHeaderItem]
headers -> if
| ServiceID
svc ServiceID -> [ServiceID] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` [ServiceID]
svcs -> do
if RefDigest
dgst RefDigest -> [RefDigest] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
`elem` (PartialRef -> RefDigest) -> [PartialRef] -> [RefDigest]
forall a b. (a -> b) -> [a] -> [b]
map PartialRef -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest [PartialRef]
prefs Bool -> Bool -> Bool
|| Bool
True
then do
PacketHandler WaitingRef -> PacketHandler ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (PacketHandler WaitingRef -> PacketHandler ())
-> PacketHandler WaitingRef -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
newWaitingRef RefDigest
dgst ((Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef)
-> (Ref -> ExceptT String IO ()) -> PacketHandler WaitingRef
forall a b. (a -> b) -> a -> b
$ \Ref
ref ->
IO () -> ExceptT String IO ()
forall a. IO a -> ExceptT String IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT String IO ()) -> IO () -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TQueue (Peer, ServiceID, Ref) -> (Peer, ServiceID, Ref) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Peer, ServiceID, Ref)
chanSvc (Peer
peer, ServiceID
svc, Ref
ref)
else String -> PacketHandler ()
forall a. String -> PacketHandler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (String -> PacketHandler ()) -> String -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ String
"missing service object " String -> ShowS
forall a. [a] -> [a] -> [a]
++ RefDigest -> String
forall a. Show a => a -> String
show RefDigest
dgst
| Bool
otherwise -> TransportHeaderItem -> PacketHandler ()
addHeader (TransportHeaderItem -> PacketHandler ())
-> TransportHeaderItem -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ RefDigest -> TransportHeaderItem
Rejected RefDigest
dgst
| Bool
otherwise -> String -> PacketHandler ()
forall a. String -> PacketHandler a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (String -> PacketHandler ()) -> String -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ String
"service ref without type"
TransportHeaderItem
_ -> () -> PacketHandler ()
forall a. a -> PacketHandler a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
withPeerIdentity :: MonadIO m => Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
withPeerIdentity :: forall (m :: * -> *).
MonadIO m =>
Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
withPeerIdentity Peer
peer UnifiedIdentity -> ExceptT String IO ()
act = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar PeerIdentity -> STM PeerIdentity
forall a. TVar a -> STM a
readTVar (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) STM PeerIdentity -> (PeerIdentity -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PeerIdentityUnknown TVar [UnifiedIdentity -> ExceptT String IO ()]
tvar -> TVar [UnifiedIdentity -> ExceptT String IO ()]
-> ([UnifiedIdentity -> ExceptT String IO ()]
-> [UnifiedIdentity -> ExceptT String IO ()])
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [UnifiedIdentity -> ExceptT String IO ()]
tvar (UnifiedIdentity -> ExceptT String IO ()
act(UnifiedIdentity -> ExceptT String IO ())
-> [UnifiedIdentity -> ExceptT String IO ()]
-> [UnifiedIdentity -> ExceptT String IO ()]
forall a. a -> [a] -> [a]
:)
PeerIdentityRef WaitingRef
_ TVar [UnifiedIdentity -> ExceptT String IO ()]
tvar -> TVar [UnifiedIdentity -> ExceptT String IO ()]
-> ([UnifiedIdentity -> ExceptT String IO ()]
-> [UnifiedIdentity -> ExceptT String IO ()])
-> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar [UnifiedIdentity -> ExceptT String IO ()]
tvar (UnifiedIdentity -> ExceptT String IO ()
act(UnifiedIdentity -> ExceptT String IO ())
-> [UnifiedIdentity -> ExceptT String IO ()]
-> [UnifiedIdentity -> ExceptT String IO ()]
forall a. a -> [a] -> [a]
:)
PeerIdentityFull UnifiedIdentity
idt -> TQueue (ExceptT String IO ()) -> ExceptT String IO () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue (ExceptT String IO ())
serverIOActions (Server -> TQueue (ExceptT String IO ()))
-> Server -> TQueue (ExceptT String IO ())
forall a b. (a -> b) -> a -> b
$ Peer -> Server
peerServer Peer
peer) (UnifiedIdentity -> ExceptT String IO ()
act UnifiedIdentity
idt)
setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> WaitingRefCallback
setupChannel :: UnifiedIdentity -> Peer -> UnifiedIdentity -> ExceptT String IO ()
setupChannel UnifiedIdentity
identity Peer
peer UnifiedIdentity
upid = do
Stored ChannelRequest
req <- (ReaderT Storage (ExceptT String IO) (Stored ChannelRequest)
-> Storage -> ExceptT String IO (Stored ChannelRequest))
-> Storage
-> ReaderT Storage (ExceptT String IO) (Stored ChannelRequest)
-> ExceptT String IO (Stored ChannelRequest)
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT Storage (ExceptT String IO) (Stored ChannelRequest)
-> Storage -> ExceptT String IO (Stored ChannelRequest)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Peer -> Storage
peerStorage Peer
peer) (ReaderT Storage (ExceptT String IO) (Stored ChannelRequest)
-> ExceptT String IO (Stored ChannelRequest))
-> ReaderT Storage (ExceptT String IO) (Stored ChannelRequest)
-> ExceptT String IO (Stored ChannelRequest)
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity
-> UnifiedIdentity
-> ReaderT Storage (ExceptT String IO) (Stored ChannelRequest)
forall (m :: * -> *).
(MonadStorage m, MonadIO m, MonadError String m) =>
UnifiedIdentity -> UnifiedIdentity -> m (Stored ChannelRequest)
createChannelRequest UnifiedIdentity
identity UnifiedIdentity
upid
let reqref :: RefDigest
reqref = Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Ref -> RefDigest) -> Ref -> RefDigest
forall a b. (a -> b) -> a -> b
$ Stored ChannelRequest -> Ref
forall a. Stored a -> Ref
storedRef Stored ChannelRequest
req
let hitems :: [TransportHeaderItem]
hitems =
[ RefDigest -> TransportHeaderItem
TrChannelRequest RefDigest
reqref
, RefDigest -> TransportHeaderItem
AnnounceSelf (RefDigest -> TransportHeaderItem)
-> RefDigest -> TransportHeaderItem
forall a b. (a -> b) -> a -> b
$ Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Ref -> RefDigest) -> Ref -> RefDigest
forall a b. (a -> b) -> a -> b
$ Stored (Signed IdentityData) -> Ref
forall a. Stored a -> Ref
storedRef (Stored (Signed IdentityData) -> Ref)
-> Stored (Signed IdentityData) -> Ref
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> Stored (Signed IdentityData)
idData UnifiedIdentity
identity
]
IO () -> ExceptT String IO ()
forall a. IO a -> ExceptT String IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT String IO ()) -> IO () -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Peer -> STM ChannelState
getPeerChannel Peer
peer STM ChannelState -> (ChannelState -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ChannelCookieConfirmed Cookie
cookie -> do
Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerPlain Peer
peer [ RefDigest -> TransportHeaderItem
Acknowledged RefDigest
reqref, RefDigest -> TransportHeaderItem
Rejected RefDigest
reqref ] (TransportPacket Ref -> STM ()) -> TransportPacket Ref -> STM ()
forall a b. (a -> b) -> a -> b
$
TransportHeader -> [Ref] -> TransportPacket Ref
forall a. TransportHeader -> [a] -> TransportPacket a
TransportPacket ([TransportHeaderItem] -> TransportHeader
TransportHeader [TransportHeaderItem]
hitems) [Stored ChannelRequest -> Ref
forall a. Stored a -> Ref
storedRef Stored ChannelRequest
req]
Peer -> ChannelState -> STM ()
setPeerChannel Peer
peer (ChannelState -> STM ()) -> ChannelState -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe Cookie -> Stored ChannelRequest -> ChannelState
ChannelOurRequest (Cookie -> Maybe Cookie
forall a. a -> Maybe a
Just Cookie
cookie) Stored ChannelRequest
req
ChannelState
_ -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> WaitingRefCallback
handleChannelRequest :: Peer -> UnifiedIdentity -> Ref -> ExceptT String IO ()
handleChannelRequest Peer
peer UnifiedIdentity
identity Ref
req = do
Peer
-> (UnifiedIdentity -> ExceptT String IO ())
-> ExceptT String IO ()
forall (m :: * -> *).
MonadIO m =>
Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
withPeerIdentity Peer
peer ((UnifiedIdentity -> ExceptT String IO ()) -> ExceptT String IO ())
-> (UnifiedIdentity -> ExceptT String IO ())
-> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ \UnifiedIdentity
upid -> do
(Stored ChannelAccept
acc, Channel
ch) <- (ReaderT
Storage (ExceptT String IO) (Stored ChannelAccept, Channel)
-> Storage -> ExceptT String IO (Stored ChannelAccept, Channel))
-> Storage
-> ReaderT
Storage (ExceptT String IO) (Stored ChannelAccept, Channel)
-> ExceptT String IO (Stored ChannelAccept, Channel)
forall a b c. (a -> b -> c) -> b -> a -> c
flip ReaderT Storage (ExceptT String IO) (Stored ChannelAccept, Channel)
-> Storage -> ExceptT String IO (Stored ChannelAccept, Channel)
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (Peer -> Storage
peerStorage Peer
peer) (ReaderT
Storage (ExceptT String IO) (Stored ChannelAccept, Channel)
-> ExceptT String IO (Stored ChannelAccept, Channel))
-> ReaderT
Storage (ExceptT String IO) (Stored ChannelAccept, Channel)
-> ExceptT String IO (Stored ChannelAccept, Channel)
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity
-> UnifiedIdentity
-> Stored ChannelRequest
-> ReaderT
Storage (ExceptT String IO) (Stored ChannelAccept, Channel)
forall (m :: * -> *).
(MonadStorage m, MonadIO m, MonadError String m) =>
UnifiedIdentity
-> UnifiedIdentity
-> Stored ChannelRequest
-> m (Stored ChannelAccept, Channel)
acceptChannelRequest UnifiedIdentity
identity UnifiedIdentity
upid (Ref -> Stored ChannelRequest
forall a. Storable a => Ref -> Stored a
wrappedLoad Ref
req)
IO () -> ExceptT String IO ()
forall a. IO a -> ExceptT String IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT String IO ()) -> IO () -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Peer -> STM ChannelState
getPeerChannel Peer
peer STM ChannelState -> (ChannelState -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
ChannelPeerRequest Maybe Cookie
mbcookie WaitingRef
wr | WaitingRef -> RefDigest
wrDigest WaitingRef
wr RefDigest -> RefDigest -> Bool
forall a. Eq a => a -> a -> Bool
== Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest Ref
req -> do
Peer -> ChannelState -> STM ()
setPeerChannel Peer
peer (ChannelState -> STM ()) -> ChannelState -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe Cookie -> Stored ChannelAccept -> Channel -> ChannelState
ChannelOurAccept Maybe Cookie
mbcookie Stored ChannelAccept
acc Channel
ch
let accref :: RefDigest
accref = Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Ref -> RefDigest) -> Ref -> RefDigest
forall a b. (a -> b) -> a -> b
$ Stored ChannelAccept -> Ref
forall a. Stored a -> Ref
storedRef Stored ChannelAccept
acc
header :: TransportHeaderItem
header = RefDigest -> TransportHeaderItem
TrChannelAccept RefDigest
accref
ackedBy :: [TransportHeaderItem]
ackedBy = [ RefDigest -> TransportHeaderItem
Acknowledged RefDigest
accref, RefDigest -> TransportHeaderItem
Rejected RefDigest
accref ]
Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerPlain Peer
peer [TransportHeaderItem]
ackedBy (TransportPacket Ref -> STM ()) -> TransportPacket Ref -> STM ()
forall a b. (a -> b) -> a -> b
$ TransportHeader -> [Ref] -> TransportPacket Ref
forall a. TransportHeader -> [a] -> TransportPacket a
TransportPacket ([TransportHeaderItem] -> TransportHeader
TransportHeader [TransportHeaderItem
header]) ([Ref] -> TransportPacket Ref) -> [Ref] -> TransportPacket Ref
forall a b. (a -> b) -> a -> b
$ [[Ref]] -> [Ref]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat
[ [ Stored ChannelAccept -> Ref
forall a. Stored a -> Ref
storedRef (Stored ChannelAccept -> Ref) -> Stored ChannelAccept -> Ref
forall a b. (a -> b) -> a -> b
$ Stored ChannelAccept
acc ]
, [ Stored ChannelAcceptData -> Ref
forall a. Stored a -> Ref
storedRef (Stored ChannelAcceptData -> Ref)
-> Stored ChannelAcceptData -> Ref
forall a b. (a -> b) -> a -> b
$ ChannelAccept -> Stored ChannelAcceptData
forall a. Signed a -> Stored a
signedData (ChannelAccept -> Stored ChannelAcceptData)
-> ChannelAccept -> Stored ChannelAcceptData
forall a b. (a -> b) -> a -> b
$ Stored ChannelAccept -> ChannelAccept
forall a. Stored a -> a
fromStored Stored ChannelAccept
acc ]
, [ Stored PublicKexKey -> Ref
forall a. Stored a -> Ref
storedRef (Stored PublicKexKey -> Ref) -> Stored PublicKexKey -> Ref
forall a b. (a -> b) -> a -> b
$ ChannelAcceptData -> Stored PublicKexKey
caKey (ChannelAcceptData -> Stored PublicKexKey)
-> ChannelAcceptData -> Stored PublicKexKey
forall a b. (a -> b) -> a -> b
$ Stored ChannelAcceptData -> ChannelAcceptData
forall a. Stored a -> a
fromStored (Stored ChannelAcceptData -> ChannelAcceptData)
-> Stored ChannelAcceptData -> ChannelAcceptData
forall a b. (a -> b) -> a -> b
$ ChannelAccept -> Stored ChannelAcceptData
forall a. Signed a -> Stored a
signedData (ChannelAccept -> Stored ChannelAcceptData)
-> ChannelAccept -> Stored ChannelAcceptData
forall a b. (a -> b) -> a -> b
$ Stored ChannelAccept -> ChannelAccept
forall a. Stored a -> a
fromStored Stored ChannelAccept
acc ]
, (Stored Signature -> Ref) -> [Stored Signature] -> [Ref]
forall a b. (a -> b) -> [a] -> [b]
map Stored Signature -> Ref
forall a. Stored a -> Ref
storedRef ([Stored Signature] -> [Ref]) -> [Stored Signature] -> [Ref]
forall a b. (a -> b) -> a -> b
$ ChannelAccept -> [Stored Signature]
forall a. Signed a -> [Stored Signature]
signedSignature (ChannelAccept -> [Stored Signature])
-> ChannelAccept -> [Stored Signature]
forall a b. (a -> b) -> a -> b
$ Stored ChannelAccept -> ChannelAccept
forall a. Stored a -> a
fromStored Stored ChannelAccept
acc
]
ChannelState
_ -> TQueue String -> String -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue String
serverErrorLog (Server -> TQueue String) -> Server -> TQueue String
forall a b. (a -> b) -> a -> b
$ Peer -> Server
peerServer Peer
peer) (String -> STM ()) -> String -> STM ()
forall a b. (a -> b) -> a -> b
$ String
"unexpected channel request"
handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler ()
handleChannelAccept :: UnifiedIdentity -> PartialRef -> PacketHandler ()
handleChannelAccept UnifiedIdentity
identity PartialRef
accref = do
Peer
peer <- (PacketHandlerState -> Peer) -> PacketHandler Peer
forall s (m :: * -> *) a. MonadState s m => (s -> a) -> m a
gets PacketHandlerState -> Peer
phPeer
STM () -> PacketHandler ()
forall a. STM a -> PacketHandler a
liftSTM (STM () -> PacketHandler ()) -> STM () -> PacketHandler ()
forall a b. (a -> b) -> a -> b
$ TQueue (ExceptT String IO ()) -> ExceptT String IO () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue (ExceptT String IO ())
serverIOActions (Server -> TQueue (ExceptT String IO ()))
-> Server -> TQueue (ExceptT String IO ())
forall a b. (a -> b) -> a -> b
$ Peer -> Server
peerServer Peer
peer) (ExceptT String IO () -> STM ()) -> ExceptT String IO () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
Peer
-> (UnifiedIdentity -> ExceptT String IO ())
-> ExceptT String IO ()
forall (m :: * -> *).
MonadIO m =>
Peer -> (UnifiedIdentity -> ExceptT String IO ()) -> m ()
withPeerIdentity Peer
peer ((UnifiedIdentity -> ExceptT String IO ()) -> ExceptT String IO ())
-> (UnifiedIdentity -> ExceptT String IO ())
-> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ \UnifiedIdentity
upid -> do
Storage -> PartialRef -> ExceptT String IO (LoadResult Partial Ref)
forall (c :: * -> *) (c' :: * -> *) (m :: * -> *).
(StorageCompleteness c, StorageCompleteness c', MonadIO m) =>
Storage' c' -> Ref' c -> m (LoadResult c (Ref' c'))
copyRef (Peer -> Storage
peerStorage Peer
peer) PartialRef
accref ExceptT String IO (Either RefDigest Ref)
-> (Either RefDigest Ref -> ExceptT String IO ())
-> ExceptT String IO ()
forall a b.
ExceptT String IO a
-> (a -> ExceptT String IO b) -> ExceptT String IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right Ref
acc -> do
Channel
ch <- UnifiedIdentity
-> UnifiedIdentity
-> Stored ChannelAccept
-> ExceptT String IO Channel
forall (m :: * -> *).
(MonadIO m, MonadError String m) =>
UnifiedIdentity
-> UnifiedIdentity -> Stored ChannelAccept -> m Channel
acceptedChannel UnifiedIdentity
identity UnifiedIdentity
upid (Ref -> Stored ChannelAccept
forall a. Storable a => Ref -> Stored a
wrappedLoad Ref
acc)
IO () -> ExceptT String IO ()
forall a. IO a -> ExceptT String IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT String IO ()) -> IO () -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS Peer
peer [] (TransportPacket Ref -> STM ()) -> TransportPacket Ref -> STM ()
forall a b. (a -> b) -> a -> b
$ TransportHeader -> [Ref] -> TransportPacket Ref
forall a. TransportHeader -> [a] -> TransportPacket a
TransportPacket ([TransportHeaderItem] -> TransportHeader
TransportHeader [RefDigest -> TransportHeaderItem
Acknowledged (RefDigest -> TransportHeaderItem)
-> RefDigest -> TransportHeaderItem
forall a b. (a -> b) -> a -> b
$ PartialRef -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest PartialRef
accref]) []
Peer -> Channel -> UnifiedIdentity -> STM ()
finalizedChannel Peer
peer Channel
ch UnifiedIdentity
identity
Left RefDigest
dgst -> String -> ExceptT String IO ()
forall a. String -> ExceptT String IO a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError (String -> ExceptT String IO ()) -> String -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ String
"missing accept data " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ByteString -> String
BC.unpack (RefDigest -> ByteString
showRefDigest RefDigest
dgst)
finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM ()
finalizedChannel :: Peer -> Channel -> UnifiedIdentity -> STM ()
finalizedChannel peer :: Peer
peer@Peer {TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
TVar PeerIdentity
TMVar [WaitingRef]
TMVar (Map ServiceID SomeServiceState)
PartialStorage
Storage
Server
PeerAddress
peerAddress :: Peer -> PeerAddress
peerServer_ :: Peer -> Server
peerConnection :: Peer
-> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: Peer -> TVar PeerIdentity
peerStorage_ :: Peer -> Storage
peerInStorage :: Peer -> PartialStorage
peerServiceState :: Peer -> TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: Peer -> TMVar [WaitingRef]
peerAddress :: PeerAddress
peerServer_ :: Server
peerConnection :: TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: TVar PeerIdentity
peerStorage_ :: Storage
peerInStorage :: PartialStorage
peerServiceState :: TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: TMVar [WaitingRef]
..} Channel
ch UnifiedIdentity
self = do
Peer -> ChannelState -> STM ()
setPeerChannel Peer
peer (ChannelState -> STM ()) -> ChannelState -> STM ()
forall a b. (a -> b) -> a -> b
$ Channel -> ChannelState
ChannelEstablished Channel
ch
TQueue (ExceptT String IO ()) -> ExceptT String IO () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue (ExceptT String IO ())
serverIOActions Server
peerServer_) (ExceptT String IO () -> STM ()) -> ExceptT String IO () -> STM ()
forall a b. (a -> b) -> a -> b
$ IO () -> ExceptT String IO ()
forall a. IO a -> ExceptT String IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT String IO ()) -> IO () -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let selfRef :: RefDigest
selfRef = Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Ref -> RefDigest) -> Ref -> RefDigest
forall a b. (a -> b) -> a -> b
$ Stored (Signed ExtendedIdentityData) -> Ref
forall a. Stored a -> Ref
storedRef (Stored (Signed ExtendedIdentityData) -> Ref)
-> Stored (Signed ExtendedIdentityData) -> Ref
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> Stored (Signed ExtendedIdentityData)
idExtData (UnifiedIdentity -> Stored (Signed ExtendedIdentityData))
-> UnifiedIdentity -> Stored (Signed ExtendedIdentityData)
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity
self
updateRefs :: [RefDigest]
updateRefs = RefDigest
selfRef RefDigest -> [RefDigest] -> [RefDigest]
forall a. a -> [a] -> [a]
: (Stored (Signed ExtendedIdentityData) -> RefDigest)
-> [Stored (Signed ExtendedIdentityData)] -> [RefDigest]
forall a b. (a -> b) -> [a] -> [b]
map (Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Ref -> RefDigest)
-> (Stored (Signed ExtendedIdentityData) -> Ref)
-> Stored (Signed ExtendedIdentityData)
-> RefDigest
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stored (Signed ExtendedIdentityData) -> Ref
forall a. Stored a -> Ref
storedRef) (UnifiedIdentity -> [Stored (Signed ExtendedIdentityData)]
forall (m :: * -> *).
Identity m -> [Stored (Signed ExtendedIdentityData)]
idUpdates UnifiedIdentity
self)
ackedBy :: [TransportHeaderItem]
ackedBy = [[TransportHeaderItem]] -> [TransportHeaderItem]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[ RefDigest -> TransportHeaderItem
Acknowledged RefDigest
r, RefDigest -> TransportHeaderItem
Rejected RefDigest
r, RefDigest -> TransportHeaderItem
DataRequest RefDigest
r ] | RefDigest
r <- [RefDigest]
updateRefs ]
Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS Peer
peer [TransportHeaderItem]
ackedBy (TransportPacket Ref -> STM ()) -> TransportPacket Ref -> STM ()
forall a b. (a -> b) -> a -> b
$ (TransportHeader -> [Ref] -> TransportPacket Ref)
-> [Ref] -> TransportHeader -> TransportPacket Ref
forall a b c. (a -> b -> c) -> b -> a -> c
flip TransportHeader -> [Ref] -> TransportPacket Ref
forall a. TransportHeader -> [a] -> TransportPacket a
TransportPacket [] (TransportHeader -> TransportPacket Ref)
-> TransportHeader -> TransportPacket Ref
forall a b. (a -> b) -> a -> b
$ [TransportHeaderItem] -> TransportHeader
TransportHeader ([TransportHeaderItem] -> TransportHeader)
-> [TransportHeaderItem] -> TransportHeader
forall a b. (a -> b) -> a -> b
$ (RefDigest -> TransportHeaderItem)
-> [RefDigest] -> [TransportHeaderItem]
forall a b. (a -> b) -> [a] -> [b]
map RefDigest -> TransportHeaderItem
AnnounceUpdate [RefDigest]
updateRefs
TVar PeerIdentity -> STM PeerIdentity
forall a. TVar a -> STM a
readTVar TVar PeerIdentity
peerIdentityVar STM PeerIdentity -> (PeerIdentity -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PeerIdentityFull UnifiedIdentity
_ -> Peer -> STM ()
notifyServicesOfPeer Peer
peer
PeerIdentity
_ -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> WaitingRefCallback
handleIdentityAnnounce :: UnifiedIdentity -> Peer -> Ref -> ExceptT String IO ()
handleIdentityAnnounce UnifiedIdentity
self Peer
peer Ref
ref = IO () -> ExceptT String IO ()
forall a. IO a -> ExceptT String IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT String IO ()) -> IO () -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
let validateAndUpdate :: [Stored (Signed ExtendedIdentityData)]
-> (UnifiedIdentity -> STM ()) -> STM ()
validateAndUpdate [Stored (Signed ExtendedIdentityData)]
upds UnifiedIdentity -> STM ()
act = case Stored (Signed IdentityData) -> Maybe UnifiedIdentity
validateIdentity (Stored (Signed IdentityData) -> Maybe UnifiedIdentity)
-> Stored (Signed IdentityData) -> Maybe UnifiedIdentity
forall a b. (a -> b) -> a -> b
$ Ref -> Stored (Signed IdentityData)
forall a. Storable a => Ref -> Stored a
wrappedLoad Ref
ref of
Just UnifiedIdentity
pid' -> do
let pid :: UnifiedIdentity
pid = UnifiedIdentity -> Maybe UnifiedIdentity -> UnifiedIdentity
forall a. a -> Maybe a -> a
fromMaybe UnifiedIdentity
pid' (Maybe UnifiedIdentity -> UnifiedIdentity)
-> Maybe UnifiedIdentity -> UnifiedIdentity
forall a b. (a -> b) -> a -> b
$ Identity [] -> Maybe UnifiedIdentity
forall (m :: * -> *). Identity m -> Maybe UnifiedIdentity
toUnifiedIdentity ([Stored (Signed ExtendedIdentityData)]
-> UnifiedIdentity -> Identity []
forall (m :: * -> *).
[Stored (Signed ExtendedIdentityData)] -> Identity m -> Identity []
updateIdentity [Stored (Signed ExtendedIdentityData)]
upds UnifiedIdentity
pid')
TVar PeerIdentity -> PeerIdentity -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) (PeerIdentity -> STM ()) -> PeerIdentity -> STM ()
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> PeerIdentity
PeerIdentityFull UnifiedIdentity
pid
TChan Peer -> Peer -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan (Server -> TChan Peer
serverChanPeer (Server -> TChan Peer) -> Server -> TChan Peer
forall a b. (a -> b) -> a -> b
$ Peer -> Server
peerServer Peer
peer) Peer
peer
UnifiedIdentity -> STM ()
act UnifiedIdentity
pid
TQueue (ExceptT String IO ()) -> ExceptT String IO () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue (ExceptT String IO ())
serverIOActions (Server -> TQueue (ExceptT String IO ()))
-> Server -> TQueue (ExceptT String IO ())
forall a b. (a -> b) -> a -> b
$ Peer -> Server
peerServer Peer
peer) (ExceptT String IO () -> STM ()) -> ExceptT String IO () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
UnifiedIdentity -> Peer -> UnifiedIdentity -> ExceptT String IO ()
setupChannel UnifiedIdentity
self Peer
peer UnifiedIdentity
pid
Maybe UnifiedIdentity
Nothing -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
TVar PeerIdentity -> STM PeerIdentity
forall a. TVar a -> STM a
readTVar (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) STM PeerIdentity -> (PeerIdentity -> STM ()) -> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PeerIdentityRef WaitingRef
wref TVar [UnifiedIdentity -> ExceptT String IO ()]
wact
| WaitingRef -> RefDigest
wrDigest WaitingRef
wref RefDigest -> RefDigest -> Bool
forall a. Eq a => a -> a -> Bool
== Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest Ref
ref
-> [Stored (Signed ExtendedIdentityData)]
-> (UnifiedIdentity -> STM ()) -> STM ()
validateAndUpdate [] ((UnifiedIdentity -> STM ()) -> STM ())
-> (UnifiedIdentity -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \UnifiedIdentity
pid -> do
((UnifiedIdentity -> ExceptT String IO ()) -> STM ())
-> [UnifiedIdentity -> ExceptT String IO ()] -> STM ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TQueue (ExceptT String IO ()) -> ExceptT String IO () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue (ExceptT String IO ())
serverIOActions (Server -> TQueue (ExceptT String IO ()))
-> Server -> TQueue (ExceptT String IO ())
forall a b. (a -> b) -> a -> b
$ Peer -> Server
peerServer Peer
peer) (ExceptT String IO () -> STM ())
-> ((UnifiedIdentity -> ExceptT String IO ())
-> ExceptT String IO ())
-> (UnifiedIdentity -> ExceptT String IO ())
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((UnifiedIdentity -> ExceptT String IO ())
-> UnifiedIdentity -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity
pid)) ([UnifiedIdentity -> ExceptT String IO ()] -> STM ())
-> ([UnifiedIdentity -> ExceptT String IO ()]
-> [UnifiedIdentity -> ExceptT String IO ()])
-> [UnifiedIdentity -> ExceptT String IO ()]
-> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
.
[UnifiedIdentity -> ExceptT String IO ()]
-> [UnifiedIdentity -> ExceptT String IO ()]
forall a. [a] -> [a]
reverse ([UnifiedIdentity -> ExceptT String IO ()] -> STM ())
-> STM [UnifiedIdentity -> ExceptT String IO ()] -> STM ()
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< TVar [UnifiedIdentity -> ExceptT String IO ()]
-> STM [UnifiedIdentity -> ExceptT String IO ()]
forall a. TVar a -> STM a
readTVar TVar [UnifiedIdentity -> ExceptT String IO ()]
wact
PeerIdentityFull UnifiedIdentity
pid
| UnifiedIdentity -> Stored (Signed IdentityData)
idData UnifiedIdentity
pid Stored (Signed IdentityData)
-> Stored (Signed IdentityData) -> Bool
forall a. Storable a => Stored a -> Stored a -> Bool
`precedes` Ref -> Stored (Signed IdentityData)
forall a. Storable a => Ref -> Stored a
wrappedLoad Ref
ref
-> [Stored (Signed ExtendedIdentityData)]
-> (UnifiedIdentity -> STM ()) -> STM ()
validateAndUpdate (UnifiedIdentity -> [Stored (Signed ExtendedIdentityData)]
forall (m :: * -> *).
Identity m -> [Stored (Signed ExtendedIdentityData)]
idUpdates UnifiedIdentity
pid) ((UnifiedIdentity -> STM ()) -> STM ())
-> (UnifiedIdentity -> STM ()) -> STM ()
forall a b. (a -> b) -> a -> b
$ \UnifiedIdentity
_ -> do
Peer -> STM ()
notifyServicesOfPeer Peer
peer
PeerIdentity
_ -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleIdentityUpdate :: Peer -> Ref -> WaitingRefCallback
handleIdentityUpdate :: Peer -> Ref -> ExceptT String IO ()
handleIdentityUpdate Peer
peer Ref
ref = IO () -> ExceptT String IO ()
forall a. IO a -> ExceptT String IO a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ExceptT String IO ()) -> IO () -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
PeerIdentity
pidentity <- TVar PeerIdentity -> STM PeerIdentity
forall a. TVar a -> STM a
readTVar (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer)
if | PeerIdentityFull UnifiedIdentity
pid <- PeerIdentity
pidentity
, Just UnifiedIdentity
pid' <- Identity [] -> Maybe UnifiedIdentity
forall (m :: * -> *). Identity m -> Maybe UnifiedIdentity
toUnifiedIdentity (Identity [] -> Maybe UnifiedIdentity)
-> Identity [] -> Maybe UnifiedIdentity
forall a b. (a -> b) -> a -> b
$ [Stored (Signed ExtendedIdentityData)]
-> UnifiedIdentity -> Identity []
forall (m :: * -> *).
[Stored (Signed ExtendedIdentityData)] -> Identity m -> Identity []
updateIdentity [Ref -> Stored (Signed ExtendedIdentityData)
forall a. Storable a => Ref -> Stored a
wrappedLoad Ref
ref] UnifiedIdentity
pid
-> do
TVar PeerIdentity -> PeerIdentity -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer) (PeerIdentity -> STM ()) -> PeerIdentity -> STM ()
forall a b. (a -> b) -> a -> b
$ UnifiedIdentity -> PeerIdentity
PeerIdentityFull UnifiedIdentity
pid'
TChan Peer -> Peer -> STM ()
forall a. TChan a -> a -> STM ()
writeTChan (Server -> TChan Peer
serverChanPeer (Server -> TChan Peer) -> Server -> TChan Peer
forall a b. (a -> b) -> a -> b
$ Peer -> Server
peerServer Peer
peer) Peer
peer
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (UnifiedIdentity -> Stored (Signed IdentityData)
idData UnifiedIdentity
pid Stored (Signed IdentityData)
-> Stored (Signed IdentityData) -> Bool
forall a. Eq a => a -> a -> Bool
/= UnifiedIdentity -> Stored (Signed IdentityData)
idData UnifiedIdentity
pid') (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> STM ()
notifyServicesOfPeer Peer
peer
| Bool
otherwise -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
notifyServicesOfPeer :: Peer -> STM ()
notifyServicesOfPeer :: Peer -> STM ()
notifyServicesOfPeer peer :: Peer
peer@Peer { peerServer_ :: Peer -> Server
peerServer_ = Server {[SomeService]
MVar [ThreadId]
MVar (Map PeerAddress Peer)
MVar Socket
MVar UnifiedIdentity
TChan Peer
TMVar (Map ServiceID SomeServiceGlobalState)
TQueue String
TQueue (Peer, Maybe PartialRef)
TQueue (ExceptT String IO ())
SymFlow (PeerAddress, ByteString)
Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
Head LocalState
Storage
serverStorage :: Server -> Storage
serverOrigHead :: Server -> Head LocalState
serverIdentity_ :: Server -> MVar UnifiedIdentity
serverThreads :: Server -> MVar [ThreadId]
serverSocket :: Server -> MVar Socket
serverRawPath :: Server -> SymFlow (PeerAddress, ByteString)
serverControlFlow :: Server
-> Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverDataResponse :: Server -> TQueue (Peer, Maybe PartialRef)
serverIOActions :: Server -> TQueue (ExceptT String IO ())
serverServices :: Server -> [SomeService]
serverServiceStates :: Server -> TMVar (Map ServiceID SomeServiceGlobalState)
serverPeers :: Server -> MVar (Map PeerAddress Peer)
serverChanPeer :: Server -> TChan Peer
serverErrorLog :: Server -> TQueue String
serverStorage :: Storage
serverOrigHead :: Head LocalState
serverIdentity_ :: MVar UnifiedIdentity
serverThreads :: MVar [ThreadId]
serverSocket :: MVar Socket
serverRawPath :: SymFlow (PeerAddress, ByteString)
serverControlFlow :: Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverDataResponse :: TQueue (Peer, Maybe PartialRef)
serverIOActions :: TQueue (ExceptT String IO ())
serverServices :: [SomeService]
serverServiceStates :: TMVar (Map ServiceID SomeServiceGlobalState)
serverPeers :: MVar (Map PeerAddress Peer)
serverChanPeer :: TChan Peer
serverErrorLog :: TQueue String
..} } = do
TQueue (ExceptT String IO ()) -> ExceptT String IO () -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (ExceptT String IO ())
serverIOActions (ExceptT String IO () -> STM ()) -> ExceptT String IO () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
[SomeService]
-> (SomeService -> ExceptT String IO ()) -> ExceptT String IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [SomeService]
serverServices ((SomeService -> ExceptT String IO ()) -> ExceptT String IO ())
-> (SomeService -> ExceptT String IO ()) -> ExceptT String IO ()
forall a b. (a -> b) -> a -> b
$ \service :: SomeService
service@(SomeService Proxy s
_ ServiceAttributes s
attrs) ->
Maybe (SomeService, ServiceAttributes s)
-> Peer -> ServiceHandler s () -> ExceptT String IO ()
forall s (m :: * -> *).
(Service s, MonadIO m) =>
Maybe (SomeService, ServiceAttributes s)
-> Peer -> ServiceHandler s () -> m ()
runPeerServiceOn ((SomeService, ServiceAttributes s)
-> Maybe (SomeService, ServiceAttributes s)
forall a. a -> Maybe a
Just (SomeService
service, ServiceAttributes s
attrs)) Peer
peer ServiceHandler s ()
forall s. Service s => ServiceHandler s ()
serviceNewPeer
mkPeer :: Server -> PeerAddress -> IO Peer
mkPeer :: Server -> PeerAddress -> IO Peer
mkPeer Server
peerServer_ PeerAddress
peerAddress = do
TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerConnection <- Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> IO
(TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)))
forall a. a -> IO (TVar a)
newTVarIO ([(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
forall a b. a -> Either a b
Left [])
TVar PeerIdentity
peerIdentityVar <- PeerIdentity -> IO (TVar PeerIdentity)
forall a. a -> IO (TVar a)
newTVarIO (PeerIdentity -> IO (TVar PeerIdentity))
-> (TVar [UnifiedIdentity -> ExceptT String IO ()] -> PeerIdentity)
-> TVar [UnifiedIdentity -> ExceptT String IO ()]
-> IO (TVar PeerIdentity)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. TVar [UnifiedIdentity -> ExceptT String IO ()] -> PeerIdentity
PeerIdentityUnknown (TVar [UnifiedIdentity -> ExceptT String IO ()]
-> IO (TVar PeerIdentity))
-> IO (TVar [UnifiedIdentity -> ExceptT String IO ()])
-> IO (TVar PeerIdentity)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< [UnifiedIdentity -> ExceptT String IO ()]
-> IO (TVar [UnifiedIdentity -> ExceptT String IO ()])
forall a. a -> IO (TVar a)
newTVarIO []
Storage
peerStorage_ <- Storage -> IO Storage
deriveEphemeralStorage (Storage -> IO Storage) -> Storage -> IO Storage
forall a b. (a -> b) -> a -> b
$ Server -> Storage
serverStorage Server
peerServer_
PartialStorage
peerInStorage <- Storage -> IO PartialStorage
derivePartialStorage Storage
peerStorage_
TMVar (Map ServiceID SomeServiceState)
peerServiceState <- Map ServiceID SomeServiceState
-> IO (TMVar (Map ServiceID SomeServiceState))
forall a. a -> IO (TMVar a)
newTMVarIO Map ServiceID SomeServiceState
forall k a. Map k a
M.empty
TMVar [WaitingRef]
peerWaitingRefs <- [WaitingRef] -> IO (TMVar [WaitingRef])
forall a. a -> IO (TMVar a)
newTMVarIO []
Peer -> IO Peer
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Peer {TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
TVar PeerIdentity
TMVar [WaitingRef]
TMVar (Map ServiceID SomeServiceState)
PartialStorage
Storage
Server
PeerAddress
peerAddress :: PeerAddress
peerServer_ :: Server
peerConnection :: TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: TVar PeerIdentity
peerStorage_ :: Storage
peerInStorage :: PartialStorage
peerServiceState :: TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: TMVar [WaitingRef]
peerServer_ :: Server
peerAddress :: PeerAddress
peerConnection :: TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: TVar PeerIdentity
peerStorage_ :: Storage
peerInStorage :: PartialStorage
peerServiceState :: TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: TMVar [WaitingRef]
..}
serverPeer :: Server -> SockAddr -> IO Peer
serverPeer :: Server -> SockAddr -> IO Peer
serverPeer Server
server SockAddr
paddr = do
Server -> PeerAddress -> IO Peer
serverPeer' Server
server (SockAddr -> PeerAddress
DatagramAddress SockAddr
paddr)
#ifdef ENABLE_ICE_SUPPORT
serverPeerIce :: Server -> IceSession -> IO Peer
serverPeerIce server@Server {..} ice = do
let paddr = PeerIceSession ice
peer <- serverPeer' server paddr
iceSetChan ice $ mapFlow undefined (paddr,) serverRawPath
return peer
#endif
serverPeer' :: Server -> PeerAddress -> IO Peer
serverPeer' :: Server -> PeerAddress -> IO Peer
serverPeer' Server
server PeerAddress
paddr = do
(Peer
peer, Bool
hello) <- MVar (Map PeerAddress Peer)
-> (Map PeerAddress Peer
-> IO (Map PeerAddress Peer, (Peer, Bool)))
-> IO (Peer, Bool)
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar (Server -> MVar (Map PeerAddress Peer)
serverPeers Server
server) ((Map PeerAddress Peer -> IO (Map PeerAddress Peer, (Peer, Bool)))
-> IO (Peer, Bool))
-> (Map PeerAddress Peer
-> IO (Map PeerAddress Peer, (Peer, Bool)))
-> IO (Peer, Bool)
forall a b. (a -> b) -> a -> b
$ \Map PeerAddress Peer
pvalue -> do
case PeerAddress -> Map PeerAddress Peer -> Maybe Peer
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup PeerAddress
paddr Map PeerAddress Peer
pvalue of
Just Peer
peer -> (Map PeerAddress Peer, (Peer, Bool))
-> IO (Map PeerAddress Peer, (Peer, Bool))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Map PeerAddress Peer
pvalue, (Peer
peer, Bool
False))
Maybe Peer
Nothing -> do
Peer
peer <- Server -> PeerAddress -> IO Peer
mkPeer Server
server PeerAddress
paddr
(Map PeerAddress Peer, (Peer, Bool))
-> IO (Map PeerAddress Peer, (Peer, Bool))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (PeerAddress -> Peer -> Map PeerAddress Peer -> Map PeerAddress Peer
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert PeerAddress
paddr Peer
peer Map PeerAddress Peer
pvalue, (Peer
peer, Bool
True))
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
hello (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
-> ControlRequest PeerAddress -> STM ()
forall r w. Flow r w -> w -> STM ()
writeFlow (Server
-> Flow (ControlMessage PeerAddress) (ControlRequest PeerAddress)
serverControlFlow Server
server) (PeerAddress -> ControlRequest PeerAddress
forall addr. addr -> ControlRequest addr
RequestConnection PeerAddress
paddr)
Peer -> IO Peer
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Peer
peer
sendToPeer :: (Service s, MonadIO m) => Peer -> s -> m ()
sendToPeer :: forall s (m :: * -> *). (Service s, MonadIO m) => Peer -> s -> m ()
sendToPeer Peer
peer s
packet = Peer -> [ServiceReply s] -> m ()
forall s (m :: * -> *).
(Service s, MonadIO m) =>
Peer -> [ServiceReply s] -> m ()
sendToPeerList Peer
peer [Either s (Stored s) -> Bool -> ServiceReply s
forall s. Either s (Stored s) -> Bool -> ServiceReply s
ServiceReply (s -> Either s (Stored s)
forall a b. a -> Either a b
Left s
packet) Bool
True]
sendToPeerStored :: (Service s, MonadIO m) => Peer -> Stored s -> m ()
sendToPeerStored :: forall s (m :: * -> *).
(Service s, MonadIO m) =>
Peer -> Stored s -> m ()
sendToPeerStored Peer
peer Stored s
spacket = Peer -> [ServiceReply s] -> m ()
forall s (m :: * -> *).
(Service s, MonadIO m) =>
Peer -> [ServiceReply s] -> m ()
sendToPeerList Peer
peer [Either s (Stored s) -> Bool -> ServiceReply s
forall s. Either s (Stored s) -> Bool -> ServiceReply s
ServiceReply (Stored s -> Either s (Stored s)
forall a b. b -> Either a b
Right Stored s
spacket) Bool
True]
sendToPeerList :: (Service s, MonadIO m) => Peer -> [ServiceReply s] -> m ()
sendToPeerList :: forall s (m :: * -> *).
(Service s, MonadIO m) =>
Peer -> [ServiceReply s] -> m ()
sendToPeerList Peer
peer [ServiceReply s]
parts = do
let st :: Storage
st = Peer -> Storage
peerStorage Peer
peer
[(Ref, Bool)]
srefs <- IO [(Ref, Bool)] -> m [(Ref, Bool)]
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [(Ref, Bool)] -> m [(Ref, Bool)])
-> IO [(Ref, Bool)] -> m [(Ref, Bool)]
forall a b. (a -> b) -> a -> b
$ ([Maybe (Ref, Bool)] -> [(Ref, Bool)])
-> IO [Maybe (Ref, Bool)] -> IO [(Ref, Bool)]
forall a b. (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap [Maybe (Ref, Bool)] -> [(Ref, Bool)]
forall a. [Maybe a] -> [a]
catMaybes (IO [Maybe (Ref, Bool)] -> IO [(Ref, Bool)])
-> IO [Maybe (Ref, Bool)] -> IO [(Ref, Bool)]
forall a b. (a -> b) -> a -> b
$ [ServiceReply s]
-> (ServiceReply s -> IO (Maybe (Ref, Bool)))
-> IO [Maybe (Ref, Bool)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ServiceReply s]
parts ((ServiceReply s -> IO (Maybe (Ref, Bool)))
-> IO [Maybe (Ref, Bool)])
-> (ServiceReply s -> IO (Maybe (Ref, Bool)))
-> IO [Maybe (Ref, Bool)]
forall a b. (a -> b) -> a -> b
$ \case
ServiceReply (Left s
x) Bool
use -> (Ref, Bool) -> Maybe (Ref, Bool)
forall a. a -> Maybe a
Just ((Ref, Bool) -> Maybe (Ref, Bool))
-> (Ref -> (Ref, Bool)) -> Ref -> Maybe (Ref, Bool)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (,Bool
use) (Ref -> Maybe (Ref, Bool)) -> IO Ref -> IO (Maybe (Ref, Bool))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Storage -> s -> IO Ref
forall a (c :: * -> *).
(Storable a, StorageCompleteness c) =>
Storage' c -> a -> IO (Ref' c)
forall (c :: * -> *).
StorageCompleteness c =>
Storage' c -> s -> IO (Ref' c)
store Storage
st s
x
ServiceReply (Right Stored s
sx) Bool
use -> Maybe (Ref, Bool) -> IO (Maybe (Ref, Bool))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (Ref, Bool) -> IO (Maybe (Ref, Bool)))
-> Maybe (Ref, Bool) -> IO (Maybe (Ref, Bool))
forall a b. (a -> b) -> a -> b
$ (Ref, Bool) -> Maybe (Ref, Bool)
forall a. a -> Maybe a
Just (Stored s -> Ref
forall a. Stored a -> Ref
storedRef Stored s
sx, Bool
use)
ServiceFinally IO ()
act -> IO ()
act IO () -> IO (Maybe (Ref, Bool)) -> IO (Maybe (Ref, Bool))
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe (Ref, Bool) -> IO (Maybe (Ref, Bool))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Ref, Bool)
forall a. Maybe a
Nothing
let dgsts :: [RefDigest]
dgsts = ((Ref, Bool) -> RefDigest) -> [(Ref, Bool)] -> [RefDigest]
forall a b. (a -> b) -> [a] -> [b]
map (Ref -> RefDigest
forall (c :: * -> *). Ref' c -> RefDigest
refDigest (Ref -> RefDigest)
-> ((Ref, Bool) -> Ref) -> (Ref, Bool) -> RefDigest
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Ref, Bool) -> Ref
forall a b. (a, b) -> a
fst) [(Ref, Bool)]
srefs
let content :: [Ref]
content = ((Ref, Bool) -> Ref) -> [(Ref, Bool)] -> [Ref]
forall a b. (a -> b) -> [a] -> [b]
map (Ref, Bool) -> Ref
forall a b. (a, b) -> a
fst ([(Ref, Bool)] -> [Ref]) -> [(Ref, Bool)] -> [Ref]
forall a b. (a -> b) -> a -> b
$ ((Ref, Bool) -> Bool) -> [(Ref, Bool)] -> [(Ref, Bool)]
forall a. (a -> Bool) -> [a] -> [a]
filter (Ref, Bool) -> Bool
forall a b. (a, b) -> b
snd [(Ref, Bool)]
srefs
header :: TransportHeader
header = [TransportHeaderItem] -> TransportHeader
TransportHeader (ServiceID -> TransportHeaderItem
ServiceType (ServiceReply s -> ServiceID
forall s (proxy :: * -> *). Service s => proxy s -> ServiceID
forall (proxy :: * -> *). proxy s -> ServiceID
serviceID (ServiceReply s -> ServiceID) -> ServiceReply s -> ServiceID
forall a b. (a -> b) -> a -> b
$ [ServiceReply s] -> ServiceReply s
forall a. HasCallStack => [a] -> a
head [ServiceReply s]
parts) TransportHeaderItem
-> [TransportHeaderItem] -> [TransportHeaderItem]
forall a. a -> [a] -> [a]
: (RefDigest -> TransportHeaderItem)
-> [RefDigest] -> [TransportHeaderItem]
forall a b. (a -> b) -> [a] -> [b]
map RefDigest -> TransportHeaderItem
ServiceRef [RefDigest]
dgsts)
packet :: TransportPacket Ref
packet = TransportHeader -> [Ref] -> TransportPacket Ref
forall a. TransportHeader -> [a] -> TransportPacket a
TransportPacket TransportHeader
header [Ref]
content
ackedBy :: [TransportHeaderItem]
ackedBy = [[TransportHeaderItem]] -> [TransportHeaderItem]
forall (t :: * -> *) a. Foldable t => t [a] -> [a]
concat [[ RefDigest -> TransportHeaderItem
Acknowledged RefDigest
r, RefDigest -> TransportHeaderItem
Rejected RefDigest
r, RefDigest -> TransportHeaderItem
DataRequest RefDigest
r ] | RefDigest
r <- [RefDigest]
dgsts ]
IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS Peer
peer [TransportHeaderItem]
ackedBy TransportPacket Ref
packet
sendToPeerS' :: Bool -> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' :: Bool
-> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' Bool
secure Peer {TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
TVar PeerIdentity
TMVar [WaitingRef]
TMVar (Map ServiceID SomeServiceState)
PartialStorage
Storage
Server
PeerAddress
peerAddress :: Peer -> PeerAddress
peerServer_ :: Peer -> Server
peerConnection :: Peer
-> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: Peer -> TVar PeerIdentity
peerStorage_ :: Peer -> Storage
peerInStorage :: Peer -> PartialStorage
peerServiceState :: Peer -> TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: Peer -> TMVar [WaitingRef]
peerAddress :: PeerAddress
peerServer_ :: Server
peerConnection :: TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerIdentityVar :: TVar PeerIdentity
peerStorage_ :: Storage
peerInStorage :: PartialStorage
peerServiceState :: TMVar (Map ServiceID SomeServiceState)
peerWaitingRefs :: TMVar [WaitingRef]
..} [TransportHeaderItem]
ackedBy TransportPacket Ref
packet = do
TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> STM
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
forall a. TVar a -> STM a
readTVar TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerConnection STM
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> (Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> STM ())
-> STM ()
forall a b. STM a -> (a -> STM b) -> STM b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left [(Bool, TransportPacket Ref, [TransportHeaderItem])]
xs -> TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar
(Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
peerConnection (Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> STM ())
-> Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
-> STM ()
forall a b. (a -> b) -> a -> b
$ [(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
forall a b. a -> Either a b
Left ([(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress))
-> [(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> Either
[(Bool, TransportPacket Ref, [TransportHeaderItem])]
(Connection PeerAddress)
forall a b. (a -> b) -> a -> b
$ (Bool
secure, TransportPacket Ref
packet, [TransportHeaderItem]
ackedBy) (Bool, TransportPacket Ref, [TransportHeaderItem])
-> [(Bool, TransportPacket Ref, [TransportHeaderItem])]
-> [(Bool, TransportPacket Ref, [TransportHeaderItem])]
forall a. a -> [a] -> [a]
: [(Bool, TransportPacket Ref, [TransportHeaderItem])]
xs
Right Connection PeerAddress
conn -> Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
-> (Bool, TransportPacket Ref, [TransportHeaderItem]) -> STM ()
forall r w. Flow r w -> w -> STM ()
writeFlow (Connection PeerAddress
-> Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
forall addr.
Connection addr
-> Flow
(Bool, TransportPacket PartialObject)
(Bool, TransportPacket Ref, [TransportHeaderItem])
connData Connection PeerAddress
conn) (Bool
secure, TransportPacket Ref
packet, [TransportHeaderItem]
ackedBy)
sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS = Bool
-> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' Bool
True
sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerPlain :: Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerPlain = Bool
-> Peer -> [TransportHeaderItem] -> TransportPacket Ref -> STM ()
sendToPeerS' Bool
False
sendToPeerWith :: forall s m. (Service s, MonadIO m, MonadError String m) => Peer -> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)) -> m ()
sendToPeerWith :: forall s (m :: * -> *).
(Service s, MonadIO m, MonadError String m) =>
Peer
-> (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s))
-> m ()
sendToPeerWith Peer
peer ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)
fobj = do
let sproxy :: Proxy s
sproxy = forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @s
sid :: ServiceID
sid = Proxy s -> ServiceID
forall s (proxy :: * -> *). Service s => proxy s -> ServiceID
forall (proxy :: * -> *). proxy s -> ServiceID
serviceID Proxy s
sproxy
Either String (Maybe s)
res <- IO (Either String (Maybe s)) -> m (Either String (Maybe s))
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either String (Maybe s)) -> m (Either String (Maybe s)))
-> IO (Either String (Maybe s)) -> m (Either String (Maybe s))
forall a b. (a -> b) -> a -> b
$ do
Map ServiceID SomeServiceState
svcs <- STM (Map ServiceID SomeServiceState)
-> IO (Map ServiceID SomeServiceState)
forall a. STM a -> IO a
atomically (STM (Map ServiceID SomeServiceState)
-> IO (Map ServiceID SomeServiceState))
-> STM (Map ServiceID SomeServiceState)
-> IO (Map ServiceID SomeServiceState)
forall a b. (a -> b) -> a -> b
$ TMVar (Map ServiceID SomeServiceState)
-> STM (Map ServiceID SomeServiceState)
forall a. TMVar a -> STM a
takeTMVar (Peer -> TMVar (Map ServiceID SomeServiceState)
peerServiceState Peer
peer)
(Map ServiceID SomeServiceState
svcs', Either String (Maybe s)
res) <- ExceptT String IO (Maybe s, ServiceState s)
-> IO (Either String (Maybe s, ServiceState s))
forall e (m :: * -> *) a. ExceptT e m a -> m (Either e a)
runExceptT (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)
fobj (ServiceState s -> ExceptT String IO (Maybe s, ServiceState s))
-> ServiceState s -> ExceptT String IO (Maybe s, ServiceState s)
forall a b. (a -> b) -> a -> b
$ ServiceState s -> Maybe (ServiceState s) -> ServiceState s
forall a. a -> Maybe a -> a
fromMaybe (Proxy s -> ServiceState s
forall s (proxy :: * -> *). Service s => proxy s -> ServiceState s
forall (proxy :: * -> *). proxy s -> ServiceState s
emptyServiceState Proxy s
sproxy) (Maybe (ServiceState s) -> ServiceState s)
-> Maybe (ServiceState s) -> ServiceState s
forall a b. (a -> b) -> a -> b
$ Proxy s -> SomeServiceState -> Maybe (ServiceState s)
forall s (proxy :: * -> *).
Service s =>
proxy s -> SomeServiceState -> Maybe (ServiceState s)
fromServiceState Proxy s
sproxy (SomeServiceState -> Maybe (ServiceState s))
-> Maybe SomeServiceState -> Maybe (ServiceState s)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< ServiceID
-> Map ServiceID SomeServiceState -> Maybe SomeServiceState
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ServiceID
sid Map ServiceID SomeServiceState
svcs) IO (Either String (Maybe s, ServiceState s))
-> (Either String (Maybe s, ServiceState s)
-> IO (Map ServiceID SomeServiceState, Either String (Maybe s)))
-> IO (Map ServiceID SomeServiceState, Either String (Maybe s))
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Right (Maybe s
obj, ServiceState s
s') -> (Map ServiceID SomeServiceState, Either String (Maybe s))
-> IO (Map ServiceID SomeServiceState, Either String (Maybe s))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Map ServiceID SomeServiceState, Either String (Maybe s))
-> IO (Map ServiceID SomeServiceState, Either String (Maybe s)))
-> (Map ServiceID SomeServiceState, Either String (Maybe s))
-> IO (Map ServiceID SomeServiceState, Either String (Maybe s))
forall a b. (a -> b) -> a -> b
$ (ServiceID
-> SomeServiceState
-> Map ServiceID SomeServiceState
-> Map ServiceID SomeServiceState
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ServiceID
sid (Proxy s -> ServiceState s -> SomeServiceState
forall s.
Service s =>
Proxy s -> ServiceState s -> SomeServiceState
SomeServiceState Proxy s
sproxy ServiceState s
s') Map ServiceID SomeServiceState
svcs, Maybe s -> Either String (Maybe s)
forall a b. b -> Either a b
Right Maybe s
obj)
Left String
err -> (Map ServiceID SomeServiceState, Either String (Maybe s))
-> IO (Map ServiceID SomeServiceState, Either String (Maybe s))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Map ServiceID SomeServiceState, Either String (Maybe s))
-> IO (Map ServiceID SomeServiceState, Either String (Maybe s)))
-> (Map ServiceID SomeServiceState, Either String (Maybe s))
-> IO (Map ServiceID SomeServiceState, Either String (Maybe s))
forall a b. (a -> b) -> a -> b
$ (Map ServiceID SomeServiceState
svcs, String -> Either String (Maybe s)
forall a b. a -> Either a b
Left String
err)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TMVar (Map ServiceID SomeServiceState)
-> Map ServiceID SomeServiceState -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (Peer -> TMVar (Map ServiceID SomeServiceState)
peerServiceState Peer
peer) Map ServiceID SomeServiceState
svcs'
Either String (Maybe s) -> IO (Either String (Maybe s))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Either String (Maybe s)
res
case Either String (Maybe s)
res of
Right (Just s
obj) -> Peer -> s -> m ()
forall s (m :: * -> *). (Service s, MonadIO m) => Peer -> s -> m ()
sendToPeer Peer
peer s
obj
Right Maybe s
Nothing -> () -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left String
err -> String -> m ()
forall a. String -> m a
forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError String
err
lookupService :: forall s. Service s => Proxy s -> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService :: forall s.
Service s =>
Proxy s
-> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService Proxy s
proxy (service :: SomeService
service@(SomeService (Proxy s
_ :: Proxy t) ServiceAttributes s
attr) : [SomeService]
rest)
| Just (s :~: s
Refl :: s :~: t) <- Maybe (s :~: s)
forall {k} (a :: k) (b :: k).
(Typeable a, Typeable b) =>
Maybe (a :~: b)
eqT = (SomeService, ServiceAttributes s)
-> Maybe (SomeService, ServiceAttributes s)
forall a. a -> Maybe a
Just (SomeService
service, ServiceAttributes s
ServiceAttributes s
attr)
| Bool
otherwise = Proxy s
-> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
forall s.
Service s =>
Proxy s
-> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService Proxy s
proxy [SomeService]
rest
lookupService Proxy s
_ [] = Maybe (SomeService, ServiceAttributes s)
forall a. Maybe a
Nothing
runPeerService :: forall s m. (Service s, MonadIO m) => Peer -> ServiceHandler s () -> m ()
runPeerService :: forall s (m :: * -> *).
(Service s, MonadIO m) =>
Peer -> ServiceHandler s () -> m ()
runPeerService = Maybe (SomeService, ServiceAttributes s)
-> Peer -> ServiceHandler s () -> m ()
forall s (m :: * -> *).
(Service s, MonadIO m) =>
Maybe (SomeService, ServiceAttributes s)
-> Peer -> ServiceHandler s () -> m ()
runPeerServiceOn Maybe (SomeService, ServiceAttributes s)
forall a. Maybe a
Nothing
runPeerServiceOn :: forall s m. (Service s, MonadIO m) => Maybe (SomeService, ServiceAttributes s) -> Peer -> ServiceHandler s () -> m ()
runPeerServiceOn :: forall s (m :: * -> *).
(Service s, MonadIO m) =>
Maybe (SomeService, ServiceAttributes s)
-> Peer -> ServiceHandler s () -> m ()
runPeerServiceOn Maybe (SomeService, ServiceAttributes s)
mbservice Peer
peer ServiceHandler s ()
handler = IO () -> m ()
forall a. IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
let server :: Server
server = Peer -> Server
peerServer Peer
peer
proxy :: Proxy s
proxy = forall t. Proxy t
forall {k} (t :: k). Proxy t
Proxy @s
svc :: ServiceID
svc = Proxy s -> ServiceID
forall s (proxy :: * -> *). Service s => proxy s -> ServiceID
forall (proxy :: * -> *). proxy s -> ServiceID
serviceID Proxy s
proxy
logd :: String -> STM ()
logd = TQueue String -> String -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue (Server -> TQueue String
serverErrorLog Server
server)
case Maybe (SomeService, ServiceAttributes s)
mbservice Maybe (SomeService, ServiceAttributes s)
-> Maybe (SomeService, ServiceAttributes s)
-> Maybe (SomeService, ServiceAttributes s)
forall a. Maybe a -> Maybe a -> Maybe a
forall (m :: * -> *) a. MonadPlus m => m a -> m a -> m a
`mplus` Proxy s
-> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
forall s.
Service s =>
Proxy s
-> [SomeService] -> Maybe (SomeService, ServiceAttributes s)
lookupService Proxy s
proxy (Server -> [SomeService]
serverServices Server
server) of
Just (SomeService
service, ServiceAttributes s
attr) ->
STM PeerIdentity -> IO PeerIdentity
forall a. STM a -> IO a
atomically (TVar PeerIdentity -> STM PeerIdentity
forall a. TVar a -> STM a
readTVar (Peer -> TVar PeerIdentity
peerIdentityVar Peer
peer)) IO PeerIdentity -> (PeerIdentity -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
PeerIdentityFull UnifiedIdentity
peerId -> do
(Map ServiceID SomeServiceGlobalState
global, Map ServiceID SomeServiceState
svcs) <- STM
(Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState)
-> IO
(Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState)
forall a. STM a -> IO a
atomically (STM
(Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState)
-> IO
(Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState))
-> STM
(Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState)
-> IO
(Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState)
forall a b. (a -> b) -> a -> b
$ (,)
(Map ServiceID SomeServiceGlobalState
-> Map ServiceID SomeServiceState
-> (Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState))
-> STM (Map ServiceID SomeServiceGlobalState)
-> STM
(Map ServiceID SomeServiceState
-> (Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar (Map ServiceID SomeServiceGlobalState)
-> STM (Map ServiceID SomeServiceGlobalState)
forall a. TMVar a -> STM a
takeTMVar (Server -> TMVar (Map ServiceID SomeServiceGlobalState)
serverServiceStates Server
server)
STM
(Map ServiceID SomeServiceState
-> (Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState))
-> STM (Map ServiceID SomeServiceState)
-> STM
(Map ServiceID SomeServiceGlobalState,
Map ServiceID SomeServiceState)
forall a b. STM (a -> b) -> STM a -> STM b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TMVar (Map ServiceID SomeServiceState)
-> STM (Map ServiceID SomeServiceState)
forall a. TMVar a -> STM a
takeTMVar (Peer -> TMVar (Map ServiceID SomeServiceState)
peerServiceState Peer
peer)
case (SomeServiceState -> Maybe SomeServiceState -> SomeServiceState
forall a. a -> Maybe a -> a
fromMaybe (SomeService -> SomeServiceState
someServiceEmptyState SomeService
service) (Maybe SomeServiceState -> SomeServiceState)
-> Maybe SomeServiceState -> SomeServiceState
forall a b. (a -> b) -> a -> b
$ ServiceID
-> Map ServiceID SomeServiceState -> Maybe SomeServiceState
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ServiceID
svc Map ServiceID SomeServiceState
svcs,
SomeServiceGlobalState
-> Maybe SomeServiceGlobalState -> SomeServiceGlobalState
forall a. a -> Maybe a -> a
fromMaybe (SomeService -> SomeServiceGlobalState
someServiceEmptyGlobalState SomeService
service) (Maybe SomeServiceGlobalState -> SomeServiceGlobalState)
-> Maybe SomeServiceGlobalState -> SomeServiceGlobalState
forall a b. (a -> b) -> a -> b
$ ServiceID
-> Map ServiceID SomeServiceGlobalState
-> Maybe SomeServiceGlobalState
forall k a. Ord k => k -> Map k a -> Maybe a
M.lookup ServiceID
svc Map ServiceID SomeServiceGlobalState
global) of
((SomeServiceState (Proxy s
_ :: Proxy ps) ServiceState s
ps),
(SomeServiceGlobalState (Proxy s
_ :: Proxy gs) ServiceGlobalState s
gs)) -> do
Just (s :~: s
Refl :: s :~: ps) <- Maybe (s :~: s) -> IO (Maybe (s :~: s))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (s :~: s) -> IO (Maybe (s :~: s)))
-> Maybe (s :~: s) -> IO (Maybe (s :~: s))
forall a b. (a -> b) -> a -> b
$ Maybe (s :~: s)
forall {k} (a :: k) (b :: k).
(Typeable a, Typeable b) =>
Maybe (a :~: b)
eqT
Just (s :~: s
Refl :: s :~: gs) <- Maybe (s :~: s) -> IO (Maybe (s :~: s))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (s :~: s) -> IO (Maybe (s :~: s)))
-> Maybe (s :~: s) -> IO (Maybe (s :~: s))
forall a b. (a -> b) -> a -> b
$ Maybe (s :~: s)
forall {k} (a :: k) (b :: k).
(Typeable a, Typeable b) =>
Maybe (a :~: b)
eqT
let inp :: ServiceInput s
inp = ServiceInput
{ svcAttributes :: ServiceAttributes s
svcAttributes = ServiceAttributes s
attr
, svcPeer :: Peer
svcPeer = Peer
peer
, svcPeerIdentity :: UnifiedIdentity
svcPeerIdentity = UnifiedIdentity
peerId
, svcServer :: Server
svcServer = Server
server
, svcPrintOp :: String -> IO ()
svcPrintOp = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> (String -> STM ()) -> String -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> STM ()
logd
}
Head LocalState -> IO (Maybe (Head LocalState))
forall a (m :: * -> *).
(HeadType a, MonadIO m) =>
Head a -> m (Maybe (Head a))
reloadHead (Server -> Head LocalState
serverOrigHead Server
server) IO (Maybe (Head LocalState))
-> (Maybe (Head LocalState) -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Head LocalState)
Nothing -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> STM ()
logd (String -> STM ()) -> String -> STM ()
forall a b. (a -> b) -> a -> b
$ String
"current head deleted"
TMVar (Map ServiceID SomeServiceState)
-> Map ServiceID SomeServiceState -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (Peer -> TMVar (Map ServiceID SomeServiceState)
peerServiceState Peer
peer) Map ServiceID SomeServiceState
svcs
TMVar (Map ServiceID SomeServiceGlobalState)
-> Map ServiceID SomeServiceGlobalState -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (Server -> TMVar (Map ServiceID SomeServiceGlobalState)
serverServiceStates Server
server) Map ServiceID SomeServiceGlobalState
global
Just Head LocalState
h -> do
([ServiceReply s]
rsp, (ServiceState s
s', ServiceGlobalState s
gs')) <- Head LocalState
-> ServiceInput s
-> ServiceState s
-> ServiceGlobalState s
-> ServiceHandler s ()
-> IO ([ServiceReply s], (ServiceState s, ServiceGlobalState s))
forall s.
Service s =>
Head LocalState
-> ServiceInput s
-> ServiceState s
-> ServiceGlobalState s
-> ServiceHandler s ()
-> IO ([ServiceReply s], (ServiceState s, ServiceGlobalState s))
runServiceHandler Head LocalState
h ServiceInput s
inp ServiceState s
ServiceState s
ps ServiceGlobalState s
ServiceGlobalState s
gs ServiceHandler s ()
handler
Storage -> Storage -> IO ()
forall (m :: * -> *). MonadIO m => Storage -> Storage -> m ()
moveKeys (Peer -> Storage
peerStorage Peer
peer) (Server -> Storage
serverStorage Server
server)
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not ([ServiceReply s] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ServiceReply s]
rsp)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Peer -> [ServiceReply s] -> IO ()
forall s (m :: * -> *).
(Service s, MonadIO m) =>
Peer -> [ServiceReply s] -> m ()
sendToPeerList Peer
peer [ServiceReply s]
rsp
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TMVar (Map ServiceID SomeServiceState)
-> Map ServiceID SomeServiceState -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (Peer -> TMVar (Map ServiceID SomeServiceState)
peerServiceState Peer
peer) (Map ServiceID SomeServiceState -> STM ())
-> Map ServiceID SomeServiceState -> STM ()
forall a b. (a -> b) -> a -> b
$ ServiceID
-> SomeServiceState
-> Map ServiceID SomeServiceState
-> Map ServiceID SomeServiceState
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ServiceID
svc (Proxy s -> ServiceState s -> SomeServiceState
forall s.
Service s =>
Proxy s -> ServiceState s -> SomeServiceState
SomeServiceState Proxy s
proxy ServiceState s
s') Map ServiceID SomeServiceState
svcs
TMVar (Map ServiceID SomeServiceGlobalState)
-> Map ServiceID SomeServiceGlobalState -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar (Server -> TMVar (Map ServiceID SomeServiceGlobalState)
serverServiceStates Server
server) (Map ServiceID SomeServiceGlobalState -> STM ())
-> Map ServiceID SomeServiceGlobalState -> STM ()
forall a b. (a -> b) -> a -> b
$ ServiceID
-> SomeServiceGlobalState
-> Map ServiceID SomeServiceGlobalState
-> Map ServiceID SomeServiceGlobalState
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ServiceID
svc (Proxy s -> ServiceGlobalState s -> SomeServiceGlobalState
forall s.
Service s =>
Proxy s -> ServiceGlobalState s -> SomeServiceGlobalState
SomeServiceGlobalState Proxy s
proxy ServiceGlobalState s
gs') Map ServiceID SomeServiceGlobalState
global
PeerIdentity
_ -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ String -> STM ()
logd (String -> STM ()) -> String -> STM ()
forall a b. (a -> b) -> a -> b
$ String
"can't run service handler on peer with incomplete identity " String -> ShowS
forall a. [a] -> [a] -> [a]
++ PeerAddress -> String
forall a. Show a => a -> String
show (Peer -> PeerAddress
peerAddress Peer
peer)
Maybe (SomeService, ServiceAttributes s)
_ -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> STM ()
logd (String -> STM ()) -> String -> STM ()
forall a b. (a -> b) -> a -> b
$ String
"unhandled service '" String -> ShowS
forall a. [a] -> [a] -> [a]
++ UUID -> String
forall a. Show a => a -> String
show (ServiceID -> UUID
forall a. StorableUUID a => a -> UUID
toUUID ServiceID
svc) String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
"'"
foreign import ccall unsafe "Network/ifaddrs.h broadcast_addresses" cBroadcastAddresses :: IO (Ptr Word32)
foreign import ccall unsafe "stdlib.h free" cFree :: Ptr Word32 -> IO ()
getBroadcastAddresses :: PortNumber -> IO [SockAddr]
getBroadcastAddresses :: PortNumber -> IO [SockAddr]
getBroadcastAddresses PortNumber
port = do
Ptr Word32
ptr <- IO (Ptr Word32)
cBroadcastAddresses
let parse :: Int -> IO [SockAddr]
parse Int
i = do
Word32
w <- Ptr Word32 -> Int -> IO Word32
forall a. Storable a => Ptr a -> Int -> IO a
peekElemOff Ptr Word32
ptr Int
i
if Word32
w Word32 -> Word32 -> Bool
forall a. Eq a => a -> a -> Bool
== Word32
0 then [SockAddr] -> IO [SockAddr]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return []
else (PortNumber -> Word32 -> SockAddr
SockAddrInet PortNumber
port Word32
wSockAddr -> [SockAddr] -> [SockAddr]
forall a. a -> [a] -> [a]
:) ([SockAddr] -> [SockAddr]) -> IO [SockAddr] -> IO [SockAddr]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> IO [SockAddr]
parse (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
[SockAddr]
addrs <- Int -> IO [SockAddr]
parse Int
0
Ptr Word32 -> IO ()
cFree Ptr Word32
ptr
[SockAddr] -> IO [SockAddr]
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return [SockAddr]
addrs