/******************************************************************************** * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #include "DDS.h" #include #include #include #include #include #include #include // for the interactive mode #include // for the interactive mode #include #include #include #include using namespace std; using fair::mq::tools::ToString; namespace fair { namespace mq { namespace plugins { DDS::DDS(const string& name, const Plugin::Version version, const string& maintainer, const string& homepage, PluginServices* pluginServices) : Plugin(name, version, maintainer, homepage, pluginServices) , fTransitions({"INIT DEVICE", "COMPLETE INIT", "BIND", "CONNECT", "INIT TASK", "RUN", "STOP", "RESET TASK", "RESET DEVICE", "END"}) , fCurrentState(DeviceState::Idle) , fLastState(DeviceState::Idle) , fDeviceTerminationRequested(false) , fLastExternalController(0) , fExitingAckedByLastExternalController(false) , fHeartbeatInterval(100) , fUpdatesAllowed(false) , fWorkGuard(fWorkerQueue.get_executor()) { try { TakeDeviceControl(); fHeartbeatThread = thread(&DDS::HeartbeatSender, this); string deviceId(GetProperty("id")); if (deviceId.empty()) { SetProperty("id", dds::env_prop()); } string sessionId(GetProperty("session")); if (sessionId == "default") { SetProperty("session", dds::env_prop()); } auto control = GetProperty("control"); bool staticMode(false); if (control == "static") { LOG(debug) << "Running DDS controller: static"; staticMode = true; } else if (control == "dynamic" || control == "external" || control == "interactive") { LOG(debug) << "Running DDS controller: external"; } else { LOG(error) << "Unrecognized control mode '" << control << "' requested. " << "Ignoring and falling back to static control mode."; staticMode = true; } SubscribeForCustomCommands(); SubscribeForConnectingChannels(); // subscribe to device state changes, pushing new state changes into the event queue SubscribeToDeviceStateChange([&](DeviceState newState) { fStateQueue.Push(newState); switch (newState) { case DeviceState::Bound: // Receive addresses of connecting channels from DDS // and propagate addresses of bound channels to DDS. FillChannelContainers(); // publish bound addresses via DDS at keys corresponding to the channel // prefixes, e.g. 'data' in data[i] PublishBoundChannels(); break; case DeviceState::ResettingDevice: { { lock_guard lk(fUpdateMutex); fUpdatesAllowed = false; } EmptyChannelContainers(); break; } case DeviceState::Exiting: fWorkGuard.reset(); fDeviceTerminationRequested = true; UnsubscribeFromDeviceStateChange(); ReleaseDeviceControl(); break; default: break; } lock_guard lock{fStateChangeSubscriberMutex}; string id = GetProperty("id"); fLastState = fCurrentState; fCurrentState = newState; using namespace sdk::cmd; for (auto subscriberId : fStateChangeSubscribers) { LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState << " to " << subscriberId; Cmds cmds(make(id, dds::env_prop(), fLastState, fCurrentState)); fDDS.Send(cmds.Serialize(), to_string(subscriberId)); } }); if (staticMode) { fControllerThread = thread(&DDS::StaticControl, this); } else { StartWorkerThread(); } fDDS.Start(); } catch (PluginServices::DeviceControlError& e) { LOG(debug) << e.what(); } catch (exception& e) { LOG(error) << "Error in plugin initialization: " << e.what(); } } void DDS::EmptyChannelContainers() { fBindingChans.clear(); fConnectingChans.clear(); } auto DDS::StartWorkerThread() -> void { fWorkerThread = thread([this]() { fWorkerQueue.run(); }); } auto DDS::WaitForExitingAck() -> void { unique_lock lock(fStateChangeSubscriberMutex); fExitingAcked.wait_for( lock, chrono::milliseconds(GetProperty("wait-for-exiting-ack-timeout")), [this]() { return fExitingAckedByLastExternalController; }); } auto DDS::StaticControl() -> void { try { TransitionDeviceStateTo(DeviceState::Running); // wait until stop signal unique_lock lock(fStopMutex); while (!fDeviceTerminationRequested) { fStopCondition.wait_for(lock, chrono::seconds(1)); } LOG(debug) << "Stopping DDS plugin static controller"; } catch (DeviceErrorState&) { ReleaseDeviceControl(); } catch (exception& e) { ReleaseDeviceControl(); LOG(error) << "Error: " << e.what() << endl; return; } } auto DDS::FillChannelContainers() -> void { try { unordered_map channelInfo(GetChannelInfo()); // fill binding and connecting chans for (const auto& c : channelInfo) { string methodKey{"chans." + c.first + "." + to_string(c.second - 1) + ".method"}; if (GetProperty(methodKey) == "bind") { fBindingChans.insert(make_pair(c.first, vector())); for (int i = 0; i < c.second; ++i) { fBindingChans.at(c.first).push_back(GetProperty(string{"chans." + c.first + "." + to_string(i) + ".address"})); } } else if (GetProperty(methodKey) == "connect") { fConnectingChans.insert(make_pair(c.first, DDSConfig())); LOG(debug) << "preparing to connect: " << c.first << " with " << c.second << " sub-channels."; for (int i = 0; i < c.second; ++i) { fConnectingChans.at(c.first).fSubChannelAddresses.push_back(string()); } } else { LOG(error) << "Cannot update address configuration. Channel method (bind/connect) not specified."; return; } } // save properties that will have multiple values arriving (with only some of them to be used) vector iValues; if (PropertyExists("dds-i")) { iValues = GetProperty>("dds-i"); } vector inValues; if (PropertyExists("dds-i-n")) { inValues = GetProperty>("dds-i-n"); } for (const auto& vi : iValues) { size_t pos = vi.find(":"); string chanName = vi.substr(0, pos); // check if provided name is a valid channel name if (fConnectingChans.find(chanName) == fConnectingChans.end()) { throw invalid_argument(ToString("channel provided to dds-i is not an actual connecting channel of this device: ", chanName)); } int i = stoi(vi.substr(pos + 1)); LOG(debug) << "dds-i: adding " << chanName << " -> i of " << i; fI.insert(make_pair(chanName, i)); } for (const auto& vi : inValues) { size_t pos = vi.find(":"); string chanName = vi.substr(0, pos); // check if provided name is a valid channel name if (fConnectingChans.find(chanName) == fConnectingChans.end()) { throw invalid_argument(ToString("channel provided to dds-i-n is not an actual connecting channel of this device: ", chanName)); } string i_n = vi.substr(pos + 1); pos = i_n.find("-"); int i = stoi(i_n.substr(0, pos)); int n = stoi(i_n.substr(pos + 1)); LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n; fIofN.insert(make_pair(chanName, IofN(i, n))); } { lock_guard lk(fUpdateMutex); fUpdatesAllowed = true; } fUpdateCondition.notify_one(); } catch (const exception& e) { LOG(error) << "Error filling channel containers: " << e.what(); } } auto DDS::SubscribeForConnectingChannels() -> void { LOG(debug) << "Subscribing for DDS properties."; fDDS.SubscribeKeyValue([&] (const string& key, const string& value, uint64_t senderTaskID) { LOG(debug) << "Received property: key=" << key << ", value=" << value << ", senderTaskID=" << senderTaskID; if (key.compare(0, 8, "fmqchan_") != 0) { LOG(debug) << "property update is not a channel info update: " << key; return; } string channelName = key.substr(8); LOG(info) << "Update for channel name: " << channelName; boost::asio::post(fWorkerQueue, [=]() { try { { unique_lock lk(fUpdateMutex); fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; }); } string val = value; // check if it is to handle as one out of multiple values auto it = fIofN.find(channelName); if (it != fIofN.end()) { it->second.fEntries.push_back(value); if (it->second.fEntries.size() == it->second.fN) { sort(it->second.fEntries.begin(), it->second.fEntries.end()); val = it->second.fEntries.at(it->second.fI); } else { LOG(debug) << "received " << it->second.fEntries.size() << " values for " << channelName << ", expecting total of " << it->second.fN; return; } } vector connectionStrings; boost::algorithm::split(connectionStrings, val, boost::algorithm::is_any_of(",")); if (connectionStrings.size() > 1) { // multiple bound channels received auto it2 = fI.find(channelName); if (it2 != fI.end()) { LOG(debug) << "adding connecting channel " << channelName << " : " << connectionStrings.at(it2->second); fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, connectionStrings.at(it2->second).c_str()}); } else { LOG(error) << "multiple bound channels received, but no task index specified, only assigning the first"; fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, connectionStrings.at(0).c_str()}); } } else { // only one bound channel received fConnectingChans.at(channelName).fDDSValues.insert({senderTaskID, val.c_str()}); } // update channels and remove them from unfinished container for (auto mi = fConnectingChans.begin(); mi != fConnectingChans.end(); /* no increment */) { if (mi->second.fSubChannelAddresses.size() == mi->second.fDDSValues.size()) { // when multiple subChannels are used, their order on every device should be the same, irregardless of arrival order from DDS. sort(mi->second.fSubChannelAddresses.begin(), mi->second.fSubChannelAddresses.end()); auto it3 = mi->second.fDDSValues.begin(); for (unsigned int i = 0; i < mi->second.fSubChannelAddresses.size(); ++i) { SetProperty(string{"chans." + mi->first + "." + to_string(i) + ".address"}, it3->second); ++it3; } fConnectingChans.erase(mi++); } else { ++mi; } } } catch (const exception& e) { LOG(error) << "Error handling DDS property: key=" << key << ", value=" << value << ", senderTaskID=" << senderTaskID << ": " << e.what(); } }); }); } auto DDS::PublishBoundChannels() -> void { for (const auto& chan : fBindingChans) { string joined = boost::algorithm::join(chan.second, ","); LOG(debug) << "Publishing bound addresses (" << chan.second.size() << ") of channel '" << chan.first << "' to DDS under '" << "fmqchan_" + chan.first << "' property name."; fDDS.PutValue("fmqchan_" + chan.first, joined); } } auto DDS::HeartbeatSender() -> void { using namespace sdk::cmd; string id = GetProperty("id"); while (!fDeviceTerminationRequested) { { lock_guard lock{fHeartbeatSubscriberMutex}; for (const auto subscriberId : fHeartbeatSubscribers) { fDDS.Send(Cmds(make(id)).Serialize(), to_string(subscriberId)); } } this_thread::sleep_for(chrono::milliseconds(fHeartbeatInterval)); } } auto DDS::SubscribeForCustomCommands() -> void { using namespace sdk::cmd; LOG(debug) << "Subscribing for DDS custom commands."; string id = GetProperty("id"); fDDS.SubscribeCustomCmd([id, this](const string& cmdStr, const string& cond, uint64_t senderId) { // LOG(info) << "Received command: '" << cmdStr << "' from " << senderId; Cmds inCmds; inCmds.Deserialize(cmdStr); for (const auto& cmd : inCmds) { switch (cmd->GetType()) { case Type::check_state: { fDDS.Send(Cmds(make(id, GetCurrentDeviceState())).Serialize(), to_string(senderId)); } break; case Type::change_state: { Transition transition = static_cast(*cmd).GetTransition(); if (ChangeDeviceState(transition)) { Cmds outCmds(make(id, Result::Ok, transition)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } else { Cmds outCmds(make(id, Result::Failure, transition)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } } break; case Type::dump_config: { stringstream ss; for (const auto pKey: GetPropertyKeys()) { ss << id << ": " << pKey << " -> " << GetPropertyAsString(pKey) << endl; } Cmds outCmds(make(id, ss.str())); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } break; case Type::subscribe_to_heartbeats: { { lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.insert(senderId); } Cmds outCmds(make(id, Result::Ok)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } break; case Type::unsubscribe_from_heartbeats: { { lock_guard lock{fHeartbeatSubscriberMutex}; fHeartbeatSubscribers.erase(senderId); } Cmds outCmds(make(id, Result::Ok)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } break; case Type::state_change_exiting_received: { { lock_guard lock{fStateChangeSubscriberMutex}; if (fLastExternalController == senderId) { fExitingAckedByLastExternalController = true; } } fExitingAcked.notify_one(); } break; case Type::subscribe_to_state_change: { lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.insert(senderId); if (!fControllerThread.joinable()) { fControllerThread = thread(&DDS::WaitForExitingAck, this); } LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState << " to " << senderId; Cmds outCmds(make(id, Result::Ok), make(id, dds::env_prop(), fLastState, fCurrentState)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } break; case Type::unsubscribe_from_state_change: { { lock_guard lock{fStateChangeSubscriberMutex}; fStateChangeSubscribers.erase(senderId); } Cmds outCmds(make(id, Result::Ok)); fDDS.Send(outCmds.Serialize(), to_string(senderId)); } break; default: LOG(warn) << "Unexpected/unknown command received: " << cmdStr; LOG(warn) << "Origin: " << senderId; LOG(warn) << "Destination: " << cond; break; } } }); } DDS::~DDS() { UnsubscribeFromDeviceStateChange(); ReleaseDeviceControl(); if (fControllerThread.joinable()) { fControllerThread.join(); } if (fHeartbeatThread.joinable()) { fHeartbeatThread.join(); } fWorkGuard.reset(); if (fWorkerThread.joinable()) { fWorkerThread.join(); } } } /* namespace plugins */ } /* namespace mq */ } /* namespace fair */