9 #ifndef FAIR_MQ_PLUGINS_DDS
10 #define FAIR_MQ_PLUGINS_DDS
12 #include <fairmq/Plugin.h>
13 #include <fairmq/StateQueue.h>
14 #include <fairmq/Version.h>
15 #include <fairmq/sdk/commands/Commands.h>
19 #include <boost/asio/executor.hpp>
20 #include <boost/asio/executor_work_guard.hpp>
21 #include <boost/asio/io_context.hpp>
25 #include <condition_variable>
31 #include <unordered_map>
35 namespace fair::mq::plugins
41 unsigned int fNumSubChannels;
43 std::map<uint64_t, std::string> fDDSValues;
49 : fDDSCustomCmd(fService)
50 , fDDSKeyValue(fService)
52 LOG(debug) <<
"$DDS_TASK_PATH: " << dds::env_prop<dds::task_path>();
53 LOG(debug) <<
"$DDS_GROUP_NAME: " << dds::env_prop<dds::group_name>();
54 LOG(debug) <<
"$DDS_COLLECTION_NAME: " << dds::env_prop<dds::collection_name>();
55 LOG(debug) <<
"$DDS_TASK_NAME: " << dds::env_prop<dds::task_name>();
56 LOG(debug) <<
"$DDS_TASK_INDEX: " << dds::env_prop<dds::task_index>();
57 LOG(debug) <<
"$DDS_COLLECTION_INDEX: " << dds::env_prop<dds::collection_index>();
58 LOG(debug) <<
"$DDS_TASK_ID: " << dds::env_prop<dds::task_id>();
59 LOG(debug) <<
"$DDS_LOCATION: " << dds::env_prop<dds::dds_location>();
60 std::string dds_session_id(dds::env_prop<dds::dds_session_id>());
61 LOG(debug) <<
"$DDS_SESSION_ID: " << dds_session_id;
64 fService.subscribeOnError([](
const dds::intercom_api::EErrorCode errorCode,
const std::string& errorMsg) {
65 LOG(error) <<
"DDS Error received: error code: " << errorCode <<
", error message: " << errorMsg;
71 assert(!dds_session_id.empty());
74 auto Start() ->
void {
75 fService.start(dds::env_prop<dds::dds_session_id>());
79 fDDSKeyValue.unsubscribe();
80 fDDSCustomCmd.unsubscribe();
83 template<
typename... Args>
84 auto SubscribeCustomCmd(Args&&... args) ->
void
86 fDDSCustomCmd.subscribe(std::forward<Args>(args)...);
89 template<
typename... Args>
90 auto SubscribeKeyValue(Args&&... args) ->
void
92 fDDSKeyValue.subscribe(std::forward<Args>(args)...);
95 template<
typename... Args>
96 auto Send(Args&&... args) ->
void
98 fDDSCustomCmd.send(std::forward<Args>(args)...);
101 template<
typename... Args>
102 auto PutValue(Args&&... args) ->
void
104 fDDSKeyValue.putValue(std::forward<Args>(args)...);
108 dds::intercom_api::CIntercomService fService;
109 dds::intercom_api::CCustomCmd fDDSCustomCmd;
110 dds::intercom_api::CKeyValue fDDSKeyValue;
122 std::vector<std::string> fEntries;
133 auto WaitForExitingAck() -> void;
134 auto StartWorkerThread() -> void;
136 auto FillChannelContainers() -> void;
137 auto EmptyChannelContainers() -> void;
139 auto SubscribeForConnectingChannels() -> void;
140 auto PublishBoundChannels() -> void;
141 auto SubscribeForCustomCommands() -> void;
142 auto HandleCmd(
const std::string&
id,
sdk::cmd::Cmd& cmd,
const std::string& cond, uint64_t senderId) -> void;
147 std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
148 std::unordered_map<std::string, DDSConfig> fConnectingChans;
150 std::unordered_map<std::string, int> fI;
151 std::unordered_map<std::string, IofN> fIofN;
153 std::thread fControllerThread;
154 DeviceState fCurrentState, fLastState;
156 std::atomic<bool> fDeviceTerminationRequested;
158 std::unordered_map<uint64_t, std::pair<std::chrono::steady_clock::time_point, int64_t>> fStateChangeSubscribers;
159 uint64_t fLastExternalController;
160 bool fExitingAckedByLastExternalController;
161 std::condition_variable fExitingAcked;
162 std::mutex fStateChangeSubscriberMutex;
164 bool fUpdatesAllowed;
165 std::mutex fUpdateMutex;
166 std::condition_variable fUpdateCondition;
168 std::thread fWorkerThread;
169 boost::asio::io_context fWorkerQueue;
170 boost::asio::executor_work_guard<boost::asio::executor> fWorkGuard;
173 Plugin::ProgOptions DDSProgramOptions()
175 boost::program_options::options_description options{
"DDS Plugin"};
176 options.add_options()
177 (
"dds-i", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(),
"Task index for chosing connection target (single channel n to m). When all values come via same update.")
178 (
"dds-i-n", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(),
"Task index for chosing connection target (one out of n values to take). When values come as independent updates.")
179 (
"wait-for-exiting-ack-timeout", boost::program_options::value<unsigned int>()->default_value(1000),
"Wait timeout for EXITING state-change acknowledgement by external controller in milliseconds.");
184 REGISTER_FAIRMQ_PLUGIN(
187 (Plugin::Version{FAIRMQ_VERSION_MAJOR,
188 FAIRMQ_VERSION_MINOR,
189 FAIRMQ_VERSION_PATCH}),
190 "FairRootGroup <fairroot@gsi.de>",
191 "https://github.com/FairRootGroup/FairMQ",