FairMQ  1.4.33
C++ Message Queuing Library and Framework
DDS.h
1 /********************************************************************************
2  * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
3  * *
4  * This software is distributed under the terms of the *
5  * GNU Lesser General Public Licence (LGPL) version 3, *
6  * copied verbatim in the file "LICENSE" *
7  ********************************************************************************/
8 
9 #ifndef FAIR_MQ_PLUGINS_DDS
10 #define FAIR_MQ_PLUGINS_DDS
11 
12 #include <fairmq/Plugin.h>
13 #include <fairmq/StateQueue.h>
14 #include <fairmq/Version.h>
15 #include <fairmq/sdk/commands/Commands.h>
16 
17 #include <dds/dds.h>
18 
19 #include <boost/asio/executor.hpp>
20 #include <boost/asio/executor_work_guard.hpp>
21 #include <boost/asio/io_context.hpp>
22 
23 #include <cassert>
24 #include <chrono>
25 #include <condition_variable>
26 #include <mutex>
27 #include <string>
28 #include <atomic>
29 #include <thread>
30 #include <map>
31 #include <unordered_map>
32 #include <utility> // pair
33 #include <vector>
34 
35 namespace fair::mq::plugins
36 {
37 
38 struct DDSConfig
39 {
40  // container of sub channel addresses
41  unsigned int fNumSubChannels;
42  // dds values for the channel
43  std::map<uint64_t, std::string> fDDSValues;
44 };
45 
46 struct DDSSubscription
47 {
49  : fDDSCustomCmd(fService)
50  , fDDSKeyValue(fService)
51  {
52  LOG(debug) << "$DDS_TASK_PATH: " << dds::env_prop<dds::task_path>();
53  LOG(debug) << "$DDS_GROUP_NAME: " << dds::env_prop<dds::group_name>();
54  LOG(debug) << "$DDS_COLLECTION_NAME: " << dds::env_prop<dds::collection_name>();
55  LOG(debug) << "$DDS_TASK_NAME: " << dds::env_prop<dds::task_name>();
56  LOG(debug) << "$DDS_TASK_INDEX: " << dds::env_prop<dds::task_index>();
57  LOG(debug) << "$DDS_COLLECTION_INDEX: " << dds::env_prop<dds::collection_index>();
58  LOG(debug) << "$DDS_TASK_ID: " << dds::env_prop<dds::task_id>();
59  LOG(debug) << "$DDS_LOCATION: " << dds::env_prop<dds::dds_location>();
60  std::string dds_session_id(dds::env_prop<dds::dds_session_id>());
61  LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id;
62 
63  // subscribe for DDS service errors.
64  fService.subscribeOnError([](const dds::intercom_api::EErrorCode errorCode, const std::string& errorMsg) {
65  LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg;
66  });
67 
68  // fDDSCustomCmd.subscribe([](const std::string& cmd, const std::string& cond, uint64_t senderId) {
69  // LOG(debug) << "cmd: " << cmd << ", cond: " << cond << ", senderId: " << senderId;
70  // });
71  assert(!dds_session_id.empty());
72  }
73 
74  auto Start() -> void {
75  fService.start(dds::env_prop<dds::dds_session_id>());
76  }
77 
78  ~DDSSubscription() {
79  fDDSKeyValue.unsubscribe();
80  fDDSCustomCmd.unsubscribe();
81  }
82 
83  template<typename... Args>
84  auto SubscribeCustomCmd(Args&&... args) -> void
85  {
86  fDDSCustomCmd.subscribe(std::forward<Args>(args)...);
87  }
88 
89  template<typename... Args>
90  auto SubscribeKeyValue(Args&&... args) -> void
91  {
92  fDDSKeyValue.subscribe(std::forward<Args>(args)...);
93  }
94 
95  template<typename... Args>
96  auto Send(Args&&... args) -> void
97  {
98  fDDSCustomCmd.send(std::forward<Args>(args)...);
99  }
100 
101  template<typename... Args>
102  auto PutValue(Args&&... args) -> void
103  {
104  fDDSKeyValue.putValue(std::forward<Args>(args)...);
105  }
106 
107  private:
108  dds::intercom_api::CIntercomService fService;
109  dds::intercom_api::CCustomCmd fDDSCustomCmd;
110  dds::intercom_api::CKeyValue fDDSKeyValue;
111 };
112 
113 struct IofN
114 {
115  IofN(int i, int n)
116  : fI(i)
117  , fN(n)
118  {}
119 
120  unsigned int fI;
121  unsigned int fN;
122  std::vector<std::string> fEntries;
123 };
124 
125 class DDS : public Plugin
126 {
127  public:
128  DDS(const std::string& name, const Plugin::Version version, const std::string& maintainer, const std::string& homepage, PluginServices* pluginServices);
129 
130  ~DDS();
131 
132  private:
133  auto WaitForExitingAck() -> void;
134  auto StartWorkerThread() -> void;
135 
136  auto FillChannelContainers() -> void;
137  auto EmptyChannelContainers() -> void;
138 
139  auto SubscribeForConnectingChannels() -> void;
140  auto PublishBoundChannels() -> void;
141  auto SubscribeForCustomCommands() -> void;
142  auto HandleCmd(const std::string& id, sdk::cmd::Cmd& cmd, const std::string& cond, uint64_t senderId) -> void;
143 
144  DDSSubscription fDDS;
145  size_t fDDSTaskId;
146 
147  std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
148  std::unordered_map<std::string, DDSConfig> fConnectingChans;
149 
150  std::unordered_map<std::string, int> fI;
151  std::unordered_map<std::string, IofN> fIofN;
152 
153  std::thread fControllerThread;
154  DeviceState fCurrentState, fLastState;
155 
156  std::atomic<bool> fDeviceTerminationRequested;
157 
158  std::unordered_map<uint64_t, std::pair<std::chrono::steady_clock::time_point, int64_t>> fStateChangeSubscribers;
159  uint64_t fLastExternalController;
160  bool fExitingAckedByLastExternalController;
161  std::condition_variable fExitingAcked;
162  std::mutex fStateChangeSubscriberMutex;
163 
164  bool fUpdatesAllowed;
165  std::mutex fUpdateMutex;
166  std::condition_variable fUpdateCondition;
167 
168  std::thread fWorkerThread;
169  boost::asio::io_context fWorkerQueue;
170  boost::asio::executor_work_guard<boost::asio::executor> fWorkGuard;
171 };
172 
173 Plugin::ProgOptions DDSProgramOptions()
174 {
175  boost::program_options::options_description options{"DDS Plugin"};
176  options.add_options()
177  ("dds-i", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (single channel n to m). When all values come via same update.")
178  ("dds-i-n", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(), "Task index for chosing connection target (one out of n values to take). When values come as independent updates.")
179  ("wait-for-exiting-ack-timeout", boost::program_options::value<unsigned int>()->default_value(1000), "Wait timeout for EXITING state-change acknowledgement by external controller in milliseconds.");
180 
181  return options;
182 }
183 
184 REGISTER_FAIRMQ_PLUGIN(
185  DDS, // Class name
186  dds, // Plugin name (string, lower case chars only)
187  (Plugin::Version{FAIRMQ_VERSION_MAJOR,
188  FAIRMQ_VERSION_MINOR,
189  FAIRMQ_VERSION_PATCH}), // Version
190  "FairRootGroup <fairroot@gsi.de>", // Maintainer
191  "https://github.com/FairRootGroup/FairMQ", // Homepage
192  DDSProgramOptions // custom program options for the plugin
193 )
194 
195 } // namespace fair::mq::plugins
196 
197 #endif /* FAIR_MQ_PLUGINS_DDS */
fair::mq::PluginServices
Facilitates communication between devices and plugins.
Definition: PluginServices.h:46
fair::mq::tools::Version
Definition: Version.h:25
fair::mq::sdk::cmd::Cmd
Definition: Commands.h:62
fair::mq::plugins::DDS
Definition: DDS.h:132
fair::mq::plugins::DDSSubscription
Definition: DDS.h:53
fair::mq::Plugin
Base class for FairMQ plugins.
Definition: Plugin.h:43

privacy