From f8824335a54cf5ce3e6848411ca9c563e2c64ccb Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 16 Oct 2018 12:58:40 +0200 Subject: [PATCH] Add setters/getters for socket options --- fairmq/FairMQDevice.cxx | 10 +-- fairmq/FairMQSocket.h | 11 +++ fairmq/nanomsg/FairMQSocketNN.cxx | 71 ++++++++++++++++- fairmq/nanomsg/FairMQSocketNN.h | 15 +++- fairmq/nanomsg/FairMQTransportFactoryNN.cxx | 8 +- fairmq/ofi/Socket.cxx | 78 ++++++++++++++++++ fairmq/ofi/Socket.h | 11 +++ fairmq/shmem/FairMQSocketSHM.cxx | 87 +++++++++++++++++++++ fairmq/shmem/FairMQSocketSHM.h | 11 +++ fairmq/zeromq/FairMQSocketZMQ.cxx | 87 +++++++++++++++++++++ fairmq/zeromq/FairMQSocketZMQ.h | 11 +++ 11 files changed, 386 insertions(+), 14 deletions(-) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 67b20ba5..d8a3e804 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -296,20 +296,20 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch) } // set linger duration (how long socket should wait for outstanding transfers before shutdown) - ch.fSocket->SetOption("linger", &(ch.fLinger), sizeof(ch.fLinger)); + ch.fSocket->SetLinger(ch.fLinger); // set high water marks - ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); - ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); + ch.fSocket->SetSndBufSize(ch.fSndBufSize); + ch.fSocket->SetRcvBufSize(ch.fRcvBufSize); // set kernel transmit size (set it only if value is not the default value) if (ch.fSndKernelSize != 0) { - ch.fSocket->SetOption("snd-size", &(ch.fSndKernelSize), sizeof(ch.fSndKernelSize)); + ch.fSocket->SetSndKernelSize(ch.fSndKernelSize); } if (ch.fRcvKernelSize != 0) { - ch.fSocket->SetOption("rcv-size", &(ch.fRcvKernelSize), sizeof(ch.fRcvKernelSize)); + ch.fSocket->SetRcvKernelSize(ch.fRcvKernelSize); } // attach diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 2fe013dc..2ec03a9f 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -43,6 +43,17 @@ class FairMQSocket virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0; virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0; + virtual void SetLinger(const int value) = 0; + virtual int GetLinger() const = 0; + virtual void SetSndBufSize(const int value) = 0; + virtual int GetSndBufSize() const = 0; + virtual void SetRcvBufSize(const int value) = 0; + virtual int GetRcvBufSize() const = 0; + virtual void SetSndKernelSize(const int value) = 0; + virtual int GetSndKernelSize() const = 0; + virtual void SetRcvKernelSize(const int value) = 0; + virtual int GetRcvKernelSize() const = 0; + virtual unsigned long GetBytesTx() const = 0; virtual unsigned long GetBytesRx() const = 0; virtual unsigned long GetMessagesTx() const = 0; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 22d5d432..3f74eef6 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -16,6 +16,7 @@ #include "FairMQMessageNN.h" #include "FairMQLogger.h" #include "FairMQUnmanagedRegionNN.h" +#include #include #include @@ -27,6 +28,7 @@ #include using namespace std; +using namespace fair::mq; atomic FairMQSocketNN::fInterrupted(false); @@ -431,7 +433,7 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v int val = *(static_cast(const_cast(value))); if (val <= 0) { - LOG(warn) << "value for sndKernelSize/rcvKernelSize should be greater than 0, using defaults (128kB)."; + LOG(warn) << "value for sndKernelSize/rcvKernelSize should be greater than 0, leaving unchanged."; return; } } @@ -464,6 +466,73 @@ void FairMQSocketNN::GetOption(const string& option, void* value, size_t* valueS } } +void FairMQSocketNN::SetLinger(const int value) +{ + fLinger = value; +} + +int FairMQSocketNN::GetLinger() const +{ + return fLinger; +} + +void FairMQSocketNN::SetSndBufSize(const int /* value */) +{ + // not used in nanomsg +} + +int FairMQSocketNN::GetSndBufSize() const +{ + // not used in nanomsg + return -1; +} + +void FairMQSocketNN::SetRcvBufSize(const int /* value */) +{ + // not used in nanomsg +} + +int FairMQSocketNN::GetRcvBufSize() const +{ + // not used in nanomsg + return -1; +} + +void FairMQSocketNN::SetSndKernelSize(const int value) +{ + if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_SNDBUF, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting NN_SNDBUF, reason: ", nn_strerror(errno))); + } +} + +int FairMQSocketNN::GetSndKernelSize() const +{ + int value = 0; + size_t valueSize; + if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_SNDBUF, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting NN_SNDBUF, reason: ", nn_strerror(errno))); + } + return value; +} + +void FairMQSocketNN::SetRcvKernelSize(const int value) +{ + if (nn_setsockopt(fSocket, NN_SOL_SOCKET, NN_RCVBUF, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting NN_RCVBUF, reason: ", nn_strerror(errno))); + } +} + +int FairMQSocketNN::GetRcvKernelSize() const +{ + int value = 0; + size_t valueSize; + if (nn_getsockopt(fSocket, NN_SOL_SOCKET, NN_RCVBUF, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting NN_RCVBUF, reason: ", nn_strerror(errno))); + } + return value; +} + + unsigned long FairMQSocketNN::GetBytesTx() const { return fBytesTx; diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 00dea137..528db306 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -15,12 +15,8 @@ #include "FairMQSocket.h" #include "FairMQMessage.h" -class FairMQTransportFactoryNN; - class FairMQSocketNN final : public FairMQSocket { - friend class FairMQTransportFactoryNN; - public: FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = ""); FairMQSocketNN(const FairMQSocketNN&) = delete; @@ -52,6 +48,17 @@ class FairMQSocketNN final : public FairMQSocket void SetOption(const std::string& option, const void* value, size_t valueSize) override; void GetOption(const std::string& option, void* value, size_t* valueSize) override; + void SetLinger(const int value) override; + int GetLinger() const override; + void SetSndBufSize(const int value) override; + int GetSndBufSize() const override; + void SetRcvBufSize(const int value) override; + int GetRcvBufSize() const override; + void SetSndKernelSize(const int value) override; + int GetSndKernelSize() const override; + void SetRcvKernelSize(const int value) override; + int GetRcvKernelSize() const override; + unsigned long GetBytesTx() const override; unsigned long GetBytesRx() const override; unsigned long GetMessagesTx() const override; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 4bb31ca3..7b488aaf 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -82,11 +82,11 @@ fair::mq::Transport FairMQTransportFactoryNN::GetType() const void FairMQTransportFactoryNN::Reset() { - auto result = max_element(fSockets.begin(), fSockets.end(), [](FairMQSocket* s1, FairMQSocket* s2) { - return static_cast(s1)->fLinger < static_cast(s2)->fLinger; + auto it = max_element(fSockets.begin(), fSockets.end(), [](FairMQSocket* s1, FairMQSocket* s2) { + return static_cast(s1)->GetLinger() < static_cast(s2)->GetLinger(); }); - if (result != fSockets.end()) { - this_thread::sleep_for(chrono::milliseconds(static_cast(*result)->fLinger)); + if (it != fSockets.end()) { + this_thread::sleep_for(chrono::milliseconds(static_cast(*it)->GetLinger())); } fSockets.clear(); } diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 5405319d..d07ea5ff 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -619,6 +619,84 @@ auto Socket::GetOption(const string& option, void* value, size_t* valueSize) -> } } +int Socket::GetLinger() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fControlSocket, ZMQ_LINGER, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); + } + return value; +} + +void Socket::SetSndBufSize(const int value) +{ + if (zmq_setsockopt(fControlSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + } +} + +int Socket::GetSndBufSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fControlSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + } + return value; +} + +void Socket::SetRcvBufSize(const int value) +{ + if (zmq_setsockopt(fControlSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + } +} + +int Socket::GetRcvBufSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fControlSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + } + return value; +} + +void Socket::SetSndKernelSize(const int value) +{ + if (zmq_setsockopt(fControlSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + } +} + +int Socket::GetSndKernelSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fControlSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + } + return value; +} + +void Socket::SetRcvKernelSize(const int value) +{ + if (zmq_setsockopt(fControlSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + } +} + +int Socket::GetRcvKernelSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fControlSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + } + return value; +} + auto Socket::GetConstant(const string& constant) -> int { if (constant == "") diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 34988352..d82bad1c 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -57,6 +57,17 @@ class Socket final : public fair::mq::Socket auto GetSocket() const -> void* override { return fControlSocket; } auto GetSocket(int nothing) const -> int override { return -1; } + void SetLinger(const int value) override; + int GetLinger() const override; + void SetSndBufSize(const int value) override; + int GetSndBufSize() const override; + void SetRcvBufSize(const int value) override; + int GetRcvBufSize() const override; + void SetSndKernelSize(const int value) override; + int GetSndKernelSize() const override; + void SetRcvKernelSize(const int value) override; + int GetRcvKernelSize() const override; + auto Close() -> void override; auto SetOption(const std::string& option, const void* value, size_t valueSize) -> void override; diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index bb80b79a..ec208214 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -11,6 +11,7 @@ #include "FairMQMessageSHM.h" #include "FairMQUnmanagedRegionSHM.h" #include "FairMQLogger.h" +#include #include @@ -18,6 +19,7 @@ using namespace std; using namespace fair::mq::shmem; +using namespace fair::mq; atomic FairMQSocketSHM::fInterrupted(false); @@ -477,6 +479,91 @@ void FairMQSocketSHM::GetOption(const string& option, void* value, size_t* value } } +void FairMQSocketSHM::SetLinger(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketSHM::GetLinger() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); + } + return value; +} + +void FairMQSocketSHM::SetSndBufSize(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketSHM::GetSndBufSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + } + return value; +} + +void FairMQSocketSHM::SetRcvBufSize(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketSHM::GetRcvBufSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + } + return value; +} + +void FairMQSocketSHM::SetSndKernelSize(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketSHM::GetSndKernelSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + } + return value; +} + +void FairMQSocketSHM::SetRcvKernelSize(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketSHM::GetRcvKernelSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + } + return value; +} + unsigned long FairMQSocketSHM::GetBytesTx() const { return fBytesTx; diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/FairMQSocketSHM.h index c2c2b406..584138ce 100644 --- a/fairmq/shmem/FairMQSocketSHM.h +++ b/fairmq/shmem/FairMQSocketSHM.h @@ -49,6 +49,17 @@ class FairMQSocketSHM final : public FairMQSocket void SetOption(const std::string& option, const void* value, size_t valueSize) override; void GetOption(const std::string& option, void* value, size_t* valueSize) override; + void SetLinger(const int value) override; + int GetLinger() const override; + void SetSndBufSize(const int value) override; + int GetSndBufSize() const override; + void SetRcvBufSize(const int value) override; + int GetRcvBufSize() const override; + void SetSndKernelSize(const int value) override; + int GetSndKernelSize() const override; + void SetRcvKernelSize(const int value) override; + int GetRcvKernelSize() const override; + unsigned long GetBytesTx() const override; unsigned long GetBytesRx() const override; unsigned long GetMessagesTx() const override; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index b589f9d5..1b915247 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -9,12 +9,14 @@ #include "FairMQSocketZMQ.h" #include "FairMQMessageZMQ.h" #include "FairMQLogger.h" +#include #include #include using namespace std; +using namespace fair::mq; atomic FairMQSocketZMQ::fInterrupted(false); @@ -402,6 +404,91 @@ void FairMQSocketZMQ::GetOption(const string& option, void* value, size_t* value } } +void FairMQSocketZMQ::SetLinger(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketZMQ::GetLinger() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); + } + return value; +} + +void FairMQSocketZMQ::SetSndBufSize(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketZMQ::GetSndBufSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + } + return value; +} + +void FairMQSocketZMQ::SetRcvBufSize(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketZMQ::GetRcvBufSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + } + return value; +} + +void FairMQSocketZMQ::SetSndKernelSize(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketZMQ::GetSndKernelSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + } + return value; +} + +void FairMQSocketZMQ::SetRcvKernelSize(const int value) +{ + if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + } +} + +int FairMQSocketZMQ::GetRcvKernelSize() const +{ + int value = 0; + size_t valueSize; + if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) { + throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + } + return value; +} + unsigned long FairMQSocketZMQ::GetBytesTx() const { return fBytesTx; diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index e8b3dd79..9b809685 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -49,6 +49,17 @@ class FairMQSocketZMQ final : public FairMQSocket void SetOption(const std::string& option, const void* value, size_t valueSize) override; void GetOption(const std::string& option, void* value, size_t* valueSize) override; + void SetLinger(const int value) override; + int GetLinger() const override; + void SetSndBufSize(const int value) override; + int GetSndBufSize() const override; + void SetRcvBufSize(const int value) override; + int GetRcvBufSize() const override; + void SetSndKernelSize(const int value) override; + int GetSndKernelSize() const override; + void SetRcvKernelSize(const int value) override; + int GetRcvKernelSize() const override; + unsigned long GetBytesTx() const override; unsigned long GetBytesRx() const override; unsigned long GetMessagesTx() const override;