SDK: send heartbeats when subscribed to state changes

This commit is contained in:
Alexey Rybalchenko 2020-04-10 13:07:05 +02:00 committed by Dennis Klein
parent 330687772f
commit 5721ea9510
4 changed files with 69 additions and 21 deletions

View File

@ -109,16 +109,27 @@ DDS::DDS(const string& name,
break;
}
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
using namespace sdk::cmd;
auto now = chrono::steady_clock::now();
string id = GetProperty<string>("id");
fLastState = fCurrentState;
fCurrentState = newState;
using namespace sdk::cmd;
for (auto subscriberId : fStateChangeSubscribers) {
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
for (auto it = fStateChangeSubscribers.cbegin(); it != fStateChangeSubscribers.end();) {
// if a subscriber did not send a heartbeat in more than 3 times the promised interval,
// remove it from the subscriber list
if (chrono::duration<double>(now - it->second.first).count() > 3 * it->second.second) {
LOG(warn) << "Controller '" << it->first
<< "' did not send heartbeats since over 3 intervals ("
<< 3 * it->second.second << " ms), removing it.";
fStateChangeSubscribers.erase(it++);
} else {
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << it->first;
Cmds cmds(make<StateChange>(id, fDDSTaskId, fLastState, fCurrentState));
fDDS.Send(cmds.Serialize(), to_string(subscriberId));
fDDS.Send(cmds.Serialize(), to_string(it->first));
++it;
}
}
});
@ -150,7 +161,7 @@ auto DDS::WaitForExitingAck() -> void
unique_lock<mutex> lock(fStateChangeSubscriberMutex);
auto timeout = GetProperty<unsigned int>("wait-for-exiting-ack-timeout");
fExitingAcked.wait_for(lock, chrono::milliseconds(timeout), [this]() {
return fExitingAckedByLastExternalController;
return fExitingAckedByLastExternalController || fStateChangeSubscribers.empty();
});
}
@ -362,8 +373,9 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui
fExitingAcked.notify_one();
} break;
case Type::subscribe_to_state_change: {
auto _cmd = static_cast<cmd::SubscribeToStateChange&>(cmd);
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.insert(senderId);
fStateChangeSubscribers.emplace(senderId, make_pair(chrono::steady_clock::now(), _cmd.GetInterval()));
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId;
@ -372,6 +384,15 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case Type::subscription_heartbeat: {
try {
auto _cmd = static_cast<cmd::SubscriptionHeartbeat&>(cmd);
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.at(senderId) = make_pair(chrono::steady_clock::now(), _cmd.GetInterval());
} catch(out_of_range& oor) {
LOG(warn) << "Received subscription heartbeat from an unknown controller with id '" << senderId << "'";
}
} break;
case Type::unsubscribe_from_state_change: {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
@ -384,12 +405,12 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui
auto _cmd = static_cast<cmd::GetProperties&>(cmd);
auto const request_id(_cmd.GetRequestId());
auto result(Result::Ok);
std::vector<std::pair<std::string, std::string>> props;
vector<pair<string, string>> props;
try {
for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) {
props.push_back({prop.first, prop.second});
}
} catch (std::exception const& e) {
} catch (exception const& e) {
LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what();
result = Result::Failure;
}
@ -407,7 +428,7 @@ auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, ui
}
// TODO Handle builtin keys with different value type than string
SetProperties(props);
} catch (std::exception const& e) {
} catch (exception const& e) {
LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what();
result = Result::Failure;
}

View File

@ -24,12 +24,12 @@
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <set>
#include <string>
#include <atomic>
#include <thread>
#include <map>
#include <unordered_map>
#include <utility> // pair
#include <vector>
namespace fair
@ -159,7 +159,7 @@ class DDS : public Plugin
std::atomic<bool> fDeviceTerminationRequested;
std::set<uint64_t> fStateChangeSubscribers;
std::unordered_map<uint64_t, std::pair<std::chrono::steady_clock::time_point, int64_t>> fStateChangeSubscribers;
uint64_t fLastExternalController;
bool fExitingAckedByLastExternalController;
std::condition_variable fExitingAcked;

