/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
* If you would like to incorporate Link into a proprietary software application,
* please contact .
*/
#pragma once
#include
#include
#include
namespace ableton
{
namespace link
{
// SessionMembershipCallback is invoked when any change to session
// membership occurs (when any peer joins or leaves a session)
//
// SessionTimelineCallback is invoked with a session id and a timeline
// whenever a new combination of these values is seen
//
// SessionStartStopStateCallback is invoked with a session id and a startStopState
// whenever a new combination of these values is seen
template
class Peers
{
// non-movable private implementation type
struct Impl;
public:
using Peer = std::pair;
Peers(util::Injected io,
SessionMembershipCallback membership,
SessionTimelineCallback timeline,
SessionStartStopStateCallback startStop)
: mpImpl(std::make_shared(
std::move(io), std::move(membership), std::move(timeline), std::move(startStop)))
{
}
// The set of peers for a given session, ordered by (peerId, addr).
// The result will possibly contain multiple entries for the same
// peer if it is visible through multiple gateways.
std::vector sessionPeers(const SessionId& sid) const
{
using namespace std;
vector result;
auto& peerVec = mpImpl->mPeers;
copy_if(begin(peerVec), end(peerVec), back_inserter(result), SessionMemberPred{sid});
return result;
}
// Number of individual for a given session.
std::size_t uniqueSessionPeerCount(const SessionId& sid) const
{
using namespace std;
auto peerVec = sessionPeers(sid);
auto last = unique(begin(peerVec), end(peerVec),
[](const Peer& a, const Peer& b) { return a.first.ident() == b.first.ident(); });
return static_cast(distance(begin(peerVec), last));
}
void setSessionTimeline(const SessionId& sid, const Timeline& tl)
{
// Set the cached timeline for all peers to a new client-specified
// timeline. When we make a timeline change, we do so
// optimistically and clients assume that all peers in a session
// have adopted the newly specified timeline. We must represent
// this in our cache or else we risk failing to notify about a
// higher-priority peer timeline that was already seen.
for (auto& peer : mpImpl->mPeers)
{
if (peer.first.sessionId() == sid)
{
peer.first.nodeState.timeline = tl;
}
}
}
// Purge all cached peers that are members of the given session
void forgetSession(const SessionId& sid)
{
using namespace std;
auto& peerVec = mpImpl->mPeers;
peerVec.erase(
remove_if(begin(peerVec), end(peerVec), SessionMemberPred{sid}), end(peerVec));
}
void resetPeers()
{
mpImpl->mPeers.clear();
}
// Observer type that monitors peer discovery on a particular
// gateway and relays the information to a Peers instance.
// Models the PeerObserver concept from the discovery module.
struct GatewayObserver
{
using GatewayObserverNodeState = PeerState;
using GatewayObserverNodeId = NodeId;
GatewayObserver(std::shared_ptr pImpl, discovery::IpAddress addr)
: mpImpl(std::move(pImpl))
, mAddr(std::move(addr))
{
}
GatewayObserver(const GatewayObserver&) = delete;
GatewayObserver(GatewayObserver&& rhs)
: mpImpl(std::move(rhs.mpImpl))
, mAddr(std::move(rhs.mAddr))
{
}
~GatewayObserver()
{
// Check to handle the moved from case
if (mpImpl)
{
mpImpl->gatewayClosed(mAddr);
}
}
// model the PeerObserver concept from discovery
friend void sawPeer(GatewayObserver& observer, const PeerState& state)
{
auto pImpl = observer.mpImpl;
auto addr = observer.mAddr;
assert(pImpl);
pImpl->sawPeerOnGateway(std::move(state), std::move(addr));
}
friend void peerLeft(GatewayObserver& observer, const NodeId& id)
{
auto pImpl = observer.mpImpl;
auto addr = observer.mAddr;
pImpl->peerLeftGateway(std::move(id), std::move(addr));
}
friend void peerTimedOut(GatewayObserver& observer, const NodeId& id)
{
auto pImpl = observer.mpImpl;
auto addr = observer.mAddr;
pImpl->peerLeftGateway(std::move(id), std::move(addr));
}
std::shared_ptr mpImpl;
discovery::IpAddress mAddr;
};
// Factory function for the gateway observer
friend GatewayObserver makeGatewayObserver(Peers& peers, discovery::IpAddress addr)
{
return GatewayObserver{peers.mpImpl, std::move(addr)};
}
private:
struct Impl
{
Impl(util::Injected io,
SessionMembershipCallback membership,
SessionTimelineCallback timeline,
SessionStartStopStateCallback startStop)
: mIo(std::move(io))
, mSessionMembershipCallback(std::move(membership))
, mSessionTimelineCallback(std::move(timeline))
, mSessionStartStopStateCallback(std::move(startStop))
{
}
void sawPeerOnGateway(PeerState peerState, discovery::IpAddress gatewayAddr)
{
using namespace std;
const auto peerSession = peerState.sessionId();
const auto peerTimeline = peerState.timeline();
const auto peerStartStopState = peerState.startStopState();
bool isNewSessionTimeline = !sessionTimelineExists(peerSession, peerTimeline);
bool isNewSessionStartStopState =
!sessionStartStopStateExists(peerSession, peerStartStopState);
auto peer = make_pair(std::move(peerState), std::move(gatewayAddr));
const auto idRange = equal_range(begin(mPeers), end(mPeers), peer, PeerIdComp{});
bool didSessionMembershipChange = false;
if (idRange.first == idRange.second)
{
// This peer is not currently known on any gateway
didSessionMembershipChange = true;
mPeers.insert(std::move(idRange.first), std::move(peer));
}
else
{
// We've seen this peer before... does it have a new session?
didSessionMembershipChange =
all_of(idRange.first, idRange.second, [&peerSession](const Peer& test) {
return test.first.sessionId() != peerSession;
});
// was it on this gateway?
const auto addrRange =
equal_range(idRange.first, idRange.second, peer, AddrComp{});
if (addrRange.first == addrRange.second)
{
// First time on this gateway, add it
mPeers.insert(std::move(addrRange.first), std::move(peer));
}
else
{
// We have an entry for this peer on this gateway, update it
*addrRange.first = std::move(peer);
}
}
// Invoke callbacks outside the critical section
if (isNewSessionTimeline)
{
mSessionTimelineCallback(peerSession, peerTimeline);
}
// Pass the start stop state to the Controller after it processed the timeline.
// A new timeline can cause a session Id change which will prevent processing the
// new start stop state. By handling the start stop state after the timeline we
// assure that the start stop state is processed with the correct session Id.
if (isNewSessionStartStopState)
{
mSessionStartStopStateCallback(peerSession, peerStartStopState);
}
if (didSessionMembershipChange)
{
mSessionMembershipCallback();
}
}
void peerLeftGateway(const NodeId& nodeId, const discovery::IpAddress& gatewayAddr)
{
using namespace std;
auto it = find_if(begin(mPeers), end(mPeers), [&](const Peer& peer) {
return peer.first.ident() == nodeId && peer.second == gatewayAddr;
});
bool didSessionMembershipChange = false;
if (it != end(mPeers))
{
mPeers.erase(std::move(it));
didSessionMembershipChange = true;
}
if (didSessionMembershipChange)
{
mSessionMembershipCallback();
}
}
void gatewayClosed(const discovery::IpAddress& gatewayAddr)
{
using namespace std;
mPeers.erase(
remove_if(begin(mPeers), end(mPeers),
[&gatewayAddr](const Peer& peer) { return peer.second == gatewayAddr; }),
end(mPeers));
mSessionMembershipCallback();
}
template
bool hasPeerWith(const SessionId& sessionId, Predicate predicate)
{
using namespace std;
return find_if(begin(mPeers), end(mPeers), [&](const Peer& peer) {
return peer.first.sessionId() == sessionId && predicate(peer.first);
}) != end(mPeers);
}
bool sessionTimelineExists(const SessionId& session, const Timeline& timeline)
{
return hasPeerWith(session,
[&](const PeerState& peerState) { return peerState.timeline() == timeline; });
}
bool sessionStartStopStateExists(
const SessionId& sessionId, const StartStopState& startStopState)
{
return hasPeerWith(sessionId, [&](const PeerState& peerState) {
return peerState.startStopState() == startStopState;
});
}
struct PeerIdComp
{
bool operator()(const Peer& lhs, const Peer& rhs) const
{
return lhs.first.ident() < rhs.first.ident();
}
};
struct AddrComp
{
bool operator()(const Peer& lhs, const Peer& rhs) const
{
return lhs.second < rhs.second;
}
};
util::Injected mIo;
SessionMembershipCallback mSessionMembershipCallback;
SessionTimelineCallback mSessionTimelineCallback;
SessionStartStopStateCallback mSessionStartStopStateCallback;
std::vector mPeers; // sorted by peerId, unique by (peerId, addr)
};
struct SessionMemberPred
{
bool operator()(const Peer& peer) const
{
return peer.first.sessionId() == sid;
}
const SessionId& sid;
};
std::shared_ptr mpImpl;
};
template
Peers
makePeers(util::Injected io,
SessionMembershipCallback membershipCallback,
SessionTimelineCallback timelineCallback,
SessionStartStopStateCallback startStopStateCallback)
{
return {std::move(io), std::move(membershipCallback), std::move(timelineCallback),
std::move(startStopStateCallback)};
}
} // namespace link
} // namespace ableton