mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
feat!: Remove deprecated components sdk, sdk_commands, dds_plugin
BREAKING CHANGE: Components have been moved to ODC project, see https://github.com/FairRootGroup/FairMQ/discussions/392 for details.
This commit is contained in:
@@ -1,24 +0,0 @@
|
||||
################################################################################
|
||||
# Copyright (C) 2012-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
set(plugin FairMQPlugin_dds)
|
||||
add_library(${plugin} SHARED ${CMAKE_CURRENT_SOURCE_DIR}/DDS.cxx ${CMAKE_CURRENT_SOURCE_DIR}/DDS.h)
|
||||
target_compile_features(${plugin} PUBLIC cxx_std_17)
|
||||
target_link_libraries(${plugin} PUBLIC FairMQ StateMachine DDS::dds_intercom_lib DDS::dds_protocol_lib Boost::boost PRIVATE Commands asio::asio)
|
||||
target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
|
||||
set_target_properties(${plugin} PROPERTIES CXX_VISIBILITY_PRESET hidden)
|
||||
set_target_properties(${plugin} PROPERTIES
|
||||
VERSION ${PROJECT_VERSION}
|
||||
SOVERSION "${PROJECT_VERSION_MAJOR}.${PROJECT_VERSION_MINOR}"
|
||||
LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/fairmq
|
||||
)
|
||||
|
||||
install(TARGETS ${plugin}
|
||||
EXPORT ${PROJECT_EXPORT_SET}
|
||||
LIBRARY DESTINATION ${PROJECT_INSTALL_LIBDIR}
|
||||
)
|
@@ -1,457 +0,0 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "DDS.h"
|
||||
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <boost/algorithm/string/join.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/classification.hpp>
|
||||
#include <asio/post.hpp>
|
||||
|
||||
#include <cstdlib>
|
||||
#include <stdexcept>
|
||||
#include <sstream>
|
||||
|
||||
using namespace std;
|
||||
using fair::mq::tools::ToString;
|
||||
|
||||
namespace fair::mq::plugins
|
||||
{
|
||||
|
||||
DDS::DDS(const string& name,
|
||||
const Plugin::Version version,
|
||||
const string& maintainer,
|
||||
const string& homepage,
|
||||
PluginServices* pluginServices)
|
||||
: Plugin(name, version, maintainer, homepage, pluginServices)
|
||||
, fDDSTaskId(dds::env_prop<dds::task_id>())
|
||||
, fCurrentState(DeviceState::Idle)
|
||||
, fLastState(DeviceState::Idle)
|
||||
, fDeviceTerminationRequested(false)
|
||||
, fLastExternalController(0)
|
||||
, fExitingAckedByLastExternalController(false)
|
||||
, fUpdatesAllowed(false)
|
||||
, fWorkGuard(fWorkerQueue.get_executor())
|
||||
{
|
||||
try {
|
||||
TakeDeviceControl();
|
||||
|
||||
string deviceId(GetProperty<string>("id"));
|
||||
if (deviceId.empty()) {
|
||||
SetProperty<string>("id", dds::env_prop<dds::task_path>());
|
||||
}
|
||||
string sessionId(GetProperty<string>("session"));
|
||||
if (sessionId == "default") {
|
||||
SetProperty<string>("session", dds::env_prop<dds::dds_session_id>());
|
||||
}
|
||||
|
||||
auto control = GetProperty<string>("control");
|
||||
if (control == "static") {
|
||||
LOG(error) << "DDS Plugin: static mode is not supported";
|
||||
throw invalid_argument("DDS Plugin: static mode is not supported");
|
||||
} else if (control == "dynamic" || control == "external" || control == "interactive") {
|
||||
LOG(debug) << "Running DDS controller: external";
|
||||
} else {
|
||||
LOG(error) << "Unrecognized control mode '" << control << "' requested. " << "Ignoring and starting in external control mode.";
|
||||
}
|
||||
|
||||
SubscribeForCustomCommands();
|
||||
SubscribeForConnectingChannels();
|
||||
|
||||
// subscribe to device state changes, pushing new state changes into the event queue
|
||||
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
||||
switch (newState) {
|
||||
case DeviceState::Bound: {
|
||||
// Receive addresses of connecting channels from DDS
|
||||
// and propagate addresses of bound channels to DDS.
|
||||
FillChannelContainers();
|
||||
|
||||
// allow updates from key value after channel containers are filled
|
||||
{
|
||||
lock_guard<mutex> lk(fUpdateMutex);
|
||||
fUpdatesAllowed = true;
|
||||
}
|
||||
fUpdateCondition.notify_one();
|
||||
|
||||
// publish bound addresses via DDS at keys corresponding to the channel
|
||||
// prefixes, e.g. 'data' in data[i]
|
||||
PublishBoundChannels();
|
||||
} break;
|
||||
case DeviceState::ResettingDevice: {
|
||||
{
|
||||
lock_guard<mutex> lk(fUpdateMutex);
|
||||
fUpdatesAllowed = false;
|
||||
}
|
||||
|
||||
EmptyChannelContainers();
|
||||
} break;
|
||||
case DeviceState::Exiting: {
|
||||
if (!fControllerThread.joinable()) {
|
||||
fControllerThread = thread(&DDS::WaitForExitingAck, this);
|
||||
}
|
||||
fWorkGuard.reset();
|
||||
fDeviceTerminationRequested = true;
|
||||
UnsubscribeFromDeviceStateChange();
|
||||
ReleaseDeviceControl();
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
using namespace sdk::cmd;
|
||||
auto now = chrono::steady_clock::now();
|
||||
string id = GetProperty<string>("id");
|
||||
fLastState = fCurrentState;
|
||||
fCurrentState = newState;
|
||||
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
for (auto it = fStateChangeSubscribers.cbegin(); it != fStateChangeSubscribers.end();) {
|
||||
// if a subscriber did not send a heartbeat in more than 3 times the promised interval,
|
||||
// remove it from the subscriber list
|
||||
if (chrono::duration<double>(now - it->second.first).count() > 3 * it->second.second) {
|
||||
LOG(warn) << "Controller '" << it->first
|
||||
<< "' did not send heartbeats since over 3 intervals ("
|
||||
<< 3 * it->second.second << " ms), removing it.";
|
||||
fStateChangeSubscribers.erase(it++);
|
||||
} else {
|
||||
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << it->first;
|
||||
Cmds cmds(make<StateChange>(id, fDDSTaskId, fLastState, fCurrentState));
|
||||
fDDS.Send(cmds.Serialize(), to_string(it->first));
|
||||
++it;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
StartWorkerThread();
|
||||
|
||||
fDDS.Start();
|
||||
} catch (PluginServices::DeviceControlError& e) {
|
||||
LOG(debug) << e.what();
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Error in plugin initialization: " << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
void DDS::EmptyChannelContainers()
|
||||
{
|
||||
fBindingChans.clear();
|
||||
fConnectingChans.clear();
|
||||
}
|
||||
|
||||
auto DDS::StartWorkerThread() -> void
|
||||
{
|
||||
fWorkerThread = thread([this]() {
|
||||
fWorkerQueue.run();
|
||||
});
|
||||
}
|
||||
|
||||
auto DDS::WaitForExitingAck() -> void
|
||||
{
|
||||
unique_lock<mutex> lock(fStateChangeSubscriberMutex);
|
||||
auto timeout = GetProperty<unsigned int>("wait-for-exiting-ack-timeout");
|
||||
fExitingAcked.wait_for(lock, chrono::milliseconds(timeout), [this]() {
|
||||
return fExitingAckedByLastExternalController || fStateChangeSubscribers.empty();
|
||||
});
|
||||
}
|
||||
|
||||
auto DDS::FillChannelContainers() -> void
|
||||
{
|
||||
try {
|
||||
unordered_map<string, int> channelInfo(GetChannelInfo());
|
||||
|
||||
// fill binding and connecting chans
|
||||
for (const auto& c : channelInfo) {
|
||||
string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"};
|
||||
if (GetProperty<string>(methodKey) == "bind") {
|
||||
fBindingChans.insert(make_pair(c.first, vector<string>()));
|
||||
for (int i = 0; i < c.second; ++i) {
|
||||
fBindingChans.at(c.first).push_back(GetProperty<string>(string{"chans." + c.first + "." + to_string(i) + ".address"}));
|
||||
}
|
||||
} else if (GetProperty<string>(methodKey) == "connect") {
|
||||
fConnectingChans.insert(make_pair(c.first, DDSConfig()));
|
||||
LOG(debug) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels.";
|
||||
fConnectingChans.at(c.first).fNumSubChannels = c.second;
|
||||
} else {
|
||||
LOG(error) << "Cannot update address configuration. Channel method (bind/connect) not specified.";
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// save properties that will have multiple values arriving (with only some of them to be used)
|
||||
vector<string> iValues;
|
||||
if (PropertyExists("dds-i")) {
|
||||
iValues = GetProperty<vector<string>>("dds-i");
|
||||
}
|
||||
vector<string> inValues;
|
||||
if (PropertyExists("dds-i-n")) {
|
||||
inValues = GetProperty<vector<string>>("dds-i-n");
|
||||
}
|
||||
|
||||
for (const auto& vi : iValues) {
|
||||
size_t pos = vi.find(":");
|
||||
string chanName = vi.substr(0, pos);
|
||||
|
||||
// check if provided name is a valid channel name
|
||||
if (fConnectingChans.find(chanName) == fConnectingChans.end()) {
|
||||
throw invalid_argument(ToString("channel provided to dds-i is not an actual connecting channel of this device: ", chanName));
|
||||
}
|
||||
|
||||
int i = stoi(vi.substr(pos + 1));
|
||||
LOG(debug) << "dds-i: adding " << chanName << " -> i of " << i;
|
||||
fI.insert(make_pair(chanName, i));
|
||||
}
|
||||
|
||||
for (const auto& vi : inValues) {
|
||||
size_t pos = vi.find(":");
|
||||
string chanName = vi.substr(0, pos);
|
||||
|
||||
// check if provided name is a valid channel name
|
||||
if (fConnectingChans.find(chanName) == fConnectingChans.end()) {
|
||||
throw invalid_argument(ToString("channel provided to dds-i-n is not an actual connecting channel of this device: ", chanName));
|
||||
}
|
||||
|
||||
string i_n = vi.substr(pos + 1);
|
||||
pos = i_n.find("-");
|
||||
int i = stoi(i_n.substr(0, pos));
|
||||
int n = stoi(i_n.substr(pos + 1));
|
||||
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
|
||||
fIofN.insert(make_pair(chanName, IofN(i, n)));
|
||||
}
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << "Error filling channel containers: " << e.what();
|
||||
}
|
||||
}
|
||||
|
||||
auto DDS::SubscribeForConnectingChannels() -> void
|
||||
{
|
||||
LOG(debug) << "Subscribing for DDS properties.";
|
||||
|
||||
fDDS.SubscribeKeyValue([&] (const string& key, const string& value, uint64_t senderTaskID) {
|
||||
LOG(debug) << "Received property: key=" << key << ", value=" << value << ", senderTaskID=" << senderTaskID;
|
||||
|
||||
if (key.compare(0, 8, "fmqchan_") != 0) {
|
||||
LOG(debug) << "property update is not a channel info update: " << key;
|
||||
return;
|
||||
}
|
||||
string channelName = key.substr(8);
|
||||
LOG(info) << "Update for channel name: " << channelName;
|
||||
|
||||
asio::post(fWorkerQueue, [=]() {
|
||||
try {
|
||||
{
|
||||
unique_lock<mutex> lk(fUpdateMutex);
|
||||
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; });
|
||||
}
|
||||
|
||||
if (fConnectingChans.find(channelName) == fConnectingChans.end()) {
|
||||
LOG(error) << "Received an update for a connecting channel, but either no channel with given channel name exists or it has already been configured: '" << channelName << "', ignoring...";
|
||||
return;
|
||||
}
|
||||
|
||||
string val = value;
|
||||
// check if it is to handle as one out of multiple values
|
||||
auto it = fIofN.find(channelName);
|
||||
if (it != fIofN.end()) {
|
||||
it->second.fEntries.push_back(value);
|
||||
if (it->second.fEntries.size() == it->second.fN) {
|
||||
sort(it->second.fEntries.begin(), it->second.fEntries.end());
|
||||
val = it->second.fEntries.at(it->second.fI);
|
||||
} else {
|
||||
LOG(debug) << "received " << it->second.fEntries.size() << " values for " << channelName << ", expecting total of " << it->second.fN;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
vector<string> connectionStrings;
|
||||
boost::algorithm::split(connectionStrings, val, boost::algorithm::is_any_of(","));
|
||||
if (connectionStrings.size() > 1) { // multiple bound channels received
|
||||
auto it2 = fI.find(channelName);
|
||||
if (it2 != fI.end()) {
|
||||
LOG(debug) << "adding connecting channel " << channelName << " : " << connectionStrings.at(it2->second);
|
||||
fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()});
|
||||
} else {
|
||||
LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first";
|
||||
fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()});
|
||||
}
|
||||
} else { // only one bound channel received
|
||||
fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, val.c_str()});
|
||||
}
|
||||
|
||||
for (const auto& mi : fConnectingChans) {
|
||||
if (mi.second.fNumSubChannels == mi.second.fDDSValues.size()) {
|
||||
int i = 0;
|
||||
for (const auto& e : mi.second.fDDSValues) {
|
||||
auto result = UpdateProperty<string>(string{"chans." + mi.first + "." + to_string(i) + ".address"}, e.second);
|
||||
if (!result) {
|
||||
LOG(error) << "UpdateProperty failed for: " << "chans." << mi.first << "." << to_string(i) << ".address" << " - property does not exist";
|
||||
}
|
||||
++i;
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (const exception& e) {
|
||||
LOG(error) << "Error handling DDS property: key=" << key << ", value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what();
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
auto DDS::PublishBoundChannels() -> void
|
||||
{
|
||||
for (const auto& chan : fBindingChans) {
|
||||
string joined = boost::algorithm::join(chan.second, ",");
|
||||
LOG(debug) << "Publishing bound addresses (" << chan.second.size() << ") of channel '" << chan.first << "' to DDS under '" << "fmqchan_" + chan.first << "' property name.";
|
||||
fDDS.PutValue("fmqchan_" + chan.first, joined);
|
||||
}
|
||||
}
|
||||
|
||||
auto DDS::SubscribeForCustomCommands() -> void
|
||||
{
|
||||
LOG(debug) << "Subscribing for DDS custom commands.";
|
||||
|
||||
string id = GetProperty<string>("id");
|
||||
|
||||
fDDS.SubscribeCustomCmd([id, this](const string& cmdStr, const string& cond, uint64_t senderId) {
|
||||
// LOG(info) << "Received command: '" << cmdStr << "' from " << senderId;
|
||||
sdk::cmd::Cmds inCmds;
|
||||
inCmds.Deserialize(cmdStr);
|
||||
for (const auto& cmd : inCmds) {
|
||||
HandleCmd(id, *cmd, cond, senderId);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
auto DDS::HandleCmd(const string& id, sdk::cmd::Cmd& cmd, const string& cond, uint64_t senderId) -> void
|
||||
{
|
||||
using namespace fair::mq::sdk;
|
||||
using namespace fair::mq::sdk::cmd;
|
||||
// LOG(info) << "Received command type: '" << cmd.GetType() << "' from " << senderId;
|
||||
switch (cmd.GetType()) {
|
||||
case Type::check_state: {
|
||||
fDDS.Send(Cmds(make<CurrentState>(id, GetCurrentDeviceState())).Serialize(), to_string(senderId));
|
||||
} break;
|
||||
case Type::change_state: {
|
||||
Transition transition = static_cast<ChangeState&>(cmd).GetTransition();
|
||||
if (ChangeDeviceState(transition)) {
|
||||
Cmds outCmds(make<TransitionStatus>(id, fDDSTaskId, Result::Ok, transition, GetCurrentDeviceState()));
|
||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||
} else {
|
||||
Cmds outCmds(make<TransitionStatus>(id, fDDSTaskId, Result::Failure, transition, GetCurrentDeviceState()));
|
||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||
}
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fLastExternalController = senderId;
|
||||
}
|
||||
} break;
|
||||
case Type::dump_config: {
|
||||
stringstream ss;
|
||||
for (const auto& pKey : GetPropertyKeys()) {
|
||||
ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << "\n";
|
||||
}
|
||||
Cmds outCmds(make<Config>(id, ss.str()));
|
||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||
} break;
|
||||
case Type::state_change_exiting_received: {
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
if (fLastExternalController == senderId) {
|
||||
fExitingAckedByLastExternalController = true;
|
||||
}
|
||||
}
|
||||
fExitingAcked.notify_one();
|
||||
} break;
|
||||
case Type::subscribe_to_state_change: {
|
||||
auto _cmd = static_cast<cmd::SubscribeToStateChange&>(cmd);
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fStateChangeSubscribers.emplace(senderId, make_pair(chrono::steady_clock::now(), _cmd.GetInterval()));
|
||||
|
||||
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId;
|
||||
|
||||
Cmds outCmds(make<StateChangeSubscription>(id, fDDSTaskId, Result::Ok),
|
||||
make<StateChange>(id, fDDSTaskId, fLastState, fCurrentState));
|
||||
|
||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||
} break;
|
||||
case Type::subscription_heartbeat: {
|
||||
try {
|
||||
auto _cmd = static_cast<cmd::SubscriptionHeartbeat&>(cmd);
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fStateChangeSubscribers.at(senderId) = make_pair(chrono::steady_clock::now(), _cmd.GetInterval());
|
||||
} catch(out_of_range& oor) {
|
||||
LOG(warn) << "Received subscription heartbeat from an unknown controller with id '" << senderId << "'";
|
||||
}
|
||||
} break;
|
||||
case Type::unsubscribe_from_state_change: {
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fStateChangeSubscribers.erase(senderId);
|
||||
}
|
||||
Cmds outCmds(make<StateChangeUnsubscription>(id, fDDSTaskId, Result::Ok));
|
||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||
} break;
|
||||
case Type::get_properties: {
|
||||
auto _cmd = static_cast<cmd::GetProperties&>(cmd);
|
||||
auto const request_id(_cmd.GetRequestId());
|
||||
auto result(Result::Ok);
|
||||
vector<pair<string, string>> props;
|
||||
try {
|
||||
for (auto const& prop : GetPropertiesAsString(_cmd.GetQuery())) {
|
||||
props.push_back({prop.first, prop.second});
|
||||
}
|
||||
} catch (exception const& e) {
|
||||
LOG(warn) << "Getting properties (request id: " << request_id << ") failed: " << e.what();
|
||||
result = Result::Failure;
|
||||
}
|
||||
Cmds const outCmds(make<cmd::Properties>(id, request_id, result, props));
|
||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||
} break;
|
||||
case Type::set_properties: {
|
||||
auto _cmd(static_cast<cmd::SetProperties&>(cmd));
|
||||
auto const request_id(_cmd.GetRequestId());
|
||||
auto result(Result::Ok);
|
||||
try {
|
||||
fair::mq::Properties props;
|
||||
for (auto const& prop : _cmd.GetProps()) {
|
||||
props.insert({prop.first, fair::mq::Property(prop.second)});
|
||||
}
|
||||
// TODO Handle builtin keys with different value type than string
|
||||
SetProperties(props);
|
||||
} catch (exception const& e) {
|
||||
LOG(warn) << "Setting properties (request id: " << request_id << ") failed: " << e.what();
|
||||
result = Result::Failure;
|
||||
}
|
||||
Cmds const outCmds(make<PropertiesSet>(id, request_id, result));
|
||||
fDDS.Send(outCmds.Serialize(), to_string(senderId));
|
||||
} break;
|
||||
default:
|
||||
LOG(warn) << "Unexpected/unknown command received: " << cmd.GetType();
|
||||
LOG(warn) << "Origin: " << senderId;
|
||||
LOG(warn) << "Destination: " << cond;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
DDS::~DDS()
|
||||
{
|
||||
UnsubscribeFromDeviceStateChange();
|
||||
ReleaseDeviceControl();
|
||||
|
||||
if (fControllerThread.joinable()) {
|
||||
fControllerThread.join();
|
||||
}
|
||||
|
||||
fWorkGuard.reset();
|
||||
if (fWorkerThread.joinable()) {
|
||||
fWorkerThread.join();
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace fair::mq::plugins
|
@@ -1,199 +0,0 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIR_MQ_PLUGINS_DDS
|
||||
#define FAIR_MQ_PLUGINS_DDS
|
||||
|
||||
#include <fairmq/Plugin.h>
|
||||
#include <fairmq/StateQueue.h>
|
||||
#include <fairmq/Version.h>
|
||||
#include <fairmq/sdk/commands/Commands.h>
|
||||
|
||||
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
|
||||
#include <dds/dds.h>
|
||||
#undef BOOST_BIND_GLOBAL_PLACEHOLDERS
|
||||
|
||||
#include <asio/executor.hpp>
|
||||
#include <asio/executor_work_guard.hpp>
|
||||
#include <asio/io_context.hpp>
|
||||
|
||||
#include <cassert>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <map>
|
||||
#include <unordered_map>
|
||||
#include <utility> // pair
|
||||
#include <vector>
|
||||
|
||||
namespace fair::mq::plugins
|
||||
{
|
||||
|
||||
struct DDSConfig
|
||||
{
|
||||
// container of sub channel addresses
|
||||
unsigned int fNumSubChannels;
|
||||
// dds values for the channel
|
||||
std::map<uint64_t, std::string> fDDSValues;
|
||||
};
|
||||
|
||||
struct DDSSubscription
|
||||
{
|
||||
DDSSubscription()
|
||||
: fDDSCustomCmd(fService)
|
||||
, fDDSKeyValue(fService)
|
||||
{
|
||||
LOG(debug) << "$DDS_TASK_PATH: " << dds::env_prop<dds::task_path>();
|
||||
LOG(debug) << "$DDS_GROUP_NAME: " << dds::env_prop<dds::group_name>();
|
||||
LOG(debug) << "$DDS_COLLECTION_NAME: " << dds::env_prop<dds::collection_name>();
|
||||
LOG(debug) << "$DDS_TASK_NAME: " << dds::env_prop<dds::task_name>();
|
||||
LOG(debug) << "$DDS_TASK_INDEX: " << dds::env_prop<dds::task_index>();
|
||||
LOG(debug) << "$DDS_COLLECTION_INDEX: " << dds::env_prop<dds::collection_index>();
|
||||
LOG(debug) << "$DDS_TASK_ID: " << dds::env_prop<dds::task_id>();
|
||||
LOG(debug) << "$DDS_LOCATION: " << dds::env_prop<dds::dds_location>();
|
||||
std::string dds_session_id(dds::env_prop<dds::dds_session_id>());
|
||||
LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id;
|
||||
|
||||
// subscribe for DDS service errors.
|
||||
fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& errorMsg) {
|
||||
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg;
|
||||
});
|
||||
|
||||
// fDDSCustomCmd.subscribe([](const std::string& cmd, const std::string& cond, uint64_t senderId) {
|
||||
// LOG(debug) << "cmd: " << cmd << ", cond: " << cond << ", senderId: " << senderId;
|
||||
// });
|
||||
assert(!dds_session_id.empty());
|
||||
}
|
||||
|
||||
auto Start() -> void {
|
||||
fService.start(dds::env_prop<dds::dds_session_id>());
|
||||
}
|
||||
|
||||
~DDSSubscription() {
|
||||
fDDSKeyValue.unsubscribe();
|
||||
fDDSCustomCmd.unsubscribe();
|
||||
}
|
||||
|
||||
template<typename... Args>
|
||||
auto SubscribeCustomCmd(Args&&... args) -> void
|
||||
{
|
||||
fDDSCustomCmd.subscribe(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template<typename... Args>
|
||||
auto SubscribeKeyValue(Args&&... args) -> void
|
||||
{
|
||||
fDDSKeyValue.subscribe(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template<typename... Args>
|
||||
auto Send(Args&&... args) -> void
|
||||
{
|
||||
fDDSCustomCmd.send(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
template<typename... Args>
|
||||
auto PutValue(Args&&... args) -> void
|
||||
{
|
||||
fDDSKeyValue.putValue(std::forward<Args>(args)...);
|
||||
}
|
||||
|
||||
private:
|
||||
dds::intercom_api::CIntercomService fService;
|
||||
dds::intercom_api::CCustomCmd fDDSCustomCmd;
|
||||
dds::intercom_api::CKeyValue fDDSKeyValue;
|
||||
};
|
||||
|
||||
struct IofN
|
||||
{
|
||||
IofN(int i, int n)
|
||||
: fI(i)
|
||||
, fN(n)
|
||||
{}
|
||||
|
||||
unsigned int fI;
|
||||
unsigned int fN;
|
||||
std::vector<std::string> fEntries;
|
||||
};
|
||||
|
||||
class DDS : public Plugin
|
||||
{
|
||||
public:
|
||||
DDS(const std::string& name, const Plugin::Version version, const std::string& maintainer, const std::string& homepage, PluginServices* pluginServices);
|
||||
|
||||
~DDS();
|
||||
|
||||
private:
|
||||
auto WaitForExitingAck() -> void;
|
||||
auto StartWorkerThread() -> void;
|
||||
|
||||
auto FillChannelContainers() -> void;
|
||||
auto EmptyChannelContainers() -> void;
|
||||
|
||||
auto SubscribeForConnectingChannels() -> void;
|
||||
auto PublishBoundChannels() -> void;
|
||||
auto SubscribeForCustomCommands() -> void;
|
||||
auto HandleCmd(const std::string& id, sdk::cmd::Cmd& cmd, const std::string& cond, uint64_t senderId) -> void;
|
||||
|
||||
DDSSubscription fDDS;
|
||||
size_t fDDSTaskId;
|
||||
|
||||
std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
|
||||
std::unordered_map<std::string, DDSConfig> fConnectingChans;
|
||||
|
||||
std::unordered_map<std::string, int> fI;
|
||||
std::unordered_map<std::string, IofN> fIofN;
|
||||
|
||||
std::thread fControllerThread;
|
||||
DeviceState fCurrentState, fLastState;
|
||||
|
||||
std::atomic<bool> fDeviceTerminationRequested;
|
||||
|
||||
std::unordered_map<uint64_t, std::pair<std::chrono::steady_clock::time_point, int64_t>> fStateChangeSubscribers;
|
||||
uint64_t fLastExternalController;
|
||||
bool fExitingAckedByLastExternalController;
|
||||
std::condition_variable fExitingAcked;
|
||||
std::mutex fStateChangeSubscriberMutex;
|
||||
|
||||
bool fUpdatesAllowed;
|
||||
std::mutex fUpdateMutex;
|
||||
std::condition_variable fUpdateCondition;
|
||||
|
||||
std::thread fWorkerThread;
|
||||
asio::io_context fWorkerQueue;
|
||||
asio::executor_work_guard<asio::executor> fWorkGuard;
|
||||
};
|
||||
|
||||
Plugin::ProgOptions DDSProgramOptions()
|
||||
{
|
||||
boost::program_options::options_description options{"DDS Plugin"};
|
||||
options.add_options()
|
||||
("dds-i", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (single channel n to m). When all values come via same update.")
|
||||
("dds-i-n", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates.")
|
||||
("wait-for-exiting-ack-timeout", boost::program_options::value<unsigned int>()->default_value(1000), "Wait timeout for EXITING state-change acknowledgement by external controller in milliseconds.");
|
||||
|
||||
return options;
|
||||
}
|
||||
|
||||
REGISTER_FAIRMQ_PLUGIN(
|
||||
DDS, // Class name
|
||||
dds, // Plugin name (string, lower case chars only)
|
||||
(Plugin::Version{FAIRMQ_VERSION_MAJOR,
|
||||
FAIRMQ_VERSION_MINOR,
|
||||
FAIRMQ_VERSION_PATCH}), // Version
|
||||
"FairRootGroup <fairroot@gsi.de>", // Maintainer
|
||||
"https://github.com/FairRootGroup/FairMQ", // Homepage
|
||||
DDSProgramOptions // custom program options for the plugin
|
||||
)
|
||||
|
||||
} // namespace fair::mq::plugins
|
||||
|
||||
#endif /* FAIR_MQ_PLUGINS_DDS */
|
@@ -1,5 +1,5 @@
|
||||
################################################################################
|
||||
# Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence (LGPL) version 3, #
|
||||
@@ -14,7 +14,7 @@ add_library(${plugin} SHARED
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/PMIx.hpp
|
||||
)
|
||||
target_compile_features(${plugin} PUBLIC cxx_std_17)
|
||||
target_link_libraries(${plugin} PUBLIC FairMQ PMIx::libpmix PRIVATE Commands)
|
||||
target_link_libraries(${plugin} PUBLIC FairMQ PMIx::libpmix)
|
||||
target_include_directories(${plugin} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
|
||||
set_target_properties(${plugin} PROPERTIES
|
||||
CXX_VISIBILITY_PRESET hidden
|
||||
@@ -24,7 +24,7 @@ set_target_properties(${plugin} PROPERTIES
|
||||
|
||||
set(exe fairmq-pmix-command-ui)
|
||||
add_executable(${exe} ${CMAKE_CURRENT_SOURCE_DIR}/runPMIxCommandUI.cxx)
|
||||
target_link_libraries(${exe} FairMQ Commands StateMachine PMIx::libpmix)
|
||||
target_link_libraries(${exe} FairMQ StateMachine PMIx::libpmix)
|
||||
target_include_directories(${exe} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
|
||||
|
||||
install(TARGETS ${plugin} ${exe}
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -8,7 +8,6 @@
|
||||
|
||||
#include "PMIxPlugin.h"
|
||||
|
||||
#include <fairmq/sdk/commands/Commands.h>
|
||||
#include <fairmq/tools/Strings.h>
|
||||
|
||||
#include <sstream>
|
||||
@@ -16,7 +15,6 @@
|
||||
#include <cstdint> // UINT32_MAX
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::sdk::cmd;
|
||||
|
||||
namespace fair::mq::plugins
|
||||
{
|
||||
@@ -31,8 +29,7 @@ PMIxPlugin::PMIxPlugin(const string& name,
|
||||
, fPid(getpid())
|
||||
, fPMIxClient(tools::ToString("PMIx client(pid=", fPid, ") "))
|
||||
, fDeviceId(string(fProcess.nspace) + "_" + to_string(fProcess.rank))
|
||||
, fCommands(fProcess)
|
||||
, fLastExternalController(UINT32_MAX)
|
||||
// , fLastExternalController(UINT32_MAX)
|
||||
, fExitingAckedByLastExternalController(false)
|
||||
, fCurrentState(DeviceState::Idle)
|
||||
, fLastState(DeviceState::Idle)
|
||||
@@ -42,12 +39,6 @@ PMIxPlugin::PMIxPlugin(const string& name,
|
||||
SetProperty<string>("id", fDeviceId);
|
||||
|
||||
Fence("pmix::init");
|
||||
SubscribeForCommands();
|
||||
Fence("subscribed");
|
||||
|
||||
// fCommands.Send("test1");
|
||||
// fCommands.Send("test2", 0);
|
||||
// fCommands.Send("test3", 0);
|
||||
|
||||
// LOG(info) << "PMIX_EXTERNAL_ERR_BASE: " << PMIX_EXTERNAL_ERR_BASE;
|
||||
|
||||
@@ -101,11 +92,9 @@ PMIxPlugin::PMIxPlugin(const string& name,
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fLastState = fCurrentState;
|
||||
fCurrentState = newState;
|
||||
for (auto subscriberId : fStateChangeSubscribers) {
|
||||
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
|
||||
Cmds cmds(make<StateChange>(fDeviceId, 0, fLastState, fCurrentState));
|
||||
fCommands.Send(cmds.Serialize(Format::JSON), static_cast<pmix::rank>(subscriberId));
|
||||
}
|
||||
// for (auto subscriberId : fStateChangeSubscribers) {
|
||||
// LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId;
|
||||
// }
|
||||
});
|
||||
}
|
||||
|
||||
@@ -113,7 +102,6 @@ PMIxPlugin::~PMIxPlugin()
|
||||
{
|
||||
LOG(debug) << "Destroying PMIxPlugin";
|
||||
ReleaseDeviceControl();
|
||||
fCommands.Unsubscribe();
|
||||
while (pmix::initialized()) {
|
||||
try {
|
||||
pmix::finalize();
|
||||
@@ -124,92 +112,6 @@ PMIxPlugin::~PMIxPlugin()
|
||||
}
|
||||
}
|
||||
|
||||
auto PMIxPlugin::SubscribeForCommands() -> void
|
||||
{
|
||||
fCommands.Subscribe([this](const string& cmdStr, const pmix::proc& sender) {
|
||||
// LOG(info) << "PMIx Plugin received message: '" << cmdStr << "', from " << sender;
|
||||
|
||||
Cmds inCmds;
|
||||
inCmds.Deserialize(cmdStr, Format::JSON);
|
||||
|
||||
for (const auto& cmd : inCmds) {
|
||||
LOG(info) << "Received command type: '" << cmd->GetType() << "' from " << sender;
|
||||
switch (cmd->GetType()) {
|
||||
case Type::check_state:
|
||||
fCommands.Send(Cmds(make<CurrentState>(fDeviceId, GetCurrentDeviceState()))
|
||||
.Serialize(Format::JSON),
|
||||
{sender});
|
||||
break;
|
||||
case Type::change_state: {
|
||||
Transition transition = static_cast<ChangeState&>(*cmd).GetTransition();
|
||||
if (ChangeDeviceState(transition)) {
|
||||
fCommands.Send(
|
||||
Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Ok, transition, GetCurrentDeviceState()))
|
||||
.Serialize(Format::JSON),
|
||||
{sender});
|
||||
} else {
|
||||
fCommands.Send(
|
||||
Cmds(make<TransitionStatus>(fDeviceId, 0, Result::Failure, transition, GetCurrentDeviceState()))
|
||||
.Serialize(Format::JSON),
|
||||
{sender});
|
||||
}
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fLastExternalController = sender.rank;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::subscribe_to_state_change: {
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fStateChangeSubscribers.insert(sender.rank);
|
||||
}
|
||||
|
||||
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState
|
||||
<< " to " << sender;
|
||||
Cmds outCmds(make<StateChangeSubscription>(fDeviceId, fProcess.rank, Result::Ok),
|
||||
make<StateChange>(fDeviceId, 0, fLastState, fCurrentState));
|
||||
fCommands.Send(outCmds.Serialize(Format::JSON), {sender});
|
||||
}
|
||||
break;
|
||||
case Type::unsubscribe_from_state_change: {
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
fStateChangeSubscribers.erase(sender.rank);
|
||||
}
|
||||
fCommands.Send(Cmds(make<StateChangeUnsubscription>(fDeviceId, fProcess.rank, Result::Ok))
|
||||
.Serialize(Format::JSON),
|
||||
{sender});
|
||||
}
|
||||
break;
|
||||
case Type::state_change_exiting_received: {
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
if (fLastExternalController == sender.rank) {
|
||||
fExitingAckedByLastExternalController = true;
|
||||
}
|
||||
}
|
||||
fExitingAcked.notify_one();
|
||||
}
|
||||
break;
|
||||
case Type::dump_config: {
|
||||
stringstream ss;
|
||||
for (const auto& k: GetPropertyKeys()) {
|
||||
ss << fDeviceId << ": " << k << " -> " << GetPropertyAsString(k) << "\n";
|
||||
}
|
||||
fCommands.Send(Cmds(make<Config>(fDeviceId, ss.str())).Serialize(Format::JSON),
|
||||
{sender});
|
||||
}
|
||||
break;
|
||||
default:
|
||||
LOG(warn) << "Unexpected/unknown command received: " << cmdStr;
|
||||
LOG(warn) << "Origin: " << sender;
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
auto PMIxPlugin::Init() -> pmix::proc
|
||||
{
|
||||
if (!pmix::initialized()) {
|
||||
|
@@ -1,5 +1,5 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2019-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
@@ -44,10 +44,9 @@ class PMIxPlugin : public Plugin
|
||||
pid_t fPid;
|
||||
std::string fPMIxClient;
|
||||
std::string fDeviceId;
|
||||
pmix::Commands fCommands;
|
||||
|
||||
std::set<uint32_t> fStateChangeSubscribers;
|
||||
uint32_t fLastExternalController;
|
||||
// uint32_t fLastExternalController;
|
||||
bool fExitingAckedByLastExternalController;
|
||||
std::condition_variable fExitingAcked;
|
||||
std::mutex fStateChangeSubscriberMutex;
|
||||
@@ -61,7 +60,6 @@ class PMIxPlugin : public Plugin
|
||||
auto Fence(const std::string& label) -> void;
|
||||
auto Lookup() -> void;
|
||||
|
||||
auto SubscribeForCommands() -> void;
|
||||
auto WaitForExitingAck() -> void;
|
||||
};
|
||||
|
||||
|
@@ -1,12 +1,11 @@
|
||||
/********************************************************************************
|
||||
* Copyright (C) 2014-2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2022 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include <fairmq/sdk/commands/Commands.h>
|
||||
#include <fairmq/States.h>
|
||||
|
||||
#include <fairlogger/Logger.h>
|
||||
@@ -29,7 +28,6 @@
|
||||
#include <vector>
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::sdk::cmd;
|
||||
namespace bpo = boost::program_options;
|
||||
|
||||
const std::map<fair::mq::Transition, fair::mq::State> expected =
|
||||
@@ -46,23 +44,6 @@ const std::map<fair::mq::Transition, fair::mq::State> expected =
|
||||
{ fair::mq::Transition::End, fair::mq::State::Exiting }
|
||||
};
|
||||
|
||||
struct StateSubscription
|
||||
{
|
||||
pmix::Commands& fCommands;
|
||||
|
||||
explicit StateSubscription(pmix::Commands& commands)
|
||||
: fCommands(commands)
|
||||
{
|
||||
fCommands.Send(Cmds(make<SubscribeToStateChange>(600000)).Serialize(Format::JSON));
|
||||
}
|
||||
|
||||
~StateSubscription()
|
||||
{
|
||||
fCommands.Send(Cmds(make<UnsubscribeFromStateChange>()).Serialize(Format::JSON));
|
||||
this_thread::sleep_for(chrono::milliseconds(100)); // give PMIx a chance to complete request
|
||||
}
|
||||
};
|
||||
|
||||
struct MiniTopo
|
||||
{
|
||||
explicit MiniTopo(unsigned int n)
|
||||
@@ -141,74 +122,6 @@ int main(int argc, char* argv[])
|
||||
LOG(warn) << "pmix::fence() [pmix::init] OK";
|
||||
|
||||
MiniTopo topo(numDevices);
|
||||
pmix::Commands commands(process);
|
||||
|
||||
commands.Subscribe([&](const string& msg, const pmix::proc& sender) {
|
||||
// LOG(info) << "Received '" << msg << "' from " << sender;
|
||||
Cmds cmds;
|
||||
cmds.Deserialize(msg, Format::JSON);
|
||||
// cout << "Received " << cmds.Size() << " command(s) with total size of " << msg.length() << " bytes: " << endl;
|
||||
for (const auto& cmd : cmds) {
|
||||
// cout << " > " << cmd->GetType() << endl;
|
||||
switch (cmd->GetType()) {
|
||||
case Type::state_change: {
|
||||
cout << "Received state_change from " << static_cast<StateChange&>(*cmd).GetDeviceId() << ": " << static_cast<StateChange&>(*cmd).GetLastState() << "->" << static_cast<StateChange&>(*cmd).GetCurrentState() << endl;
|
||||
topo.Update(sender.rank, static_cast<StateChange&>(*cmd).GetCurrentState());
|
||||
if (static_cast<StateChange&>(*cmd).GetCurrentState() == fair::mq::State::Exiting) {
|
||||
commands.Send(Cmds(make<StateChangeExitingReceived>()).Serialize(Format::JSON), {sender});
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::state_change_subscription:
|
||||
if (static_cast<StateChangeSubscription&>(*cmd).GetResult() != Result::Ok) {
|
||||
cout << "State change subscription failed for " << static_cast<StateChangeSubscription&>(*cmd).GetDeviceId() << endl;
|
||||
}
|
||||
break;
|
||||
case Type::state_change_unsubscription:
|
||||
if (static_cast<StateChangeUnsubscription&>(*cmd).GetResult() != Result::Ok) {
|
||||
cout << "State change unsubscription failed for " << static_cast<StateChangeUnsubscription&>(*cmd).GetDeviceId() << endl;
|
||||
}
|
||||
break;
|
||||
case Type::transition_status: {
|
||||
if (static_cast<TransitionStatus&>(*cmd).GetResult() == Result::Ok) {
|
||||
cout << "Device " << static_cast<TransitionStatus&>(*cmd).GetDeviceId() << " started to transition with " << static_cast<TransitionStatus&>(*cmd).GetTransition() << endl;
|
||||
} else {
|
||||
cout << "Device " << static_cast<TransitionStatus&>(*cmd).GetDeviceId() << " cannot transition with " << static_cast<TransitionStatus&>(*cmd).GetTransition() << endl;
|
||||
}
|
||||
}
|
||||
break;
|
||||
case Type::current_state:
|
||||
cout << "Device " << static_cast<CurrentState&>(*cmd).GetDeviceId() << " is in " << static_cast<CurrentState&>(*cmd).GetCurrentState() << " state" << endl;
|
||||
break;
|
||||
case Type::config:
|
||||
cout << "Received config for device " << static_cast<Config&>(*cmd).GetDeviceId() << ":\n" << static_cast<Config&>(*cmd).GetConfig() << endl;
|
||||
break;
|
||||
default:
|
||||
cout << "Unexpected/unknown command received: " << cmd->GetType() << endl;
|
||||
cout << "Origin: " << sender << endl;
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
pmix::fence({all});
|
||||
LOG(warn) << "pmix::fence() [subscribed] OK";
|
||||
|
||||
StateSubscription stateSubscription(commands);
|
||||
|
||||
for (auto transition : { fair::mq::Transition::InitDevice,
|
||||
fair::mq::Transition::CompleteInit,
|
||||
fair::mq::Transition::Bind,
|
||||
fair::mq::Transition::Connect,
|
||||
fair::mq::Transition::InitTask,
|
||||
fair::mq::Transition::Run,
|
||||
fair::mq::Transition::Stop,
|
||||
fair::mq::Transition::ResetTask,
|
||||
fair::mq::Transition::ResetDevice,
|
||||
fair::mq::Transition::End }) {
|
||||
commands.Send(Cmds(make<ChangeState>(transition)).Serialize(Format::JSON));
|
||||
topo.WaitFor(expected.at(transition));
|
||||
}
|
||||
} catch (exception& e) {
|
||||
LOG(error) << "Error: " << e.what();
|
||||
return EXIT_FAILURE;
|
||||
|
Reference in New Issue
Block a user