SDK Commands: remove heartbeat commands

This commit is contained in:
Alexey Rybalchenko 2020-02-19 16:10:03 +01:00 committed by Dennis Klein
parent fcd1022997
commit c1719eb285
7 changed files with 9 additions and 223 deletions

View File

@ -42,15 +42,12 @@ DDS::DDS(const string& name,
, fDeviceTerminationRequested(false)
, fLastExternalController(0)
, fExitingAckedByLastExternalController(false)
, fHeartbeatInterval(100)
, fUpdatesAllowed(false)
, fWorkGuard(fWorkerQueue.get_executor())
{
try {
TakeDeviceControl();
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
string deviceId(GetProperty<string>("id"));
if (deviceId.empty()) {
SetProperty<string>("id", dds::env_prop<dds::task_path>());
@ -304,24 +301,6 @@ auto DDS::PublishBoundChannels() -> void
}
}
auto DDS::HeartbeatSender() -> void
{
using namespace sdk::cmd;
string id = GetProperty<string>("id");
while (!fDeviceTerminationRequested) {
{
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
for (const auto subscriberId : fHeartbeatSubscribers) {
fDDS.Send(Cmds(make<Heartbeat>(id)).Serialize(), to_string(subscriberId));
}
}
this_thread::sleep_for(chrono::milliseconds(fHeartbeatInterval));
}
}
auto DDS::SubscribeForCustomCommands() -> void
{
LOG(debug) << "Subscribing for DDS custom commands.";
@ -367,22 +346,6 @@ auto DDS::SubscribeForCustomCommands() -> void
cmd::Cmds outCmds(cmd::make<cmd::Config>(id, ss.str()));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case cmd::Type::subscribe_to_heartbeats: {
{
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.insert(senderId);
}
cmd::Cmds outCmds(cmd::make<cmd::HeartbeatSubscription>(id, cmd::Result::Ok));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case cmd::Type::unsubscribe_from_heartbeats: {
{
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.erase(senderId);
}
cmd::Cmds outCmds(cmd::make<cmd::HeartbeatUnsubscription>(id, cmd::Result::Ok));
fDDS.Send(outCmds.Serialize(), to_string(senderId));
} break;
case cmd::Type::state_change_exiting_received: {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
@ -471,10 +434,6 @@ DDS::~DDS()
fControllerThread.join();
}
if (fHeartbeatThread.joinable()) {
fHeartbeatThread.join();
}
fWorkGuard.reset();
if (fWorkerThread.joinable()) {
fWorkerThread.join();

View File

@ -142,8 +142,6 @@ class DDS : public Plugin
auto PublishBoundChannels() -> void;
auto SubscribeForCustomCommands() -> void;
auto HeartbeatSender() -> void;
DDSSubscription fDDS;
std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
@ -157,18 +155,12 @@ class DDS : public Plugin
std::atomic<bool> fDeviceTerminationRequested;
std::set<uint64_t> fHeartbeatSubscribers;
std::mutex fHeartbeatSubscriberMutex;
std::set<uint64_t> fStateChangeSubscribers;
uint64_t fLastExternalController;
bool fExitingAckedByLastExternalController;
std::condition_variable fExitingAcked;
std::mutex fStateChangeSubscriberMutex;
std::thread fHeartbeatThread;
std::chrono::milliseconds fHeartbeatInterval;
bool fUpdatesAllowed;
std::mutex fUpdateMutex;
std::condition_variable fUpdateCondition;

View File

@ -252,6 +252,10 @@ class BasicTopology : public AsioBase<Executor, Allocator>
~BasicTopology()
{
std::lock_guard<std::mutex> lk(fMtx);
fDDSSession.UnsubscribeFromCommands();
try {

View File

@ -46,14 +46,12 @@ array<string, 2> resultNames =
}
};
array<string, 21> typeNames =
array<string, 16> typeNames =
{
{
"CheckState",
"ChangeState",
"DumpConfig",
"SubscribeToHeartbeats",
"UnsubscribeFromHeartbeats",
"SubscribeToStateChange",
"UnsubscribeFromStateChange",
"StateChangeExitingReceived",
@ -63,9 +61,6 @@ array<string, 21> typeNames =
"CurrentState",
"TransitionStatus",
"Config",
"HeartbeatSubscription",
"HeartbeatUnsubscription",
"Heartbeat",
"StateChangeSubscription",
"StateChangeUnsubscription",
"StateChange",
@ -152,14 +147,12 @@ array<sdk::cmd::FBTransition, 12> mqTransitionToFBTransition =
}
};
array<FBCmd, 21> typeToFBCmd =
array<FBCmd, 16> typeToFBCmd =
{
{
FBCmd::FBCmd_check_state,
FBCmd::FBCmd_change_state,
FBCmd::FBCmd_dump_config,
FBCmd::FBCmd_subscribe_to_heartbeats,
FBCmd::FBCmd_unsubscribe_from_heartbeats,
FBCmd::FBCmd_subscribe_to_state_change,
FBCmd::FBCmd_unsubscribe_from_state_change,
FBCmd::FBCmd_state_change_exiting_received,
@ -168,9 +161,6 @@ array<FBCmd, 21> typeToFBCmd =
FBCmd::FBCmd_current_state,
FBCmd::FBCmd_transition_status,
FBCmd::FBCmd_config,
FBCmd::FBCmd_heartbeat_subscription,
FBCmd::FBCmd_heartbeat_unsubscription,
FBCmd::FBCmd_heartbeat,
FBCmd::FBCmd_state_change_subscription,
FBCmd::FBCmd_state_change_unsubscription,
FBCmd::FBCmd_state_change,
@ -179,14 +169,12 @@ array<FBCmd, 21> typeToFBCmd =
}
};
array<Type, 21> fbCmdToType =
array<Type, 16> fbCmdToType =
{
{
Type::check_state,
Type::change_state,
Type::dump_config,
Type::subscribe_to_heartbeats,
Type::unsubscribe_from_heartbeats,
Type::subscribe_to_state_change,
Type::unsubscribe_from_state_change,
Type::state_change_exiting_received,
@ -195,9 +183,6 @@ array<Type, 21> fbCmdToType =
Type::current_state,
Type::transition_status,
Type::config,
Type::heartbeat_subscription,
Type::heartbeat_unsubscription,
Type::heartbeat,
Type::state_change_subscription,
Type::state_change_unsubscription,
Type::state_change,
@ -241,13 +226,6 @@ string Cmds::Serialize(const Format type) const
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
}
break;
case Type::subscribe_to_heartbeats: {
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
}
break;
case Type::unsubscribe_from_heartbeats: {
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
}
break;
case Type::subscribe_to_state_change: {
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
@ -310,28 +288,6 @@ string Cmds::Serialize(const Format type) const
cmdBuilder->add_config_string(config);
}
break;
case Type::heartbeat_subscription: {
auto _cmd = static_cast<HeartbeatSubscription&>(*cmd);
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
cmdBuilder->add_device_id(deviceId);
cmdBuilder->add_result(GetFBResult(_cmd.GetResult()));
}
break;
case Type::heartbeat_unsubscription: {
auto _cmd = static_cast<HeartbeatUnsubscription&>(*cmd);
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
cmdBuilder->add_device_id(deviceId);
cmdBuilder->add_result(GetFBResult(_cmd.GetResult()));
}
break;
case Type::heartbeat: {
auto deviceId = fbb.CreateString(static_cast<Heartbeat&>(*cmd).GetDeviceId());
cmdBuilder = tools::make_unique<FBCommandBuilder>(fbb);
cmdBuilder->add_device_id(deviceId);
}
break;
case Type::state_change_subscription: {
auto _cmd = static_cast<StateChangeSubscription&>(*cmd);
auto deviceId = fbb.CreateString(_cmd.GetDeviceId());
@ -447,12 +403,6 @@ void Cmds::Deserialize(const string& str, const Format type)
case FBCmd_dump_config:
fCmds.emplace_back(make<DumpConfig>());
break;
case FBCmd_subscribe_to_heartbeats:
fCmds.emplace_back(make<SubscribeToHeartbeats>());
break;
case FBCmd_unsubscribe_from_heartbeats:
fCmds.emplace_back(make<UnsubscribeFromHeartbeats>());
break;
case FBCmd_subscribe_to_state_change:
fCmds.emplace_back(make<SubscribeToStateChange>());
break;
@ -482,15 +432,6 @@ void Cmds::Deserialize(const string& str, const Format type)
case FBCmd_config:
fCmds.emplace_back(make<Config>(cmdPtr.device_id()->str(), cmdPtr.config_string()->str()));
break;
case FBCmd_heartbeat_subscription:
fCmds.emplace_back(make<HeartbeatSubscription>(cmdPtr.device_id()->str(), GetResult(cmdPtr.result())));
break;
case FBCmd_heartbeat_unsubscription:
fCmds.emplace_back(make<HeartbeatUnsubscription>(cmdPtr.device_id()->str(), GetResult(cmdPtr.result())));
break;
case FBCmd_heartbeat:
fCmds.emplace_back(make<Heartbeat>(cmdPtr.device_id()->str()));
break;
case FBCmd_state_change_subscription:
fCmds.emplace_back(make<StateChangeSubscription>(cmdPtr.device_id()->str(), GetResult(cmdPtr.result())));
break;

View File

@ -42,8 +42,6 @@ enum class Type : int
check_state, // args: { }
change_state, // args: { transition }
dump_config, // args: { }
subscribe_to_heartbeats, // args: { }
unsubscribe_from_heartbeats, // args: { }
subscribe_to_state_change, // args: { }
unsubscribe_from_state_change, // args: { }
state_change_exiting_received, // args: { }
@ -53,9 +51,6 @@ enum class Type : int
current_state, // args: { device_id, current_state }
transition_status, // args: { device_id, task_id, Result, transition }
config, // args: { device_id, config_string }
heartbeat_subscription, // args: { device_id, Result }
heartbeat_unsubscription, // args: { device_id, Result }
heartbeat, // args: { device_id }
state_change_subscription, // args: { device_id, Result }
state_change_unsubscription, // args: { device_id, Result }
state_change, // args: { device_id, task_id, last_state, current_state }
@ -98,16 +93,6 @@ struct DumpConfig : Cmd
explicit DumpConfig() : Cmd(Type::dump_config) {}
};
struct SubscribeToHeartbeats : Cmd
{
explicit SubscribeToHeartbeats() : Cmd(Type::subscribe_to_heartbeats) {}
};
struct UnsubscribeFromHeartbeats : Cmd
{
explicit UnsubscribeFromHeartbeats() : Cmd(Type::unsubscribe_from_heartbeats) {}
};
struct SubscribeToStateChange : Cmd
{
explicit SubscribeToStateChange() : Cmd(Type::subscribe_to_state_change) {}
@ -221,56 +206,6 @@ struct Config : Cmd
std::string fConfig;
};
struct HeartbeatSubscription : Cmd
{
explicit HeartbeatSubscription(const std::string& id, const Result result)
: Cmd(Type::heartbeat_subscription)
, fDeviceId(id)
, fResult(result)
{}
std::string GetDeviceId() const { return fDeviceId; }
void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; }
Result GetResult() const { return fResult; }
void SetResult(const Result result) { fResult = result; }
private:
std::string fDeviceId;
Result fResult;
};
struct HeartbeatUnsubscription : Cmd
{
explicit HeartbeatUnsubscription(const std::string& id, const Result result)
: Cmd(Type::heartbeat_unsubscription)
, fDeviceId(id)
, fResult(result)
{}
std::string GetDeviceId() const { return fDeviceId; }
void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; }
Result GetResult() const { return fResult; }
void SetResult(const Result result) { fResult = result; }
private:
std::string fDeviceId;
Result fResult;
};
struct Heartbeat : Cmd
{
explicit Heartbeat(const std::string& id)
: Cmd(Type::heartbeat)
, fDeviceId(id)
{}
std::string GetDeviceId() const { return fDeviceId; }
void SetDeviceId(const std::string& deviceId) { fDeviceId = deviceId; }
private:
std::string fDeviceId;
};
struct StateChangeSubscription : Cmd
{
explicit StateChangeSubscription(const std::string& id, const Result result)

View File

@ -47,8 +47,6 @@ enum FBCmd:byte {
check_state, // args: { }
change_state, // args: { transition }
dump_config, // args: { }
subscribe_to_heartbeats, // args: { }
unsubscribe_from_heartbeats, // args: { }
subscribe_to_state_change, // args: { }
unsubscribe_from_state_change, // args: { }
state_change_exiting_received, // args: { }
@ -58,9 +56,6 @@ enum FBCmd:byte {
current_state, // args: { device_id, current_state }
transition_status, // args: { device_id, Result, transition }
config, // args: { device_id, config_string }
heartbeat_subscription, // args: { device_id, Result }
heartbeat_unsubscription, // args: { device_id, Result }
heartbeat, // args: { device_id }
state_change_subscription, // args: { device_id, Result }
state_change_unsubscription, // args: { device_id, Result }
state_change, // args: { device_id, task_id, last_state, current_state }

View File

@ -24,8 +24,6 @@ TEST(Format, Construction)
Cmds checkStateCmds(make<CheckState>());
Cmds changeStateCmds(make<ChangeState>(Transition::Stop));
Cmds dumpConfigCmds(make<DumpConfig>());
Cmds subscribeToHeartbeatsCmds(make<SubscribeToHeartbeats>());
Cmds unsubscribeFromHeartbeatsCmds(make<UnsubscribeFromHeartbeats>());
Cmds subscribeToStateChangeCmds(make<SubscribeToStateChange>());
Cmds unsubscribeFromStateChangeCmds(make<UnsubscribeFromStateChange>());
Cmds stateChangeExitingReceivedCmds(make<StateChangeExitingReceived>());
@ -34,9 +32,6 @@ TEST(Format, Construction)
Cmds currentStateCmds(make<CurrentState>("somedeviceid", State::Running));
Cmds transitionStatusCmds(make<TransitionStatus>("somedeviceid", 123456, Result::Ok, Transition::Stop));
Cmds configCmds(make<Config>("somedeviceid", "someconfig"));
Cmds heartbeatSubscriptionCmds(make<HeartbeatSubscription>("somedeviceid", Result::Ok));
Cmds heartbeatUnsubscriptionCmds(make<HeartbeatUnsubscription>("somedeviceid", Result::Ok));
Cmds heartbeatCmds(make<Heartbeat>("somedeviceid"));
Cmds stateChangeSubscriptionCmds(make<StateChangeSubscription>("somedeviceid", Result::Ok));
Cmds stateChangeUnsubscriptionCmds(make<StateChangeUnsubscription>("somedeviceid", Result::Ok));
Cmds stateChangeCmds(make<StateChange>("somedeviceid", 123456, State::Running, State::Ready));
@ -47,8 +42,6 @@ TEST(Format, Construction)
ASSERT_EQ(changeStateCmds.At(0).GetType(), Type::change_state);
ASSERT_EQ(static_cast<ChangeState&>(changeStateCmds.At(0)).GetTransition(), Transition::Stop);
ASSERT_EQ(dumpConfigCmds.At(0).GetType(), Type::dump_config);
ASSERT_EQ(subscribeToHeartbeatsCmds.At(0).GetType(), Type::subscribe_to_heartbeats);
ASSERT_EQ(unsubscribeFromHeartbeatsCmds.At(0).GetType(), Type::unsubscribe_from_heartbeats);
ASSERT_EQ(subscribeToStateChangeCmds.At(0).GetType(), Type::subscribe_to_state_change);
ASSERT_EQ(unsubscribeFromStateChangeCmds.At(0).GetType(), Type::unsubscribe_from_state_change);
ASSERT_EQ(stateChangeExitingReceivedCmds.At(0).GetType(), Type::state_change_exiting_received);
@ -69,14 +62,6 @@ TEST(Format, Construction)
ASSERT_EQ(configCmds.At(0).GetType(), Type::config);
ASSERT_EQ(static_cast<Config&>(configCmds.At(0)).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<Config&>(configCmds.At(0)).GetConfig(), "someconfig");
ASSERT_EQ(heartbeatSubscriptionCmds.At(0).GetType(), Type::heartbeat_subscription);
ASSERT_EQ(static_cast<HeartbeatSubscription&>(heartbeatSubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<HeartbeatSubscription&>(heartbeatSubscriptionCmds.At(0)).GetResult(), Result::Ok);
ASSERT_EQ(heartbeatUnsubscriptionCmds.At(0).GetType(), Type::heartbeat_unsubscription);
ASSERT_EQ(static_cast<HeartbeatUnsubscription&>(heartbeatUnsubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<HeartbeatUnsubscription&>(heartbeatUnsubscriptionCmds.At(0)).GetResult(), Result::Ok);
ASSERT_EQ(heartbeatCmds.At(0).GetType(), Type::heartbeat);
ASSERT_EQ(static_cast<Heartbeat&>(heartbeatCmds.At(0)).GetDeviceId(), "somedeviceid");
ASSERT_EQ(stateChangeSubscriptionCmds.At(0).GetType(), Type::state_change_subscription);
ASSERT_EQ(static_cast<StateChangeSubscription&>(stateChangeSubscriptionCmds.At(0)).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<StateChangeSubscription&>(stateChangeSubscriptionCmds.At(0)).GetResult(), Result::Ok);
@ -106,8 +91,6 @@ void fillCommands(Cmds& cmds)
cmds.Add<CheckState>();
cmds.Add<ChangeState>(Transition::Stop);
cmds.Add<DumpConfig>();
cmds.Add<SubscribeToHeartbeats>();
cmds.Add<UnsubscribeFromHeartbeats>();
cmds.Add<SubscribeToStateChange>();
cmds.Add<UnsubscribeFromStateChange>();
cmds.Add<StateChangeExitingReceived>();
@ -116,9 +99,6 @@ void fillCommands(Cmds& cmds)
cmds.Add<CurrentState>("somedeviceid", State::Running);
cmds.Add<TransitionStatus>("somedeviceid", 123456, Result::Ok, Transition::Stop);
cmds.Add<Config>("somedeviceid", "someconfig");
cmds.Add<HeartbeatSubscription>("somedeviceid", Result::Ok);
cmds.Add<HeartbeatUnsubscription>("somedeviceid", Result::Ok);
cmds.Add<Heartbeat>("somedeviceid");
cmds.Add<StateChangeSubscription>("somedeviceid", Result::Ok);
cmds.Add<StateChangeUnsubscription>("somedeviceid", Result::Ok);
cmds.Add<StateChange>("somedeviceid", 123456, State::Running, State::Ready);
@ -128,7 +108,7 @@ void fillCommands(Cmds& cmds)
void checkCommands(Cmds& cmds)
{
ASSERT_EQ(cmds.Size(), 21);
ASSERT_EQ(cmds.Size(), 16);
int count = 0;
auto const props(std::vector<std::pair<std::string, std::string>>({{"k1", "v1"}, {"k2", "v2"}}));
@ -145,12 +125,6 @@ void checkCommands(Cmds& cmds)
case Type::dump_config:
++count;
break;
case Type::subscribe_to_heartbeats:
++count;
break;
case Type::unsubscribe_from_heartbeats:
++count;
break;
case Type::subscribe_to_state_change:
++count;
break;
@ -187,20 +161,6 @@ void checkCommands(Cmds& cmds)
ASSERT_EQ(static_cast<Config&>(*cmd).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<Config&>(*cmd).GetConfig(), "someconfig");
break;
case Type::heartbeat_subscription:
++count;
ASSERT_EQ(static_cast<HeartbeatSubscription&>(*cmd).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<HeartbeatSubscription&>(*cmd).GetResult(), Result::Ok);
break;
case Type::heartbeat_unsubscription:
++count;
ASSERT_EQ(static_cast<HeartbeatUnsubscription&>(*cmd).GetDeviceId(), "somedeviceid");
ASSERT_EQ(static_cast<HeartbeatUnsubscription&>(*cmd).GetResult(), Result::Ok);
break;
case Type::heartbeat:
++count;
ASSERT_EQ(static_cast<Heartbeat&>(*cmd).GetDeviceId(), "somedeviceid");
break;
case Type::state_change_subscription:
++count;
ASSERT_EQ(static_cast<StateChangeSubscription&>(*cmd).GetDeviceId(), "somedeviceid");
@ -237,7 +197,7 @@ void checkCommands(Cmds& cmds)
}
}
ASSERT_EQ(count, 21);
ASSERT_EQ(count, 16);
}
TEST(Format, SerializationBinary)