9 #ifndef FAIR_MQ_PLUGINS_DDS 10 #define FAIR_MQ_PLUGINS_DDS 12 #include <fairmq/Plugin.h> 14 #include <dds_intercom.h> 16 #include <condition_variable> 22 #include <unordered_map> 37 : fSubChannelAddresses()
42 std::vector<std::string> fSubChannelAddresses;
44 std::unordered_map<uint64_t, std::string> fDDSValues;
57 std::vector<std::string> fEntries;
69 auto HandleControl() -> void;
70 auto WaitForNextState() -> DeviceState;
72 auto FillChannelContainers() -> void;
73 auto SubscribeForConnectingChannels() -> void;
74 auto PublishBoundChannels() -> void;
75 auto SubscribeForCustomCommands() -> void;
77 auto HeartbeatSender() -> void;
79 dds::intercom_api::CIntercomService fService;
80 dds::intercom_api::CCustomCmd fDDSCustomCmd;
81 dds::intercom_api::CKeyValue fDDSKeyValue;
83 std::unordered_map<std::string, std::vector<std::string>> fBindingChans;
84 std::unordered_map<std::string, DDSConfig> fConnectingChans;
86 std::unordered_map<std::string, int> fI;
87 std::unordered_map<std::string, IofN> fIofN;
89 std::mutex fStopMutex;
90 std::condition_variable fStopCondition;
92 const std::set<std::string> fCommands;
94 std::thread fControllerThread;
95 std::queue<DeviceState> fEvents;
96 std::mutex fEventsMutex;
97 std::condition_variable fNewEvent;
99 std::atomic<bool> fDeviceTerminationRequested;
101 std::set<uint64_t> fHeartbeatSubscribers;
102 std::mutex fHeartbeatSubscriberMutex;
103 std::set<uint64_t> fStateChangeSubscribers;
104 std::mutex fStateChangeSubscriberMutex;
106 std::thread fHeartbeatThread;
107 std::chrono::milliseconds fHeartbeatInterval;
110 Plugin::ProgOptions DDSProgramOptions()
112 boost::program_options::options_description options{
"DDS Plugin"};
113 options.add_options()
114 (
"dds-i", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(),
"Task index for chosing connection target (single channel n to m). When all values come via same update.")
115 (
"dds-i-n", boost::program_options::value<std::vector<std::string>>()->multitoken()->composing(),
"Task index for chosing connection target (one out of n values to take). When values come as independent updates.");
120 REGISTER_FAIRMQ_PLUGIN(
124 "FairRootGroup <fairroot@gsi.de>",
125 "https://github.com/FairRootGroup/FairRoot",
Facilitates communication between devices and plugins.
Definition: PluginServices.h:38
Base class for FairMQ plugins.
Definition: Plugin.h:38
Tools for interfacing containers to the transport via polymorphic allocators.
Definition: DeviceRunner.h:23