/* Copyright 2016, Ableton AG, Berlin. All rights reserved.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
* If you would like to incorporate Link into a proprietary software application,
* please contact .
*/
#pragma once
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
namespace ableton
{
namespace link
{
namespace detail
{
template
GhostXForm initXForm(const Clock& clock)
{
// Make the current time map to a ghost time of 0 with ghost time
// increasing at the same rate as clock time
return {1.0, -clock.micros()};
}
template
inline SessionState initSessionState(const Tempo tempo, const Clock& clock)
{
using namespace std::chrono;
return {clampTempo(Timeline{tempo, Beats{0.}, microseconds{0}}),
StartStopState{false, Beats{0.}, microseconds{0}}, initXForm(clock)};
}
inline ClientState initClientState(const SessionState& sessionState)
{
const auto hostTime = sessionState.ghostXForm.ghostToHost(std::chrono::microseconds{0});
return {
Timeline{sessionState.timeline.tempo, sessionState.timeline.beatOrigin, hostTime},
ClientStartStopState{sessionState.startStopState.isPlaying, hostTime, hostTime}};
}
inline RtClientState initRtClientState(const ClientState& clientState)
{
using namespace std::chrono;
return {
clientState.timeline, clientState.startStopState, microseconds{0}, microseconds{0}};
}
// The timespan in which local modifications to the timeline will be
// preferred over any modifications coming from the network.
const auto kLocalModGracePeriod = std::chrono::milliseconds(1000);
const auto kRtHandlerFallbackPeriod = kLocalModGracePeriod / 2;
inline ClientStartStopState selectPreferredStartStopState(
const ClientStartStopState currentStartStopState,
const ClientStartStopState startStopState)
{
return startStopState.timestamp > currentStartStopState.timestamp
? startStopState
: currentStartStopState;
}
inline ClientStartStopState mapStartStopStateFromSessionToClient(
const StartStopState& sessionStartStopState,
const Timeline& sessionTimeline,
const GhostXForm& xForm)
{
const auto time =
xForm.ghostToHost(sessionTimeline.fromBeats(sessionStartStopState.beats));
const auto timestamp = xForm.ghostToHost(sessionStartStopState.timestamp);
return ClientStartStopState{sessionStartStopState.isPlaying, time, timestamp};
}
inline StartStopState mapStartStopStateFromClientToSession(
const ClientStartStopState& clientStartStopState,
const Timeline& sessionTimeline,
const GhostXForm& xForm)
{
const auto sessionBeats =
sessionTimeline.toBeats(xForm.hostToGhost(clientStartStopState.time));
const auto timestamp = xForm.hostToGhost(clientStartStopState.timestamp);
return StartStopState{clientStartStopState.isPlaying, sessionBeats, timestamp};
}
} // namespace detail
// function types corresponding to the Controller callback type params
using PeerCountCallback = std::function;
using TempoCallback = std::function;
using StartStopStateCallback = std::function;
// The main Link controller
template
class Controller
{
public:
Controller(Tempo tempo,
PeerCountCallback peerCallback,
TempoCallback tempoCallback,
StartStopStateCallback startStopStateCallback,
Clock clock)
: mTempoCallback(std::move(tempoCallback))
, mStartStopStateCallback(std::move(startStopStateCallback))
, mClock(std::move(clock))
, mNodeId(NodeId::random())
, mSessionId(mNodeId)
, mSessionState(detail::initSessionState(tempo, mClock))
, mClientState(detail::initClientState(mSessionState))
, mLastIsPlayingForStartStopStateCallback(false)
, mRtClientState(detail::initRtClientState(mClientState.get()))
, mHasPendingRtClientStates(false)
, mSessionPeerCounter(*this, std::move(peerCallback))
, mEnabled(false)
, mStartStopSyncEnabled(false)
, mIo(IoContext{UdpSendExceptionHandler{this}})
, mRtClientStateSetter(*this)
, mPeers(util::injectRef(*mIo),
std::ref(mSessionPeerCounter),
SessionTimelineCallback{*this},
SessionStartStopStateCallback{*this})
, mSessions(
{mSessionId, mSessionState.timeline, {mSessionState.ghostXForm, mClock.micros()}},
util::injectRef(mPeers),
MeasurePeer{*this},
JoinSessionCallback{*this},
util::injectRef(*mIo),
mClock)
, mDiscovery(std::make_pair(NodeState{mNodeId, mSessionId, mSessionState.timeline,
mSessionState.startStopState},
mSessionState.ghostXForm),
GatewayFactory{*this},
util::injectRef(*mIo))
{
}
Controller(const Controller&) = delete;
Controller(Controller&&) = delete;
Controller& operator=(const Controller&) = delete;
Controller& operator=(Controller&&) = delete;
~Controller()
{
std::mutex mutex;
std::condition_variable condition;
auto stopped = false;
mIo->async([this, &mutex, &condition, &stopped]() {
enable(false);
std::unique_lock lock(mutex);
stopped = true;
condition.notify_one();
});
std::unique_lock lock(mutex);
condition.wait(lock, [&stopped] { return stopped; });
mIo->stop();
}
void enable(const bool bEnable)
{
const bool bWasEnabled = mEnabled.exchange(bEnable);
if (bWasEnabled != bEnable)
{
mIo->async([this, bEnable] {
if (bEnable)
{
// Process the pending client states to make sure we don't push one after we
// have joined a running session
mRtClientStateSetter.processPendingClientStates();
// Always reset when first enabling to avoid hijacking
// tempo in existing sessions
resetState();
}
mDiscovery.enable(bEnable);
});
}
}
bool isEnabled() const
{
return mEnabled;
}
void enableStartStopSync(const bool bEnable)
{
mStartStopSyncEnabled = bEnable;
}
bool isStartStopSyncEnabled() const
{
return mStartStopSyncEnabled;
}
std::size_t numPeers() const
{
return mSessionPeerCounter.mSessionPeerCount;
}
// Get the current Link client state. Thread-safe but may block, so
// it cannot be used from audio thread.
ClientState clientState() const
{
return mClientState.get();
}
// Set the client state to be used, starting at the given time.
// Thread-safe but may block, so it cannot be used from audio thread.
void setClientState(IncomingClientState newClientState)
{
mClientState.update([&](ClientState& clientState) {
if (newClientState.timeline)
{
*newClientState.timeline = clampTempo(*newClientState.timeline);
clientState.timeline = *newClientState.timeline;
}
if (newClientState.startStopState)
{
// Prevent updating client start stop state with an outdated start stop state
*newClientState.startStopState = detail::selectPreferredStartStopState(
clientState.startStopState, *newClientState.startStopState);
clientState.startStopState = *newClientState.startStopState;
}
});
mIo->async([this, newClientState] { handleClientState(newClientState); });
}
// Non-blocking client state access for a realtime context. NOT
// thread-safe. Must not be called from multiple threads
// concurrently and must not be called concurrently with setClientStateRtSafe.
ClientState clientStateRtSafe() const
{
// Respect the session state guard and the client state guard but don't
// block on them. If we can't access one or both because of concurrent modification
// we fall back to our cached version of the timeline and/or start stop state.
if (!mHasPendingRtClientStates)
{
const auto now = mClock.micros();
const auto timelineGracePeriodOver =
now - mRtClientState.timelineTimestamp > detail::kLocalModGracePeriod;
const auto startStopStateGracePeriodOver =
now - mRtClientState.startStopStateTimestamp > detail::kLocalModGracePeriod;
if (timelineGracePeriodOver || startStopStateGracePeriodOver)
{
const auto clientState = mClientState.getRt();
if (timelineGracePeriodOver && clientState.timeline != mRtClientState.timeline)
{
mRtClientState.timeline = clientState.timeline;
}
if (startStopStateGracePeriodOver
&& clientState.startStopState != mRtClientState.startStopState)
{
mRtClientState.startStopState = clientState.startStopState;
}
}
}
return {mRtClientState.timeline, mRtClientState.startStopState};
}
// should only be called from the audio thread
void setClientStateRtSafe(IncomingClientState newClientState)
{
if (!newClientState.timeline && !newClientState.startStopState)
{
return;
}
if (newClientState.timeline)
{
*newClientState.timeline = clampTempo(*newClientState.timeline);
}
if (newClientState.startStopState)
{
// Prevent updating client start stop state with an outdated start stop state
*newClientState.startStopState = detail::selectPreferredStartStopState(
mRtClientState.startStopState, *newClientState.startStopState);
}
// This flag ensures that mRtClientState is only updated after all incoming
// client states were processed
mHasPendingRtClientStates = true;
mRtClientStateSetter.push(newClientState);
const auto now = mClock.micros();
// Cache the new timeline and StartStopState for serving back to the client
if (newClientState.timeline)
{
// Cache the new timeline and StartStopState for serving back to the client
mRtClientState.timeline = *newClientState.timeline;
mRtClientState.timelineTimestamp = makeRtTimestamp(now);
}
if (newClientState.startStopState)
{
mRtClientState.startStopState = *newClientState.startStopState;
mRtClientState.startStopStateTimestamp = makeRtTimestamp(now);
}
}
private:
std::chrono::microseconds makeRtTimestamp(const std::chrono::microseconds now) const
{
return isEnabled() ? now : std::chrono::microseconds(0);
}
void invokeStartStopStateCallbackIfChanged()
{
bool shouldInvokeCallback = false;
mClientState.update([&](ClientState& clientState) {
shouldInvokeCallback =
mLastIsPlayingForStartStopStateCallback != clientState.startStopState.isPlaying;
mLastIsPlayingForStartStopStateCallback = clientState.startStopState.isPlaying;
});
if (shouldInvokeCallback)
{
mStartStopStateCallback(mLastIsPlayingForStartStopStateCallback);
}
}
void updateDiscovery()
{
// Push the change to the discovery service
mDiscovery.updateNodeState(
std::make_pair(NodeState{mNodeId, mSessionId, mSessionState.timeline,
mSessionState.startStopState},
mSessionState.ghostXForm));
}
void updateSessionTiming(Timeline newTimeline, const GhostXForm newXForm)
{
// Clamp the session tempo because it may slightly overshoot (999 bpm is
// transferred as 60606 us/beat and received as 999.000999... bpm).
newTimeline = clampTempo(newTimeline);
const auto oldTimeline = mSessionState.timeline;
const auto oldXForm = mSessionState.ghostXForm;
if (oldTimeline != newTimeline || oldXForm != newXForm)
{
{
std::lock_guard lock(mSessionStateGuard);
mSessionState.timeline = newTimeline;
mSessionState.ghostXForm = newXForm;
}
// Update the client timeline and start stop state based on the new session timing
mClientState.update([&](ClientState& clientState) {
clientState.timeline = updateClientTimelineFromSession(clientState.timeline,
mSessionState.timeline, mClock.micros(), mSessionState.ghostXForm);
// Don't pass the start stop state to the client when start stop sync is disabled
// or when we have a default constructed start stop state
if (mStartStopSyncEnabled && mSessionState.startStopState != StartStopState{})
{
std::lock_guard startStopStateLock(mSessionStateGuard);
clientState.startStopState =
detail::mapStartStopStateFromSessionToClient(mSessionState.startStopState,
mSessionState.timeline, mSessionState.ghostXForm);
}
});
if (oldTimeline.tempo != newTimeline.tempo)
{
mTempoCallback(newTimeline.tempo);
}
}
}
void handleTimelineFromSession(SessionId id, Timeline timeline)
{
debug(mIo->log()) << "Received timeline with tempo: " << timeline.tempo.bpm()
<< " for session: " << id;
updateSessionTiming(mSessions.sawSessionTimeline(std::move(id), std::move(timeline)),
mSessionState.ghostXForm);
updateDiscovery();
}
void resetSessionStartStopState()
{
mSessionState.startStopState = StartStopState{};
}
void handleStartStopStateFromSession(SessionId sessionId, StartStopState startStopState)
{
debug(mIo->log()) << "Received start stop state. isPlaying: "
<< startStopState.isPlaying
<< ", beats: " << startStopState.beats.floating()
<< ", time: " << startStopState.timestamp.count()
<< " for session: " << sessionId;
if (sessionId == mSessionId
&& startStopState.timestamp > mSessionState.startStopState.timestamp)
{
mSessionState.startStopState = startStopState;
// Always propagate the session start stop state so even a client that doesn't have
// the feature enabled can function as a relay.
updateDiscovery();
if (mStartStopSyncEnabled)
{
mClientState.update([&](ClientState& clientState) {
clientState.startStopState = detail::mapStartStopStateFromSessionToClient(
startStopState, mSessionState.timeline, mSessionState.ghostXForm);
});
invokeStartStopStateCallbackIfChanged();
}
}
}
void handleClientState(const IncomingClientState clientState)
{
auto mustUpdateDiscovery = false;
if (clientState.timeline)
{
auto sessionTimeline = updateSessionTimelineFromClient(mSessionState.timeline,
*clientState.timeline, clientState.timelineTimestamp, mSessionState.ghostXForm);
mSessions.resetTimeline(sessionTimeline);
mPeers.setSessionTimeline(mSessionId, sessionTimeline);
updateSessionTiming(std::move(sessionTimeline), mSessionState.ghostXForm);
mustUpdateDiscovery = true;
}
if (mStartStopSyncEnabled && clientState.startStopState)
{
// Prevent updating with an outdated start stop state
const auto newGhostTime =
mSessionState.ghostXForm.hostToGhost(clientState.startStopState->timestamp);
if (newGhostTime > mSessionState.startStopState.timestamp)
{
mClientState.update([&](ClientState& currentClientState) {
mSessionState.startStopState =
detail::mapStartStopStateFromClientToSession(*clientState.startStopState,
mSessionState.timeline, mSessionState.ghostXForm);
currentClientState.startStopState = *clientState.startStopState;
});
mustUpdateDiscovery = true;
}
}
if (mustUpdateDiscovery)
{
updateDiscovery();
}
invokeStartStopStateCallbackIfChanged();
}
void handleRtClientState(IncomingClientState clientState)
{
mClientState.update([&](ClientState& currentClientState) {
if (clientState.timeline)
{
currentClientState.timeline = *clientState.timeline;
}
if (clientState.startStopState)
{
// Prevent updating client start stop state with an outdated start stop state
*clientState.startStopState = detail::selectPreferredStartStopState(
currentClientState.startStopState, *clientState.startStopState);
currentClientState.startStopState = *clientState.startStopState;
}
});
handleClientState(clientState);
mHasPendingRtClientStates = false;
}
void joinSession(const Session& session)
{
const bool sessionIdChanged = mSessionId != session.sessionId;
mSessionId = session.sessionId;
// Prevent passing the state of the previous session to the new one.
if (sessionIdChanged)
{
mRtClientStateSetter.processPendingClientStates();
resetSessionStartStopState();
}
updateSessionTiming(session.timeline, session.measurement.xform);
updateDiscovery();
if (sessionIdChanged)
{
debug(mIo->log()) << "Joining session " << session.sessionId << " with tempo "
<< session.timeline.tempo.bpm();
mSessionPeerCounter();
}
}
void resetState()
{
mNodeId = NodeId::random();
mSessionId = mNodeId;
const auto xform = detail::initXForm(mClock);
const auto hostTime = -xform.intercept;
// When creating the new timeline, make it continuous by finding
// the beat on the old session timeline corresponding to the
// current host time and mapping it to the new ghost time
// representation of the current host time.
const auto newTl = Timeline{mSessionState.timeline.tempo,
mSessionState.timeline.toBeats(mSessionState.ghostXForm.hostToGhost(hostTime)),
xform.hostToGhost(hostTime)};
resetSessionStartStopState();
updateSessionTiming(newTl, xform);
updateDiscovery();
mSessions.resetSession({mNodeId, newTl, {xform, hostTime}});
mPeers.resetPeers();
}
struct SessionTimelineCallback
{
void operator()(SessionId id, Timeline timeline)
{
mController.handleTimelineFromSession(std::move(id), std::move(timeline));
}
Controller& mController;
};
struct RtClientStateSetter
{
using CallbackDispatcher =
typename IoContext::template LockFreeCallbackDispatcher,
std::chrono::milliseconds>;
RtClientStateSetter(Controller& controller)
: mController(controller)
, mCallbackDispatcher(
[this] { mController.mIo->async([this]() { processPendingClientStates(); }); },
detail::kRtHandlerFallbackPeriod)
{
}
void push(const IncomingClientState clientState)
{
if (clientState.timeline)
{
mTimelineBuffer.write(
std::make_pair(clientState.timelineTimestamp, *clientState.timeline));
}
if (clientState.startStopState)
{
mStartStopStateBuffer.write(*clientState.startStopState);
}
if (clientState.timeline || clientState.startStopState)
{
mCallbackDispatcher.invoke();
}
}
void processPendingClientStates()
{
const auto clientState = buildMergedPendingClientState();
mController.handleRtClientState(clientState);
}
private:
IncomingClientState buildMergedPendingClientState()
{
auto clientState = IncomingClientState{};
if (auto tl = mTimelineBuffer.readNew())
{
clientState.timelineTimestamp = (*tl).first;
clientState.timeline = OptionalTimeline{(*tl).second};
}
if (auto sss = mStartStopStateBuffer.readNew())
{
clientState.startStopState = sss;
}
return clientState;
}
Controller& mController;
// Use separate TripleBuffers for the Timeline and the StartStopState so we read the
// latest set value from either optional.
TripleBuffer> mTimelineBuffer;
TripleBuffer mStartStopStateBuffer;
CallbackDispatcher mCallbackDispatcher;
};
struct SessionStartStopStateCallback
{
void operator()(SessionId sessionId, StartStopState startStopState)
{
mController.handleStartStopStateFromSession(sessionId, startStopState);
}
Controller& mController;
};
struct SessionPeerCounter
{
SessionPeerCounter(Controller& controller, PeerCountCallback callback)
: mController(controller)
, mCallback(std::move(callback))
, mSessionPeerCount(0)
{
}
void operator()()
{
const auto count =
mController.mPeers.uniqueSessionPeerCount(mController.mSessionId);
const auto oldCount = mSessionPeerCount.exchange(count);
if (oldCount != count)
{
if (count == 0)
{
// When the count goes down to zero, completely reset the
// state, effectively founding a new session
mController.mIo->async([this] { mController.resetState(); });
}
mCallback(count);
}
}
Controller& mController;
PeerCountCallback mCallback;
std::atomic mSessionPeerCount;
};
struct MeasurePeer
{
template
void operator()(Peer peer, Handler handler)
{
using It = typename Discovery::ServicePeerGateways::GatewayMap::iterator;
using ValueType = typename Discovery::ServicePeerGateways::GatewayMap::value_type;
mController.mDiscovery.withGateways([peer, handler](It begin, const It end) {
const auto addr = peer.second;
const auto it = std::find_if(
begin, end, [&addr](const ValueType& vt) { return vt.first == addr; });
if (it != end)
{
it->second->measurePeer(std::move(peer.first), std::move(handler));
}
else
{
// invoke the handler with an empty result if we couldn't
// find the peer's gateway
handler(GhostXForm{});
}
});
}
Controller& mController;
};
struct JoinSessionCallback
{
void operator()(Session session)
{
mController.joinSession(std::move(session));
}
Controller& mController;
};
using IoType = typename util::Injected::type;
using ControllerPeers = Peers,
SessionTimelineCallback,
SessionStartStopStateCallback>;
using ControllerGateway =
Gateway;
using GatewayPtr = std::shared_ptr;
struct GatewayFactory
{
GatewayPtr operator()(std::pair state,
util::Injected io,
const discovery::IpAddress& addr)
{
return GatewayPtr{new ControllerGateway{std::move(io), addr,
util::injectVal(makeGatewayObserver(mController.mPeers, addr)),
std::move(state.first), std::move(state.second), mController.mClock}};
}
Controller& mController;
};
struct UdpSendExceptionHandler
{
using Exception = discovery::UdpSendException;
void operator()(const Exception exception)
{
mpController->mIo->async([this, exception] {
mpController->mDiscovery.repairGateway(exception.interfaceAddr);
});
}
Controller* mpController;
};
TempoCallback mTempoCallback;
StartStopStateCallback mStartStopStateCallback;
Clock mClock;
NodeId mNodeId;
SessionId mSessionId;
mutable std::mutex mSessionStateGuard;
SessionState mSessionState;
ControllerClientState mClientState;
bool mLastIsPlayingForStartStopStateCallback;
mutable RtClientState mRtClientState;
std::atomic mHasPendingRtClientStates;
SessionPeerCounter mSessionPeerCounter;
std::atomic mEnabled;
std::atomic mStartStopSyncEnabled;
util::Injected mIo;
RtClientStateSetter mRtClientStateSetter;
ControllerPeers mPeers;
using ControllerSessions = Sessions::type&,
Clock>;
ControllerSessions mSessions;
using Discovery = discovery::Service,
GatewayFactory,
typename util::Injected::type&>;
Discovery mDiscovery;
};
} // namespace link
} // namespace ableton