From 8e6c50e7cc17e7d1437c10ecf9ca456a08713e90 Mon Sep 17 00:00:00 2001 From: Dennis Klein Date: Wed, 9 Jun 2021 16:11:57 +0200 Subject: [PATCH] refactor: Prepare deprecation of non-namespaced types and headers --- fairmq/CMakeLists.txt | 16 +- fairmq/{FairMQChannel.cxx => Channel.cxx} | 96 ++- fairmq/Channel.h | 443 ++++++++++++- fairmq/{FairMQDevice.cxx => Device.cxx} | 167 +++-- fairmq/Device.h | 622 +++++++++++++++++- fairmq/FairMQChannel.h | 444 +------------ fairmq/FairMQDevice.h | 547 +-------------- fairmq/FairMQLogger.cxx | 15 - fairmq/FairMQMessage.cxx | 13 - fairmq/FairMQMessage.h | 67 +- fairmq/FairMQParts.h | 92 +-- fairmq/FairMQPoller.cxx | 13 - fairmq/FairMQPoller.h | 31 +- fairmq/FairMQSocket.cxx | 9 - fairmq/FairMQSocket.h | 90 +-- fairmq/FairMQTransportFactory.h | 176 +---- fairmq/FairMQUnmanagedRegion.h | 120 +--- fairmq/FwdDecls.h | 44 +- fairmq/MemoryResources.h | 52 +- fairmq/Message.h | 67 +- fairmq/Parts.h | 94 +++ fairmq/Poller.h | 31 +- fairmq/ProgOptionsFwd.h | 6 +- fairmq/Socket.h | 83 ++- ...nsportFactory.cxx => TransportFactory.cxx} | 36 +- fairmq/TransportFactory.h | 216 +++++- fairmq/UnmanagedRegion.h | 135 +++- fairmq/shmem/Poller.h | 4 +- fairmq/shmem/Socket.h | 6 +- 29 files changed, 1950 insertions(+), 1785 deletions(-) rename fairmq/{FairMQChannel.cxx => Channel.cxx} (83%) rename fairmq/{FairMQDevice.cxx => Device.cxx} (83%) delete mode 100644 fairmq/FairMQLogger.cxx delete mode 100644 fairmq/FairMQMessage.cxx delete mode 100644 fairmq/FairMQPoller.cxx delete mode 100644 fairmq/FairMQSocket.cxx create mode 100644 fairmq/Parts.h rename fairmq/{FairMQTransportFactory.cxx => TransportFactory.cxx} (58%) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index c8a53008..f01dcc3e 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -159,6 +159,7 @@ if(BUILD_FAIRMQ) MemoryResourceTools.h MemoryResources.h Message.h + Parts.h Plugin.h PluginManager.h PluginServices.h @@ -167,6 +168,7 @@ if(BUILD_FAIRMQ) ProgOptionsFwd.h Properties.h PropertyOutput.h + Socket.h SuboptParser.h TransportFactory.h Transports.h @@ -217,24 +219,20 @@ if(BUILD_FAIRMQ) # libFairMQ source files # ########################## set(FAIRMQ_SOURCE_FILES + Channel.cxx + Device.cxx DeviceRunner.cxx - FairMQChannel.cxx - FairMQDevice.cxx - FairMQLogger.cxx - FairMQMessage.cxx - FairMQPoller.cxx - FairMQSocket.cxx - FairMQTransportFactory.cxx + JSONParser.cxx + MemoryResources.cxx Plugin.cxx PluginManager.cxx PluginServices.cxx ProgOptions.cxx - JSONParser.cxx Properties.cxx SuboptParser.cxx + TransportFactory.cxx plugins/config/Config.cxx plugins/control/Control.cxx - MemoryResources.cxx shmem/Manager.cxx shmem/Monitor.cxx ) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/Channel.cxx similarity index 83% rename from fairmq/FairMQChannel.cxx rename to fairmq/Channel.cxx index 189baa1a..6b3a26a9 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/Channel.cxx @@ -1,30 +1,27 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include "FairMQChannel.h" - -#include -#include - +#include // join/split +#include // size_t #include - -#include // join/split - -#include // size_t +#include +#include +#include +#include #include #include -#include + +namespace fair::mq { using namespace std; -using namespace fair::mq; template -T GetPropertyOrDefault(const fair::mq::Properties& m, const string& k, const T& ifNotFound) +T GetPropertyOrDefault(const Properties& m, const string& k, const T& ifNotFound) { if (m.count(k)) { return boost::any_cast(m.at(k)); @@ -32,39 +29,39 @@ T GetPropertyOrDefault(const fair::mq::Properties& m, const string& k, const T& return ifNotFound; } -constexpr fair::mq::Transport FairMQChannel::DefaultTransportType; -constexpr const char* FairMQChannel::DefaultTransportName; -constexpr const char* FairMQChannel::DefaultName; -constexpr const char* FairMQChannel::DefaultType; -constexpr const char* FairMQChannel::DefaultMethod; -constexpr const char* FairMQChannel::DefaultAddress; -constexpr int FairMQChannel::DefaultSndBufSize; -constexpr int FairMQChannel::DefaultRcvBufSize; -constexpr int FairMQChannel::DefaultSndKernelSize; -constexpr int FairMQChannel::DefaultRcvKernelSize; -constexpr int FairMQChannel::DefaultLinger; -constexpr int FairMQChannel::DefaultRateLogging; -constexpr int FairMQChannel::DefaultPortRangeMin; -constexpr int FairMQChannel::DefaultPortRangeMax; -constexpr bool FairMQChannel::DefaultAutoBind; +constexpr Transport Channel::DefaultTransportType; +constexpr const char* Channel::DefaultTransportName; +constexpr const char* Channel::DefaultName; +constexpr const char* Channel::DefaultType; +constexpr const char* Channel::DefaultMethod; +constexpr const char* Channel::DefaultAddress; +constexpr int Channel::DefaultSndBufSize; +constexpr int Channel::DefaultRcvBufSize; +constexpr int Channel::DefaultSndKernelSize; +constexpr int Channel::DefaultRcvKernelSize; +constexpr int Channel::DefaultLinger; +constexpr int Channel::DefaultRateLogging; +constexpr int Channel::DefaultPortRangeMin; +constexpr int Channel::DefaultPortRangeMax; +constexpr bool Channel::DefaultAutoBind; -FairMQChannel::FairMQChannel() - : FairMQChannel(DefaultName, DefaultType, DefaultMethod, DefaultAddress, nullptr) +Channel::Channel() + : Channel(DefaultName, DefaultType, DefaultMethod, DefaultAddress, nullptr) {} -FairMQChannel::FairMQChannel(const string& name) - : FairMQChannel(name, DefaultType, DefaultMethod, DefaultAddress, nullptr) +Channel::Channel(const string& name) + : Channel(name, DefaultType, DefaultMethod, DefaultAddress, nullptr) {} -FairMQChannel::FairMQChannel(const string& type, const string& method, const string& address) - : FairMQChannel(DefaultName, type, method, address, nullptr) +Channel::Channel(const string& type, const string& method, const string& address) + : Channel(DefaultName, type, method, address, nullptr) {} -FairMQChannel::FairMQChannel(const string& name, const string& type, shared_ptr factory) - : FairMQChannel(name, type, DefaultMethod, DefaultAddress, factory) +Channel::Channel(const string& name, const string& type, shared_ptr factory) + : Channel(name, type, DefaultMethod, DefaultAddress, factory) {} -FairMQChannel::FairMQChannel(string name, string type, string method, string address, shared_ptr factory) +Channel::Channel(string name, string type, string method, string address, shared_ptr factory) : fTransportFactory(factory) , fTransportType(factory ? factory->GetType() : DefaultTransportType) , fSocket(factory ? factory->CreateSocket(type, name) : nullptr) @@ -87,8 +84,8 @@ FairMQChannel::FairMQChannel(string name, string type, string method, string add // LOG(warn) << "Constructing channel '" << fName << "'"; } -FairMQChannel::FairMQChannel(const string& name, int index, const fair::mq::Properties& properties) - : FairMQChannel(tools::ToString(name, "[", index, "]"), "unspecified", "unspecified", "unspecified", nullptr) +Channel::Channel(const string& name, int index, const Properties& properties) + : Channel(tools::ToString(name, "[", index, "]"), "unspecified", "unspecified", "unspecified", nullptr) { string prefix(tools::ToString("chans.", name, ".", index, ".")); @@ -107,11 +104,11 @@ FairMQChannel::FairMQChannel(const string& name, int index, const fair::mq::Prop fAutoBind = GetPropertyOrDefault(properties, string(prefix + "autoBind"), DefaultAutoBind); } -FairMQChannel::FairMQChannel(const FairMQChannel& chan) - : FairMQChannel(chan, chan.fName) +Channel::Channel(const Channel& chan) + : Channel(chan, chan.fName) {} -FairMQChannel::FairMQChannel(const FairMQChannel& chan, string newName) +Channel::Channel(const Channel& chan, string newName) : fTransportFactory(nullptr) , fTransportType(chan.fTransportType) , fSocket(nullptr) @@ -132,7 +129,7 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan, string newName) , fMultipart(chan.fMultipart) {} -FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) +Channel& Channel::operator=(const Channel& chan) { if (this == &chan) { return *this; @@ -160,7 +157,7 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) return *this; } -bool FairMQChannel::Validate() +bool Channel::Validate() try { stringstream ss; ss << "Validating channel '" << fName << "'... "; @@ -305,11 +302,11 @@ try { LOG(debug) << ss.str(); return true; } catch (exception& e) { - LOG(error) << "Exception caught in FairMQChannel::ValidateChannel: " << e.what(); + LOG(error) << "Exception caught in Channel::ValidateChannel: " << e.what(); throw ChannelConfigurationError(tools::ToString(e.what())); } -void FairMQChannel::Init() +void Channel::Init() { fSocket = fTransportFactory->CreateSocket(fType, fName); @@ -329,12 +326,12 @@ void FairMQChannel::Init() } } -bool FairMQChannel::ConnectEndpoint(const string& endpoint) +bool Channel::ConnectEndpoint(const string& endpoint) { return fSocket->Connect(endpoint); } -bool FairMQChannel::BindEndpoint(string& endpoint) +bool Channel::BindEndpoint(string& endpoint) { // try to bind to the configured port. If it fails, try random one (if AutoBind is on). if (fSocket->Bind(endpoint)) { @@ -374,5 +371,6 @@ bool FairMQChannel::BindEndpoint(string& endpoint) return false; } } - } + +} // namespace fair::mq diff --git a/fairmq/Channel.h b/fairmq/Channel.h index e10e6cc8..35c0e00e 100644 --- a/fairmq/Channel.h +++ b/fairmq/Channel.h @@ -9,12 +9,449 @@ #ifndef FAIR_MQ_CHANNEL_H #define FAIR_MQ_CHANNEL_H -#include +#include // int64_t +#include +#include +#include +#include +#include +#include +#include +#include // unique_ptr, shared_ptr +#include +#include +#include +#include // std::move +#include namespace fair::mq { -using Channel = FairMQChannel; +/** + * @class Channel Channel.h + * @brief Wrapper class for Socket and related methods + * + * The class is not thread-safe. + */ +class Channel +{ + friend class Device; -} // namespace fair::mq + public: + /// Default constructor + Channel(); + + /// Constructor + /// @param name Channel name + Channel(const std::string& name); + + /// Constructor + /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) + /// @param method Socket method (bind/connect) + /// @param address Network address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") + Channel(const std::string& type, const std::string& method, const std::string& address); + + /// Constructor + /// @param name Channel name + /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) + /// @param factory TransportFactory + Channel(const std::string& name, const std::string& type, std::shared_ptr factory); + + /// Constructor + /// @param name Channel name + /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) + /// @param method Socket method (bind/connect) + /// @param address Network address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") + /// @param factory TransportFactory + Channel(std::string name, std::string type, std::string method, std::string address, std::shared_ptr factory); + + Channel(const std::string& name, int index, const Properties& properties); + + /// Copy Constructor + Channel(const Channel&); + + /// Copy Constructor (with new name) + Channel(const Channel&, std::string name); + + /// Move constructor + // Channel(Channel&&) = delete; + + /// Assignment operator + Channel& operator=(const Channel&); + + /// Move assignment operator + // Channel& operator=(Channel&&) = delete; + + /// Destructor + virtual ~Channel() = default; + // { LOG(warn) << "Destroying channel '" << fName << "'"; } + + struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; }; + + Socket& GetSocket() const { assert(fSocket); return *fSocket; } + + bool Bind(const std::string& address) + { + fMethod = "bind"; + fAddress = address; + return fSocket->Bind(address); + } + + bool Connect(const std::string& address) + { + fMethod = "connect"; + fAddress = address; + return fSocket->Connect(address); + } + + /// Get channel name + /// @return Returns full channel name (e.g. "data[0]") + std::string GetName() const { return fName; } + + /// Get channel prefix + /// @return Returns channel prefix (e.g. "data" in "data[0]") + std::string GetPrefix() const + { + std::string prefix = fName; + prefix = prefix.erase(fName.rfind('[')); + return prefix; + } + + /// Get channel index + /// @return Returns channel index (e.g. 0 in "data[0]") + std::string GetIndex() const + { + std::string indexStr = fName; + indexStr.erase(indexStr.rfind(']')); + indexStr.erase(0, indexStr.rfind('[') + 1); + return indexStr; + } + + /// Get socket type + /// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) + std::string GetType() const { return fType; } + + /// Get socket method + /// @return Returns socket method (bind/connect) + std::string GetMethod() const { return fMethod; } + + /// Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") + /// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") + std::string GetAddress() const { return fAddress; } + + /// Get channel transport name ("default", "zeromq" or "shmem") + /// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem") + std::string GetTransportName() const { return TransportName(fTransportType); } + + /// Get channel transport type + /// @return Returns channel transport type + mq::Transport GetTransportType() const { return fTransportType; } + + /// Get socket send buffer size (in number of messages) + /// @return Returns socket send buffer size (in number of messages) + int GetSndBufSize() const { return fSndBufSize; } + + /// Get socket receive buffer size (in number of messages) + /// @return Returns socket receive buffer size (in number of messages) + int GetRcvBufSize() const { return fRcvBufSize; } + + /// Get socket kernel transmit send buffer size (in bytes) + /// @return Returns socket kernel transmit send buffer size (in bytes) + int GetSndKernelSize() const { return fSndKernelSize; } + + /// Get socket kernel transmit receive buffer size (in bytes) + /// @return Returns socket kernel transmit receive buffer size (in bytes) + int GetRcvKernelSize() const { return fRcvKernelSize; } + + /// Get linger duration (in milliseconds) + /// @return Returns linger duration (in milliseconds) + int GetLinger() const { return fLinger; } + + /// Get socket rate logging interval (in seconds) + /// @return Returns socket rate logging interval (in seconds) + int GetRateLogging() const { return fRateLogging; } + + /// Get start of the port range for automatic binding + /// @return start of the port range + int GetPortRangeMin() const { return fPortRangeMin; } + + /// Get end of the port range for automatic binding + /// @return end of the port range + int GetPortRangeMax() const { return fPortRangeMax; } + + /// Set automatic binding (pick random port if bind fails) + /// @return true/false, true if automatic binding is enabled + bool GetAutoBind() const { return fAutoBind; } + + /// Set channel name + /// @param name Arbitrary channel name + void UpdateName(const std::string& name) { fName = name; Invalidate(); } + + /// Set socket type + /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) + void UpdateType(const std::string& type) { fType = type; Invalidate(); } + + /// Set socket method + /// @param method Socket method (bind/connect) + void UpdateMethod(const std::string& method) { fMethod = method; Invalidate(); } + + /// Set socket address + /// @param Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") + void UpdateAddress(const std::string& address) { fAddress = address; Invalidate(); } + + /// Set channel transport + /// @param transport transport string ("default", "zeromq" or "shmem") + void UpdateTransport(const std::string& transport) { fTransportType = TransportType(transport); Invalidate(); } + + /// Set socket send buffer size + /// @param sndBufSize Socket send buffer size (in number of messages) + void UpdateSndBufSize(const int sndBufSize) { fSndBufSize = sndBufSize; Invalidate(); } + + /// Set socket receive buffer size + /// @param rcvBufSize Socket receive buffer size (in number of messages) + void UpdateRcvBufSize(const int rcvBufSize) { fRcvBufSize = rcvBufSize; Invalidate(); } + + /// Set socket kernel transmit send buffer size (in bytes) + /// @param sndKernelSize Socket send buffer size (in bytes) + void UpdateSndKernelSize(const int sndKernelSize) { fSndKernelSize = sndKernelSize; Invalidate(); } + + /// Set socket kernel transmit receive buffer size (in bytes) + /// @param rcvKernelSize Socket receive buffer size (in bytes) + void UpdateRcvKernelSize(const int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); } + + /// Set linger duration (in milliseconds) + /// @param duration linger duration (in milliseconds) + void UpdateLinger(const int duration) { fLinger = duration; Invalidate(); } + + /// Set socket rate logging interval (in seconds) + /// @param rateLogging Socket rate logging interval (in seconds) + void UpdateRateLogging(const int rateLogging) { fRateLogging = rateLogging; Invalidate(); } + + /// Set start of the port range for automatic binding + /// @param minPort start of the port range + void UpdatePortRangeMin(const int minPort) { fPortRangeMin = minPort; Invalidate(); } + + /// Set end of the port range for automatic binding + /// @param maxPort end of the port range + void UpdatePortRangeMax(const int maxPort) { fPortRangeMax = maxPort; Invalidate(); } + + /// Set automatic binding (pick random port if bind fails) + /// @param autobind true/false, true to enable automatic binding + void UpdateAutoBind(const bool autobind) { fAutoBind = autobind; Invalidate(); } + + /// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel())) + /// @return true if channel settings are valid, false otherwise. + bool IsValid() const { return fValid; } + + /// Validates channel configuration + /// @return true if channel settings are valid, false otherwise. + bool Validate(); + + void Init(); + + bool ConnectEndpoint(const std::string& endpoint); + + bool BindEndpoint(std::string& endpoint); + + /// invalidates the channel (requires validation to be used again). + void Invalidate() { fValid = false; } + + /// Sends a message to the socket queue. + /// @param msg Constant reference of unique_ptr to a Message + /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) + int64_t Send(MessagePtr& msg, int sndTimeoutInMs = -1) + { + CheckSendCompatibility(msg); + return fSocket->Send(msg, sndTimeoutInMs); + } + + /// Receives a message from the socket queue. + /// @param msg Constant reference of unique_ptr to a Message + /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) + int64_t Receive(MessagePtr& msg, int rcvTimeoutInMs = -1) + { + CheckReceiveCompatibility(msg); + return fSocket->Receive(msg, rcvTimeoutInMs); + } + + /// Send a vector of messages + /// @param msgVec message vector reference + /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) + int64_t Send(std::vector& msgVec, int sndTimeoutInMs = -1) + { + CheckSendCompatibility(msgVec); + return fSocket->Send(msgVec, sndTimeoutInMs); + } + + /// Receive a vector of messages + /// @param msgVec message vector reference + /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) + int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs = -1) + { + CheckReceiveCompatibility(msgVec); + return fSocket->Receive(msgVec, rcvTimeoutInMs); + } + + /// Send Parts + /// @param parts Parts reference + /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) + int64_t Send(Parts& parts, int sndTimeoutInMs = -1) + { + return Send(parts.fParts, sndTimeoutInMs); + } + + /// Receive Parts + /// @param parts Parts reference + /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) + int64_t Receive(Parts& parts, int rcvTimeoutInMs = -1) + { + return Receive(parts.fParts, rcvTimeoutInMs); + } + + unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); } + unsigned long GetBytesRx() const { return fSocket->GetBytesRx(); } + unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); } + unsigned long GetMessagesRx() const { return fSocket->GetMessagesRx(); } + + auto Transport() -> TransportFactory* { return fTransportFactory.get(); }; + + template + MessagePtr NewMessage(Args&&... args) + { + return Transport()->CreateMessage(std::forward(args)...); + } + + template + MessagePtr NewSimpleMessage(const T& data) + { + return Transport()->NewSimpleMessage(data); + } + + template + MessagePtr NewStaticMessage(const T& data) + { + return Transport()->NewStaticMessage(data); + } + + template + UnmanagedRegionPtr NewUnmanagedRegion(Args&&... args) + { + return Transport()->CreateUnmanagedRegion(std::forward(args)...); + } + + static constexpr mq::Transport DefaultTransportType = mq::Transport::DEFAULT; + static constexpr const char* DefaultTransportName = "default"; + static constexpr const char* DefaultName = ""; + static constexpr const char* DefaultType = "unspecified"; + static constexpr const char* DefaultMethod = "unspecified"; + static constexpr const char* DefaultAddress = "unspecified"; + static constexpr int DefaultSndBufSize = 1000; + static constexpr int DefaultRcvBufSize = 1000; + static constexpr int DefaultSndKernelSize = 0; + static constexpr int DefaultRcvKernelSize = 0; + static constexpr int DefaultLinger = 500; + static constexpr int DefaultRateLogging = 1; + static constexpr int DefaultPortRangeMin = 22000; + static constexpr int DefaultPortRangeMax = 23000; + static constexpr bool DefaultAutoBind = true; + + private: + std::shared_ptr fTransportFactory; + mq::Transport fTransportType; + std::unique_ptr fSocket; + + std::string fName; + std::string fType; + std::string fMethod; + std::string fAddress; + int fSndBufSize; + int fRcvBufSize; + int fSndKernelSize; + int fRcvKernelSize; + int fLinger; + int fRateLogging; + int fPortRangeMin; + int fPortRangeMax; + bool fAutoBind; + + bool fValid; + + bool fMultipart; + + void CheckSendCompatibility(MessagePtr& msg) + { + if (fTransportType != msg->GetType()) { + if (msg->GetSize() > 0) { + MessagePtr msgWrapper(NewMessage( + msg->GetData(), + msg->GetSize(), + [](void* /*data*/, void* _msg) { delete static_cast(_msg); }, + msg.get() + )); + msg.release(); + msg = move(msgWrapper); + } else { + MessagePtr newMsg(NewMessage()); + msg = move(newMsg); + } + } + } + + void CheckSendCompatibility(std::vector& msgVec) + { + for (auto& msg : msgVec) { + if (fTransportType != msg->GetType()) { + if (msg->GetSize() > 0) { + MessagePtr msgWrapper(NewMessage( + msg->GetData(), + msg->GetSize(), + [](void* /*data*/, void* _msg) { delete static_cast(_msg); }, + msg.get() + )); + msg.release(); + msg = move(msgWrapper); + } else { + MessagePtr newMsg(NewMessage()); + msg = move(newMsg); + } + } + } + } + + void CheckReceiveCompatibility(MessagePtr& msg) + { + if (fTransportType != msg->GetType()) { + MessagePtr newMsg(NewMessage()); + msg = move(newMsg); + } + } + + void CheckReceiveCompatibility(std::vector& msgVec) + { + for (auto& msg : msgVec) { + if (fTransportType != msg->GetType()) { + + MessagePtr newMsg(NewMessage()); + msg = move(newMsg); + } + } + } + + void InitTransport(std::shared_ptr factory) + { + fTransportFactory = factory; + fTransportType = factory->GetType(); + } +}; + +} // namespace fair::mq + +// using FairMQChannel [[deprecated("Use fair::mq::Channel")]] = fair::mq::Channel; +using FairMQChannel = fair::mq::Channel; #endif // FAIR_MQ_CHANNEL_H diff --git a/fairmq/FairMQDevice.cxx b/fairmq/Device.cxx similarity index 83% rename from fairmq/FairMQDevice.cxx rename to fairmq/Device.cxx index 0f29af7d..547c4c7c 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/Device.cxx @@ -1,51 +1,48 @@ /******************************************************************************** - * Copyright (C) 2012-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2012-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include - -#include -#include - -#include // join/split - -#include +#include // std::max +#include // join/split #include +#include +#include +#include +#include +#include #include #include -#include -#include -#include // std::max + +namespace fair::mq { using namespace std; -using namespace fair::mq; -constexpr const char* FairMQDevice::DefaultId; -constexpr int FairMQDevice::DefaultIOThreads; -constexpr const char* FairMQDevice::DefaultTransportName; -constexpr fair::mq::Transport FairMQDevice::DefaultTransportType; -constexpr const char* FairMQDevice::DefaultNetworkInterface; -constexpr int FairMQDevice::DefaultInitTimeout; -constexpr uint64_t FairMQDevice::DefaultMaxRunTime; -constexpr float FairMQDevice::DefaultRate; -constexpr const char* FairMQDevice::DefaultSession; +constexpr const char* Device::DefaultId; +constexpr int Device::DefaultIOThreads; +constexpr const char* Device::DefaultTransportName; +constexpr mq::Transport Device::DefaultTransportType; +constexpr const char* Device::DefaultNetworkInterface; +constexpr int Device::DefaultInitTimeout; +constexpr uint64_t Device::DefaultMaxRunTime; +constexpr float Device::DefaultRate; +constexpr const char* Device::DefaultSession; struct StateSubscription { - fair::mq::StateMachine& fStateMachine; - fair::mq::StateQueue& fStateQueue; + StateMachine& fStateMachine; + StateQueue& fStateQueue; string fId; - explicit StateSubscription(string id, fair::mq::StateMachine& stateMachine, fair::mq::StateQueue& stateQueue) + explicit StateSubscription(string id, StateMachine& stateMachine, StateQueue& stateQueue) : fStateMachine(stateMachine) , fStateQueue(stateQueue) , fId(std::move(id)) { - fStateMachine.SubscribeToStateChange(fId, [&](fair::mq::State state) { + fStateMachine.SubscribeToStateChange(fId, [&](State state) { fStateQueue.Push(state); }); } @@ -55,23 +52,23 @@ struct StateSubscription } }; -FairMQDevice::FairMQDevice() - : FairMQDevice(nullptr, {0, 0, 0}) +Device::Device() + : Device(nullptr, {0, 0, 0}) {} -FairMQDevice::FairMQDevice(ProgOptions& config) - : FairMQDevice(&config, {0, 0, 0}) +Device::Device(ProgOptions& config) + : Device(&config, {0, 0, 0}) {} -FairMQDevice::FairMQDevice(const tools::Version version) - : FairMQDevice(nullptr, version) +Device::Device(const tools::Version version) + : Device(nullptr, version) {} -FairMQDevice::FairMQDevice(ProgOptions& config, const tools::Version version) - : FairMQDevice(&config, version) +Device::Device(ProgOptions& config, const tools::Version version) + : Device(&config, version) {} -FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version) +Device::Device(ProgOptions* config, const tools::Version version) : fTransportFactory(nullptr) , fInternalConfig(config ? nullptr : make_unique()) , fConfig(config ? config : fInternalConfig.get()) @@ -97,34 +94,34 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version) } }); - fStateMachine.HandleStates([&](fair::mq::State state) { + fStateMachine.HandleStates([&](State state) { LOG(trace) << "device notified on new state: " << state; fStateQueue.Push(state); switch (state) { - case fair::mq::State::InitializingDevice: + case State::InitializingDevice: InitWrapper(); break; - case fair::mq::State::Binding: + case State::Binding: BindWrapper(); break; - case fair::mq::State::Connecting: + case State::Connecting: ConnectWrapper(); break; - case fair::mq::State::InitializingTask: + case State::InitializingTask: InitTaskWrapper(); break; - case fair::mq::State::Running: + case State::Running: RunWrapper(); break; - case fair::mq::State::ResettingTask: + case State::ResettingTask: ResetTaskWrapper(); break; - case fair::mq::State::ResettingDevice: + case State::ResettingDevice: ResetWrapper(); break; - case fair::mq::State::Exiting: + case State::Exiting: Exit(); break; default: @@ -136,7 +133,7 @@ FairMQDevice::FairMQDevice(ProgOptions* config, const tools::Version version) fStateMachine.Start(); } -void FairMQDevice::TransitionTo(const fair::mq::State s) +void Device::TransitionTo(const State s) { { lock_guard lock(fTransitionMtx); @@ -147,7 +144,7 @@ void FairMQDevice::TransitionTo(const fair::mq::State s) fTransitioning = true; } - using fair::mq::State; + using mq::State; StateQueue sq; StateSubscription ss(tools::ToString(fId, ".TransitionTo"), fStateMachine, sq); @@ -203,7 +200,7 @@ void FairMQDevice::TransitionTo(const fair::mq::State s) } } -void FairMQDevice::InitWrapper() +void Device::InitWrapper() { // run initialization once CompleteInit transition is requested fStateMachine.WaitForPendingState(); @@ -217,7 +214,7 @@ void FairMQDevice::InitWrapper() fInitializationTimeoutInS = fConfig->GetProperty("init-timeout", DefaultInitTimeout); try { - fDefaultTransportType = fair::mq::TransportTypes.at(fConfig->GetProperty("transport", DefaultTransportName)); + fDefaultTransportType = TransportTypes.at(fConfig->GetProperty("transport", DefaultTransportName)); } catch (const exception& e) { LOG(error) << "exception: " << e.what(); LOG(error) << "invalid transport type provided: " << fConfig->GetProperty("transport", "not provided"); @@ -231,7 +228,7 @@ void FairMQDevice::InitWrapper() } } - LOG(debug) << "Setting '" << fair::mq::TransportNames.at(fDefaultTransportType) << "' as default transport for the device"; + LOG(debug) << "Setting '" << TransportNames.at(fDefaultTransportType) << "' as default transport for the device"; fTransportFactory = AddTransport(fDefaultTransportType); string networkInterface = fConfig->GetProperty("network-interface", DefaultNetworkInterface); @@ -241,7 +238,7 @@ void FairMQDevice::InitWrapper() int subChannelIndex = 0; for (auto& subChannel : channel.second) { // set channel transport - LOG(debug) << "Initializing transport for channel " << subChannel.fName << ": " << fair::mq::TransportNames.at(subChannel.fTransportType); + LOG(debug) << "Initializing transport for channel " << subChannel.fName << ": " << TransportNames.at(subChannel.fTransportType); subChannel.InitTransport(AddTransport(subChannel.fTransportType)); if (subChannel.fMethod == "bind") { @@ -278,7 +275,7 @@ void FairMQDevice::InitWrapper() // ChangeState(Transition::Auto); } -void FairMQDevice::BindWrapper() +void Device::BindWrapper() { // Bind channels. Here one run is enough, because bind settings should be available locally // If necessary this could be handled in the same way as the connecting channels @@ -294,7 +291,7 @@ void FairMQDevice::BindWrapper() ChangeState(Transition::Auto); } -void FairMQDevice::ConnectWrapper() +void Device::ConnectWrapper() { // go over the list of channels until all are initialized (and removed from the uninitialized list) int numAttempts = 1; @@ -331,7 +328,7 @@ void FairMQDevice::ConnectWrapper() ChangeState(Transition::Auto); } -void FairMQDevice::AttachChannels(vector& chans) +void Device::AttachChannels(vector& chans) { auto itr = chans.begin(); @@ -351,7 +348,7 @@ void FairMQDevice::AttachChannels(vector& chans) } } -bool FairMQDevice::AttachChannel(FairMQChannel& chan) +bool Device::AttachChannel(Channel& chan) { vector endpoints; string chanAddress = chan.GetAddress(); @@ -424,19 +421,19 @@ bool FairMQDevice::AttachChannel(FairMQChannel& chan) return true; } -void FairMQDevice::InitTaskWrapper() +void Device::InitTaskWrapper() { InitTask(); ChangeState(Transition::Auto); } -void FairMQDevice::RunWrapper() +void Device::RunWrapper() { LOG(info) << "DEVICE: Running..."; // start the rate logger thread - future rateLogger = async(launch::async, &FairMQDevice::LogSocketRates, this); + future rateLogger = async(launch::async, &Device::LogSocketRates, this); // notify transports to resume transfers for (auto& t : fTransports) { @@ -486,7 +483,7 @@ void FairMQDevice::RunWrapper() rateLogger.get(); } -void FairMQDevice::HandleSingleChannelInput() +void Device::HandleSingleChannelInput() { bool proceed = true; @@ -501,14 +498,14 @@ void FairMQDevice::HandleSingleChannelInput() } } -void FairMQDevice::HandleMultipleChannelInput() +void Device::HandleMultipleChannelInput() { // check if more than one transport is used fMultitransportInputs.clear(); for (const auto& k : fInputChannelKeys) { - fair::mq::Transport t = fChannels.at(k).at(0).fTransportType; + mq::Transport t = fChannels.at(k).at(0).fTransportType; if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) { - fMultitransportInputs.insert(pair>(t, vector())); + fMultitransportInputs.insert(pair>(t, vector())); fMultitransportInputs.at(t).push_back(k); } else { fMultitransportInputs.at(t).push_back(k); @@ -533,7 +530,7 @@ void FairMQDevice::HandleMultipleChannelInput() } else { // otherwise poll directly bool proceed = true; - FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys)); + PollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys)); while (!NewStatePending() && proceed) { poller->Poll(200); @@ -561,14 +558,14 @@ void FairMQDevice::HandleMultipleChannelInput() } } -void FairMQDevice::HandleMultipleTransportInput() +void Device::HandleMultipleTransportInput() { vector threads; fMultitransportProceed = true; for (const auto& i : fMultitransportInputs) { - threads.emplace_back(thread(&FairMQDevice::PollForTransport, this, fTransports.at(i.first).get(), i.second)); + threads.emplace_back(thread(&Device::PollForTransport, this, fTransports.at(i.first).get(), i.second)); } for (thread& t : threads) { @@ -576,10 +573,10 @@ void FairMQDevice::HandleMultipleTransportInput() } } -void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const vector& channelKeys) +void Device::PollForTransport(const TransportFactory* factory, const vector& channelKeys) { try { - FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys)); + PollerPtr poller(factory->CreatePoller(fChannels, channelKeys)); while (!NewStatePending() && fMultitransportProceed) { poller->Poll(500); @@ -610,14 +607,14 @@ void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const } } } catch (exception& e) { - LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state."; - throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state.")); + LOG(error) << "fair::mq::Device::PollForTransport() failed: " << e.what() << ", going to ERROR state."; + throw runtime_error(tools::ToString("fair::mq::Device::PollForTransport() failed: ", e.what(), ", going to ERROR state.")); } } -bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i) +bool Device::HandleMsgInput(const string& chName, const InputMsgCallback& callback, int i) { - unique_ptr input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage()); + unique_ptr input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage()); if (Receive(input, chName, i) >= 0) { return callback(input, i); @@ -626,9 +623,9 @@ bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback& } } -bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipartCallback& callback, int i) +bool Device::HandleMultipartInput(const string& chName, const InputMultipartCallback& callback, int i) { - FairMQParts input; + Parts input; if (Receive(input, chName, i) >= 0) { return callback(input, i); @@ -637,34 +634,34 @@ bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipa } } -shared_ptr FairMQDevice::AddTransport(fair::mq::Transport transport) +shared_ptr Device::AddTransport(mq::Transport transport) { - if (transport == fair::mq::Transport::DEFAULT) { + if (transport == mq::Transport::DEFAULT) { transport = fDefaultTransportType; } auto i = fTransports.find(transport); if (i == fTransports.end()) { - LOG(debug) << "Adding '" << fair::mq::TransportNames.at(transport) << "' transport"; - auto tr = FairMQTransportFactory::CreateTransportFactory(fair::mq::TransportNames.at(transport), fId, fConfig); + LOG(debug) << "Adding '" << TransportNames.at(transport) << "' transport"; + auto tr = TransportFactory::CreateTransportFactory(TransportNames.at(transport), fId, fConfig); fTransports.insert({transport, tr}); return tr; } else { - LOG(debug) << "Reusing existing '" << fair::mq::TransportNames.at(transport) << "' transport"; + LOG(debug) << "Reusing existing '" << TransportNames.at(transport) << "' transport"; return i->second; } } -void FairMQDevice::SetConfig(ProgOptions& config) +void Device::SetConfig(ProgOptions& config) { fInternalConfig.reset(); fConfig = &config; } -void FairMQDevice::LogSocketRates() +void Device::LogSocketRates() { - vector filteredChannels; + vector filteredChannels; vector filteredChannelNames; vector logIntervals; vector intervalCounters; @@ -760,21 +757,21 @@ void FairMQDevice::LogSocketRates() } } -void FairMQDevice::UnblockTransports() +void Device::UnblockTransports() { for (auto& transport : fTransports) { transport.second->Interrupt(); } } -void FairMQDevice::ResetTaskWrapper() +void Device::ResetTaskWrapper() { ResetTask(); ChangeState(Transition::Auto); } -void FairMQDevice::ResetWrapper() +void Device::ResetWrapper() { for (auto& transport : fTransports) { transport.second->Reset(); @@ -788,9 +785,11 @@ void FairMQDevice::ResetWrapper() ChangeState(Transition::Auto); } -FairMQDevice::~FairMQDevice() +Device::~Device() { UnsubscribeFromNewTransition("device"); fStateMachine.StopHandlingStates(); LOG(debug) << "Shutting down device " << fId; } + +} // namespace fair::mq diff --git a/fairmq/Device.h b/fairmq/Device.h index 01146466..c6813943 100644 --- a/fairmq/Device.h +++ b/fairmq/Device.h @@ -9,13 +9,627 @@ #ifndef FAIR_MQ_DEVICE_H #define FAIR_MQ_DEVICE_H -#include +#include // find +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // unique_ptr +#include +#include +#include +#include +#include // pair +#include -namespace fair::mq +namespace fair::mq { + +using ChannelMap = std::unordered_map>; + +struct OngoingTransition : std::runtime_error { + using std::runtime_error::runtime_error; +}; - using Device = ::FairMQDevice; +using InputMsgCallback = std::function; -} // namespace fair::mq +using InputMultipartCallback = std::function; + +class Device +{ + friend class Channel; + + public: + Device(); + Device(ProgOptions& config); + Device(const tools::Version version); + Device(ProgOptions& config, const tools::Version version); + + private: + Device(ProgOptions* config, const tools::Version version); + + public: + Device(const Device&) = delete; + Device operator=(const Device&) = delete; + virtual ~Device(); + + /// Outputs the socket transfer rates + virtual void LogSocketRates(); + + template + [[deprecated]] void Serialize(Message& msg, DataType&& data, Args&&... args) const + { + Serializer().Serialize(msg, std::forward(data), std::forward(args)...); + } + + template + [[deprecated]] void Deserialize(Message& msg, DataType&& data, Args&&... args) const + { + Deserializer().Deserialize(msg, std::forward(data), std::forward(args)...); + } + + /// Shorthand method to send `msg` on `chan` at index `i` + /// @param msg message reference + /// @param chan channel name + /// @param i channel index + /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via + /// state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by + /// requested state change) + int64_t Send(MessagePtr& msg, + const std::string& channel, + const int index = 0, + int sndTimeoutInMs = -1) + { + return GetChannel(channel, index).Send(msg, sndTimeoutInMs); + } + + /// Shorthand method to receive `msg` on `chan` at index `i` + /// @param msg message reference + /// @param chan channel name + /// @param i channel index + /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. + /// via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by + /// requested state change) + int64_t Receive(MessagePtr& msg, + const std::string& channel, + const int index = 0, + int rcvTimeoutInMs = -1) + { + return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs); + } + + /// Shorthand method to send Parts on `chan` at index `i` + /// @param parts parts reference + /// @param chan channel name + /// @param i channel index + /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via + /// state change)), 0 will not wait (return immediately if cannot send) + /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by + /// requested state change) + int64_t Send(Parts& parts, + const std::string& channel, + const int index = 0, + int sndTimeoutInMs = -1) + { + return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs); + } + + /// Shorthand method to receive Parts on `chan` at index `i` + /// @param parts parts reference + /// @param chan channel name + /// @param i channel index + /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. + /// via state change)), 0 will not wait (return immediately if cannot receive) + /// @return Number of bytes that have been received, TransferCode::timeout if timed out, + /// TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by + /// requested state change) + int64_t Receive(Parts& parts, + const std::string& channel, + const int index = 0, + int rcvTimeoutInMs = -1) + { + return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs); + } + + /// @brief Getter for default transport factory + auto Transport() const -> TransportFactory* { return fTransportFactory.get(); } + + // creates message with the default device transport + template + MessagePtr NewMessage(Args&&... args) + { + return Transport()->CreateMessage(std::forward(args)...); + } + + // creates message with the transport of the specified channel + template + MessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) + { + return GetChannel(channel, index).NewMessage(std::forward(args)...); + } + + // creates a message that will not be cleaned up after transfer, with the default device + // transport + template + MessagePtr NewStaticMessage(const T& data) + { + return Transport()->NewStaticMessage(data); + } + + // creates a message that will not be cleaned up after transfer, with the transport of the + // specified channel + template + MessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) + { + return GetChannel(channel, index).NewStaticMessage(data); + } + + // creates a message with a copy of the provided data, with the default device transport + template + MessagePtr NewSimpleMessage(const T& data) + { + return Transport()->NewSimpleMessage(data); + } + + // creates a message with a copy of the provided data, with the transport of the specified + // channel + template + MessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) + { + return GetChannel(channel, index).NewSimpleMessage(data); + } + + // creates unamanaged region with the default device transport + template + UnmanagedRegionPtr NewUnmanagedRegion(Args&&... args) + { + return Transport()->CreateUnmanagedRegion(std::forward(args)...); + } + + // creates unmanaged region with the transport of the specified channel + template + UnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, Args&&... args) + { + return GetChannel(channel, index).NewUnmanagedRegion(std::forward(args)...); + } + + template + PollerPtr NewPoller(const Ts&... inputs) + { + std::vector chans{inputs...}; + + // if more than one channel provided, check compatibility + if (chans.size() > 1) { + mq::Transport type = GetChannel(chans.at(0), 0).Transport()->GetType(); + + for (unsigned int i = 1; i < chans.size(); ++i) { + if (type != GetChannel(chans.at(i), 0).Transport()->GetType()) { + LOG(error) << "poller failed: different transports within same poller are not " + "yet supported. Going to ERROR state."; + throw std::runtime_error("poller failed: different transports within same " + "poller are not yet supported."); + } + } + } + + return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans); + } + + PollerPtr NewPoller(const std::vector& channels) + { + // if more than one channel provided, check compatibility + if (channels.size() > 1) { + mq::Transport type = channels.at(0)->Transport()->GetType(); + + for (unsigned int i = 1; i < channels.size(); ++i) { + if (type != channels.at(i)->Transport()->GetType()) { + LOG(error) << "poller failed: different transports within same poller are not " + "yet supported. Going to ERROR state."; + throw std::runtime_error("poller failed: different transports within same " + "poller are not yet supported."); + } + } + } + + return channels.at(0)->Transport()->CreatePoller(channels); + } + + /// Adds a transport to the device if it doesn't exist + /// @param transport Transport string ("zeromq"/"shmem") + std::shared_ptr AddTransport(const mq::Transport transport); + + /// Assigns config to the device + void SetConfig(ProgOptions& config); + /// Get pointer to the config + ProgOptions* GetConfig() const { return fConfig; } + + // overload to easily bind member functions + template + void OnData(const std::string& channelName, + bool (T::*memberFunction)(MessagePtr& msg, int index)) + { + fDataCallbacks = true; + fMsgInputs.insert( + std::make_pair(channelName, [this, memberFunction](MessagePtr& msg, int index) { + return (static_cast(this)->*memberFunction)(msg, index); + })); + + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) + == fInputChannelKeys.end()) { + fInputChannelKeys.push_back(channelName); + } + } + + void OnData(const std::string& channelName, InputMsgCallback callback) + { + fDataCallbacks = true; + fMsgInputs.insert(make_pair(channelName, callback)); + + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) + == fInputChannelKeys.end()) { + fInputChannelKeys.push_back(channelName); + } + } + + // overload to easily bind member functions + template + void OnData(const std::string& channelName, bool (T::*memberFunction)(Parts& parts, int index)) + { + fDataCallbacks = true; + fMultipartInputs.insert( + std::make_pair(channelName, [this, memberFunction](Parts& parts, int index) { + return (static_cast(this)->*memberFunction)(parts, index); + })); + + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) + == fInputChannelKeys.end()) { + fInputChannelKeys.push_back(channelName); + } + } + + void OnData(const std::string& channelName, InputMultipartCallback callback) + { + fDataCallbacks = true; + fMultipartInputs.insert(make_pair(channelName, callback)); + + if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) + == fInputChannelKeys.end()) { + fInputChannelKeys.push_back(channelName); + } + } + + Channel& GetChannel(const std::string& channelName, const int index = 0) + try { + return fChannels.at(channelName).at(index); + } catch (const std::out_of_range& oor) { + LOG(error) + << "requested channel has not been configured? check channel names/configuration."; + LOG(error) << "channel: " << channelName << ", index: " << index; + LOG(error) << "out of range: " << oor.what(); + throw; + } + + virtual void RegisterChannelEndpoints() {} + + bool RegisterChannelEndpoint(const std::string& channelName, + uint16_t minNumSubChannels = 1, + uint16_t maxNumSubChannels = 1) + { + bool ok = fChannelRegistry + .insert(std::make_pair(channelName, + std::make_pair(minNumSubChannels, maxNumSubChannels))) + .second; + if (!ok) { + LOG(warn) << "Registering channel: name already registered: \"" << channelName << "\""; + } + return ok; + } + + void PrintRegisteredChannels() + { + if (fChannelRegistry.empty()) { + LOGV(info, verylow) << "no channels registered."; + } else { + for (const auto& c : fChannelRegistry) { + LOGV(info, verylow) << c.first << ":" << c.second.first << ":" << c.second.second; + } + } + } + + void SetId(const std::string& id) { fId = id; } + std::string GetId() { return fId; } + + const tools::Version GetVersion() const { return fVersion; } + + void SetNumIoThreads(int numIoThreads) { fConfig->SetProperty("io-threads", numIoThreads); } + int GetNumIoThreads() const + { + return fConfig->GetProperty("io-threads", DefaultIOThreads); + } + + void SetNetworkInterface(const std::string& networkInterface) + { + fConfig->SetProperty("network-interface", networkInterface); + } + std::string GetNetworkInterface() const + { + return fConfig->GetProperty("network-interface", DefaultNetworkInterface); + } + + void SetDefaultTransport(const std::string& name) { fConfig->SetProperty("transport", name); } + std::string GetDefaultTransport() const + { + return fConfig->GetProperty("transport", DefaultTransportName); + } + + void SetInitTimeoutInS(int initTimeoutInS) + { + fConfig->SetProperty("init-timeout", initTimeoutInS); + } + int GetInitTimeoutInS() const + { + return fConfig->GetProperty("init-timeout", DefaultInitTimeout); + } + + /// Sets the default transport for the device + /// @param transport Transport string ("zeromq"/"shmem") + void SetTransport(const std::string& transport) + { + fConfig->SetProperty("transport", transport); + } + /// Gets the default transport name + std::string GetTransportName() const + { + return fConfig->GetProperty("transport", DefaultTransportName); + } + + void SetRawCmdLineArgs(const std::vector& args) { fRawCmdLineArgs = args; } + std::vector GetRawCmdLineArgs() const { return fRawCmdLineArgs; } + + void RunStateMachine() { fStateMachine.ProcessWork(); }; + + /// Wait for the supplied amount of time or for interruption. + /// If interrupted, returns false, otherwise true. + /// @param duration wait duration + template + bool WaitFor(std::chrono::duration const& duration) + { + return !fStateMachine.WaitForPendingStateFor( + std::chrono::duration_cast(duration).count()); + } + + protected: + std::shared_ptr fTransportFactory; ///< Default transport factory + std::unordered_map> + fTransports; ///< Container for transports + + public: + std::unordered_map> fChannels; ///< Device channels + std::unique_ptr fInternalConfig; ///< Internal program options configuration + ProgOptions* fConfig; ///< Pointer to config (internal or external) + + void AddChannel(const std::string& name, Channel&& channel) + { + fConfig->AddChannel(name, channel); + } + + protected: + std::string fId; ///< Device ID + + /// Additional user initialization (can be overloaded in child classes). Prefer to use + /// InitTask(). + virtual void Init() {} + + virtual void Bind() {} + + virtual void Connect() {} + + /// Task initialization (can be overloaded in child classes) + virtual void InitTask() {} + + /// Runs the device (to be overloaded in child classes) + virtual void Run() {} + + /// Called in the RUNNING state once before executing the Run()/ConditionalRun() method + virtual void PreRun() {} + + /// Called during RUNNING state repeatedly until it returns false or device state changes + virtual bool ConditionalRun() { return false; } + + /// Called in the RUNNING state once after executing the Run()/ConditionalRun() method + virtual void PostRun() {} + + /// Resets the user task (to be overloaded in child classes) + virtual void ResetTask() {} + + /// Resets the device (can be overloaded in child classes) + virtual void Reset() {} + + public: + /// @brief Request a device state transition + /// @param transition state transition + /// + /// The state transition may not happen immediately, but when the current state evaluates the + /// pending transition event and terminates. In other words, the device states are scheduled + /// cooperatively. + bool ChangeState(const Transition transition) { return fStateMachine.ChangeState(transition); } + /// @brief Request a device state transition + /// @param transition state transition + /// + /// The state transition may not happen immediately, but when the current state evaluates the + /// pending transition event and terminates. In other words, the device states are scheduled + /// cooperatively. + bool ChangeState(const std::string& transition) + { + return fStateMachine.ChangeState(GetTransition(transition)); + } + + /// @brief waits for the next state (any) to occur + State WaitForNextState() { return fStateQueue.WaitForNext(); } + /// @brief waits for the specified state to occur + /// @param state state to wait for + void WaitForState(State state) { fStateQueue.WaitForState(state); } + /// @brief waits for the specified state to occur + /// @param state state to wait for + void WaitForState(const std::string& state) { WaitForState(GetState(state)); } + + void TransitionTo(const State state); + + /// @brief Subscribe with a callback to state changes + /// @param key id to identify your subscription + /// @param callback callback (called with the new state as the parameter) + /// + /// The callback is called at the beginning of a new state. + /// The callback is called from the thread the state is running in. + void SubscribeToStateChange(const std::string& key, std::function callback) + { + fStateMachine.SubscribeToStateChange(key, callback); + } + /// @brief Unsubscribe from state changes + /// @param key id (that was used when subscribing) + void UnsubscribeFromStateChange(const std::string& key) + { + fStateMachine.UnsubscribeFromStateChange(key); + } + + /// @brief Subscribe with a callback to incoming state transitions + /// @param key id to identify your subscription + /// @param callback callback (called with the incoming transition as the parameter) + /// The callback is called when new transition is initiated. + /// The callback is called from the thread that initiates the transition (via ChangeState). + void SubscribeToNewTransition(const std::string& key, + std::function callback) + { + fStateMachine.SubscribeToNewTransition(key, callback); + } + /// @brief Unsubscribe from state transitions + /// @param key id (that was used when subscribing) + void UnsubscribeFromNewTransition(const std::string& key) + { + fStateMachine.UnsubscribeFromNewTransition(key); + } + + /// @brief Returns true if a new state has been requested, signaling the current handler to + /// stop. + bool NewStatePending() const { return fStateMachine.NewStatePending(); } + + /// @brief Returns the current state + State GetCurrentState() const { return fStateMachine.GetCurrentState(); } + /// @brief Returns the name of the current state as a string + std::string GetCurrentStateName() const { return fStateMachine.GetCurrentStateName(); } + + /// @brief Returns name of the given state as a string + /// @param state state + static std::string GetStateName(const State state) { return GetStateName(state); } + /// @brief Returns name of the given transition as a string + /// @param transition transition + static std::string GetTransitionName(const Transition transition) + { + return GetTransitionName(transition); + } + + static constexpr const char* DefaultId = ""; + static constexpr int DefaultIOThreads = 1; + static constexpr const char* DefaultTransportName = "zeromq"; + static constexpr mq::Transport DefaultTransportType = mq::Transport::ZMQ; + static constexpr const char* DefaultNetworkInterface = "default"; + static constexpr int DefaultInitTimeout = 120; + static constexpr uint64_t DefaultMaxRunTime = 0; + static constexpr float DefaultRate = 0.; + static constexpr const char* DefaultSession = "default"; + + private: + mq::Transport fDefaultTransportType; ///< Default transport for the device + StateMachine fStateMachine; + + /// Handles the initialization + void InitWrapper(); + /// Initializes binding channels + void BindWrapper(); + /// Initializes connecting channels + void ConnectWrapper(); + /// Handles the InitTask() method + void InitTaskWrapper(); + /// Handles the Run() method + void RunWrapper(); + /// Handles the ResetTask() method + void ResetTaskWrapper(); + /// Handles the Reset() method + void ResetWrapper(); + + /// Notifies transports to cease any blocking activity + void UnblockTransports(); + + /// Shuts down the transports and the device + void Exit() {} + + /// Attach (bind/connect) channels in the list + void AttachChannels(std::vector& chans); + bool AttachChannel(Channel& ch); + + void HandleSingleChannelInput(); + void HandleMultipleChannelInput(); + void HandleMultipleTransportInput(); + void PollForTransport(const TransportFactory* factory, + const std::vector& channelKeys); + + bool HandleMsgInput(const std::string& chName, const InputMsgCallback& callback, int i); + bool HandleMultipartInput(const std::string& chName, + const InputMultipartCallback& callback, + int i); + + std::vector fUninitializedBindingChannels; + std::vector fUninitializedConnectingChannels; + + bool fDataCallbacks; + std::unordered_map fMsgInputs; + std::unordered_map fMultipartInputs; + std::unordered_map> fMultitransportInputs; + std::unordered_map> fChannelRegistry; + std::vector fInputChannelKeys; + std::mutex fMultitransportMutex; + std::atomic fMultitransportProceed; + + const tools::Version fVersion; + float fRate; ///< Rate limiting for ConditionalRun + uint64_t fMaxRunRuntimeInS; ///< Maximum runtime for the Running state handler, after which + ///< state will change to Ready (in seconds, 0 for no limit). + int fInitializationTimeoutInS; + std::vector fRawCmdLineArgs; + + StateQueue fStateQueue; + + std::mutex fTransitionMtx; + bool fTransitioning; +}; + +} // namespace fair::mq + +// using FairMQChannelMap [[deprecated("Use fair::mq::ChannelMap")]] = fair::mq::ChannelMap; +// using InputMsgCallback [[deprecated("Use fair::mq::InputMsgCallback")]] = + // fair::mq::InputMsgCallback; +// using InputMultipartCallback [[deprecated("Use fair::mq::InputMultipartCallback")]] = + // fair::mq::InputMultipartCallback; +// using FairMQDevice [[deprecated("Use fair::mq::Device")]] = fair::mq::Device; +using FairMQChannelMap = fair::mq::ChannelMap; +using InputMsgCallback = fair::mq::InputMsgCallback; +using InputMultipartCallback = fair::mq::InputMultipartCallback; +using FairMQDevice = fair::mq::Device; #endif /* FAIR_MQ_DEVICE_H */ diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 6f506f29..8dcad3f1 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,442 +9,12 @@ #ifndef FAIRMQCHANNEL_H_ #define FAIRMQCHANNEL_H_ -#include -#include -#include -#include -#include -#include -#include +#if 0 +#ifndef FAIR_MQ_CHANNEL_H +#pragma GCC warning "Deprecated header: Use instead" +#endif +#endif -#include -#include // unique_ptr, shared_ptr -#include -#include -#include -#include // std::move -#include // int64_t - -/** - * @class FairMQChannel FairMQChannel.h - * @brief Wrapper class for FairMQSocket and related methods - * - * The class is not thread-safe. - */ -class FairMQChannel -{ - friend class FairMQDevice; - - public: - /// Default constructor - FairMQChannel(); - - /// Constructor - /// @param name Channel name - FairMQChannel(const std::string& name); - - /// Constructor - /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) - /// @param method Socket method (bind/connect) - /// @param address Network address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") - FairMQChannel(const std::string& type, const std::string& method, const std::string& address); - - /// Constructor - /// @param name Channel name - /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) - /// @param factory TransportFactory - FairMQChannel(const std::string& name, const std::string& type, std::shared_ptr factory); - - /// Constructor - /// @param name Channel name - /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) - /// @param method Socket method (bind/connect) - /// @param address Network address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") - /// @param factory TransportFactory - FairMQChannel(std::string name, std::string type, std::string method, std::string address, std::shared_ptr factory); - - FairMQChannel(const std::string& name, int index, const fair::mq::Properties& properties); - - /// Copy Constructor - FairMQChannel(const FairMQChannel&); - - /// Copy Constructor (with new name) - FairMQChannel(const FairMQChannel&, std::string name); - - /// Move constructor - // FairMQChannel(FairMQChannel&&) = delete; - - /// Assignment operator - FairMQChannel& operator=(const FairMQChannel&); - - /// Move assignment operator - // FairMQChannel& operator=(FairMQChannel&&) = delete; - - /// Destructor - virtual ~FairMQChannel() = default; - // { LOG(warn) << "Destroying channel '" << fName << "'"; } - - struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; }; - - FairMQSocket& GetSocket() const { assert(fSocket); return *fSocket; } - - bool Bind(const std::string& address) - { - fMethod = "bind"; - fAddress = address; - return fSocket->Bind(address); - } - - bool Connect(const std::string& address) - { - fMethod = "connect"; - fAddress = address; - return fSocket->Connect(address); - } - - /// Get channel name - /// @return Returns full channel name (e.g. "data[0]") - std::string GetName() const { return fName; } - - /// Get channel prefix - /// @return Returns channel prefix (e.g. "data" in "data[0]") - std::string GetPrefix() const - { - std::string prefix = fName; - prefix = prefix.erase(fName.rfind('[')); - return prefix; - } - - /// Get channel index - /// @return Returns channel index (e.g. 0 in "data[0]") - std::string GetIndex() const - { - std::string indexStr = fName; - indexStr.erase(indexStr.rfind(']')); - indexStr.erase(0, indexStr.rfind('[') + 1); - return indexStr; - } - - /// Get socket type - /// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) - std::string GetType() const { return fType; } - - /// Get socket method - /// @return Returns socket method (bind/connect) - std::string GetMethod() const { return fMethod; } - - /// Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") - /// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") - std::string GetAddress() const { return fAddress; } - - /// Get channel transport name ("default", "zeromq" or "shmem") - /// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem") - std::string GetTransportName() const { return fair::mq::TransportName(fTransportType); } - - /// Get channel transport type - /// @return Returns channel transport type - fair::mq::Transport GetTransportType() const { return fTransportType; } - - /// Get socket send buffer size (in number of messages) - /// @return Returns socket send buffer size (in number of messages) - int GetSndBufSize() const { return fSndBufSize; } - - /// Get socket receive buffer size (in number of messages) - /// @return Returns socket receive buffer size (in number of messages) - int GetRcvBufSize() const { return fRcvBufSize; } - - /// Get socket kernel transmit send buffer size (in bytes) - /// @return Returns socket kernel transmit send buffer size (in bytes) - int GetSndKernelSize() const { return fSndKernelSize; } - - /// Get socket kernel transmit receive buffer size (in bytes) - /// @return Returns socket kernel transmit receive buffer size (in bytes) - int GetRcvKernelSize() const { return fRcvKernelSize; } - - /// Get linger duration (in milliseconds) - /// @return Returns linger duration (in milliseconds) - int GetLinger() const { return fLinger; } - - /// Get socket rate logging interval (in seconds) - /// @return Returns socket rate logging interval (in seconds) - int GetRateLogging() const { return fRateLogging; } - - /// Get start of the port range for automatic binding - /// @return start of the port range - int GetPortRangeMin() const { return fPortRangeMin; } - - /// Get end of the port range for automatic binding - /// @return end of the port range - int GetPortRangeMax() const { return fPortRangeMax; } - - /// Set automatic binding (pick random port if bind fails) - /// @return true/false, true if automatic binding is enabled - bool GetAutoBind() const { return fAutoBind; } - - /// Set channel name - /// @param name Arbitrary channel name - void UpdateName(const std::string& name) { fName = name; Invalidate(); } - - /// Set socket type - /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) - void UpdateType(const std::string& type) { fType = type; Invalidate(); } - - /// Set socket method - /// @param method Socket method (bind/connect) - void UpdateMethod(const std::string& method) { fMethod = method; Invalidate(); } - - /// Set socket address - /// @param Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") - void UpdateAddress(const std::string& address) { fAddress = address; Invalidate(); } - - /// Set channel transport - /// @param transport transport string ("default", "zeromq" or "shmem") - void UpdateTransport(const std::string& transport) { fTransportType = fair::mq::TransportType(transport); Invalidate(); } - - /// Set socket send buffer size - /// @param sndBufSize Socket send buffer size (in number of messages) - void UpdateSndBufSize(const int sndBufSize) { fSndBufSize = sndBufSize; Invalidate(); } - - /// Set socket receive buffer size - /// @param rcvBufSize Socket receive buffer size (in number of messages) - void UpdateRcvBufSize(const int rcvBufSize) { fRcvBufSize = rcvBufSize; Invalidate(); } - - /// Set socket kernel transmit send buffer size (in bytes) - /// @param sndKernelSize Socket send buffer size (in bytes) - void UpdateSndKernelSize(const int sndKernelSize) { fSndKernelSize = sndKernelSize; Invalidate(); } - - /// Set socket kernel transmit receive buffer size (in bytes) - /// @param rcvKernelSize Socket receive buffer size (in bytes) - void UpdateRcvKernelSize(const int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); } - - /// Set linger duration (in milliseconds) - /// @param duration linger duration (in milliseconds) - void UpdateLinger(const int duration) { fLinger = duration; Invalidate(); } - - /// Set socket rate logging interval (in seconds) - /// @param rateLogging Socket rate logging interval (in seconds) - void UpdateRateLogging(const int rateLogging) { fRateLogging = rateLogging; Invalidate(); } - - /// Set start of the port range for automatic binding - /// @param minPort start of the port range - void UpdatePortRangeMin(const int minPort) { fPortRangeMin = minPort; Invalidate(); } - - /// Set end of the port range for automatic binding - /// @param maxPort end of the port range - void UpdatePortRangeMax(const int maxPort) { fPortRangeMax = maxPort; Invalidate(); } - - /// Set automatic binding (pick random port if bind fails) - /// @param autobind true/false, true to enable automatic binding - void UpdateAutoBind(const bool autobind) { fAutoBind = autobind; Invalidate(); } - - /// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel())) - /// @return true if channel settings are valid, false otherwise. - bool IsValid() const { return fValid; } - - /// Validates channel configuration - /// @return true if channel settings are valid, false otherwise. - bool Validate(); - - void Init(); - - bool ConnectEndpoint(const std::string& endpoint); - - bool BindEndpoint(std::string& endpoint); - - /// invalidates the channel (requires validation to be used again). - void Invalidate() { fValid = false; } - - /// Sends a message to the socket queue. - /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Send(FairMQMessagePtr& msg, int sndTimeoutInMs = -1) - { - CheckSendCompatibility(msg); - return fSocket->Send(msg, sndTimeoutInMs); - } - - /// Receives a message from the socket queue. - /// @param msg Constant reference of unique_ptr to a FairMQMessage - /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Receive(FairMQMessagePtr& msg, int rcvTimeoutInMs = -1) - { - CheckReceiveCompatibility(msg); - return fSocket->Receive(msg, rcvTimeoutInMs); - } - - /// Send a vector of messages - /// @param msgVec message vector reference - /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Send(std::vector& msgVec, int sndTimeoutInMs = -1) - { - CheckSendCompatibility(msgVec); - return fSocket->Send(msgVec, sndTimeoutInMs); - } - - /// Receive a vector of messages - /// @param msgVec message vector reference - /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Receive(std::vector& msgVec, int rcvTimeoutInMs = -1) - { - CheckReceiveCompatibility(msgVec); - return fSocket->Receive(msgVec, rcvTimeoutInMs); - } - - /// Send FairMQParts - /// @param parts FairMQParts reference - /// @param sndTimeoutInMs send timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Send(FairMQParts& parts, int sndTimeoutInMs = -1) - { - return Send(parts.fParts, sndTimeoutInMs); - } - - /// Receive FairMQParts - /// @param parts FairMQParts reference - /// @param rcvTimeoutInMs receive timeout in ms. -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Receive(FairMQParts& parts, int rcvTimeoutInMs = -1) - { - return Receive(parts.fParts, rcvTimeoutInMs); - } - - unsigned long GetBytesTx() const { return fSocket->GetBytesTx(); } - unsigned long GetBytesRx() const { return fSocket->GetBytesRx(); } - unsigned long GetMessagesTx() const { return fSocket->GetMessagesTx(); } - unsigned long GetMessagesRx() const { return fSocket->GetMessagesRx(); } - - auto Transport() -> FairMQTransportFactory* { return fTransportFactory.get(); }; - - template - FairMQMessagePtr NewMessage(Args&&... args) - { - return Transport()->CreateMessage(std::forward(args)...); - } - - template - FairMQMessagePtr NewSimpleMessage(const T& data) - { - return Transport()->NewSimpleMessage(data); - } - - template - FairMQMessagePtr NewStaticMessage(const T& data) - { - return Transport()->NewStaticMessage(data); - } - - template - FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args) - { - return Transport()->CreateUnmanagedRegion(std::forward(args)...); - } - - static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::DEFAULT; - static constexpr const char* DefaultTransportName = "default"; - static constexpr const char* DefaultName = ""; - static constexpr const char* DefaultType = "unspecified"; - static constexpr const char* DefaultMethod = "unspecified"; - static constexpr const char* DefaultAddress = "unspecified"; - static constexpr int DefaultSndBufSize = 1000; - static constexpr int DefaultRcvBufSize = 1000; - static constexpr int DefaultSndKernelSize = 0; - static constexpr int DefaultRcvKernelSize = 0; - static constexpr int DefaultLinger = 500; - static constexpr int DefaultRateLogging = 1; - static constexpr int DefaultPortRangeMin = 22000; - static constexpr int DefaultPortRangeMax = 23000; - static constexpr bool DefaultAutoBind = true; - - private: - std::shared_ptr fTransportFactory; - fair::mq::Transport fTransportType; - std::unique_ptr fSocket; - - std::string fName; - std::string fType; - std::string fMethod; - std::string fAddress; - int fSndBufSize; - int fRcvBufSize; - int fSndKernelSize; - int fRcvKernelSize; - int fLinger; - int fRateLogging; - int fPortRangeMin; - int fPortRangeMax; - bool fAutoBind; - - bool fValid; - - bool fMultipart; - - void CheckSendCompatibility(FairMQMessagePtr& msg) - { - if (fTransportType != msg->GetType()) { - if (msg->GetSize() > 0) { - FairMQMessagePtr msgWrapper(NewMessage( - msg->GetData(), - msg->GetSize(), - [](void* /*data*/, void* _msg) { delete static_cast(_msg); }, - msg.get() - )); - msg.release(); - msg = move(msgWrapper); - } else { - FairMQMessagePtr newMsg(NewMessage()); - msg = move(newMsg); - } - } - } - - void CheckSendCompatibility(std::vector& msgVec) - { - for (auto& msg : msgVec) { - if (fTransportType != msg->GetType()) { - if (msg->GetSize() > 0) { - FairMQMessagePtr msgWrapper(NewMessage( - msg->GetData(), - msg->GetSize(), - [](void* /*data*/, void* _msg) { delete static_cast(_msg); }, - msg.get() - )); - msg.release(); - msg = move(msgWrapper); - } else { - FairMQMessagePtr newMsg(NewMessage()); - msg = move(newMsg); - } - } - } - } - - void CheckReceiveCompatibility(FairMQMessagePtr& msg) - { - if (fTransportType != msg->GetType()) { - FairMQMessagePtr newMsg(NewMessage()); - msg = move(newMsg); - } - } - - void CheckReceiveCompatibility(std::vector& msgVec) - { - for (auto& msg : msgVec) { - if (fTransportType != msg->GetType()) { - FairMQMessagePtr newMsg(NewMessage()); - msg = move(newMsg); - } - } - } - - void InitTransport(std::shared_ptr factory) - { - fTransportFactory = factory; - fTransportType = factory->GetType(); - } -}; +#include #endif /* FAIRMQCHANNEL_H_ */ diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 6e6e205a..2be20ceb 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2012-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2012-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,545 +9,12 @@ #ifndef FAIRMQDEVICE_H_ #define FAIRMQDEVICE_H_ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#if 0 +#ifndef FAIR_MQ_DEVICE_H +#pragma GCC warning "Deprecated header: Use instead" +#endif +#endif -#include -#include // unique_ptr -#include // find -#include -#include -#include -#include -#include -#include -#include -#include -#include // pair - -using FairMQChannelMap = std::unordered_map>; - -using InputMsgCallback = std::function; -using InputMultipartCallback = std::function; - -namespace fair::mq -{ -struct OngoingTransition : std::runtime_error { using std::runtime_error::runtime_error; }; -} - -class FairMQDevice -{ - friend class FairMQChannel; - - public: - /// Default constructor - FairMQDevice(); - /// Constructor with external fair::mq::ProgOptions - FairMQDevice(fair::mq::ProgOptions& config); - - /// Constructor that sets the version - FairMQDevice(const fair::mq::tools::Version version); - - /// Constructor that sets the version and external fair::mq::ProgOptions - FairMQDevice(fair::mq::ProgOptions& config, const fair::mq::tools::Version version); - - private: - FairMQDevice(fair::mq::ProgOptions* config, const fair::mq::tools::Version version); - - public: - /// Copy constructor (disabled) - FairMQDevice(const FairMQDevice&) = delete; - /// Assignment operator (disabled) - FairMQDevice operator=(const FairMQDevice&) = delete; - /// Default destructor - virtual ~FairMQDevice(); - - /// Outputs the socket transfer rates - virtual void LogSocketRates(); - - template - void Serialize(FairMQMessage& msg, DataType&& data, Args&&... args) const - { - Serializer().Serialize(msg, std::forward(data), std::forward(args)...); - } - - template - void Deserialize(FairMQMessage& msg, DataType&& data, Args&&... args) const - { - Deserializer().Deserialize(msg, std::forward(data), std::forward(args)...); - } - - /// Shorthand method to send `msg` on `chan` at index `i` - /// @param msg message reference - /// @param chan channel name - /// @param i channel index - /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Send(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) - { - return GetChannel(channel, index).Send(msg, sndTimeoutInMs); - } - - /// Shorthand method to receive `msg` on `chan` at index `i` - /// @param msg message reference - /// @param chan channel name - /// @param i channel index - /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Receive(FairMQMessagePtr& msg, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) - { - return GetChannel(channel, index).Receive(msg, rcvTimeoutInMs); - } - - /// Shorthand method to send FairMQParts on `chan` at index `i` - /// @param parts parts reference - /// @param chan channel name - /// @param i channel index - /// @param sndTimeoutInMs send timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot send) - /// @return Number of bytes that have been queued, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Send(FairMQParts& parts, const std::string& channel, const int index = 0, int sndTimeoutInMs = -1) - { - return GetChannel(channel, index).Send(parts.fParts, sndTimeoutInMs); - } - - /// Shorthand method to receive FairMQParts on `chan` at index `i` - /// @param parts parts reference - /// @param chan channel name - /// @param i channel index - /// @param rcvTimeoutInMs receive timeout in ms, -1 will wait forever (or until interrupt (e.g. via state change)), 0 will not wait (return immediately if cannot receive) - /// @return Number of bytes that have been received, TransferCode::timeout if timed out, TransferCode::error if there was an error, TransferCode::interrupted if interrupted (e.g. by requested state change) - int64_t Receive(FairMQParts& parts, const std::string& channel, const int index = 0, int rcvTimeoutInMs = -1) - { - return GetChannel(channel, index).Receive(parts.fParts, rcvTimeoutInMs); - } - - /// @brief Getter for default transport factory - auto Transport() const -> FairMQTransportFactory* - { - return fTransportFactory.get(); - } - - // creates message with the default device transport - template - FairMQMessagePtr NewMessage(Args&&... args) - { - return Transport()->CreateMessage(std::forward(args)...); - } - - // creates message with the transport of the specified channel - template - FairMQMessagePtr NewMessageFor(const std::string& channel, int index, Args&&... args) - { - return GetChannel(channel, index).NewMessage(std::forward(args)...); - } - - // creates a message that will not be cleaned up after transfer, with the default device transport - template - FairMQMessagePtr NewStaticMessage(const T& data) - { - return Transport()->NewStaticMessage(data); - } - - // creates a message that will not be cleaned up after transfer, with the transport of the specified channel - template - FairMQMessagePtr NewStaticMessageFor(const std::string& channel, int index, const T& data) - { - return GetChannel(channel, index).NewStaticMessage(data); - } - - // creates a message with a copy of the provided data, with the default device transport - template - FairMQMessagePtr NewSimpleMessage(const T& data) - { - return Transport()->NewSimpleMessage(data); - } - - // creates a message with a copy of the provided data, with the transport of the specified channel - template - FairMQMessagePtr NewSimpleMessageFor(const std::string& channel, int index, const T& data) - { - return GetChannel(channel, index).NewSimpleMessage(data); - } - - // creates unamanaged region with the default device transport - template - FairMQUnmanagedRegionPtr NewUnmanagedRegion(Args&&... args) - { - return Transport()->CreateUnmanagedRegion(std::forward(args)...); - } - - // creates unmanaged region with the transport of the specified channel - template - FairMQUnmanagedRegionPtr NewUnmanagedRegionFor(const std::string& channel, int index, Args&&... args) - { - return GetChannel(channel, index).NewUnmanagedRegion(std::forward(args)...); - } - - template - FairMQPollerPtr NewPoller(const Ts&... inputs) - { - std::vector chans{inputs...}; - - // if more than one channel provided, check compatibility - if (chans.size() > 1) - { - fair::mq::Transport type = GetChannel(chans.at(0), 0).Transport()->GetType(); - - for (unsigned int i = 1; i < chans.size(); ++i) - { - if (type != GetChannel(chans.at(i), 0).Transport()->GetType()) - { - LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state."; - throw std::runtime_error("poller failed: different transports within same poller are not yet supported."); - } - } - } - - return GetChannel(chans.at(0), 0).Transport()->CreatePoller(fChannels, chans); - } - - FairMQPollerPtr NewPoller(const std::vector& channels) - { - // if more than one channel provided, check compatibility - if (channels.size() > 1) - { - fair::mq::Transport type = channels.at(0)->Transport()->GetType(); - - for (unsigned int i = 1; i < channels.size(); ++i) - { - if (type != channels.at(i)->Transport()->GetType()) - { - LOG(error) << "poller failed: different transports within same poller are not yet supported. Going to ERROR state."; - throw std::runtime_error("poller failed: different transports within same poller are not yet supported."); - } - } - } - - return channels.at(0)->Transport()->CreatePoller(channels); - } - - /// Adds a transport to the device if it doesn't exist - /// @param transport Transport string ("zeromq"/"shmem") - std::shared_ptr AddTransport(const fair::mq::Transport transport); - - /// Assigns config to the device - void SetConfig(fair::mq::ProgOptions& config); - /// Get pointer to the config - fair::mq::ProgOptions* GetConfig() const - { - return fConfig; - } - - // overload to easily bind member functions - template - void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index)) - { - fDataCallbacks = true; - fMsgInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQMessagePtr& msg, int index) - { - return (static_cast(this)->*memberFunction)(msg, index); - })); - - if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) - { - fInputChannelKeys.push_back(channelName); - } - } - - void OnData(const std::string& channelName, InputMsgCallback callback) - { - fDataCallbacks = true; - fMsgInputs.insert(make_pair(channelName, callback)); - - if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) - { - fInputChannelKeys.push_back(channelName); - } - } - - // overload to easily bind member functions - template - void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQParts& parts, int index)) - { - fDataCallbacks = true; - fMultipartInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQParts& parts, int index) - { - return (static_cast(this)->*memberFunction)(parts, index); - })); - - if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) - { - fInputChannelKeys.push_back(channelName); - } - } - - void OnData(const std::string& channelName, InputMultipartCallback callback) - { - fDataCallbacks = true; - fMultipartInputs.insert(make_pair(channelName, callback)); - - if (find(fInputChannelKeys.begin(), fInputChannelKeys.end(), channelName) == fInputChannelKeys.end()) - { - fInputChannelKeys.push_back(channelName); - } - } - - FairMQChannel& GetChannel(const std::string& channelName, const int index = 0) - try { - return fChannels.at(channelName).at(index); - } catch (const std::out_of_range& oor) { - LOG(error) << "requested channel has not been configured? check channel names/configuration."; - LOG(error) << "channel: " << channelName << ", index: " << index; - LOG(error) << "out of range: " << oor.what(); - throw; - } - - virtual void RegisterChannelEndpoints() {} - - bool RegisterChannelEndpoint(const std::string& channelName, uint16_t minNumSubChannels = 1, uint16_t maxNumSubChannels = 1) - { - bool ok = fChannelRegistry.insert(std::make_pair(channelName, std::make_pair(minNumSubChannels, maxNumSubChannels))).second; - if (!ok) { - LOG(warn) << "Registering channel: name already registered: \"" << channelName << "\""; - } - return ok; - } - - void PrintRegisteredChannels() - { - if (fChannelRegistry.empty()) { - LOGV(info, verylow) << "no channels registered."; - } else { - for (const auto& c : fChannelRegistry) { - LOGV(info, verylow) << c.first << ":" << c.second.first << ":" << c.second.second; - } - } - } - - void SetId(const std::string& id) { fId = id; } - std::string GetId() { return fId; } - - const fair::mq::tools::Version GetVersion() const { return fVersion; } - - void SetNumIoThreads(int numIoThreads) { fConfig->SetProperty("io-threads", numIoThreads);} - int GetNumIoThreads() const { return fConfig->GetProperty("io-threads", DefaultIOThreads); } - - void SetNetworkInterface(const std::string& networkInterface) { fConfig->SetProperty("network-interface", networkInterface); } - std::string GetNetworkInterface() const { return fConfig->GetProperty("network-interface", DefaultNetworkInterface); } - - void SetDefaultTransport(const std::string& name) { fConfig->SetProperty("transport", name); } - std::string GetDefaultTransport() const { return fConfig->GetProperty("transport", DefaultTransportName); } - - void SetInitTimeoutInS(int initTimeoutInS) { fConfig->SetProperty("init-timeout", initTimeoutInS); } - int GetInitTimeoutInS() const { return fConfig->GetProperty("init-timeout", DefaultInitTimeout); } - - /// Sets the default transport for the device - /// @param transport Transport string ("zeromq"/"shmem") - void SetTransport(const std::string& transport) { fConfig->SetProperty("transport", transport); } - /// Gets the default transport name - std::string GetTransportName() const { return fConfig->GetProperty("transport", DefaultTransportName); } - - void SetRawCmdLineArgs(const std::vector& args) { fRawCmdLineArgs = args; } - std::vector GetRawCmdLineArgs() const { return fRawCmdLineArgs; } - - void RunStateMachine() - { - fStateMachine.ProcessWork(); - }; - - /// Wait for the supplied amount of time or for interruption. - /// If interrupted, returns false, otherwise true. - /// @param duration wait duration - template - bool WaitFor(std::chrono::duration const& duration) - { - return !fStateMachine.WaitForPendingStateFor(std::chrono::duration_cast(duration).count()); - } - - protected: - std::shared_ptr fTransportFactory; ///< Default transport factory - std::unordered_map> fTransports; ///< Container for transports - - public: - std::unordered_map> fChannels; ///< Device channels - std::unique_ptr fInternalConfig; ///< Internal program options configuration - fair::mq::ProgOptions* fConfig; ///< Pointer to config (internal or external) - - void AddChannel(const std::string& name, FairMQChannel&& channel) - { - fConfig->AddChannel(name, channel); - } - - protected: - std::string fId; ///< Device ID - - /// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask(). - virtual void Init() {} - - virtual void Bind() {} - - virtual void Connect() {} - - /// Task initialization (can be overloaded in child classes) - virtual void InitTask() {} - - /// Runs the device (to be overloaded in child classes) - virtual void Run() {} - - /// Called in the RUNNING state once before executing the Run()/ConditionalRun() method - virtual void PreRun() {} - - /// Called during RUNNING state repeatedly until it returns false or device state changes - virtual bool ConditionalRun() { return false; } - - /// Called in the RUNNING state once after executing the Run()/ConditionalRun() method - virtual void PostRun() {} - - /// Resets the user task (to be overloaded in child classes) - virtual void ResetTask() {} - - /// Resets the device (can be overloaded in child classes) - virtual void Reset() {} - - public: - /// @brief Request a device state transition - /// @param transition state transition - /// - /// The state transition may not happen immediately, but when the current state evaluates the - /// pending transition event and terminates. In other words, the device states are scheduled cooperatively. - bool ChangeState(const fair::mq::Transition transition) { return fStateMachine.ChangeState(transition); } - /// @brief Request a device state transition - /// @param transition state transition - /// - /// The state transition may not happen immediately, but when the current state evaluates the - /// pending transition event and terminates. In other words, the device states are scheduled cooperatively. - bool ChangeState(const std::string& transition) { return fStateMachine.ChangeState(fair::mq::GetTransition(transition)); } - - /// @brief waits for the next state (any) to occur - fair::mq::State WaitForNextState() { return fStateQueue.WaitForNext(); } - /// @brief waits for the specified state to occur - /// @param state state to wait for - void WaitForState(fair::mq::State state) { fStateQueue.WaitForState(state); } - /// @brief waits for the specified state to occur - /// @param state state to wait for - void WaitForState(const std::string& state) { WaitForState(fair::mq::GetState(state)); } - - void TransitionTo(const fair::mq::State state); - - /// @brief Subscribe with a callback to state changes - /// @param key id to identify your subscription - /// @param callback callback (called with the new state as the parameter) - /// - /// The callback is called at the beginning of a new state. - /// The callback is called from the thread the state is running in. - void SubscribeToStateChange(const std::string& key, std::function callback) { fStateMachine.SubscribeToStateChange(key, callback); } - /// @brief Unsubscribe from state changes - /// @param key id (that was used when subscribing) - void UnsubscribeFromStateChange(const std::string& key) { fStateMachine.UnsubscribeFromStateChange(key); } - - /// @brief Subscribe with a callback to incoming state transitions - /// @param key id to identify your subscription - /// @param callback callback (called with the incoming transition as the parameter) - /// The callback is called when new transition is initiated. - /// The callback is called from the thread that initiates the transition (via ChangeState). - void SubscribeToNewTransition(const std::string& key, std::function callback) { fStateMachine.SubscribeToNewTransition(key, callback); } - /// @brief Unsubscribe from state transitions - /// @param key id (that was used when subscribing) - void UnsubscribeFromNewTransition(const std::string& key) { fStateMachine.UnsubscribeFromNewTransition(key); } - - /// @brief Returns true if a new state has been requested, signaling the current handler to stop. - bool NewStatePending() const { return fStateMachine.NewStatePending(); } - - /// @brief Returns the current state - fair::mq::State GetCurrentState() const { return fStateMachine.GetCurrentState(); } - /// @brief Returns the name of the current state as a string - std::string GetCurrentStateName() const { return fStateMachine.GetCurrentStateName(); } - - /// @brief Returns name of the given state as a string - /// @param state state - static std::string GetStateName(const fair::mq::State state) { return fair::mq::GetStateName(state); } - /// @brief Returns name of the given transition as a string - /// @param transition transition - static std::string GetTransitionName(const fair::mq::Transition transition) { return fair::mq::GetTransitionName(transition); } - - static constexpr const char* DefaultId = ""; - static constexpr int DefaultIOThreads = 1; - static constexpr const char* DefaultTransportName = "zeromq"; - static constexpr fair::mq::Transport DefaultTransportType = fair::mq::Transport::ZMQ; - static constexpr const char* DefaultNetworkInterface = "default"; - static constexpr int DefaultInitTimeout = 120; - static constexpr uint64_t DefaultMaxRunTime = 0; - static constexpr float DefaultRate = 0.; - static constexpr const char* DefaultSession = "default"; - - private: - fair::mq::Transport fDefaultTransportType; ///< Default transport for the device - fair::mq::StateMachine fStateMachine; - - /// Handles the initialization - void InitWrapper(); - /// Initializes binding channels - void BindWrapper(); - /// Initializes connecting channels - void ConnectWrapper(); - /// Handles the InitTask() method - void InitTaskWrapper(); - /// Handles the Run() method - void RunWrapper(); - /// Handles the ResetTask() method - void ResetTaskWrapper(); - /// Handles the Reset() method - void ResetWrapper(); - - /// Notifies transports to cease any blocking activity - void UnblockTransports(); - - /// Shuts down the transports and the device - void Exit() {} - - /// Attach (bind/connect) channels in the list - void AttachChannels(std::vector& chans); - bool AttachChannel(FairMQChannel& ch); - - void HandleSingleChannelInput(); - void HandleMultipleChannelInput(); - void HandleMultipleTransportInput(); - void PollForTransport(const FairMQTransportFactory* factory, const std::vector& channelKeys); - - bool HandleMsgInput(const std::string& chName, const InputMsgCallback& callback, int i); - bool HandleMultipartInput(const std::string& chName, const InputMultipartCallback& callback, int i); - - std::vector fUninitializedBindingChannels; - std::vector fUninitializedConnectingChannels; - - bool fDataCallbacks; - std::unordered_map fMsgInputs; - std::unordered_map fMultipartInputs; - std::unordered_map> fMultitransportInputs; - std::unordered_map> fChannelRegistry; - std::vector fInputChannelKeys; - std::mutex fMultitransportMutex; - std::atomic fMultitransportProceed; - - const fair::mq::tools::Version fVersion; - float fRate; ///< Rate limiting for ConditionalRun - uint64_t fMaxRunRuntimeInS; ///< Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit). - int fInitializationTimeoutInS; - std::vector fRawCmdLineArgs; - - fair::mq::StateQueue fStateQueue; - - std::mutex fTransitionMtx; - bool fTransitioning; -}; +#include #endif /* FAIRMQDEVICE_H_ */ diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx deleted file mode 100644 index a4cb7409..00000000 --- a/fairmq/FairMQLogger.cxx +++ /dev/null @@ -1,15 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQLogger.cxx - * - * @since 2012-12-04 - * @author D. Klein, A. Rybalchenko - */ - -#include "FairMQLogger.h" diff --git a/fairmq/FairMQMessage.cxx b/fairmq/FairMQMessage.cxx deleted file mode 100644 index bc9d4147..00000000 --- a/fairmq/FairMQMessage.cxx +++ /dev/null @@ -1,13 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQMessage.cxx - * - * @since 2012-12-05 - * @author D. Klein, A. Rybalchenko - */ diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index 8cdea93d..08f809ca 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,65 +9,12 @@ #ifndef FAIRMQMESSAGE_H_ #define FAIRMQMESSAGE_H_ -#include // for size_t -#include // unique_ptr -#include +#if 0 +#ifndef FAIR_MQ_MESSAGE_H +#pragma GCC warning "Deprecated header: Use instead" +#endif +#endif -#include - -using fairmq_free_fn = void(void* data, void* hint); -class FairMQTransportFactory; - -namespace fair::mq -{ - -struct Alignment -{ - size_t alignment; - explicit operator size_t() const { return alignment; } -}; - -} // namespace fair::mq - -class FairMQMessage -{ - public: - FairMQMessage() = default; - FairMQMessage(FairMQTransportFactory* factory) : fTransport(factory) {} - - virtual void Rebuild() = 0; - virtual void Rebuild(fair::mq::Alignment alignment) = 0; - virtual void Rebuild(const size_t size) = 0; - virtual void Rebuild(const size_t size, fair::mq::Alignment alignment) = 0; - virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0; - - virtual void* GetData() const = 0; - virtual size_t GetSize() const = 0; - - virtual bool SetUsedSize(const size_t size) = 0; - - virtual fair::mq::Transport GetType() const = 0; - FairMQTransportFactory* GetTransport() { return fTransport; } - void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } - - virtual void Copy(const FairMQMessage& msg) = 0; - - virtual ~FairMQMessage() = default; - - private: - FairMQTransportFactory* fTransport{nullptr}; -}; - -using FairMQMessagePtr = std::unique_ptr; - -namespace fair::mq -{ - -using Message = FairMQMessage; -using MessagePtr = FairMQMessagePtr; -struct MessageError : std::runtime_error { using std::runtime_error::runtime_error; }; -struct MessageBadAlloc : std::runtime_error { using std::runtime_error::runtime_error; }; - -} // namespace fair::mq +#include #endif /* FAIRMQMESSAGE_H_ */ diff --git a/fairmq/FairMQParts.h b/fairmq/FairMQParts.h index 4f5238d2..dce68a46 100644 --- a/fairmq/FairMQParts.h +++ b/fairmq/FairMQParts.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,90 +9,12 @@ #ifndef FAIRMQPARTS_H_ #define FAIRMQPARTS_H_ -#include "FairMQTransportFactory.h" -#include "FairMQMessage.h" +#if 0 +#ifndef FAIR_MQ_PARTS_H +#pragma GCC warning "Deprecated header: Use instead" +#endif +#endif -#include -#include // unique_ptr - -/// FairMQParts is a lightweight convenience wrapper around a vector of unique pointers to FairMQMessage, used for sending multi-part messages - -class FairMQParts -{ - private: - using container = std::vector>; - - public: - /// Default constructor - FairMQParts() = default; - /// Copy Constructor - FairMQParts(const FairMQParts&) = delete; - /// Move constructor - FairMQParts(FairMQParts&& p) = default; - /// Assignment operator - FairMQParts& operator=(const FairMQParts&) = delete; - /// Constructor from argument pack of std::unique_ptr rvalues - template - FairMQParts(Ts&&... messages) { AddPart(std::forward(messages)...); } - /// Default destructor - ~FairMQParts() = default; - - /// Adds part (FairMQMessage) to the container - /// @param msg message pointer (for example created with NewMessage() method of FairMQDevice) - void AddPart(FairMQMessage* msg) - { - fParts.push_back(std::unique_ptr(msg)); - } - - /// Adds part (std::unique_ptr&) to the container (move) - /// @param msg unique pointer to FairMQMessage - /// rvalue ref (move required when passing argument) - void AddPart(std::unique_ptr&& msg) - { - fParts.push_back(std::move(msg)); - } - - /// Add variable list of parts to the container (move) - template - void AddPart(std::unique_ptr&& first, Ts&&... remaining) - { - AddPart(std::move(first)); - AddPart(std::forward(remaining)...); - } - - /// Add content of another object by move - void AddPart(FairMQParts&& other) - { - container parts = std::move(other.fParts); - for (auto& part : parts) { - fParts.push_back(std::move(part)); - } - } - - /// Get reference to part in the container at index (without bounds check) - /// @param index container index - FairMQMessage& operator[](const int index) { return *(fParts[index]); } - - /// Get reference to unique pointer to part in the container at index (with bounds check) - /// @param index container index - std::unique_ptr& At(const int index) { return fParts.at(index); } - - // ref version - FairMQMessage& AtRef(const int index) { return *(fParts.at(index)); } - - /// Get number of parts in the container - /// @return number of parts in the container - int Size() const { return fParts.size(); } - - container fParts; - - // forward container iterators - using iterator = container::iterator; - using const_iterator = container::const_iterator; - auto begin() -> decltype(fParts.begin()) { return fParts.begin(); } - auto end() -> decltype(fParts.end()) { return fParts.end(); } - auto cbegin() -> decltype(fParts.cbegin()) { return fParts.cbegin(); } - auto cend() -> decltype(fParts.cend()) { return fParts.cend(); } -}; +#include #endif /* FAIRMQPARTS_H_ */ diff --git a/fairmq/FairMQPoller.cxx b/fairmq/FairMQPoller.cxx deleted file mode 100644 index 9f9687a6..00000000 --- a/fairmq/FairMQPoller.cxx +++ /dev/null @@ -1,13 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQPoller.cxx - * - * @since 2014-01-23 - * @author A. Rybalchenko - */ diff --git a/fairmq/FairMQPoller.h b/fairmq/FairMQPoller.h index 64be63f9..5e9d91bf 100644 --- a/fairmq/FairMQPoller.h +++ b/fairmq/FairMQPoller.h @@ -9,31 +9,12 @@ #ifndef FAIRMQPOLLER_H_ #define FAIRMQPOLLER_H_ -#include -#include -#include +#if 0 +#ifndef FAIR_MQ_POLLER_H +#pragma GCC warning "Deprecated header: Use instead" +#endif +#endif -class FairMQPoller -{ - public: - virtual void Poll(const int timeout) = 0; - virtual bool CheckInput(const int index) = 0; - virtual bool CheckOutput(const int index) = 0; - virtual bool CheckInput(const std::string& channelKey, const int index) = 0; - virtual bool CheckOutput(const std::string& channelKey, const int index) = 0; - - virtual ~FairMQPoller() = default; -}; - -using FairMQPollerPtr = std::unique_ptr; - -namespace fair::mq -{ - -using Poller = FairMQPoller; -using PollerPtr = FairMQPollerPtr; -struct PollerError : std::runtime_error { using std::runtime_error::runtime_error; }; - -} // namespace fair::mq +#include #endif /* FAIRMQPOLLER_H_ */ diff --git a/fairmq/FairMQSocket.cxx b/fairmq/FairMQSocket.cxx deleted file mode 100644 index b318cc68..00000000 --- a/fairmq/FairMQSocket.cxx +++ /dev/null @@ -1,9 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index be07cbbb..d94cb657 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,88 +9,12 @@ #ifndef FAIRMQSOCKET_H_ #define FAIRMQSOCKET_H_ -#include "FairMQMessage.h" +#if 0 +#ifndef FAIR_MQ_SOCKET_H +#pragma GCC warning "Deprecated header: Use instead" +#endif +#endif -#include -#include -#include -#include -#include - -class FairMQTransportFactory; - -namespace fair::mq -{ - -enum class TransferCode : int -{ - success = 0, - error = -1, - timeout = -2, - interrupted = -3 -}; - -} // namespace fair::mq - -class FairMQSocket -{ - public: - FairMQSocket() = default; - FairMQSocket(FairMQTransportFactory* fac) : fTransport(fac) {} - - virtual std::string GetId() const = 0; - - virtual bool Bind(const std::string& address) = 0; - virtual bool Connect(const std::string& address) = 0; - - virtual int64_t Send(FairMQMessagePtr& msg, int timeout = -1) = 0; - virtual int64_t Receive(FairMQMessagePtr& msg, int timeout = -1) = 0; - virtual int64_t Send(std::vector>& msgVec, int timeout = -1) = 0; - virtual int64_t Receive(std::vector>& msgVec, int timeout = -1) = 0; - - virtual void Close() = 0; - - virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0; - virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0; - - /// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value - /// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in - /// the future. - virtual int Events(uint32_t* events) = 0; - virtual void SetLinger(const int value) = 0; - virtual int GetLinger() const = 0; - virtual void SetSndBufSize(const int value) = 0; - virtual int GetSndBufSize() const = 0; - virtual void SetRcvBufSize(const int value) = 0; - virtual int GetRcvBufSize() const = 0; - virtual void SetSndKernelSize(const int value) = 0; - virtual int GetSndKernelSize() const = 0; - virtual void SetRcvKernelSize(const int value) = 0; - virtual int GetRcvKernelSize() const = 0; - - virtual unsigned long GetBytesTx() const = 0; - virtual unsigned long GetBytesRx() const = 0; - virtual unsigned long GetMessagesTx() const = 0; - virtual unsigned long GetMessagesRx() const = 0; - - FairMQTransportFactory* GetTransport() { return fTransport; } - void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } - - virtual ~FairMQSocket() = default; - - private: - FairMQTransportFactory* fTransport{nullptr}; -}; - -using FairMQSocketPtr = std::unique_ptr; - -namespace fair::mq -{ - -using Socket = FairMQSocket; -using SocketPtr = FairMQSocketPtr; -struct SocketError : std::runtime_error { using std::runtime_error::runtime_error; }; - -} // namespace fair::mq +#include #endif /* FAIRMQSOCKET_H_ */ diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 817f0c9c..9799f585 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,174 +9,12 @@ #ifndef FAIRMQTRANSPORTFACTORY_H_ #define FAIRMQTRANSPORTFACTORY_H_ -#include -#include -#include -#include -#include -#include +#if 0 +#ifndef FAIR_MQ_TRANSPORTFACTORY_H +#pragma GCC warning "Deprecated header: Use instead" +#endif +#endif -#include -#include // shared_ptr -#include -#include -#include -#include // size_t - -class FairMQChannel; -namespace fair::mq { class ProgOptions; } - -class FairMQTransportFactory -{ - private: - /// Topology wide unique id - const std::string fkId; - - /// The polymorphic memory resource associated with the transport - fair::mq::ChannelResource fMemoryResource{this}; - - public: - /// ctor - /// @param id Topology wide unique id, usually the device id. - FairMQTransportFactory(std::string id); - - auto GetId() const -> const std::string { return fkId; }; - - /// Get a pointer to the associated polymorphic memory resource - fair::mq::ChannelResource* GetMemoryResource() { return &fMemoryResource; } - operator fair::mq::ChannelResource*() { return &fMemoryResource; } - - /// @brief Create empty FairMQMessage (for receiving) - /// @return pointer to FairMQMessage - virtual FairMQMessagePtr CreateMessage() = 0; - /// @brief Create empty FairMQMessage (for receiving), align received buffer to specified alignment - /// @param alignment alignment to align received buffer to - /// @return pointer to FairMQMessage - virtual FairMQMessagePtr CreateMessage(fair::mq::Alignment alignment) = 0; - /// @brief Create new FairMQMessage of specified size - /// @param size message size - /// @return pointer to FairMQMessage - virtual FairMQMessagePtr CreateMessage(const size_t size) = 0; - /// @brief Create new FairMQMessage of specified size and alignment - /// @param size message size - /// @param alignment message alignment - /// @return pointer to FairMQMessage - virtual FairMQMessagePtr CreateMessage(const size_t size, fair::mq::Alignment alignment) = 0; - /// @brief Create new FairMQMessage with user provided buffer and size - /// @param data pointer to user provided buffer - /// @param size size of the user provided buffer - /// @param ffn callback, called when the message is transfered (and can be deleted) - /// @param obj optional helper pointer that can be used in the callback - /// @return pointer to FairMQMessage - virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0; - /// @brief create a message with the buffer located within the corresponding unmanaged region - /// @param unmanagedRegion the unmanaged region that this message buffer belongs to - /// @param data message buffer (must be within the region - checked at runtime by the transport) - /// @param size size of the message - /// @param hint optional parameter, returned to the user in the FairMQRegionCallback - virtual FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& unmanagedRegion, void* data, const size_t size, void* hint = 0) = 0; - - /// @brief Create a socket - virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) = 0; - - /// @brief Create a poller for a single channel (all subchannels) - virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const = 0; - /// @brief Create a poller for specific channels - virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const = 0; - /// @brief Create a poller for specific channels (all subchannels) - virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const = 0; - - /// @brief Create new UnmanagedRegion - /// @param size size of the region - /// @param callback callback to be called when a message belonging to this region is no longer needed by the transport - /// @param path optional parameter to pass to the underlying transport - /// @param flags optional parameter to pass to the underlying transport - /// @return pointer to UnmanagedRegion - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0; - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0; - /// @brief Create new UnmanagedRegion - /// @param size size of the region - /// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user - /// @param callback callback to be called when a message belonging to this region is no longer needed by the transport - /// @param path optional parameter to pass to the underlying transport - /// @param flags optional parameter to pass to the underlying transport - /// @return pointer to UnmanagedRegion - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0; - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0; - - /// @brief Subscribe to region events (creation, destruction, ...) - /// @param callback the callback that is called when a region event occurs - virtual void SubscribeToRegionEvents(FairMQRegionEventCallback callback) = 0; - /// @brief Check if there is an active subscription to region events - /// @return true/false - virtual bool SubscribedToRegionEvents() = 0; - /// @brief Unsubscribe from region events - virtual void UnsubscribeFromRegionEvents() = 0; - - virtual std::vector GetRegionInfo() = 0; - - /// Get transport type - virtual fair::mq::Transport GetType() const = 0; - - virtual void Interrupt() = 0; - virtual void Resume() = 0; - virtual void Reset() = 0; - - virtual ~FairMQTransportFactory() = default; - - static auto CreateTransportFactory(const std::string& type, const std::string& id = "", const fair::mq::ProgOptions* config = nullptr) -> std::shared_ptr; - - static void FairMQNoCleanup(void* /*data*/, void* /*obj*/) - { - } - - template - static void FairMQSimpleMsgCleanup(void* /*data*/, void* obj) - { - delete static_cast(obj); - } - - template - FairMQMessagePtr NewSimpleMessage(const T& data) - { - // todo: is_trivially_copyable not available on gcc < 5, workaround? - // static_assert(std::is_trivially_copyable::value, "The argument type for NewSimpleMessage has to be trivially copyable!"); - T* dataCopy = new T(data); - return CreateMessage(dataCopy, sizeof(T), FairMQSimpleMsgCleanup, dataCopy); - } - - template - FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) - { - std::string* msgStr = new std::string(data); - return CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); - } - - FairMQMessagePtr NewSimpleMessage(const std::string& str) - { - - std::string* msgStr = new std::string(str); - return CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); - } - - template - FairMQMessagePtr NewStaticMessage(const T& data) - { - return CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr); - } - - FairMQMessagePtr NewStaticMessage(const std::string& str) - { - return CreateMessage(const_cast(str.c_str()), str.length(), FairMQNoCleanup, nullptr); - } -}; - -namespace fair::mq -{ - -using TransportFactory = FairMQTransportFactory; -struct TransportFactoryError : std::runtime_error { using std::runtime_error::runtime_error; }; - -} // namespace fair::mq +#include #endif /* FAIRMQTRANSPORTFACTORY_H_ */ diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index 2f240926..7a5bade2 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -9,120 +9,12 @@ #ifndef FAIRMQUNMANAGEDREGION_H_ #define FAIRMQUNMANAGEDREGION_H_ -#include // size_t -#include // uint32_t -#include -#include // std::function -#include // std::unique_ptr -#include // std::ostream -#include +#if 0 +#ifndef FAIR_MQ_UNMANAGEDREGION_H +#pragma GCC warning "Deprecated header: Use instead" +#endif +#endif -class FairMQTransportFactory; - -enum class FairMQRegionEvent : int -{ - created, - destroyed, - local_only -}; - -struct FairMQRegionInfo -{ - FairMQRegionInfo() = default; - - FairMQRegionInfo(bool _managed, uint64_t _id, void* _ptr, size_t _size, int64_t _flags, FairMQRegionEvent _event) - : managed(_managed) - , id(_id) - , ptr(_ptr) - , size(_size) - , flags(_flags) - , event(_event) - {} - - bool managed = true; // managed/unmanaged - uint64_t id = 0; // id of the region - void* ptr = nullptr; // pointer to the start of the region - size_t size = 0; // region size - int64_t flags = 0; // custom flags set by the creator - FairMQRegionEvent event = FairMQRegionEvent::created; -}; - -struct FairMQRegionBlock { - void* ptr; - size_t size; - void* hint; - - FairMQRegionBlock(void* p, size_t s, void* h) - : ptr(p), size(s), hint(h) - {} -}; - -using FairMQRegionCallback = std::function; -using FairMQRegionBulkCallback = std::function&)>; -using FairMQRegionEventCallback = std::function; - -class FairMQUnmanagedRegion -{ - public: - FairMQUnmanagedRegion() = default; - FairMQUnmanagedRegion(FairMQTransportFactory* factory) : fTransport(factory) {} - - virtual void* GetData() const = 0; - virtual size_t GetSize() const = 0; - virtual uint16_t GetId() const = 0; - virtual void SetLinger(uint32_t linger) = 0; - virtual uint32_t GetLinger() const = 0; - - virtual fair::mq::Transport GetType() const = 0; - FairMQTransportFactory* GetTransport() { return fTransport; } - void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; } - - virtual ~FairMQUnmanagedRegion() = default; - - private: - FairMQTransportFactory* fTransport{nullptr}; -}; - -using FairMQUnmanagedRegionPtr = std::unique_ptr; - -inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event) -{ - switch (event) { - case FairMQRegionEvent::created: - return os << "created"; - case FairMQRegionEvent::destroyed: - return os << "destroyed"; - case FairMQRegionEvent::local_only: - return os << "local_only"; - default: - return os << "unrecognized event"; - } -} - -namespace fair::mq -{ - -struct RegionConfig -{ - RegionConfig() = default; - - RegionConfig(bool l, bool z) - : lock(l), zero(z) - {} - - bool lock = false; - bool zero = false; -}; - -using RegionCallback = FairMQRegionCallback; -using RegionBulkCallback = FairMQRegionBulkCallback; -using RegionEventCallback = FairMQRegionEventCallback; -using RegionEvent = FairMQRegionEvent; -using RegionInfo = FairMQRegionInfo; -using RegionBlock = FairMQRegionBlock; -using UnmanagedRegion = FairMQUnmanagedRegion; -using UnmanagedRegionPtr = FairMQUnmanagedRegionPtr; - -} // namespace fair::mq +#include #endif /* FAIRMQUNMANAGEDREGION_H_ */ diff --git a/fairmq/FwdDecls.h b/fairmq/FwdDecls.h index 52096a81..cdab35f0 100644 --- a/fairmq/FwdDecls.h +++ b/fairmq/FwdDecls.h @@ -11,21 +11,35 @@ #include -class FairMQChannel; -class FairMQDevice; -class FairMQMemoryResource; -class FairMQMessage; -class FairMQParts; -class FairMQPoller; -class FairMQRegionBlock; -class FairMQRegionConfig; -class FairMQRegionInfo; -class FairMQSocket; -class FairMQTransportFactory; -class FairMQUnmanagedRegion; - namespace fair::mq { -class FairMQMemoryResource; -} + +class Channel; +class Device; +class MemoryResource; +class Message; +class Parts; +class Poller; +class RegionBlock; +class RegionConfig; +class RegionInfo; +class Socket; +class TransportFactory; +class UnmanagedRegion; + +using FairMQMemoryResource = MemoryResource; + +} // namespace fair::mq + +using FairMQChannel = fair::mq::Channel; +using FairMQDevice = fair::mq::Device; +using FairMQMessage = fair::mq::Message; +using FairMQParts = fair::mq::Parts; +using FairMQPoller = fair::mq::Poller; +using FairMQRegionBlock = fair::mq::RegionBlock; +using FairMQRegionConfig = fair::mq::RegionConfig; +using FairMQRegionInfo = fair::mq::RegionInfo; +using FairMQSocket = fair::mq::Socket; +using FairMQTransportFactory = fair::mq::TransportFactory; +using FairMQUnmanagedRegion = fair::mq::UnmanagedRegion; #endif // FAIR_MQ_FWDDECLS_H diff --git a/fairmq/MemoryResources.h b/fairmq/MemoryResources.h index ccad2282..d44ec2d5 100644 --- a/fairmq/MemoryResources.h +++ b/fairmq/MemoryResources.h @@ -1,10 +1,10 @@ /******************************************************************************** - * Copyright (C) 2018 CERN and copyright holders of ALICE O2 * - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018 CERN and copyright holders of ALICE O2 * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * ********************************************************************************/ /// @brief Memory allocators and interfaces related to managing memory via the @@ -15,27 +15,24 @@ #ifndef FAIR_MQ_MEMORY_RESOURCES_H #define FAIR_MQ_MEMORY_RESOURCES_H -#include -class FairMQTransportFactory; - #include #include #include - #include +#include #include #include -namespace fair::mq -{ +namespace fair::mq { +class TransportFactory; using byte = unsigned char; namespace pmr = boost::container::pmr; /// All FairMQ related memory resources need to inherit from this interface /// class for the /// getMessage() api. -class FairMQMemoryResource : public pmr::memory_resource +class MemoryResource : public pmr::memory_resource { public: /// return the message containing data associated with the pointer (to start @@ -43,9 +40,9 @@ class FairMQMemoryResource : public pmr::memory_resource /// buffer), e.g. pointer returned by std::vector::data() return nullptr if /// returning /// a message does not make sense! - virtual FairMQMessagePtr getMessage(void *p) = 0; - virtual void *setMessage(FairMQMessagePtr) = 0; - virtual FairMQTransportFactory *getTransportFactory() noexcept = 0; + virtual MessagePtr getMessage(void* p) = 0; + virtual void* setMessage(MessagePtr) = 0; + virtual TransportFactory* getTransportFactory() noexcept = 0; virtual size_t getNumberOfMessages() const noexcept = 0; }; @@ -54,41 +51,42 @@ class FairMQMemoryResource : public pmr::memory_resource /// delegated to FairMQ so standard (e.g. STL) containers can construct their /// stuff in /// memory regions appropriate for the data channel configuration. -class ChannelResource : public FairMQMemoryResource +class ChannelResource : public MemoryResource { protected: - FairMQTransportFactory* factory{nullptr}; + TransportFactory* factory{nullptr}; // TODO: for now a map to keep track of allocations, something else would // probably be // faster, but for now this does not need to be fast. - boost::container::flat_map messageMap; + boost::container::flat_map messageMap; public: ChannelResource() = delete; - ChannelResource(FairMQTransportFactory* _factory) + ChannelResource(TransportFactory* _factory) : factory(_factory) { if (!_factory) { - throw std::runtime_error("Tried to construct from a nullptr FairMQTransportFactory"); + throw std::runtime_error( + "Tried to construct from a nullptr fair::mq::TransportFactory"); } }; - FairMQMessagePtr getMessage(void* p) override + MessagePtr getMessage(void* p) override { auto mes = std::move(messageMap[p]); messageMap.erase(p); return mes; } - void* setMessage(FairMQMessagePtr message) override + void* setMessage(MessagePtr message) override { void* addr = message->GetData(); messageMap[addr] = std::move(message); return addr; } - FairMQTransportFactory* getTransportFactory() noexcept override { return factory; } + TransportFactory* getTransportFactory() noexcept override { return factory; } size_t getNumberOfMessages() const noexcept override { return messageMap.size(); } @@ -99,12 +97,16 @@ class ChannelResource : public FairMQMemoryResource messageMap.erase(p); }; - bool do_is_equal(const pmr::memory_resource &other) const noexcept override + bool do_is_equal(const pmr::memory_resource& other) const noexcept override { return this == &other; }; }; -} // namespace fair::mq +// using FairMQMemoryResource [[deprecated("Use fair::mq::MemoryResource")]] = + // MemoryResource; +using FairMQMemoryResource = MemoryResource; + +} // namespace fair::mq #endif /* FAIR_MQ_MEMORY_RESOURCES_H */ diff --git a/fairmq/Message.h b/fairmq/Message.h index f638134a..686314cf 100644 --- a/fairmq/Message.h +++ b/fairmq/Message.h @@ -9,6 +9,71 @@ #ifndef FAIR_MQ_MESSAGE_H #define FAIR_MQ_MESSAGE_H -#include +#include // for size_t +#include +#include // unique_ptr +#include + +namespace fair::mq { + +using FreeFn = void(void* data, void* hint); +class TransportFactory; + +struct Alignment +{ + size_t alignment; + explicit operator size_t() const { return alignment; } +}; + +struct Message +{ + Message() = default; + Message(TransportFactory* factory) + : fTransport(factory) + {} + + virtual void Rebuild() = 0; + virtual void Rebuild(Alignment alignment) = 0; + virtual void Rebuild(const size_t size) = 0; + virtual void Rebuild(const size_t size, Alignment alignment) = 0; + virtual void Rebuild(void* data, const size_t size, FreeFn* ffn, void* hint = nullptr) = 0; + + virtual void* GetData() const = 0; + virtual size_t GetSize() const = 0; + + virtual bool SetUsedSize(const size_t size) = 0; + + virtual Transport GetType() const = 0; + TransportFactory* GetTransport() { return fTransport; } + void SetTransport(TransportFactory* transport) { fTransport = transport; } + + virtual void Copy(const Message& msg) = 0; + + virtual ~Message() = default; + + private: + TransportFactory* fTransport{nullptr}; +}; + +using MessagePtr = std::unique_ptr; + +struct MessageError : std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +struct MessageBadAlloc : std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +} // namespace fair::mq + +// using fairmq_free_fn [[deprecated("Use fair::mq::FreeFn")]] = fair::mq::FreeFn; +// using FairMQMessage [[deprecated("Use fair::mq::Message")]] = fair::mq::Message; +// using FairMQMessagePtr [[deprecated("Use fair::mq::MessagePtr")]] = fair::mq::MessagePtr; +using fairmq_free_fn = fair::mq::FreeFn; +using FairMQMessage = fair::mq::Message; +using FairMQMessagePtr = fair::mq::MessagePtr; #endif // FAIR_MQ_MESSAGE_H diff --git a/fairmq/Parts.h b/fairmq/Parts.h new file mode 100644 index 00000000..e2e03d0f --- /dev/null +++ b/fairmq/Parts.h @@ -0,0 +1,94 @@ +/******************************************************************************** + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_PARTS_H +#define FAIR_MQ_PARTS_H + +#include +#include // unique_ptr +#include + +namespace fair::mq { + +/// fair::mq::Parts is a lightweight convenience wrapper around a vector of unique pointers to +/// Message, used for sending multi-part messages +class Parts +{ + private: + using container = std::vector; + + public: + Parts() = default; + Parts(const Parts&) = delete; + Parts(Parts&& p) = default; + Parts& operator=(const Parts&) = delete; + template + Parts(Ts&&... messages) + { + AddPart(std::forward(messages)...); + } + ~Parts() = default; + + /// Adds part (Message) to the container + /// @param msg message pointer (for example created with NewMessage() method of Device) + void AddPart(Message* msg) { fParts.push_back(MessagePtr(msg)); } + + /// Adds part to the container (move) + /// @param msg unique pointer to Message + /// rvalue ref (move required when passing argument) + void AddPart(MessagePtr&& msg) { fParts.push_back(std::move(msg)); } + + /// Add variable list of parts to the container (move) + template + void AddPart(MessagePtr&& first, Ts&&... remaining) + { + AddPart(std::move(first)); + AddPart(std::forward(remaining)...); + } + + /// Add content of another object by move + void AddPart(Parts&& other) + { + container parts = std::move(other.fParts); + for (auto& part : parts) { + fParts.push_back(std::move(part)); + } + } + + /// Get reference to part in the container at index (without bounds check) + /// @param index container index + Message& operator[](const int index) { return *(fParts[index]); } + + /// Get reference to unique pointer to part in the container at index (with bounds check) + /// @param index container index + MessagePtr& At(const int index) { return fParts.at(index); } + + // ref version + Message& AtRef(const int index) { return *(fParts.at(index)); } + + /// Get number of parts in the container + /// @return number of parts in the container + int Size() const { return fParts.size(); } + + container fParts; + + // forward container iterators + using iterator = container::iterator; + using const_iterator = container::const_iterator; + auto begin() -> decltype(fParts.begin()) { return fParts.begin(); } + auto end() -> decltype(fParts.end()) { return fParts.end(); } + auto cbegin() -> decltype(fParts.cbegin()) { return fParts.cbegin(); } + auto cend() -> decltype(fParts.cend()) { return fParts.cend(); } +}; + +} // namespace fair::mq + +// using FairMQParts [[deprecated("Use fair::mq::Parts")]] = fair::mq::Parts; +using FairMQParts = fair::mq::Parts; + +#endif /* FAIR_MQ_PARTS_H */ diff --git a/fairmq/Poller.h b/fairmq/Poller.h index c5b57d59..526c9a26 100644 --- a/fairmq/Poller.h +++ b/fairmq/Poller.h @@ -9,6 +9,35 @@ #ifndef FAIR_MQ_POLLER_H #define FAIR_MQ_POLLER_H -#include +#include +#include +#include + +namespace fair::mq { + +struct Poller +{ + virtual void Poll(const int timeout) = 0; + virtual bool CheckInput(const int index) = 0; + virtual bool CheckOutput(const int index) = 0; + virtual bool CheckInput(const std::string& channelKey, const int index) = 0; + virtual bool CheckOutput(const std::string& channelKey, const int index) = 0; + + virtual ~Poller() = default; +}; + +using PollerPtr = std::unique_ptr; + +struct PollerError : std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +} // namespace fair::mq + +// using FairMQPoller [[deprecated("Use fair::mq::Poller")]] = fair::mq::Poller; +// using FairMQPollerPtr [[deprecated("Use fair::mq::PollerPtr")]] = fair::mq::PollerPtr; +using FairMQPoller = fair::mq::Poller; +using FairMQPollerPtr = fair::mq::PollerPtr; #endif // FAIR_MQ_POLLER_H diff --git a/fairmq/ProgOptionsFwd.h b/fairmq/ProgOptionsFwd.h index 9a805ae0..b0bdeb8a 100644 --- a/fairmq/ProgOptionsFwd.h +++ b/fairmq/ProgOptionsFwd.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2019 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2019-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,11 +9,11 @@ #ifndef FAIR_MQ_PROGOPTIONSFWD_H #define FAIR_MQ_PROGOPTIONSFWD_H -namespace fair::mq -{ +namespace fair::mq { class ProgOptions; } +// using FairMQProgOptions [[deprecated("Use fair::mq::ProgOptions")]] = fair::mq::ProgOptions; using FairMQProgOptions = fair::mq::ProgOptions; #endif /* FAIR_MQ_PROGOPTIONSFWD_H */ diff --git a/fairmq/Socket.h b/fairmq/Socket.h index f9376c84..7511d83c 100644 --- a/fairmq/Socket.h +++ b/fairmq/Socket.h @@ -9,6 +9,87 @@ #ifndef FAIR_MQ_SOCKET_H #define FAIR_MQ_SOCKET_H -#include +#include +#include +#include +#include +#include + +namespace fair::mq { + +class TransportFactory; + +enum class TransferCode : int +{ + success = 0, + error = -1, + timeout = -2, + interrupted = -3 +}; + +struct Socket +{ + Socket() = default; + Socket(TransportFactory* fac) + : fTransport(fac) + {} + + virtual std::string GetId() const = 0; + + virtual bool Bind(const std::string& address) = 0; + virtual bool Connect(const std::string& address) = 0; + + virtual int64_t Send(MessagePtr& msg, int timeout = -1) = 0; + virtual int64_t Receive(MessagePtr& msg, int timeout = -1) = 0; + virtual int64_t Send(std::vector>& msgVec, int timeout = -1) = 0; + virtual int64_t Receive(std::vector>& msgVec, int timeout = -1) = 0; + + virtual void Close() = 0; + + virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0; + virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0; + + /// If the backend supports it, fills the unsigned integer @a events with the ZMQ_EVENTS value + /// DISCLAIMER: this API is experimental and unsupported and might be dropped / refactored in + /// the future. + virtual int Events(uint32_t* events) = 0; + virtual void SetLinger(const int value) = 0; + virtual int GetLinger() const = 0; + virtual void SetSndBufSize(const int value) = 0; + virtual int GetSndBufSize() const = 0; + virtual void SetRcvBufSize(const int value) = 0; + virtual int GetRcvBufSize() const = 0; + virtual void SetSndKernelSize(const int value) = 0; + virtual int GetSndKernelSize() const = 0; + virtual void SetRcvKernelSize(const int value) = 0; + virtual int GetRcvKernelSize() const = 0; + + virtual unsigned long GetBytesTx() const = 0; + virtual unsigned long GetBytesRx() const = 0; + virtual unsigned long GetMessagesTx() const = 0; + virtual unsigned long GetMessagesRx() const = 0; + + TransportFactory* GetTransport() { return fTransport; } + void SetTransport(TransportFactory* transport) { fTransport = transport; } + + virtual ~Socket() = default; + + private: + TransportFactory* fTransport{nullptr}; +}; + +using SocketPtr = std::unique_ptr; + +struct SocketError : std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +} // namespace fair::mq + +// using FairMQSocket [[deprecated("Use fair::mq::Socket")]] = fair::mq::Socket; +// using FairMQSocketPtr [[deprecated("Use fair::mq::SocketPtr")]] = fair::mq::SocketPtr; +using FairMQSocket = fair::mq::Socket; +using FairMQSocketPtr = fair::mq::SocketPtr; #endif // FAIR_MQ_SOCKET_H diff --git a/fairmq/FairMQTransportFactory.cxx b/fairmq/TransportFactory.cxx similarity index 58% rename from fairmq/FairMQTransportFactory.cxx rename to fairmq/TransportFactory.cxx index 7f4b0527..72c8195a 100644 --- a/fairmq/FairMQTransportFactory.cxx +++ b/fairmq/TransportFactory.cxx @@ -1,53 +1,47 @@ /******************************************************************************** - * Copyright (C) 2017-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include +#include #include #include #ifdef BUILD_OFI_TRANSPORT #include #endif -#include -#include -#include - #include - +#include #include #include -#include // move +#include // move using namespace std; -FairMQTransportFactory::FairMQTransportFactory(string id) - : fkId(std::move(id)) -{} +namespace fair::mq { -auto FairMQTransportFactory::CreateTransportFactory(const string& type, - const string& id, - const fair::mq::ProgOptions* config) - -> shared_ptr +auto TransportFactory::CreateTransportFactory(const string& type, + const string& id, + const ProgOptions* config) + -> shared_ptr { auto finalId = id; // Generate uuid if empty if (finalId.empty()) { - finalId = fair::mq::tools::Uuid(); + finalId = tools::Uuid(); } if (type == "zeromq") { - return make_shared(finalId, config); + return make_shared(finalId, config); } else if (type == "shmem") { - return make_shared(finalId, config); + return make_shared(finalId, config); } #ifdef BUILD_OFI_TRANSPORT else if (type == "ofi") { - return make_shared(finalId, config); + return make_shared(finalId, config); } #endif /* BUILD_OFI_TRANSPORT */ else { @@ -60,6 +54,8 @@ auto FairMQTransportFactory::CreateTransportFactory(const string& type, << ", and \"ofi\"" #endif /* BUILD_OFI_TRANSPORT */ << ". Exiting."; - throw fair::mq::TransportFactoryError(fair::mq::tools::ToString("Unavailable transport requested: ", type)); + throw TransportFactoryError(tools::ToString("Unavailable transport requested: ", type)); } } + +} // namespace fair::mq diff --git a/fairmq/TransportFactory.h b/fairmq/TransportFactory.h index ca5ef255..8873fa08 100644 --- a/fairmq/TransportFactory.h +++ b/fairmq/TransportFactory.h @@ -9,6 +9,220 @@ #ifndef FAIR_MQ_TRANSPORTFACTORY_H #define FAIR_MQ_TRANSPORTFACTORY_H -#include +#include // size_t +#include +#include +#include +#include +#include +#include +#include // shared_ptr +#include +#include +#include +#include + +namespace fair::mq { + +class Channel; +class ProgOptions; + +class TransportFactory +{ + private: + /// Topology wide unique id + const std::string fkId; + + /// The polymorphic memory resource associated with the transport + ChannelResource fMemoryResource{this}; + + public: + /// ctor + /// @param id Topology wide unique id, usually the device id. + TransportFactory(std::string id) + : fkId(std::move(id)) + {} + + auto GetId() const -> const std::string { return fkId; }; + + /// Get a pointer to the associated polymorphic memory resource + ChannelResource* GetMemoryResource() { return &fMemoryResource; } + operator ChannelResource*() { return &fMemoryResource; } + + /// @brief Create empty Message (for receiving) + /// @return pointer to Message + virtual MessagePtr CreateMessage() = 0; + /// @brief Create empty Message (for receiving), align received buffer to specified alignment + /// @param alignment alignment to align received buffer to + /// @return pointer to Message + virtual MessagePtr CreateMessage(Alignment alignment) = 0; + /// @brief Create new Message of specified size + /// @param size message size + /// @return pointer to Message + virtual MessagePtr CreateMessage(const size_t size) = 0; + /// @brief Create new Message of specified size and alignment + /// @param size message size + /// @param alignment message alignment + /// @return pointer to Message + virtual MessagePtr CreateMessage(const size_t size, Alignment alignment) = 0; + /// @brief Create new Message with user provided buffer and size + /// @param data pointer to user provided buffer + /// @param size size of the user provided buffer + /// @param ffn callback, called when the message is transfered (and can be deleted) + /// @param obj optional helper pointer that can be used in the callback + /// @return pointer to Message + virtual MessagePtr CreateMessage(void* data, + const size_t size, + fairmq_free_fn* ffn, + void* hint = nullptr) = 0; + /// @brief create a message with the buffer located within the corresponding unmanaged region + /// @param unmanagedRegion the unmanaged region that this message buffer belongs to + /// @param data message buffer (must be within the region - checked at runtime by the transport) + /// @param size size of the message + /// @param hint optional parameter, returned to the user in the RegionCallback + virtual MessagePtr CreateMessage(UnmanagedRegionPtr& unmanagedRegion, + void* data, + const size_t size, + void* hint = 0) = 0; + + /// @brief Create a socket + virtual SocketPtr CreateSocket(const std::string& type, const std::string& name) = 0; + + /// @brief Create a poller for a single channel (all subchannels) + virtual PollerPtr CreatePoller(const std::vector& channels) const = 0; + /// @brief Create a poller for specific channels + virtual PollerPtr CreatePoller(const std::vector& channels) const = 0; + /// @brief Create a poller for specific channels (all subchannels) + virtual PollerPtr CreatePoller( + const std::unordered_map>& channelsMap, + const std::vector& channelList) const = 0; + + /// @brief Create new UnmanagedRegion + /// @param size size of the region + /// @param callback callback to be called when a message belonging to this region is no longer + /// needed by the transport + /// @param path optional parameter to pass to the underlying transport + /// @param flags optional parameter to pass to the underlying transport + /// @return pointer to UnmanagedRegion + virtual UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, + RegionCallback callback = nullptr, + const std::string& path = "", + int flags = 0, + RegionConfig cfg = RegionConfig()) = 0; + virtual UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, + RegionBulkCallback callback = nullptr, + const std::string& path = "", + int flags = 0, + RegionConfig cfg = RegionConfig()) = 0; + /// @brief Create new UnmanagedRegion + /// @param size size of the region + /// @param userFlags flags to be stored with the region, have no effect on the transport, but + /// can be retrieved from the region by the user + /// @param callback callback to be called when a message belonging to this region is no longer + /// needed by the transport + /// @param path optional parameter to pass to the underlying transport + /// @param flags optional parameter to pass to the underlying transport + /// @return pointer to UnmanagedRegion + virtual UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, + const int64_t userFlags, + RegionCallback callback = nullptr, + const std::string& path = "", + int flags = 0, + RegionConfig cfg = RegionConfig()) = 0; + virtual UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, + const int64_t userFlags, + RegionBulkCallback callback = nullptr, + const std::string& path = "", + int flags = 0, + RegionConfig cfg = RegionConfig()) = 0; + + /// @brief Subscribe to region events (creation, destruction, ...) + /// @param callback the callback that is called when a region event occurs + virtual void SubscribeToRegionEvents(RegionEventCallback callback) = 0; + /// @brief Check if there is an active subscription to region events + /// @return true/false + virtual bool SubscribedToRegionEvents() = 0; + /// @brief Unsubscribe from region events + virtual void UnsubscribeFromRegionEvents() = 0; + + virtual std::vector GetRegionInfo() = 0; + + /// Get transport type + virtual Transport GetType() const = 0; + + virtual void Interrupt() = 0; + virtual void Resume() = 0; + virtual void Reset() = 0; + + virtual ~TransportFactory() = default; + + static auto CreateTransportFactory(const std::string& type, + const std::string& id = "", + const ProgOptions* config = nullptr) + -> std::shared_ptr; + + static void NoCleanup(void* /*data*/, void* /*obj*/) {} + + template + static void SimpleMsgCleanup(void* /*data*/, void* obj) + { + delete static_cast(obj); + } + + template + MessagePtr NewSimpleMessage(const T& data) + { + // todo: is_trivially_copyable not available on gcc < 5, workaround? + // static_assert(std::is_trivially_copyable::value, "The argument type for + // NewSimpleMessage has to be trivially copyable!"); + T* dataCopy = new T(data); + return CreateMessage(dataCopy, sizeof(T), SimpleMsgCleanup, dataCopy); + } + + template + MessagePtr NewSimpleMessage(const char (&data)[N]) + { + std::string* msgStr = new std::string(data); + return CreateMessage(const_cast(msgStr->c_str()), + msgStr->length(), + SimpleMsgCleanup, + msgStr); + } + + MessagePtr NewSimpleMessage(const std::string& str) + { + + std::string* msgStr = new std::string(str); + return CreateMessage(const_cast(msgStr->c_str()), + msgStr->length(), + SimpleMsgCleanup, + msgStr); + } + + template + MessagePtr NewStaticMessage(const T& data) + { + return CreateMessage(data, sizeof(T), NoCleanup, nullptr); + } + + MessagePtr NewStaticMessage(const std::string& str) + { + return CreateMessage(const_cast(str.c_str()), str.length(), NoCleanup, nullptr); + } +}; + +struct TransportFactoryError : std::runtime_error +{ + using std::runtime_error::runtime_error; +}; + +} // namespace fair::mq + +// using FairMQTransportFactory [[deprecated("Use fair::mq::TransportFactory")]] = + // fair::mq::TransportFactory; +// using FairMQTransportFactoryError [[deprecated("Use fair::mq::TransportFactoryError")]] = + // fair::mq::TransportFactoryError; +using FairMQTransportFactory = fair::mq::TransportFactory; +using FairMQTransportFactoryError = fair::mq::TransportFactoryError; #endif // FAIR_MQ_TRANSPORTFACTORY_H diff --git a/fairmq/UnmanagedRegion.h b/fairmq/UnmanagedRegion.h index ea77cf9b..6636000e 100644 --- a/fairmq/UnmanagedRegion.h +++ b/fairmq/UnmanagedRegion.h @@ -9,6 +9,139 @@ #ifndef FAIR_MQ_UNMANAGEDREGION_H #define FAIR_MQ_UNMANAGEDREGION_H -#include +#include // size_t +#include // uint32_t +#include +#include // std::function +#include // std::unique_ptr +#include // std::ostream +#include + +namespace fair::mq { + +class TransportFactory; + +enum class RegionEvent : int +{ + created, + destroyed, + local_only +}; + +struct RegionInfo +{ + RegionInfo() = default; + + RegionInfo(bool _managed, + uint64_t _id, + void* _ptr, + size_t _size, + int64_t _flags, + RegionEvent _event) + : managed(_managed) + , id(_id) + , ptr(_ptr) + , size(_size) + , flags(_flags) + , event(_event) + {} + + bool managed = true; // managed/unmanaged + uint64_t id = 0; // id of the region + void* ptr = nullptr; // pointer to the start of the region + size_t size = 0; // region size + int64_t flags = 0; // custom flags set by the creator + RegionEvent event = RegionEvent::created; +}; + +struct RegionBlock +{ + void* ptr; + size_t size; + void* hint; + + RegionBlock(void* p, size_t s, void* h) + : ptr(p) + , size(s) + , hint(h) + {} +}; + +using RegionCallback = std::function; +using RegionBulkCallback = std::function&)>; +using RegionEventCallback = std::function; + +struct UnmanagedRegion +{ + UnmanagedRegion() = default; + UnmanagedRegion(TransportFactory* factory) + : fTransport(factory) + {} + + virtual void* GetData() const = 0; + virtual size_t GetSize() const = 0; + virtual uint16_t GetId() const = 0; + virtual void SetLinger(uint32_t linger) = 0; + virtual uint32_t GetLinger() const = 0; + + virtual Transport GetType() const = 0; + TransportFactory* GetTransport() { return fTransport; } + void SetTransport(TransportFactory* transport) { fTransport = transport; } + + virtual ~UnmanagedRegion() = default; + + private: + TransportFactory* fTransport{nullptr}; +}; + +using UnmanagedRegionPtr = std::unique_ptr; + +inline std::ostream& operator<<(std::ostream& os, const RegionEvent& event) +{ + switch (event) { + case RegionEvent::created: + return os << "created"; + case RegionEvent::destroyed: + return os << "destroyed"; + case RegionEvent::local_only: + return os << "local_only"; + default: + return os << "unrecognized event"; + } +} + +struct RegionConfig +{ + RegionConfig() = default; + + RegionConfig(bool l, bool z) + : lock(l) + , zero(z) + {} + + bool lock = false; + bool zero = false; +}; + +} // namespace fair::mq + +// using FairMQRegionEvent [[deprecated("Use fair::mq::RegionBlock")]] = fair::mq::RegionEvent; +// using FairMQRegionInfo [[deprecated("Use fair::mq::RegionInfo")]] = fair::mq::RegionInfo; +// using FairMQRegionBlock [[deprecated("Use fair::mq::RegionBlock")]] = fair::mq::RegionBlock; +// using FairMQRegionCallback [[deprecated("Use fair::mq::RegionCallback")]] = fair::mq::RegionCallback; +// using FairMQRegionBulkCallback [[deprecated("Use fair::mq::RegionBulkCallback")]] = fair::mq::RegionBulkCallback; +// using FairMQRegionEventCallback [[deprecated("Use fair::mq::RegionEventCallback")]] = fair::mq::RegionEventCallback; +// using FairMQUnmanagedRegion [[deprecated("Use fair::mq::UnmanagedRegion")]] = fair::mq::UnmanagedRegion; +// using FairMQUnmanagedRegionPtr [[deprecated("Use fair::mq::UnmanagedRegionPtr")]] = fair::mq::UnmanagedRegionPtr; +// using FairMQRegionConfig [[deprecated("Use fair::mq::RegionConfig")]] = fair::mq::RegionConfig; +using FairMQRegionEvent = fair::mq::RegionEvent; +using FairMQRegionInfo = fair::mq::RegionInfo; +using FairMQRegionBlock = fair::mq::RegionBlock; +using FairMQRegionCallback = fair::mq::RegionCallback; +using FairMQRegionBulkCallback = fair::mq::RegionBulkCallback; +using FairMQRegionEventCallback = fair::mq::RegionEventCallback; +using FairMQUnmanagedRegion = fair::mq::UnmanagedRegion; +using FairMQUnmanagedRegionPtr = fair::mq::UnmanagedRegionPtr; +using FairMQRegionConfig = fair::mq::RegionConfig; #endif // FAIR_MQ_UNMANAGEDREGION_H diff --git a/fairmq/shmem/Poller.h b/fairmq/shmem/Poller.h index 27a7498f..396fca3c 100644 --- a/fairmq/shmem/Poller.h +++ b/fairmq/shmem/Poller.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -19,8 +19,6 @@ #include #include -class FairMQChannel; - namespace fair::mq::shmem { diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index fce828b2..c7435329 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -22,7 +22,9 @@ #include #include // make_unique -class FairMQTransportFactory; +namespace fair::mq { + class TransportFactory; +} namespace fair::mq::shmem {