9 #ifndef FAIRMQDEVICE_H_ 10 #define FAIRMQDEVICE_H_ 12 #include <FairMQStateMachine.h> 13 #include <FairMQTransportFactory.h> 14 #include <fairmq/Transports.h> 16 #include <FairMQSocket.h> 17 #include <FairMQChannel.h> 18 #include <FairMQMessage.h> 19 #include <FairMQParts.h> 20 #include <FairMQUnmanagedRegion.h> 21 #include <FairMQLogger.h> 22 #include <options/FairMQProgOptions.h> 30 #include <unordered_map> 33 #include <type_traits> 37 #include <condition_variable> 39 #include <fairmq/Tools.h> 41 using FairMQChannelMap = std::unordered_map<std::string, std::vector<FairMQChannel>>;
43 using InputMsgCallback = std::function<bool(FairMQMessagePtr&, int)>;
44 using InputMultipartCallback = std::function<bool(FairMQParts&, int)>;
82 void SortChannel(
const std::string& name,
const bool reindex =
true);
84 template<
typename Serializer,
typename DataType,
typename... Args>
85 void Serialize(
FairMQMessage& msg, DataType&& data, Args&&... args)
const 87 Serializer().Serialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
90 template<
typename Deserializer,
typename DataType,
typename... Args>
91 void Deserialize(
FairMQMessage& msg, DataType&& data, Args&&... args)
const 93 Deserializer().Deserialize(msg, std::forward<DataType>(data), std::forward<Args>(args)...);
102 int Send(FairMQMessagePtr& msg,
const std::string& channel,
const int index = 0,
int sndTimeoutInMs = -1)
104 return GetChannel(channel, index).
Send(msg, sndTimeoutInMs);
113 int Receive(FairMQMessagePtr& msg,
const std::string& channel,
const int index = 0,
int rcvTimeoutInMs = -1)
115 return GetChannel(channel, index).
Receive(msg, rcvTimeoutInMs);
118 int SendAsync(FairMQMessagePtr& msg,
const std::string& channel,
const int index = 0) __attribute__((deprecated("For non-blocking
Send, use timeout version with timeout of 0:
Send(msg, \"channelA\", subchannelIndex, timeout);")))
120 return GetChannel(channel, index).
Send(msg, 0);
122 int ReceiveAsync(FairMQMessagePtr& msg,
const std::string& channel,
const int index = 0) __attribute__((deprecated("For non-blocking
Receive, use timeout version with timeout of 0:
Receive(msg, \"channelA\", subchannelIndex, timeout);")))
124 return GetChannel(channel, index).
Receive(msg, 0);
133 int64_t
Send(
FairMQParts& parts,
const std::string& channel,
const int index = 0,
int sndTimeoutInMs = -1)
135 return GetChannel(channel, index).
Send(parts.fParts, sndTimeoutInMs);
144 int64_t
Receive(
FairMQParts& parts,
const std::string& channel,
const int index = 0,
int rcvTimeoutInMs = -1)
146 return GetChannel(channel, index).
Receive(parts.fParts, rcvTimeoutInMs);
149 int64_t SendAsync(
FairMQParts& parts,
const std::string& channel,
const int index = 0) __attribute__((deprecated("For non-blocking
Send, use timeout version with timeout of 0:
Send(parts, \"channelA\", subchannelIndex, timeout);")))
151 return GetChannel(channel, index).
Send(parts.fParts, 0);
153 int64_t ReceiveAsync(
FairMQParts& parts,
const std::string& channel,
const int index = 0) __attribute__((deprecated("For non-blocking
Receive, use timeout version with timeout of 0:
Receive(parts, \"channelA\", subchannelIndex, timeout);")))
155 return GetChannel(channel, index).
Receive(parts.fParts, 0);
164 template<
typename... Args>
165 FairMQMessagePtr NewMessage(Args&&... args)
167 return Transport()->CreateMessage(std::forward<Args>(args)...);
170 template<
typename... Args>
171 FairMQMessagePtr NewMessageFor(
const std::string& channel,
int index, Args&&... args)
173 return GetChannel(channel, index).NewMessage(std::forward<Args>(args)...);
177 FairMQMessagePtr NewStaticMessage(
const T& data)
179 return Transport()->NewStaticMessage(data);
183 FairMQMessagePtr NewStaticMessageFor(
const std::string& channel,
int index,
const T& data)
185 return GetChannel(channel, index).NewStaticMessage(data);
189 FairMQMessagePtr NewSimpleMessage(
const T& data)
191 return Transport()->NewSimpleMessage(data);
195 FairMQMessagePtr NewSimpleMessageFor(
const std::string& channel,
int index,
const T& data)
197 return GetChannel(channel, index).NewSimpleMessage(data);
200 FairMQUnmanagedRegionPtr NewUnmanagedRegion(
const size_t size)
202 return Transport()->CreateUnmanagedRegion(size);
205 FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(
const std::string& channel,
int index,
const size_t size, FairMQRegionCallback callback =
nullptr)
207 return GetChannel(channel, index).Transport()->CreateUnmanagedRegion(size, callback);
210 template<
typename ...Ts>
211 FairMQPollerPtr NewPoller(
const Ts&... inputs)
213 std::vector<std::string> chans{inputs...};
216 if (chans.size() > 1)
218 fair::mq::Transport type = GetChannel(chans.at(0), 0).
Transport()->GetType();
220 for (
unsigned int i = 1; i < chans.size(); ++i)
222 if (type != GetChannel(chans.at(i), 0).
Transport()->GetType())
224 LOG(error) <<
"poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
225 throw std::runtime_error(
"poller failed: different transports within same poller are not yet supported.");
233 FairMQPollerPtr NewPoller(
const std::vector<FairMQChannel*>& channels)
236 if (channels.size() > 1)
238 fair::mq::Transport type = channels.at(0)->Transport()->GetType();
240 for (
unsigned int i = 1; i < channels.size(); ++i)
242 if (type != channels.at(i)->Transport()->GetType())
244 LOG(error) <<
"poller failed: different transports within same poller are not yet supported. Going to ERROR state.";
245 throw std::runtime_error(
"poller failed: different transports within same poller are not yet supported.");
250 return channels.at(0)->Transport()->CreatePoller(channels);
254 void WaitForInitialValidation() __attribute__((deprecated("This method will have no effect in future versions and will be removed. Instead subscribe for state changes and inspect configuration values."))) {}
258 std::shared_ptr<FairMQTransportFactory>
AddTransport(
const fair::mq::Transport transport);
275 void OnData(
const std::string& channelName,
bool (T::* memberFunction)(FairMQMessagePtr& msg,
int index))
277 fDataCallbacks =
true;
278 fMsgInputs.insert(std::make_pair(channelName, [
this, memberFunction](FairMQMessagePtr& msg,
int index)
280 return (static_cast<T*>(
this)->*memberFunction)(msg, index);
283 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
285 fInputChannelKeys.push_back(channelName);
289 void OnData(
const std::string& channelName, InputMsgCallback callback)
291 fDataCallbacks =
true;
292 fMsgInputs.insert(make_pair(channelName, callback));
294 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
296 fInputChannelKeys.push_back(channelName);
302 void OnData(
const std::string& channelName,
bool (T::* memberFunction)(
FairMQParts& parts,
int index))
304 fDataCallbacks =
true;
305 fMultipartInputs.insert(std::make_pair(channelName, [
this, memberFunction](
FairMQParts& parts,
int index)
307 return (static_cast<T*>(
this)->*memberFunction)(parts, index);
310 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
312 fInputChannelKeys.push_back(channelName);
316 void OnData(
const std::string& channelName, InputMultipartCallback callback)
318 fDataCallbacks =
true;
319 fMultipartInputs.insert(make_pair(channelName, callback));
321 if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end())
323 fInputChannelKeys.push_back(channelName);
327 FairMQChannel& GetChannel(
const std::string& channelName,
const int index = 0)
329 return fChannels.at(channelName).at(index);
330 }
catch (
const std::out_of_range& oor) {
331 LOG(error) <<
"out of range: " << oor.what();
332 LOG(error) <<
"requested channel has not been configured? check channel names/configuration.";
333 fRateLogging =
false;
337 virtual void RegisterChannelEndpoints() {}
339 bool RegisterChannelEndpoint(
const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1)
341 bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second;
344 LOG(warn) <<
"Registering channel: name already registered: \"" << channelName <<
"\"";
349 void PrintRegisteredChannels()
351 if (fChannelRegistry.size() < 1)
353 std::cout <<
"no channels registered." << std::endl;
357 for (
const auto& c : fChannelRegistry)
359 std::cout << c.first <<
":" << c.second.first <<
":" << c.second.second << std::endl;
364 void SetId(
const std::string&
id) {
fId = id; }
365 std::string GetId() {
return fId; }
369 void SetNumIoThreads(
int numIoThreads) {
fConfig->SetValue<
int>(
"io-threads", numIoThreads);}
370 int GetNumIoThreads()
const {
return fConfig->GetValue<
int>(
"io-threads"); }
372 void SetNetworkInterface(
const std::string& networkInterface) {
fConfig->SetValue<std::string>(
"network-interface", networkInterface); }
373 std::string GetNetworkInterface()
const {
return fConfig->GetValue<std::string>(
"network-interface"); }
375 void SetDefaultTransport(
const std::string& name) {
fConfig->SetValue<std::string>(
"transport", name); }
376 std::string GetDefaultTransport()
const {
return fConfig->GetValue<std::string>(
"transport"); }
378 void SetInitializationTimeoutInS(
int initializationTimeoutInS) {
fConfig->SetValue<
int>(
"initialization-timeout", initializationTimeoutInS); }
379 int GetInitializationTimeoutInS()
const {
return fConfig->GetValue<
int>(
"initialization-timeout"); }
383 void SetTransport(
const std::string& transport) {
fConfig->SetValue<std::string>(
"transport", transport); }
387 void SetRawCmdLineArgs(
const std::vector<std::string>& args) { fRawCmdLineArgs = args; }
388 std::vector<std::string> GetRawCmdLineArgs()
const {
return fRawCmdLineArgs; }
390 void RunStateMachine()
392 CallStateChangeCallbacks(FairMQStateMachine::IDLE);
399 template<
class Rep,
class Period>
400 bool WaitFor(std::chrono::duration<Rep, Period>
const& duration)
402 std::unique_lock<std::mutex> lock(fInterruptedMtx);
403 return !fInterruptedCV.wait_for(lock, duration, [&] {
return fInterrupted.load(); });
408 std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>>
fTransports;
411 std::unordered_map<std::string, std::vector<FairMQChannel>>
fChannels;
415 void AddChannel(
const std::string& channelName,
const FairMQChannel& channel)
417 fConfig->AddChannel(channelName, channel);
442 virtual void Pause();
448 virtual void Reset();
451 fair::mq::Transport fDefaultTransportType;
456 void InitTaskWrapper();
462 void ResetTaskWrapper();
473 void AttachChannels(std::vector<FairMQChannel*>& chans);
476 void HandleSingleChannelInput();
477 void HandleMultipleChannelInput();
478 void HandleMultipleTransportInput();
479 void PollForTransport(
const FairMQTransportFactory* factory,
const std::vector<std::string>& channelKeys);
481 bool HandleMsgInput(
const std::string& chName,
const InputMsgCallback& callback,
int i);
482 bool HandleMultipartInput(
const std::string& chName,
const InputMultipartCallback& callback,
int i);
484 void CreateOwnConfig();
487 std::unordered_map<std::string, InputMsgCallback> fMsgInputs;
488 std::unordered_map<std::string, InputMultipartCallback> fMultipartInputs;
489 std::unordered_map<fair::mq::Transport, std::vector<std::string>> fMultitransportInputs;
490 std::unordered_map<std::string, std::pair<uint16_t, uint16_t>> fChannelRegistry;
491 std::vector<std::string> fInputChannelKeys;
492 std::mutex fMultitransportMutex;
493 std::atomic<bool> fMultitransportProceed;
497 std::vector<std::string> fRawCmdLineArgs;
499 std::atomic<bool> fInterrupted;
500 std::condition_variable fInterruptedCV;
501 std::mutex fInterruptedMtx;
502 mutable std::atomic<bool> fRateLogging;
virtual void Pause()
Handles the PAUSE state.
Definition: FairMQDevice.cxx:627
std::string GetTransportName() const
Gets the default transport name.
Definition: FairMQDevice.h:385
std::unordered_map< fair::mq::Transport, std::shared_ptr< FairMQTransportFactory > > fTransports
Container for transports.
Definition: FairMQDevice.h:408
virtual void Run()
Runs the device (to be overloaded in child classes)
Definition: FairMQDevice.cxx:603
int Send(FairMQMessagePtr &msg, int sndTimeoutInMs=-1)
Definition: FairMQChannel.h:234
virtual bool ConditionalRun()
Called during RUNNING state repeatedly until it returns false or device state changes.
Definition: FairMQDevice.cxx:611
int Receive(FairMQMessagePtr &msg, int rcvTimeoutInMs=-1)
Definition: FairMQChannel.h:244
FairMQProgOptions * fConfig
Pointer to config (internal or external)
Definition: FairMQDevice.h:413
Definition: FairMQTransportFactory.h:28
void SetTransport(const std::string &transport)
Definition: FairMQDevice.h:383
int Send(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:102
bool WaitFor(std::chrono::duration< Rep, Period > const &duration)
Definition: FairMQDevice.h:400
std::unique_ptr< FairMQProgOptions > fInternalConfig
Internal program options configuration.
Definition: FairMQDevice.h:412
Definition: FairMQChannel.h:27
int64_t Send(FairMQParts &parts, const std::string &channel, const int index=0, int sndTimeoutInMs=-1)
Definition: FairMQDevice.h:133
Definition: FairMQProgOptions.h:37
virtual void PreRun()
Called in the RUNNING state once before executing the Run()/ConditionalRun() method.
Definition: FairMQDevice.cxx:607
virtual void ResetTask()
Resets the user task (to be overloaded in child classes)
Definition: FairMQDevice.cxx:795
std::string fId
Device ID.
Definition: FairMQDevice.h:421
void CatchSignals()
Catches interrupt signals (SIGINT, SIGTERM)
std::shared_ptr< FairMQTransportFactory > AddTransport(const fair::mq::Transport transport)
Definition: FairMQDevice.cxx:637
virtual ~FairMQDevice()
Default destructor.
Definition: FairMQDevice.cxx:832
int Receive(FairMQMessagePtr &msg, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:113
void SortChannel(const std::string &name, const bool reindex=true)
Definition: FairMQDevice.cxx:314
void WaitForInitialValidation() __attribute__((deprecated("This method will have no effect in future versions and will be removed. Instead subscribe for state changes and inspect configuration values.")))
Waits for the first initialization run to finish.
Definition: FairMQDevice.h:254
virtual void PostRun()
Called in the RUNNING state once after executing the Run()/ConditionalRun() method.
Definition: FairMQDevice.cxx:616
std::unordered_map< std::string, std::vector< FairMQChannel > > fChannels
Device channels.
Definition: FairMQDevice.h:411
Definition: FairMQStateMachine.h:27
int64_t Receive(FairMQParts &parts, const std::string &channel, const int index=0, int rcvTimeoutInMs=-1)
Definition: FairMQDevice.h:144
FairMQProgOptions * GetConfig() const
Get pointer to the config.
Definition: FairMQDevice.h:263
FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage...
Definition: FairMQParts.h:20
std::shared_ptr< FairMQTransportFactory > fTransportFactory
Default transport factory.
Definition: FairMQDevice.h:407
void SetConfig(FairMQProgOptions &config)
Assigns config to the device.
Definition: FairMQDevice.cxx:659
virtual void Reset()
Resets the device (can be overloaded in child classes)
Definition: FairMQDevice.cxx:824
static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs)
Definition: FairMQDevice.cxx:309
Definition: FairMQDevice.h:46
virtual void Init()
Additional user initialization (can be overloaded in child classes). Prefer to use InitTask()...
Definition: FairMQDevice.cxx:198
Definition: FairMQMessage.h:20
virtual void InitTask()
Task initialization (can be overloaded in child classes)
Definition: FairMQDevice.cxx:305
FairMQDevice operator=(const FairMQDevice &)=delete
Assignment operator (disabled)
auto Transport() const -> FairMQTransportFactory *
Getter for default transport factory.
Definition: FairMQDevice.h:159
virtual void LogSocketRates()
Outputs the socket transfer rates.
Definition: FairMQDevice.cxx:665
FairMQDevice()
Default constructor.
Definition: FairMQDevice.cxx:30