From 6699711e17961c8a5fb2301ff579aae9ae47cc82 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 22 Sep 2020 11:37:05 +0200 Subject: [PATCH] FairMQChannel: Refactor, moving short methods to header --- fairmq/FairMQChannel.cxx | 215 +++----------------------------------- fairmq/FairMQChannel.h | 158 +++++++++++++++------------- fairmq/Transports.h | 16 +++ fairmq/shmem/Socket.h | 5 - test/channel/_channel.cxx | 2 +- 5 files changed, 116 insertions(+), 280 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 69b13afb..71278edf 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -83,26 +83,28 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, const strin , fAutoBind(DefaultAutoBind) , fValid(false) , fMultipart(false) -{} +{ + // LOG(warn) << "Constructing channel '" << fName << "'"; +} FairMQChannel::FairMQChannel(const string& name, int index, const fair::mq::Properties& properties) : FairMQChannel(tools::ToString(name, "[", index, "]"), "unspecified", "unspecified", "unspecified", nullptr) { string prefix(tools::ToString("chans.", name, ".", index, ".")); - fType = GetPropertyOrDefault(properties, string(prefix + "type"), fType); - fMethod = GetPropertyOrDefault(properties, string(prefix + "method"), fMethod); - fAddress = GetPropertyOrDefault(properties, string(prefix + "address"), fAddress); - fTransportType = TransportTypes.at(GetPropertyOrDefault(properties, string(prefix + "transport"), TransportNames.at(fTransportType))); - fSndBufSize = GetPropertyOrDefault(properties, string(prefix + "sndBufSize"), fSndBufSize); - fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), fRcvBufSize); - fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), fSndKernelSize); - fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), fRcvKernelSize); - fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), fLinger); - fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), fRateLogging); - fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), fPortRangeMin); - fPortRangeMax = GetPropertyOrDefault(properties, string(prefix + "portRangeMax"), fPortRangeMax); - fAutoBind = GetPropertyOrDefault(properties, string(prefix + "autoBind"), fAutoBind); + fType = GetPropertyOrDefault(properties, string(prefix + "type"), std::string(DefaultType)); + fMethod = GetPropertyOrDefault(properties, string(prefix + "method"), std::string(DefaultMethod)); + fAddress = GetPropertyOrDefault(properties, string(prefix + "address"), std::string(DefaultAddress)); + fTransportType = TransportType(GetPropertyOrDefault(properties, string(prefix + "transport"), std::string(DefaultTransportName))); + fSndBufSize = GetPropertyOrDefault(properties, string(prefix + "sndBufSize"), DefaultSndBufSize); + fRcvBufSize = GetPropertyOrDefault(properties, string(prefix + "rcvBufSize"), DefaultRcvBufSize); + fSndKernelSize = GetPropertyOrDefault(properties, string(prefix + "sndKernelSize"), DefaultSndKernelSize); + fRcvKernelSize = GetPropertyOrDefault(properties, string(prefix + "rcvKernelSize"), DefaultRcvKernelSize); + fLinger = GetPropertyOrDefault(properties, string(prefix + "linger"), DefaultLinger); + fRateLogging = GetPropertyOrDefault(properties, string(prefix + "rateLogging"), DefaultRateLogging); + fPortRangeMin = GetPropertyOrDefault(properties, string(prefix + "portRangeMin"), DefaultPortRangeMin); + fPortRangeMax = GetPropertyOrDefault(properties, string(prefix + "portRangeMax"), DefaultPortRangeMax); + fAutoBind = GetPropertyOrDefault(properties, string(prefix + "autoBind"), DefaultAutoBind); } FairMQChannel::FairMQChannel(const FairMQChannel& chan) @@ -158,191 +160,6 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) return *this; } -FairMQSocket & FairMQChannel::GetSocket() const -{ - assert(fSocket); - return *fSocket; -} - -string FairMQChannel::GetName() const -{ - return fName; -} - -string FairMQChannel::GetPrefix() const -{ - string prefix = fName; - prefix = prefix.erase(fName.rfind('[')); - return prefix; -} - -string FairMQChannel::GetIndex() const -{ - string indexStr = fName; - indexStr.erase(indexStr.rfind(']')); - indexStr.erase(0, indexStr.rfind('[') + 1); - return indexStr; -} - -string FairMQChannel::GetType() const -{ - return fType; -} - -string FairMQChannel::GetMethod() const -{ - return fMethod; -} - -string FairMQChannel::GetAddress() const -{ - return fAddress; -} - -string FairMQChannel::GetTransportName() const -{ - return TransportNames.at(fTransportType); -} - -Transport FairMQChannel::GetTransportType() const -{ - return fTransportType; -} - -int FairMQChannel::GetSndBufSize() const -{ - return fSndBufSize; -} - -int FairMQChannel::GetRcvBufSize() const -{ - return fRcvBufSize; -} - -int FairMQChannel::GetSndKernelSize() const -{ - return fSndKernelSize; -} - -int FairMQChannel::GetRcvKernelSize() const -{ - return fRcvKernelSize; -} - -int FairMQChannel::GetLinger() const -{ - return fLinger; -} - -int FairMQChannel::GetRateLogging() const -{ - return fRateLogging; -} - -int FairMQChannel::GetPortRangeMin() const -{ - return fPortRangeMin; -} - -int FairMQChannel::GetPortRangeMax() const -{ - return fPortRangeMax; -} - -bool FairMQChannel::GetAutoBind() const -{ - return fAutoBind; -} - -void FairMQChannel::UpdateType(const string& type) -{ - fIsValid = false; - fType = type; -} - -void FairMQChannel::UpdateMethod(const string& method) -{ - fIsValid = false; - fMethod = method; -} - -void FairMQChannel::UpdateAddress(const string& address) -{ - fIsValid = false; - fAddress = address; -} - -void FairMQChannel::UpdateTransport(const string& transport) -{ - fIsValid = false; - fTransportType = TransportTypes.at(transport); -} - -void FairMQChannel::UpdateSndBufSize(const int sndBufSize) -{ - fIsValid = false; - fSndBufSize = sndBufSize; -} - -void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) -{ - fIsValid = false; - fRcvBufSize = rcvBufSize; -} - -void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize) -{ - fIsValid = false; - fSndKernelSize = sndKernelSize; -} - -void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) -{ - fIsValid = false; - fRcvKernelSize = rcvKernelSize; -} - -void FairMQChannel::UpdateLinger(const int duration) -{ - fIsValid = false; - fLinger = duration; -} - -void FairMQChannel::UpdateRateLogging(const int rateLogging) -{ - fIsValid = false; - fRateLogging = rateLogging; -} - -void FairMQChannel::UpdatePortRangeMin(const int minPort) -{ - fIsValid = false; - fPortRangeMin = minPort; -} - -void FairMQChannel::UpdatePortRangeMax(const int maxPort) -{ - fIsValid = false; - fPortRangeMax = maxPort; -} - -void FairMQChannel::UpdateAutoBind(const bool autobind) -{ - fIsValid = false; - fAutoBind = autobind; -} - -void FairMQChannel::UpdateName(const string& name) -{ - fIsValid = false; - fName = name; -} - -bool FairMQChannel::IsValid() const -{ - return fIsValid; -} - bool FairMQChannel::Validate() try { stringstream ss; diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 872787c9..f85fbe81 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -81,14 +81,11 @@ class FairMQChannel // FairMQChannel& operator=(FairMQChannel&&) = delete; /// Destructor - virtual ~FairMQChannel() - { - // LOG(debug) << "Destroying channel " << fName; - } + virtual ~FairMQChannel() { /* LOG(warn) << "Destroying channel '" << fName << "'"; */ } struct ChannelConfigurationError : std::runtime_error { using std::runtime_error::runtime_error; }; - FairMQSocket& GetSocket() const; + FairMQSocket& GetSocket() const { assert(fSocket); return *fSocket; } bool Bind(const std::string& address) { @@ -106,127 +103,138 @@ class FairMQChannel /// Get channel name /// @return Returns full channel name (e.g. "data[0]") - std::string GetName() const; + std::string GetName() const { return fName; } /// Get channel prefix /// @return Returns channel prefix (e.g. "data" in "data[0]") - std::string GetPrefix() const; + std::string GetPrefix() const + { + std::string prefix = fName; + prefix = prefix.erase(fName.rfind('[')); + return prefix; + } /// Get channel index /// @return Returns channel index (e.g. 0 in "data[0]") - std::string GetIndex() const; + std::string GetIndex() const + { + std::string indexStr = fName; + indexStr.erase(indexStr.rfind(']')); + indexStr.erase(0, indexStr.rfind('[') + 1); + return indexStr; + } /// Get socket type /// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) - std::string GetType() const; + std::string GetType() const { return fType; } /// Get socket method /// @return Returns socket method (bind/connect) - std::string GetMethod() const; + std::string GetMethod() const { return fMethod; } /// Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") /// @return Returns socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") - std::string GetAddress() const; + std::string GetAddress() const { return fAddress; } /// Get channel transport name ("default", "zeromq" or "shmem") /// @return Returns channel transport name (e.g. "default", "zeromq" or "shmem") - std::string GetTransportName() const; + std::string GetTransportName() const { return fair::mq::TransportName(fTransportType); } /// Get channel transport type /// @return Returns channel transport type - fair::mq::Transport GetTransportType() const; + fair::mq::Transport GetTransportType() const { return fTransportType; } /// Get socket send buffer size (in number of messages) /// @return Returns socket send buffer size (in number of messages) - int GetSndBufSize() const; + int GetSndBufSize() const { return fSndBufSize; } /// Get socket receive buffer size (in number of messages) /// @return Returns socket receive buffer size (in number of messages) - int GetRcvBufSize() const; + int GetRcvBufSize() const { return fRcvBufSize; } /// Get socket kernel transmit send buffer size (in bytes) /// @return Returns socket kernel transmit send buffer size (in bytes) - int GetSndKernelSize() const; + int GetSndKernelSize() const { return fSndKernelSize; } /// Get socket kernel transmit receive buffer size (in bytes) /// @return Returns socket kernel transmit receive buffer size (in bytes) - int GetRcvKernelSize() const; + int GetRcvKernelSize() const { return fRcvKernelSize; } /// Get linger duration (in milliseconds) /// @return Returns linger duration (in milliseconds) - int GetLinger() const; + int GetLinger() const { return fLinger; } /// Get socket rate logging interval (in seconds) /// @return Returns socket rate logging interval (in seconds) - int GetRateLogging() const; + int GetRateLogging() const { return fRateLogging; } /// Get start of the port range for automatic binding /// @return start of the port range - int GetPortRangeMin() const; + int GetPortRangeMin() const { return fPortRangeMin; } /// Get end of the port range for automatic binding /// @return end of the port range - int GetPortRangeMax() const; + int GetPortRangeMax() const { return fPortRangeMax; } /// Set automatic binding (pick random port if bind fails) /// @return true/false, true if automatic binding is enabled - bool GetAutoBind() const; - - /// Set socket type - /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) - void UpdateType(const std::string& type); - - /// Set socket method - /// @param method Socket method (bind/connect) - void UpdateMethod(const std::string& method); - - /// Set socket address - /// @param Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") - void UpdateAddress(const std::string& address); - - /// Set channel transport - /// @param transport transport string ("default", "zeromq" or "shmem") - void UpdateTransport(const std::string& transport); - - /// Set socket send buffer size - /// @param sndBufSize Socket send buffer size (in number of messages) - void UpdateSndBufSize(const int sndBufSize); - - /// Set socket receive buffer size - /// @param rcvBufSize Socket receive buffer size (in number of messages) - void UpdateRcvBufSize(const int rcvBufSize); - - /// Set socket kernel transmit send buffer size (in bytes) - /// @param sndKernelSize Socket send buffer size (in bytes) - void UpdateSndKernelSize(const int sndKernelSize); - - /// Set socket kernel transmit receive buffer size (in bytes) - /// @param rcvKernelSize Socket receive buffer size (in bytes) - void UpdateRcvKernelSize(const int rcvKernelSize); - - /// Set linger duration (in milliseconds) - /// @param duration linger duration (in milliseconds) - void UpdateLinger(const int duration); - - /// Set socket rate logging interval (in seconds) - /// @param rateLogging Socket rate logging interval (in seconds) - void UpdateRateLogging(const int rateLogging); - - /// Set start of the port range for automatic binding - /// @param minPort start of the port range - void UpdatePortRangeMin(const int minPort); - - /// Set end of the port range for automatic binding - /// @param maxPort end of the port range - void UpdatePortRangeMax(const int maxPort); - - /// Set automatic binding (pick random port if bind fails) - /// @param autobind true/false, true to enable automatic binding - void UpdateAutoBind(const bool autobind); + bool GetAutoBind() const { return fAutoBind; } /// Set channel name /// @param name Arbitrary channel name - void UpdateName(const std::string& name); + void UpdateName(const std::string& name) { fName = name; Invalidate(); } + + /// Set socket type + /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) + void UpdateType(const std::string& type) { fType = type; Invalidate(); } + + /// Set socket method + /// @param method Socket method (bind/connect) + void UpdateMethod(const std::string& method) { fMethod = method; Invalidate(); } + + /// Set socket address + /// @param Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") + void UpdateAddress(const std::string& address) { fAddress = address; Invalidate(); } + + /// Set channel transport + /// @param transport transport string ("default", "zeromq" or "shmem") + void UpdateTransport(const std::string& transport) { fTransportType = fair::mq::TransportType(transport); Invalidate(); } + + /// Set socket send buffer size + /// @param sndBufSize Socket send buffer size (in number of messages) + void UpdateSndBufSize(const int sndBufSize) { fSndBufSize = sndBufSize; Invalidate(); } + + /// Set socket receive buffer size + /// @param rcvBufSize Socket receive buffer size (in number of messages) + void UpdateRcvBufSize(const int rcvBufSize) { fRcvBufSize = rcvBufSize; Invalidate(); } + + /// Set socket kernel transmit send buffer size (in bytes) + /// @param sndKernelSize Socket send buffer size (in bytes) + void UpdateSndKernelSize(const int sndKernelSize) { fSndKernelSize = sndKernelSize; Invalidate(); } + + /// Set socket kernel transmit receive buffer size (in bytes) + /// @param rcvKernelSize Socket receive buffer size (in bytes) + void UpdateRcvKernelSize(const int rcvKernelSize) { fRcvKernelSize = rcvKernelSize; Invalidate(); } + + /// Set linger duration (in milliseconds) + /// @param duration linger duration (in milliseconds) + void UpdateLinger(const int duration) { fLinger = duration; Invalidate(); } + + /// Set socket rate logging interval (in seconds) + /// @param rateLogging Socket rate logging interval (in seconds) + void UpdateRateLogging(const int rateLogging) { fRateLogging = rateLogging; Invalidate(); } + + /// Set start of the port range for automatic binding + /// @param minPort start of the port range + void UpdatePortRangeMin(const int minPort) { fPortRangeMin = minPort; Invalidate(); } + + /// Set end of the port range for automatic binding + /// @param maxPort end of the port range + void UpdatePortRangeMax(const int maxPort) { fPortRangeMax = maxPort; Invalidate(); } + + /// Set automatic binding (pick random port if bind fails) + /// @param autobind true/false, true to enable automatic binding + void UpdateAutoBind(const bool autobind) { fAutoBind = autobind; Invalidate(); } /// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel())) /// @return true if channel settings are valid, false otherwise. diff --git a/fairmq/Transports.h b/fairmq/Transports.h index a0b74d42..c4b3d155 100644 --- a/fairmq/Transports.h +++ b/fairmq/Transports.h @@ -10,8 +10,10 @@ #define FAIR_MQ_TRANSPORTS_H #include +#include #include +#include #include #include @@ -28,6 +30,8 @@ enum class Transport OFI }; +struct TransportError : std::runtime_error { using std::runtime_error::runtime_error; }; + } /* namespace mq */ } /* namespace fair */ @@ -58,6 +62,18 @@ static std::unordered_map TransportNames { { Transport::OFI, "ofi" } }; +inline std::string TransportName(Transport transport) +{ + return TransportNames[transport]; +} + +inline Transport TransportType(const std::string& transport) +try { + return TransportTypes.at(transport); +} catch (std::out_of_range&) { + throw TransportError(tools::ToString("Unknown transport provided: ", transport)); +} + } /* namespace mq */ } /* namespace fair */ diff --git a/fairmq/shmem/Socket.h b/fairmq/shmem/Socket.h index 044177fb..dcf68b8a 100644 --- a/fairmq/shmem/Socket.h +++ b/fairmq/shmem/Socket.h @@ -97,11 +97,6 @@ class Socket final : public fair::mq::Socket // LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); // } // } - - if (type == "sub" || type == "pub") { - LOG(error) << "PUB/SUB socket type is not supported for shared memory transport"; - throw SocketError("PUB/SUB socket type is not supported for shared memory transport"); - } LOG(debug) << "Created socket " << GetId(); } diff --git a/test/channel/_channel.cxx b/test/channel/_channel.cxx index dd3ecf95..d13475e6 100644 --- a/test/channel/_channel.cxx +++ b/test/channel/_channel.cxx @@ -88,7 +88,7 @@ TEST(Channel, Validation) channel2.UpdateName("Kanal"); ASSERT_EQ(channel2.GetName(), "Kanal"); - channel2.ResetChannel(); + channel2.Invalidate(); ASSERT_EQ(channel2.IsValid(), false); ASSERT_EQ(channel2.Validate(), true); }