DDS plugin: Implement --control external

This commit is contained in:
Dennis Klein 2019-07-24 16:44:06 +02:00 committed by Dennis Klein
parent 6208cbb508
commit f7cdf5ee23
5 changed files with 142 additions and 125 deletions

View File

@ -70,7 +70,7 @@ Control::Control(const string& name, const Plugin::Version version, const string
if (control == "static") {
LOG(debug) << "Running builtin controller: static";
fControllerThread = thread(&Control::StaticMode, this);
} else if (control == "interactive") {
} else if (control == "dynamic" || control == "external" || control == "interactive") {
LOG(debug) << "Running builtin controller: interactive";
fControllerThread = thread(&Control::InteractiveMode, this);
} else {
@ -113,7 +113,7 @@ auto ControlPluginProgramOptions() -> Plugin::ProgOptions
namespace po = boost::program_options;
auto pluginOptions = po::options_description{"Control (builtin) Plugin"};
pluginOptions.add_options()
("control", po::value<string>()->default_value("interactive"), "Control mode, 'static' or 'interactive'")
("control", po::value<string>()->default_value("dynamic"), "Control mode, 'static' or 'dynamic' (aliases for dynamic are external and interactive)")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).");
return pluginOptions;
}

View File

@ -33,20 +33,10 @@ namespace plugins
DDS::DDS(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices)
: Plugin(name, version, maintainer, homepage, pluginServices)
, fService()
, fDDSCustomCmd(fService)
, fDDSKeyValue(fService)
, fDDSTaskId(dds::env_prop<dds::task_id>())
, fBindingChans()
, fConnectingChans()
, fStopMutex()
, fStopCondition()
, fTransitions({ "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE" })
, fControllerThread()
, fCurrentState(DeviceState::Idle)
, fLastState(DeviceState::Idle)
, fDeviceTerminationRequested(false)
, fServiceStarted(false)
, fHeartbeatInterval(100)
{
try {
@ -63,95 +53,69 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta
auto DDS::HandleControl() -> void
{
try {
LOG(debug) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH");
LOG(debug) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME");
LOG(debug) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME");
LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME");
LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX");
LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX");
LOG(debug) << "$DDS_TASK_ID: " << getenv("DDS_TASK_ID");
LOG(debug) << "$DDS_LOCATION: " << getenv("DDS_LOCATION");
string dds_session_id(getenv("DDS_SESSION_ID"));
LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id;
auto control = GetProperty<string>("control");
bool staticMode(false);
if (control == "static") {
LOG(debug) << "Running DDS controller: static";
staticMode = true;
} else if (control == "dynamic" || control == "external" || control == "interactive") {
LOG(debug) << "Running DDS controller: external";
} else {
LOG(error) << "Unrecognized control mode '" << control << "' requested. " << "Ignoring and falling back to static control mode.";
staticMode = true;
}
LOG(info) << "DDS Task Id (from API): " << fDDSTaskId;
// subscribe for state changes from DDS (subscriptions start firing after fService.start() is called)
SubscribeForCustomCommands();
// subscribe for DDS service errors.
fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const string& errorMsg) {
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg << endl;
});
SubscribeForConnectingChannels();
// subscribe to device state changes, pushing new state changes into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState) {
fStateQueue.Push(newState);
if (newState == DeviceState::Exiting) {
switch(newState) {
case DeviceState::Bound:
// Receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
PublishBoundChannels();
break;
case DeviceState::Exiting:
fDeviceTerminationRequested = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
break;
default:
break;
}
if (fServiceStarted) {
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
string id = GetProperty<string>("id");
fLastState = fCurrentState;
fCurrentState = newState;
for (auto subscriberId : fStateChangeSubscribers) {
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
fDDSCustomCmd.send("state-change: " + id + "," + ToString(fDDSTaskId) + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId));
}
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
string id = GetProperty<string>("id");
fLastState = fCurrentState;
fCurrentState = newState;
for (auto subscriberId : fStateChangeSubscribers) {
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
fDDS.Send("state-change: " + id + "," + ToString(dds::env_prop<dds::task_id>()) + "," + ToStr(fLastState) + "->" + ToStr(newState), to_string(subscriberId));
}
});
ChangeDeviceState(DeviceStateTransition::InitDevice);
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
ChangeDeviceState(DeviceStateTransition::CompleteInit);
while (fStateQueue.WaitForNext() != DeviceState::Initialized) {}
ChangeDeviceState(DeviceStateTransition::Bind);
while (fStateQueue.WaitForNext() != DeviceState::Bound) {}
if (staticMode) {
TransitionDeviceStateTo(DeviceState::Running);
// in the Initializing state subscribe to receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS.
FillChannelContainers();
// start DDS service - subscriptions will only start firing after this step
fService.start(dds_session_id);
fServiceStarted = true;
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
PublishBoundChannels();
ChangeDeviceState(DeviceStateTransition::Connect);
while (fStateQueue.WaitForNext() != DeviceState::DeviceReady) {}
ChangeDeviceState(DeviceStateTransition::InitTask);
while (fStateQueue.WaitForNext() != DeviceState::Ready) {}
ChangeDeviceState(DeviceStateTransition::Run);
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!fDeviceTerminationRequested) {
fStopCondition.wait_for(lock, chrono::seconds(1));
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!fDeviceTerminationRequested) {
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(debug) << "Stopping DDS control plugin";
}
LOG(debug) << "Stopping DDS control plugin";
} catch (DeviceErrorState&) {
ReleaseDeviceControl();
} catch (exception& e) {
ReleaseDeviceControl();
LOG(error) << "Error: " << e.what() << endl;
return;
}
fDDSKeyValue.unsubscribe();
fDDSCustomCmd.unsubscribe();
try {
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
} catch (fair::mq::PluginServices::DeviceControlError& e) {
LOG(error) << e.what();
}
}
auto DDS::FillChannelContainers() -> void
@ -228,7 +192,7 @@ auto DDS::SubscribeForConnectingChannels() -> void
{
LOG(debug) << "Subscribing for DDS properties.";
fDDSKeyValue.subscribe([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
try {
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
string val = value;
@ -286,7 +250,7 @@ auto DDS::PublishBoundChannels() -> void
for (const auto& chan : fBindingChans) {
string joined = boost::algorithm::join(chan.second, ",");
LOG(debug) << "Publishing " << chan.first << " bound addresses (" << chan.second.size() << ") to DDS under '" << chan.first << "' property name.";
fDDSKeyValue.putValue(chan.first, joined);
fDDS.PutValue(chan.first, joined);
}
}
@ -299,7 +263,7 @@ auto DDS::HeartbeatSender() -> void
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
for (const auto subscriberId : fHeartbeatSubscribers) {
fDDSCustomCmd.send("heartbeat: " + id , to_string(subscriberId));
fDDS.Send("heartbeat: " + id , to_string(subscriberId));
}
}
@ -313,30 +277,30 @@ auto DDS::SubscribeForCustomCommands() -> void
string id = GetProperty<string>("id");
fDDSCustomCmd.subscribe([id, this](const string& cmd, const string& cond, uint64_t senderId) {
fDDS.SubscribeCustomCmd([id, this](const string& cmd, const string& cond, uint64_t senderId) {
LOG(info) << "Received command: '" << cmd << "' from " << senderId;
if (cmd == "check-state") {
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
fDDS.Send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
} else if (cmd == "INIT DEVICE") {
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId));
fDDS.Send(id + ": queued, " + cmd, to_string(senderId));
while (fStateQueue.WaitForNext() != DeviceState::InitializingDevice) {}
ChangeDeviceState(DeviceStateTransition::CompleteInit);
} else {
fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId));
fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId));
}
} else if (fTransitions.find(cmd) != fTransitions.end()) {
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId));
fDDS.Send(id + ": queued, " + cmd, to_string(senderId));
} else {
fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId));
fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId));
}
} else if (cmd == "END") {
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
fDDSCustomCmd.send(id + ": queued, " + cmd, to_string(senderId));
fDDS.Send(id + ": queued, " + cmd, to_string(senderId));
} else {
fDDSCustomCmd.send(id + ": could not queue, " + cmd , to_string(senderId));
fDDS.Send(id + ": could not queue, " + cmd, to_string(senderId));
}
if (ToStr(GetCurrentDeviceState()) == "EXITING") {
unique_lock<mutex> lock(fStopMutex);
@ -347,43 +311,43 @@ auto DDS::SubscribeForCustomCommands() -> void
for (const auto pKey: GetPropertyKeys()) {
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << endl;
}
fDDSCustomCmd.send(ss.str(), to_string(senderId));
fDDS.Send(ss.str(), to_string(senderId));
} else if (cmd == "subscribe-to-heartbeats") {
{
// auto size = fHeartbeatSubscribers.size();
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.insert(senderId);
}
fDDSCustomCmd.send("heartbeat-subscription: " + id + ",OK", to_string(senderId));
fDDS.Send("heartbeat-subscription: " + id + ",OK", to_string(senderId));
} else if (cmd == "unsubscribe-from-heartbeats") {
{
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
fHeartbeatSubscribers.erase(senderId);
}
fDDSCustomCmd.send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
fDDS.Send("heartbeat-unsubscription: " + id + ",OK", to_string(senderId));
} else if (cmd == "subscribe-to-state-changes") {
{
// auto size = fStateChangeSubscribers.size();
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.insert(senderId);
}
fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId));
fDDS.Send("state-changes-subscription: " + id + ",OK", to_string(senderId));
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId;
// fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId));
fDDSCustomCmd.send("state-change: " + id + "," + ToString(fDDSTaskId) + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId));
fDDS.Send("state-change: " + id + "," + ToString(dds::env_prop<dds::task_id>()) + "," + ToStr(fLastState) + "->" + ToStr(fCurrentState), to_string(senderId));
}
} else if (cmd == "unsubscribe-from-state-changes") {
{
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
fStateChangeSubscribers.erase(senderId);
}
fDDSCustomCmd.send("state-changes-unsubscription: " + id + ",OK", to_string(senderId));
fDDS.Send("state-changes-unsubscription: " + id + ",OK", to_string(senderId));
} else if (cmd == "SHUTDOWN") {
TransitionDeviceStateTo(DeviceState::Exiting);
} else if (cmd == "STARTUP") {
TransitionDeviceStateTo(DeviceState::Ready);
TransitionDeviceStateTo(DeviceState::Running);
} else {
LOG(warn) << "Unknown command: " << cmd;
LOG(warn) << "Origin: " << senderId;
@ -394,6 +358,9 @@ auto DDS::SubscribeForCustomCommands() -> void
DDS::~DDS()
{
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
if (fControllerThread.joinable()) {
fControllerThread.join();
}

View File

@ -9,23 +9,22 @@
#ifndef FAIR_MQ_PLUGINS_DDS
#define FAIR_MQ_PLUGINS_DDS
#include <fairmq/Plugin.h>
#include <fairmq/Version.h>
#include <fairmq/StateQueue.h>
#include <DDS/dds_intercom.h>
#include <DDS/dds_env_prop.h>
#include <condition_variable>
#include <mutex>
#include <string>
#include <queue>
#include <thread>
#include <vector>
#include <unordered_map>
#include <set>
#include <DDS/dds_intercom.h>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <fairmq/Plugin.h>
#include <fairmq/StateQueue.h>
#include <fairmq/Version.h>
#include <functional>
#include <mutex>
#include <queue>
#include <set>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
namespace fair
{
@ -36,23 +35,78 @@ namespace plugins
struct DDSConfig
{
DDSConfig()
: fSubChannelAddresses()
, fDDSValues()
{}
// container of sub channel addresses
std::vector<std::string> fSubChannelAddresses;
// dds values for the channel
std::unordered_map<uint64_t, std::string> fDDSValues;
};
struct DDSSubscription
{
DDSSubscription()
: fDDSCustomCmd(fService)
, fDDSKeyValue(fService)
{
LOG(debug) << "$DDS_TASK_PATH: " << dds::env_prop<dds::task_path>();
LOG(debug) << "$DDS_GROUP_NAME: " << dds::env_prop<dds::group_name>();
LOG(debug) << "$DDS_COLLECTION_NAME: " << dds::env_prop<dds::collection_name>();
LOG(debug) << "$DDS_TASK_NAME: " << dds::env_prop<dds::task_name>();
LOG(debug) << "$DDS_TASK_INDEX: " << dds::env_prop<dds::task_index>();
LOG(debug) << "$DDS_COLLECTION_INDEX: " << dds::env_prop<dds::collection_index>();
LOG(debug) << "$DDS_TASK_ID: " << dds::env_prop<dds::task_id>();
LOG(debug) << "$DDS_LOCATION: " << dds::env_prop<dds::dds_location>();
std::string dds_session_id(dds::env_prop<dds::dds_session_id>());
LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id;
// subscribe for DDS service errors.
fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& errorMsg) {
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg;
});
assert(!dds_session_id.empty());
fService.start(dds_session_id);
}
~DDSSubscription() {
fDDSKeyValue.unsubscribe();
fDDSCustomCmd.unsubscribe();
}
template<typename... Args>
auto SubscribeCustomCmd(Args&&... args) -> void
{
fDDSCustomCmd.subscribe(std::forward<Args>(args)...);
}
template<typename... Args>
auto SubscribeKeyValue(Args&&... args) -> void
{
fDDSKeyValue.subscribe(std::forward<Args>(args)...);
}
template<typename... Args>
auto Send(Args&&... args) -> void
{
fDDSCustomCmd.send(std::forward<Args>(args)...);
}
template<typename... Args>
auto PutValue(Args&&... args) -> void
{
fDDSKeyValue.putValue(std::forward<Args>(args)...);
}
private:
dds::intercom_api::CIntercomService fService;
dds::intercom_api::CCustomCmd fDDSCustomCmd;
dds::intercom_api::CKeyValue fDDSKeyValue;
};
struct IofN
{
IofN(int i, int n)
: fI(i)
, fN(n)
, fEntries()
{}
unsigned int fI;
@ -77,10 +131,7 @@ class DDS : public Plugin
auto HeartbeatSender() -> void;
dds::intercom_api::CIntercomService fService;
dds::intercom_api::CCustomCmd fDDSCustomCmd;
dds::intercom_api::CKeyValue fDDSKeyValue;
uint64_t fDDSTaskId;
DDSSubscription fDDS;
std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
std::unordered_map<std::string, DDSConfig> fConnectingChans;
@ -98,7 +149,6 @@ class DDS : public Plugin
fair::mq::StateQueue fStateQueue;
std::atomic<bool> fDeviceTerminationRequested;
std::atomic<bool> fServiceStarted;
std::set<uint64_t> fHeartbeatSubscribers;
std::mutex fHeartbeatSubscriberMutex;

View File

@ -57,7 +57,7 @@ struct TopologyFixture : ::testing::Test
LOG(info) << mDDSTopo;
auto n(mDDSTopo.GetNumRequiredAgents());
mDDSSession.SubmitAgents(n);
mDDSSession.ActivateTopology(mDDSTopoFile);
mDDSSession.ActivateTopology(mDDSTopo);
}
auto TearDown() -> void override {

View File

@ -27,7 +27,7 @@
<main name="main">
<task>Sampler</task>
<group name="SinkGroup" n="50">
<group name="SinkGroup" n="5">
<task>Sink</task>
</group>
</main>