View File

@ -53,7 +53,7 @@ struct StateSubscription
explicit StateSubscription(pmix::Commands& commands)
: fCommands(commands)
{
fCommands.Send(Cmds(make<SubscribeToStateChange>()).Serialize(Format::JSON));
fCommands.Send(Cmds(make<SubscribeToStateChange>(600000)).Serialize(Format::JSON));
}
~StateSubscription()

View File

@ -175,6 +175,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
, fDDSTopo(std::move(topo))
, fStateData()
, fStateIndex()
, fHeartbeatsTimer(asio::system_executor())
, fHeartbeatInterval(600000)
{
makeTopologyState();
@ -213,16 +215,36 @@ class BasicTopology : public AsioBase<Executor, Allocator>
void SubscribeToStateChanges()
{
using namespace fair::mq::sdk::cmd;
// FAIR_LOG(debug) << "Subscribing to state change";
Cmds cmds(make<SubscribeToStateChange>());
cmd::Cmds cmds(cmd::make<cmd::SubscribeToStateChange>(fHeartbeatInterval.count()));
fDDSSession.SendCommand(cmds.Serialize());
fHeartbeatsTimer.expires_after(fHeartbeatInterval);
fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
}
void SendSubscriptionHeartbeats(const std::error_code& ec)
{
if (!ec) {
// Timer expired.
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::SubscriptionHeartbeat>(fHeartbeatInterval.count())).Serialize());
// schedule again
fHeartbeatsTimer.expires_after(fHeartbeatInterval);
fHeartbeatsTimer.async_wait(std::bind(&BasicTopology::SendSubscriptionHeartbeats, this, std::placeholders::_1));
} else if (ec == asio::error::operation_aborted) {
// FAIR_LOG(debug) << "Heartbeats timer canceled";
} else {
FAIR_LOG(error) << "Timer error: " << ec;
}
}
void UnsubscribeFromStateChanges()
{
using namespace fair::mq::sdk::cmd;
fDDSSession.SendCommand(Cmds(make<UnsubscribeFromStateChange>()).Serialize());
// stop sending heartbeats
fHeartbeatsTimer.cancel();
// unsubscribe from state changes
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::UnsubscribeFromStateChange>()).Serialize());
// wait for all tasks to confirm unsubscription
std::unique_lock<std::mutex> lk(fMtx);
@ -309,10 +331,8 @@ class BasicTopology : public AsioBase<Executor, Allocator>
auto HandleCmd(cmd::StateChange const& cmd, DDSChannel::Id const& senderId) -> void
{
using namespace fair::mq::sdk::cmd;
if (cmd.GetCurrentState() == DeviceState::Exiting) {
fDDSSession.SendCommand(Cmds(make<StateChangeExitingReceived>()).Serialize(), senderId);
fDDSSession.SendCommand(cmd::Cmds(cmd::make<cmd::StateChangeExitingReceived>()).Serialize(), senderId);
}
DDSTask::Id taskId(cmd.GetTaskId());
@ -1193,6 +1213,9 @@ class BasicTopology : public AsioBase<Executor, Allocator>
return {ec, failed};
}
Duration GetHeartbeatInterval() const { return fHeartbeatInterval; }
void SetHeartbeatInterval(Duration duration) { fHeartbeatInterval = duration; }
private:
using TransitionedCount = unsigned int;
@ -1200,8 +1223,12 @@ class BasicTopology : public AsioBase<Executor, Allocator>
DDSTopology fDDSTopo;
TopologyState fStateData;
TopologyStateIndex fStateIndex;
mutable std::mutex fMtx;
std::condition_variable fStateChangeUnsubscriptionCV;
asio::steady_timer fHeartbeatsTimer;
Duration fHeartbeatInterval;
std::unordered_map<typename ChangeStateOp::Id, ChangeStateOp> fChangeStateOps;
std::unordered_map<typename WaitForStateOp::Id, WaitForStateOp> fWaitForStateOps;