From 14980d748648153f984e0149e80fc0d339a0fa5c Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 30 Jan 2019 12:46:15 +0100 Subject: [PATCH] Implement old_state->new_state notifications --- fairmq/plugins/Control.h | 2 +- fairmq/plugins/DDS/DDS.cxx | 61 ++++++---- fairmq/plugins/DDS/DDS.h | 9 +- fairmq/plugins/DDS/runDDSCommandUI.cxx | 149 ++++++++++++++----------- 4 files changed, 127 insertions(+), 94 deletions(-) diff --git a/fairmq/plugins/Control.h b/fairmq/plugins/Control.h index 88f56032..08ec56cd 100644 --- a/fairmq/plugins/Control.h +++ b/fairmq/plugins/Control.h @@ -67,7 +67,7 @@ REGISTER_FAIRMQ_PLUGIN( control, // Plugin name (string, lower case chars only) (Plugin::Version{FAIRMQ_VERSION_MAJOR, FAIRMQ_VERSION_MINOR, FAIRMQ_VERSION_PATCH}), // Version "FairRootGroup ", // Maintainer - "https://github.com/FairRootGroup/FairRoot", // Homepage + "https://github.com/FairRootGroup/FairMQ", // Homepage ControlPluginProgramOptions // Free function which declares custom program options for the // plugin signature: () -> // boost::optional diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 9b3fd5d9..380dc3ae 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -45,6 +45,8 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta , fEvents() , fEventsMutex() , fNewEvent() + , fCurrentState(DeviceState::Idle) + , fLastState(DeviceState::Idle) , fDeviceTerminationRequested(false) , fHeartbeatInterval{100} { @@ -62,6 +64,15 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta auto DDS::HandleControl() -> void { try { + LOG(debug) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH"); + LOG(debug) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME"); + LOG(debug) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME"); + LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME"); + LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX"); + LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX"); + string dds_session_id(getenv("DDS_SESSION_ID")); + LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id; + // subscribe for state changes from DDS (subscriptions start firing after fService.start() is called) SubscribeForCustomCommands(); @@ -70,10 +81,12 @@ auto DDS::HandleControl() -> void LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg << endl; }); - LOG(debug) << "Subscribing for DDS properties."; SubscribeForConnectingChannels(); - // subscribe to device state changes, pushing new state chenges into the event queue + // start DDS service - subscriptions will only start firing after this step + fService.start(dds_session_id); + + // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { { lock_guard lock{fEventsMutex}; @@ -87,9 +100,14 @@ auto DDS::HandleControl() -> void { lock_guard lock{fStateChangeSubscriberMutex}; string id = GetProperty("id"); + fLastState = fCurrentState; + fCurrentState = newState; for (auto subscriberId : fStateChangeSubscribers) { - LOG(debug) << "Publishing state-change: " << newState << " to " << subscriberId; - fDDSCustomCmd.send("state-change: " + id + "," + ToStr(newState), to_string(subscriberId)); + LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState + << " to " << subscriberId; + fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + + ToStr(newState), + to_string(subscriberId)); } } }); @@ -105,16 +123,6 @@ auto DDS::HandleControl() -> void // and propagate addresses of bound channels to DDS. FillChannelContainers(); - LOG(debug) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH"); - LOG(debug) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME"); - LOG(debug) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME"); - LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME"); - LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX"); - LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX"); - - // start DDS service - subscriptions will only start firing after this step - fService.start(); - // publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] PublishBoundChannels(); @@ -219,6 +227,8 @@ auto DDS::FillChannelContainers() -> void auto DDS::SubscribeForConnectingChannels() -> void { + LOG(debug) << "Subscribing for DDS properties."; + fDDSKeyValue.subscribe([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { try { LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; @@ -285,14 +295,13 @@ auto DDS::PublishBoundChannels() -> void auto DDS::HeartbeatSender() -> void { string id = GetProperty("id"); - string pid(to_string(getpid())); while (!fDeviceTerminationRequested) { { lock_guard lock{fHeartbeatSubscriberMutex}; for (const auto subscriberId : fHeartbeatSubscribers) { - fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId)); + fDDSCustomCmd.send("heartbeat: " + id , to_string(subscriberId)); } } @@ -302,14 +311,15 @@ auto DDS::HeartbeatSender() -> void auto DDS::SubscribeForCustomCommands() -> void { - string id = GetProperty("id"); - string pid(to_string(getpid())); + LOG(debug) << "Subscribing for DDS custom commands."; - fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId) { + string id = GetProperty("id"); + + fDDSCustomCmd.subscribe([id, this](const string& cmd, const string& cond, uint64_t senderId) { LOG(info) << "Received command: " << cmd; if (cmd == "check-state") { - fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()) + " (pid: " + pid + ")", to_string(senderId)); + fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId)); } else if (cmd == "INIT DEVICE") { if (ChangeDeviceState(ToDeviceStateTransition(cmd))) { fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId)); @@ -360,9 +370,14 @@ auto DDS::SubscribeForCustomCommands() -> void fStateChangeSubscribers.insert(senderId); } fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId)); - auto state = GetCurrentDeviceState(); - LOG(debug) << "Publishing state-change: " << state << " to " << senderId; - fDDSCustomCmd.send("state-change: " + id + "," + ToStr(state), to_string(senderId)); + { + lock_guard lock{fStateChangeSubscriberMutex}; + LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState + << " to " << senderId; + fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->" + + ToStr(fCurrentState), + to_string(senderId)); + } } else if (cmd == "unsubscribe-from-state-changes") { { lock_guard lock{fStateChangeSubscriberMutex}; diff --git a/fairmq/plugins/DDS/DDS.h b/fairmq/plugins/DDS/DDS.h index 0da13d12..9ae2bb50 100644 --- a/fairmq/plugins/DDS/DDS.h +++ b/fairmq/plugins/DDS/DDS.h @@ -10,6 +10,7 @@ #define FAIR_MQ_PLUGINS_DDS #include +#include #include @@ -55,7 +56,6 @@ struct IofN unsigned int fI; unsigned int fN; std::vector fEntries; - }; class DDS : public Plugin @@ -95,6 +95,7 @@ class DDS : public Plugin std::queue fEvents; std::mutex fEventsMutex; std::condition_variable fNewEvent; + DeviceState fCurrentState, fLastState; std::atomic fDeviceTerminationRequested; @@ -120,9 +121,11 @@ Plugin::ProgOptions DDSProgramOptions() REGISTER_FAIRMQ_PLUGIN( DDS, // Class name dds, // Plugin name (string, lower case chars only) - (Plugin::Version{1,0,0}), // Version + (Plugin::Version{FAIRMQ_VERSION_MAJOR, + FAIRMQ_VERSION_MINOR, + FAIRMQ_VERSION_PATCH}), // Version "FairRootGroup ", // Maintainer - "https://github.com/FairRootGroup/FairRoot", // Homepage + "https://github.com/FairRootGroup/FairMQ", // Homepage DDSProgramOptions // custom program options for the plugin ) diff --git a/fairmq/plugins/DDS/runDDSCommandUI.cxx b/fairmq/plugins/DDS/runDDSCommandUI.cxx index 2d141300..8b393e07 100644 --- a/fairmq/plugins/DDS/runDDSCommandUI.cxx +++ b/fairmq/plugins/DDS/runDDSCommandUI.cxx @@ -9,7 +9,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -73,71 +75,66 @@ void printControlsHelp() cout << "To quit press Ctrl+C" << endl; } -void commandMode(char command, const string& topologyPath, CCustomCmd& ddsCustomCmd) { +void commandMode(const string& command_in, const string& topologyPath, CCustomCmd& ddsCustomCmd) { char c; + string command(command_in); TerminalConfig tconfig; - if (command != ' ') { - cin.putback(command); - } else { + if (command == "") { printControlsHelp(); + cin >> c; + command = c; } - while (cin >> c) { - switch (c) { - case 'c': - cout << " > checking state of the devices" << endl; - ddsCustomCmd.send("check-state", topologyPath); - break; - case 'o': - cout << " > dumping config of the devices" << endl; - ddsCustomCmd.send("dump-config", topologyPath); - break; - case 'i': - cout << " > init devices" << endl; - ddsCustomCmd.send("INIT DEVICE", topologyPath); - break; - case 'j': - cout << " > init tasks" << endl; - ddsCustomCmd.send("INIT TASK", topologyPath); - break; - case 'p': - cout << " > pause devices" << endl; - ddsCustomCmd.send("PAUSE", topologyPath); - break; - case 'r': - cout << " > run tasks" << endl; - ddsCustomCmd.send("RUN", topologyPath); - break; - case 's': - cout << " > stop devices" << endl; - ddsCustomCmd.send("STOP", topologyPath); - break; - case 't': - cout << " > reset tasks" << endl; - ddsCustomCmd.send("RESET TASK", topologyPath); - break; - case 'd': - cout << " > reset devices" << endl; - ddsCustomCmd.send("RESET DEVICE", topologyPath); - break; - case 'h': - cout << " > help" << endl; - printControlsHelp(); - break; - case 'q': - cout << " > end" << endl; - ddsCustomCmd.send("END", topologyPath); - break; - default: - cout << "Invalid input: [" << c << "]" << endl; - printControlsHelp(); - break; + while (true) { + if (command == "c") { + cout << " > checking state of the devices" << endl; + ddsCustomCmd.send("check-state", topologyPath); + } else if (command == "o") { + cout << " > dumping config of the devices" << endl; + ddsCustomCmd.send("dump-config", topologyPath); + } else if (command == "i") { + cout << " > init devices" << endl; + ddsCustomCmd.send("INIT DEVICE", topologyPath); + } else if (command == "j") { + cout << " > init tasks" << endl; + ddsCustomCmd.send("INIT TASK", topologyPath); + } else if (command == "p") { + cout << " > pause devices" << endl; + ddsCustomCmd.send("PAUSE", topologyPath); + } else if (command == "r") { + cout << " > run tasks" << endl; + ddsCustomCmd.send("RUN", topologyPath); + } else if (command == "s") { + cout << " > stop devices" << endl; + ddsCustomCmd.send("STOP", topologyPath); + } else if (command == "t") { + cout << " > reset tasks" << endl; + ddsCustomCmd.send("RESET TASK", topologyPath); + } else if (command == "d") { + cout << " > reset devices" << endl; + ddsCustomCmd.send("RESET DEVICE", topologyPath); + } else if (command == "h") { + cout << " > help" << endl; + printControlsHelp(); + } else if (command == "q") { + cout << " > end" << endl; + ddsCustomCmd.send("END", topologyPath); + } else if (command == "q!") { + ddsCustomCmd.send("SHUTDOWN", topologyPath); + } else if (command == "r!") { + ddsCustomCmd.send("STARTUP", topologyPath); + } else { + cout << "Invalid input: [" << c << "]" << endl; + printControlsHelp(); } - if (command != ' ') { + if (command_in != "") { this_thread::sleep_for(chrono::milliseconds(100)); // give dds a chance to complete request break; + } else { + cin >> c; + command = c; } } } @@ -153,10 +150,13 @@ void waitMode(const string& waitForState, StateSubscription stateSubscription(topologyPath, ddsCustomCmd); auto condition = [&] { - return !waitForStateMap.empty() // TODO once DDS provides an API to retrieve actual number of tasks, use it here + return !waitForStateMap.empty() // TODO once DDS provides an API to retrieve actual number + // of tasks, use it here && all_of(waitForStateMap.cbegin(), waitForStateMap.cend(), - [&](WaitForStateMap::value_type i) { return i.second == waitForState; }); + [&](WaitForStateMap::value_type i) { + return boost::algorithm::ends_with(i.second, waitForState); + }); }; unique_lock lock(waitForStateMutex); @@ -174,7 +174,7 @@ int main(int argc, char* argv[]) { try { string sessionID; - char command = ' '; + string command; string topologyPath; string waitForState; unsigned int timeout; @@ -183,18 +183,27 @@ int main(int argc, char* argv[]) WaitForStateMap waitForStateMap; bpo::options_description options("Common options"); + + auto env_session_id = std::getenv("DDS_SESSION_ID"); + if (env_session_id) { + options.add_options()("session,s", + bpo::value(&sessionID)->default_value(env_session_id), + "DDS Session ID (overrides any value in env var $DDS_SESSION_ID)"); + } else { + options.add_options()("session,s", + bpo::value(&sessionID)->required(), + "DDS Session ID (overrides any value in env var $DDS_SESSION_ID)"); + } + options.add_options() - ("session,s", bpo::value (&sessionID)->required(), - "DDS Session ID") - ("command,c", bpo::value (&command)->default_value(' '), + ("command,c", bpo::value (&command)->default_value(""), "Command character") ("path,p", bpo::value (&topologyPath)->default_value(""), - "DDS Topology path to send command to") + "DDS Topology path to send command to (empty - send to all tasks)") ("wait-for-state,w", bpo::value (&waitForState)->default_value(""), "Wait until targeted FairMQ devices reach the given state") ("timeout,t", bpo::value (&timeout)->default_value(0), "Timeout in milliseconds when waiting for a device state (0 - wait infinitely)") - ("help,h", "Produce help message"); bpo::variables_map vm; @@ -212,23 +221,29 @@ int main(int argc, char* argv[]) CCustomCmd ddsCustomCmd(service); service.subscribeOnError([](const EErrorCode errorCode, const string& errorMsg) { - cout << "DDS error received: error code: " << errorCode << ", error message: " << errorMsg << endl; + cerr << "DDS error received: error code: " << errorCode << ", error message: " << errorMsg << endl; }); // subscribe to receive messages from DDS ddsCustomCmd.subscribe([&](const string& msg, const string& /*condition*/, uint64_t senderId) { + cout << "Received: " << endl << msg << endl; vector parts; boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); if (parts[0] == "state-change") { { unique_lock lock(waitForStateMutex); + boost::trim(parts[2]); waitForStateMap[senderId] = parts[2]; } waitForStateCV.notify_one(); } else if (parts[0] == "state-changes-subscription") { - // ok, stay silent + if (parts[2] != "OK") { + cerr << "state-changes-subscription failed with return code: " << parts[2]; + } } else if (parts[0] == "state-changes-unsubscription") { - // ok, stay silent + if (parts[2] != "OK") { + cerr << "state-changes-unsubscription failed with return code: " << parts[2]; + } } else { cout << "Received: " << endl << msg << endl; } @@ -298,7 +313,7 @@ int main(int argc, char* argv[]) break; } - if (command != ' ') { + if (command != "") { commandMode(command, topologyPath, ddsCustomCmd); } waitMode(waitForState,