diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 380a01b3..71c8d523 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -233,11 +233,6 @@ if(BUILD_FAIRMQ) SuboptParser.cxx plugins/config/Config.cxx plugins/Control.cxx - zeromq/FairMQMessageZMQ.cxx - zeromq/FairMQPollerZMQ.cxx - zeromq/FairMQUnmanagedRegionZMQ.cxx - zeromq/FairMQSocketZMQ.cxx - zeromq/FairMQTransportFactoryZMQ.cxx MemoryResources.cxx ) diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx deleted file mode 100644 index 7ca522b0..00000000 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ /dev/null @@ -1,247 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQMessageZMQ.cxx - * - * @since 2012-12-05 - * @author D. Klein, A. Rybalchenko, N. Winckler - */ - - -#include "FairMQMessageZMQ.h" -#include "FairMQLogger.h" -#include -#include "FairMQUnmanagedRegionZMQ.h" - -#include - -using namespace std; - -FairMQMessageZMQ::FairMQMessageZMQ(FairMQTransportFactory* factory) - : FairMQMessage(factory) - , fUsedSizeModified(false) - , fUsedSize() - , fMsg(fair::mq::tools::make_unique()) - , fViewMsg(nullptr) -{ - if (zmq_msg_init(fMsg.get()) != 0) - { - LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); - } -} - -FairMQMessageZMQ::FairMQMessageZMQ(const size_t size, FairMQTransportFactory* factory) - : FairMQMessage(factory) - , fUsedSizeModified(false) - , fUsedSize(size) - , fMsg(fair::mq::tools::make_unique()) - , fViewMsg(nullptr) -{ - if (zmq_msg_init_size(fMsg.get(), size) != 0) - { - LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno); - } -} - -FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory) - : FairMQMessage(factory) - , fUsedSizeModified(false) - , fUsedSize() - , fMsg(fair::mq::tools::make_unique()) - , fViewMsg(nullptr) -{ - if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) - { - LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno); - } -} - -FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory) - : FairMQMessage(factory) - , fUsedSizeModified(false) - , fUsedSize() - , fMsg(fair::mq::tools::make_unique()) - , fViewMsg(nullptr) -{ - // FIXME: make this zero-copy: - // simply taking over the provided buffer can casue premature delete, since region could be destroyed before the message is sent out. - // Needs lifetime extension for the ZMQ region. - if (zmq_msg_init_size(fMsg.get(), size) != 0) - { - LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno); - } - - memcpy(zmq_msg_data(fMsg.get()), data, size); - // call region callback - auto ptr = static_cast(region.get()); - if (ptr->fBulkCallback) { - ptr->fBulkCallback({{data, size, hint}}); - } else if (ptr->fCallback) { - ptr->fCallback(data, size, hint); - } - - // if (zmq_msg_init_data(fMsg.get(), data, size, [](void*, void*){}, nullptr) != 0) - // { - // LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno); - // } -} - -void FairMQMessageZMQ::Rebuild() -{ - CloseMessage(); - fMsg = fair::mq::tools::make_unique(); - if (zmq_msg_init(fMsg.get()) != 0) - { - LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); - } -} - -void FairMQMessageZMQ::Rebuild(const size_t size) -{ - CloseMessage(); - fMsg = fair::mq::tools::make_unique(); - if (zmq_msg_init_size(fMsg.get(), size) != 0) - { - LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno); - } -} - -void FairMQMessageZMQ::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) -{ - CloseMessage(); - fMsg = fair::mq::tools::make_unique(); - if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) - { - LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno); - } -} - -zmq_msg_t* FairMQMessageZMQ::GetMessage() const -{ - if (!fViewMsg) - { - return fMsg.get(); - } - else - { - return fViewMsg.get(); - } -} - -void* FairMQMessageZMQ::GetData() const -{ - if (!fViewMsg) - { - return zmq_msg_data(fMsg.get()); - } - else - { - return zmq_msg_data(fViewMsg.get()); - } -} - -size_t FairMQMessageZMQ::GetSize() const -{ - if (fUsedSizeModified) - { - return fUsedSize; - } - else - { - return zmq_msg_size(fMsg.get()); - } -} - -// To emulate shrinking, a new message is created with the new size (ViewMsg), that points to the original buffer with the new size. -// Once the "view message" is transfered, the original is destroyed. -// Used size is applied only once in ApplyUsedSize, which is called by the socket before sending. -// This function just updates the desired size until the actual "resizing" happens. -bool FairMQMessageZMQ::SetUsedSize(const size_t size) -{ - if (size <= zmq_msg_size(fMsg.get())) - { - fUsedSize = size; - fUsedSizeModified = true; - return true; - } - else - { - LOG(error) << "cannot set used size higher than original."; - return false; - } -} - -void FairMQMessageZMQ::ApplyUsedSize() -{ - // Apply only once (before actual send). - // The check is needed because a send could fail and can be reattempted by the user, in this case we do not want to modify buffer again. - if (fUsedSizeModified && !fViewMsg) - { - fViewMsg = fair::mq::tools::make_unique(); - void* ptr = zmq_msg_data(fMsg.get()); - if (zmq_msg_init_data(fViewMsg.get(), - ptr, - fUsedSize, - [](void* /* data */, void* obj) - { - zmq_msg_close(static_cast(obj)); - delete static_cast(obj); - }, - fMsg.release()) != 0) - { - LOG(error) << "failed initializing view message, reason: " << zmq_strerror(errno); - } - } -} - -void FairMQMessageZMQ::Copy(const FairMQMessage& msg) -{ - const FairMQMessageZMQ& zMsg = static_cast(msg); - // Shares the message buffer between msg and this fMsg. - if (zmq_msg_copy(fMsg.get(), zMsg.GetMessage()) != 0) - { - LOG(error) << "failed copying message, reason: " << zmq_strerror(errno); - return; - } - - // if the target message has been resized, apply same to this message also - if (zMsg.fUsedSizeModified) - { - fUsedSizeModified = true; - fUsedSize = zMsg.fUsedSize; - } -} - -void FairMQMessageZMQ::CloseMessage() -{ - if (!fViewMsg) - { - if (zmq_msg_close(fMsg.get()) != 0) - { - LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); - } - // reset the message object to allow reuse in Rebuild - fMsg.reset(nullptr); - } - else - { - if (zmq_msg_close(fViewMsg.get()) != 0) - { - LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); - } - // reset the message object to allow reuse in Rebuild - fViewMsg.reset(nullptr); - } - fUsedSizeModified = false; - fUsedSize = 0; -} - -FairMQMessageZMQ::~FairMQMessageZMQ() -{ - CloseMessage(); -} diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index 2a87f4be..09436cff 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -5,24 +5,23 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQMessageZMQ.h - * - * @since 2014-01-17 - * @author A. Rybalchenko - */ #ifndef FAIRMQMESSAGEZMQ_H_ #define FAIRMQMESSAGEZMQ_H_ -#include -#include -#include +#include +#include "FairMQUnmanagedRegionZMQ.h" +#include +#include +#include #include -#include "FairMQMessage.h" -#include "FairMQUnmanagedRegion.h" +#include +#include +#include +#include + class FairMQTransportFactory; class FairMQSocketZMQ; @@ -32,35 +31,204 @@ class FairMQMessageZMQ final : public FairMQMessage friend class FairMQSocketZMQ; public: - FairMQMessageZMQ(FairMQTransportFactory* = nullptr); - FairMQMessageZMQ(const size_t size, FairMQTransportFactory* = nullptr); - FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* = nullptr); - FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* = nullptr); + FairMQMessageZMQ(FairMQTransportFactory* factory = nullptr) + : FairMQMessage(factory) + , fUsedSizeModified(false) + , fUsedSize() + , fMsg(fair::mq::tools::make_unique()) + , fViewMsg(nullptr) + { + if (zmq_msg_init(fMsg.get()) != 0) { + LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); + } + } - void Rebuild() override; - void Rebuild(const size_t size) override; - void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; + FairMQMessageZMQ(const size_t size, FairMQTransportFactory* factory = nullptr) + : FairMQMessage(factory) + , fUsedSizeModified(false) + , fUsedSize(size) + , fMsg(fair::mq::tools::make_unique()) + , fViewMsg(nullptr) + { + if (zmq_msg_init_size(fMsg.get(), size) != 0) { + LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno); + } + } - void* GetData() const override; - size_t GetSize() const override; + FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr) + : FairMQMessage(factory) + , fUsedSizeModified(false) + , fUsedSize() + , fMsg(fair::mq::tools::make_unique()) + , fViewMsg(nullptr) + { + if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) { + LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno); + } + } - bool SetUsedSize(const size_t size) override; - void ApplyUsedSize(); + FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr) + : FairMQMessage(factory) + , fUsedSizeModified(false) + , fUsedSize() + , fMsg(fair::mq::tools::make_unique()) + , fViewMsg(nullptr) + { + // FIXME: make this zero-copy: + // simply taking over the provided buffer can casue premature delete, since region could be + // destroyed before the message is sent out. Needs lifetime extension for the ZMQ region. + if (zmq_msg_init_size(fMsg.get(), size) != 0) { + LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno); + } + + std::memcpy(zmq_msg_data(fMsg.get()), data, size); + // call region callback + auto ptr = static_cast(region.get()); + if (ptr->fBulkCallback) { + ptr->fBulkCallback({{data, size, hint}}); + } else if (ptr->fCallback) { + ptr->fCallback(data, size, hint); + } + + // if (zmq_msg_init_data(fMsg.get(), data, size, [](void*, void*){}, nullptr) != 0) + // { + // LOG(error) << "failed initializing message with data, reason: " << + // zmq_strerror(errno); + // } + } + + void Rebuild() override + { + CloseMessage(); + fMsg = fair::mq::tools::make_unique(); + if (zmq_msg_init(fMsg.get()) != 0) { + LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); + } + } + + void Rebuild(const size_t size) override + { + CloseMessage(); + fMsg = fair::mq::tools::make_unique(); + if (zmq_msg_init_size(fMsg.get(), size) != 0) { + LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno); + } + } + + void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override + { + CloseMessage(); + fMsg = fair::mq::tools::make_unique(); + if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) { + LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno); + } + } + + void* GetData() const override + { + if (!fViewMsg) { + return zmq_msg_data(fMsg.get()); + } else { + return zmq_msg_data(fViewMsg.get()); + } + } + + size_t GetSize() const override + { + if (fUsedSizeModified) { + return fUsedSize; + } else { + return zmq_msg_size(fMsg.get()); + } + } + + // To emulate shrinking, a new message is created with the new size (ViewMsg), that points to + // the original buffer with the new size. Once the "view message" is transfered, the original is + // destroyed. Used size is applied only once in ApplyUsedSize, which is called by the socket + // before sending. This function just updates the desired size until the actual "resizing" + // happens. + bool SetUsedSize(const size_t size) override + { + if (size <= zmq_msg_size(fMsg.get())) { + fUsedSize = size; + fUsedSizeModified = true; + return true; + } else { + LOG(error) << "cannot set used size higher than original."; + return false; + } + } + + void ApplyUsedSize() + { + // Apply only once (before actual send). + // The check is needed because a send could fail and can be reattempted by the user, in this + // case we do not want to modify buffer again. + if (fUsedSizeModified && !fViewMsg) { + fViewMsg = fair::mq::tools::make_unique(); + void* ptr = zmq_msg_data(fMsg.get()); + if (zmq_msg_init_data(fViewMsg.get(), ptr, fUsedSize, [](void* /* data */, void* obj) { + zmq_msg_close(static_cast(obj)); + delete static_cast(obj); + }, fMsg.release()) != 0) { + LOG(error) << "failed initializing view message, reason: " << zmq_strerror(errno); + } + } + } fair::mq::Transport GetType() const override { return fair::mq::Transport::ZMQ; } - void Copy(const FairMQMessage& msg) override; + void Copy(const FairMQMessage& msg) override + { + const FairMQMessageZMQ& zMsg = static_cast(msg); + // Shares the message buffer between msg and this fMsg. + if (zmq_msg_copy(fMsg.get(), zMsg.GetMessage()) != 0) { + LOG(error) << "failed copying message, reason: " << zmq_strerror(errno); + return; + } - ~FairMQMessageZMQ() override; + // if the target message has been resized, apply same to this message also + if (zMsg.fUsedSizeModified) { + fUsedSizeModified = true; + fUsedSize = zMsg.fUsedSize; + } + } + + ~FairMQMessageZMQ() override { CloseMessage(); } private: bool fUsedSizeModified; size_t fUsedSize; std::unique_ptr fMsg; - std::unique_ptr fViewMsg; // view on a subset of fMsg (treating it as user buffer) + std::unique_ptr fViewMsg; // view on a subset of fMsg (treating it as user buffer) - zmq_msg_t* GetMessage() const; - void CloseMessage(); + zmq_msg_t* GetMessage() const + { + if (!fViewMsg) { + return fMsg.get(); + } else { + return fViewMsg.get(); + } + } + + void CloseMessage() + { + if (!fViewMsg) { + if (zmq_msg_close(fMsg.get()) != 0) { + LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); + } + // reset the message object to allow reuse in Rebuild + fMsg.reset(nullptr); + } else { + if (zmq_msg_close(fViewMsg.get()) != 0) { + LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); + } + // reset the message object to allow reuse in Rebuild + fViewMsg.reset(nullptr); + } + fUsedSizeModified = false; + fUsedSize = 0; + } }; #endif /* FAIRMQMESSAGEZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQPollerZMQ.cxx b/fairmq/zeromq/FairMQPollerZMQ.cxx deleted file mode 100644 index 820d1ff0..00000000 --- a/fairmq/zeromq/FairMQPollerZMQ.cxx +++ /dev/null @@ -1,211 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQPollerZMQ.cxx - * - * @since 2014-01-23 - * @author A. Rybalchenko - */ - -#include - -#include "FairMQPollerZMQ.h" -#include "FairMQSocketZMQ.h" -#include "FairMQLogger.h" - -using namespace std; - -FairMQPollerZMQ::FairMQPollerZMQ(const vector& channels) - : fItems() - , fNumItems(0) - , fOffsetMap() -{ - fNumItems = channels.size(); - fItems = new zmq_pollitem_t[fNumItems]; - - for (int i = 0; i < fNumItems; ++i) - { - fItems[i].socket = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); - fItems[i].fd = 0; - fItems[i].revents = 0; - - int type = 0; - size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); - - SetItemEvents(fItems[i], type); - } -} - - -FairMQPollerZMQ::FairMQPollerZMQ(const std::vector& channels) - : fItems() - , fNumItems(0) - , fOffsetMap() -{ - fNumItems = channels.size(); - fItems = new zmq_pollitem_t[fNumItems]; - - for (int i = 0; i < fNumItems; ++i) - { - fItems[i].socket = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); - fItems[i].fd = 0; - fItems[i].revents = 0; - - int type = 0; - size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); - - SetItemEvents(fItems[i], type); - } -} - -FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map>& channelsMap, const vector& channelList) - : fItems() - , fNumItems(0) - , fOffsetMap() -{ - try - { - int offset = 0; - // calculate offsets and the total size of the poll item set - for (string channel : channelList) - { - fOffsetMap[channel] = offset; - offset += channelsMap.at(channel).size(); - fNumItems += channelsMap.at(channel).size(); - } - - fItems = new zmq_pollitem_t[fNumItems]; - - int index = 0; - for (string channel : channelList) - { - for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) - { - index = fOffsetMap[channel] + i; - - fItems[index].socket = static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); - fItems[index].fd = 0; - fItems[index].revents = 0; - - int type = 0; - size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); - - SetItemEvents(fItems[index], type); - } - } - } - catch (const std::out_of_range& oor) - { - LOG(error) << "at least one of the provided channel keys for poller initialization is invalid"; - LOG(error) << "out of range error: " << oor.what() << '\n'; - throw std::out_of_range("invalid channel during poller initialization"); - } -} - -void FairMQPollerZMQ::SetItemEvents(zmq_pollitem_t& item, const int type) -{ - if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) - { - item.events = ZMQ_POLLIN|ZMQ_POLLOUT; - } - else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) - { - item.events = ZMQ_POLLOUT; - } - else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) - { - item.events = ZMQ_POLLIN; - } - else - { - LOG(error) << "invalid poller configuration, exiting."; - exit(EXIT_FAILURE); - } -} - -void FairMQPollerZMQ::Poll(const int timeout) -{ - if (zmq_poll(fItems, fNumItems, timeout) < 0) - { - if (errno == ETERM) - { - LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); - } - else - { - LOG(error) << "polling failed, reason: " << zmq_strerror(errno); - throw std::runtime_error("polling failed"); - } - } -} - -bool FairMQPollerZMQ::CheckInput(const int index) -{ - if (fItems[index].revents & ZMQ_POLLIN) - { - return true; - } - - return false; -} - -bool FairMQPollerZMQ::CheckOutput(const int index) -{ - if (fItems[index].revents & ZMQ_POLLOUT) - { - return true; - } - - return false; -} - -bool FairMQPollerZMQ::CheckInput(const string& channelKey, const int index) -{ - try - { - if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN) - { - return true; - } - - return false; - } - catch (const std::out_of_range& oor) - { - LOG(error) << "invalid channel key: \"" << channelKey << "\""; - LOG(error) << "out of range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); - } -} - -bool FairMQPollerZMQ::CheckOutput(const string& channelKey, const int index) -{ - try - { - if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT) - { - return true; - } - - return false; - } - catch (const std::out_of_range& oor) - { - LOG(error) << "invalid channel key: \"" << channelKey << "\""; - LOG(error) << "out of range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); - } -} - -FairMQPollerZMQ::~FairMQPollerZMQ() -{ - delete[] fItems; -} diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h index 1677850c..cbb12092 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/FairMQPollerZMQ.h @@ -5,49 +5,179 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQPollerZMQ.h - * - * @since 2014-01-23 - * @author A. Rybalchenko - */ #ifndef FAIRMQPOLLERZMQ_H_ #define FAIRMQPOLLERZMQ_H_ -#include +#include +#include +#include #include - +#include #include -#include "FairMQPoller.h" -#include "FairMQChannel.h" -#include "FairMQTransportFactoryZMQ.h" - -class FairMQChannel; - class FairMQPollerZMQ final : public FairMQPoller { - friend class FairMQChannel; - friend class FairMQTransportFactoryZMQ; - public: - FairMQPollerZMQ(const std::vector& channels); - FairMQPollerZMQ(const std::vector& channels); - FairMQPollerZMQ(const std::unordered_map>& channelsMap, const std::vector& channelList); + FairMQPollerZMQ(const std::vector& channels) + : fItems() + , fNumItems(0) + , fOffsetMap() + { + fNumItems = channels.size(); + fItems = new zmq_pollitem_t[fNumItems]; // TODO: fix me + + for (int i = 0; i < fNumItems; ++i) { + fItems[i].socket = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); + fItems[i].fd = 0; + fItems[i].revents = 0; + + int type = 0; + size_t size = sizeof(type); + zmq_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + + SetItemEvents(fItems[i], type); + } + } + + FairMQPollerZMQ(const std::vector& channels) + : fItems() + , fNumItems(0) + , fOffsetMap() + { + fNumItems = channels.size(); + fItems = new zmq_pollitem_t[fNumItems]; + + for (int i = 0; i < fNumItems; ++i) { + fItems[i].socket = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); + fItems[i].fd = 0; + fItems[i].revents = 0; + + int type = 0; + size_t size = sizeof(type); + zmq_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + + SetItemEvents(fItems[i], type); + } + } + + FairMQPollerZMQ(const std::unordered_map>& channelsMap, const std::vector& channelList) + : fItems() + , fNumItems(0) + , fOffsetMap() + { + try { + int offset = 0; + // calculate offsets and the total size of the poll item set + for (std::string channel : channelList) { + fOffsetMap[channel] = offset; + offset += channelsMap.at(channel).size(); + fNumItems += channelsMap.at(channel).size(); + } + + fItems = new zmq_pollitem_t[fNumItems]; + + int index = 0; + for (std::string channel : channelList) { + for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) { + index = fOffsetMap[channel] + i; + + fItems[index].socket = static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); + fItems[index].fd = 0; + fItems[index].revents = 0; + + int type = 0; + size_t size = sizeof(type); + zmq_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + + SetItemEvents(fItems[index], type); + } + } + } catch (const std::out_of_range& oor) { + LOG(error) << "at least one of the provided channel keys for poller initialization is invalid"; + LOG(error) << "out of range error: " << oor.what() << '\n'; + throw std::out_of_range("invalid channel during poller initialization"); + } + } FairMQPollerZMQ(const FairMQPollerZMQ&) = delete; FairMQPollerZMQ operator=(const FairMQPollerZMQ&) = delete; - void SetItemEvents(zmq_pollitem_t& item, const int type); + void SetItemEvents(zmq_pollitem_t& item, const int type) + { + if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) { + item.events = ZMQ_POLLIN | ZMQ_POLLOUT; + } else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) { + item.events = ZMQ_POLLOUT; + } else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) { + item.events = ZMQ_POLLIN; + } else { + LOG(error) << "invalid poller configuration, exiting."; + exit(EXIT_FAILURE); + } + } - void Poll(const int timeout) override; - bool CheckInput(const int index) override; - bool CheckOutput(const int index) override; - bool CheckInput(const std::string& channelKey, const int index) override; - bool CheckOutput(const std::string& channelKey, const int index) override; + void Poll(const int timeout) override + { + if (zmq_poll(fItems, fNumItems, timeout) < 0) { + if (errno == ETERM) { + LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); + } else { + LOG(error) << "polling failed, reason: " << zmq_strerror(errno); + throw std::runtime_error("polling failed"); + } + } + } - ~FairMQPollerZMQ() override; + bool CheckInput(const int index) override + { + if (fItems[index].revents & ZMQ_POLLIN) { + return true; + } + + return false; + } + + bool CheckOutput(const int index) override + { + if (fItems[index].revents & ZMQ_POLLOUT) { + return true; + } + + return false; + } + + bool CheckInput(const std::string& channelKey, const int index) override + { + try { + if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN) { + return true; + } + + return false; + } catch (const std::out_of_range& oor) { + LOG(error) << "invalid channel key: \"" << channelKey << "\""; + LOG(error) << "out of range error: " << oor.what() << '\n'; + exit(EXIT_FAILURE); + } + } + + bool CheckOutput(const std::string& channelKey, const int index) override + { + try { + if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT) { + return true; + } + + return false; + } catch (const std::out_of_range& oor) { + LOG(error) << "invalid channel key: \"" << channelKey << "\""; + LOG(error) << "out of range error: " << oor.what() << '\n'; + exit(EXIT_FAILURE); + } + } + + ~FairMQPollerZMQ() override { delete[] fItems; } private: zmq_pollitem_t* fItems; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx deleted file mode 100644 index cd6dac4e..00000000 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ /dev/null @@ -1,482 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014-2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "FairMQSocketZMQ.h" -#include "FairMQMessageZMQ.h" -#include "FairMQLogger.h" -#include - -#include - -using namespace std; -using namespace fair::mq; - -FairMQSocketZMQ::FairMQSocketZMQ(fair::mq::zmq::Context& ctx, const string& type, const string& name, const string& id /*= ""*/, FairMQTransportFactory* factory) - : FairMQSocket(factory) - , fCtx(ctx) - , fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type))) - , fId(id + "." + name + "." + type) - , fBytesTx(0) - , fBytesRx(0) - , fMessagesTx(0) - , fMessagesRx(0) - , fSndTimeout(100) - , fRcvTimeout(100) -{ - if (fSocket == nullptr) - { - LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); - exit(EXIT_FAILURE); - } - - if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) - { - LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); - } - - // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. - // Default value for ZeroMQ is -1, which is to wait forever. - int linger = 1000; - if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) - { - LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); - } - - if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) - { - LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); - } - - if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) - { - LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); - } - - if (type == "sub") - { - if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) - { - LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); - } - } - - LOG(debug) << "Created socket " << GetId(); -} - -bool FairMQSocketZMQ::Bind(const string& address) -{ - // LOG(info) << "bind socket " << fId << " on " << address; - - if (zmq_bind(fSocket, address.c_str()) != 0) - { - if (errno == EADDRINUSE) { - // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. - return false; - } - LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); - return false; - } - - return true; -} - -bool FairMQSocketZMQ::Connect(const string& address) -{ - // LOG(info) << "connect socket " << fId << " on " << address; - - if (zmq_connect(fSocket, address.c_str()) != 0) - { - LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); - return false; - } - - return true; -} - -int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout) -{ - int flags = 0; - if (timeout == 0) { - flags = ZMQ_DONTWAIT; - } - int elapsed = 0; - - static_cast(msg.get())->ApplyUsedSize(); - - while (true) { - int nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); - if (nbytes >= 0) { - fBytesTx += nbytes; - ++fMessagesTx; - - return nbytes; - } else if (zmq_errno() == EAGAIN) { - if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout > 0) { - elapsed += fSndTimeout; - if (elapsed >= timeout) { - return -2; - } - } - continue; - } else { - return -2; - } - } else if (zmq_errno() == ETERM) { - LOG(info) << "terminating socket " << fId; - return -1; - } else if (zmq_errno() == EINTR) { - LOG(debug) << "Send interrupted by system call"; - return nbytes; - } else { - LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); - return nbytes; - } - } -} - -int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout) -{ - int flags = 0; - if (timeout == 0) { - flags = ZMQ_DONTWAIT; - } - int elapsed = 0; - - while (true) { - int nbytes = zmq_msg_recv(static_cast(msg.get())->GetMessage(), fSocket, flags); - if (nbytes >= 0) { - fBytesRx += nbytes; - ++fMessagesRx; - return nbytes; - } else if (zmq_errno() == EAGAIN) { - if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout > 0) { - elapsed += fRcvTimeout; - if (elapsed >= timeout) { - return -2; - } - } - continue; - } else { - return -2; - } - } else if (zmq_errno() == ETERM) { - LOG(info) << "terminating socket " << fId; - return -1; - } else if (zmq_errno() == EINTR) { - LOG(debug) << "Receive interrupted by system call"; - return nbytes; - } else { - LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); - return nbytes; - } - } -} - -int64_t FairMQSocketZMQ::Send(vector& msgVec, const int timeout) -{ - int flags = 0; - if (timeout == 0) { - flags = ZMQ_DONTWAIT; - } - - const unsigned int vecSize = msgVec.size(); - - // Sending vector typicaly handles more then one part - if (vecSize > 1) { - int elapsed = 0; - - while (true) { - int64_t totalSize = 0; - bool repeat = false; - - for (unsigned int i = 0; i < vecSize; ++i) { - static_cast(msgVec[i].get())->ApplyUsedSize(); - - int nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), - fSocket, - (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags); - if (nbytes >= 0) { - totalSize += nbytes; - } else { - // according to ZMQ docs, this can only occur for the first part - if (zmq_errno() == EAGAIN) { - if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout > 0) { - elapsed += fSndTimeout; - if (elapsed >= timeout) { - return -2; - } - } - repeat = true; - break; - } else { - return -2; - } - } - if (zmq_errno() == ETERM) { - LOG(info) << "terminating socket " << fId; - return -1; - } else if (zmq_errno() == EINTR) { - LOG(debug) << "Receive interrupted by system call"; - return nbytes; - } else { - LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); - return nbytes; - } - } - } - - if (repeat) { - continue; - } - - // store statistics on how many messages have been sent (handle all parts as a single message) - ++fMessagesTx; - fBytesTx += totalSize; - return totalSize; - } - } // If there's only one part, send it as a regular message - else if (vecSize == 1) { - return Send(msgVec.back(), timeout); - } else { // if the vector is empty, something might be wrong - LOG(warn) << "Will not send empty vector"; - return -1; - } -} - -int64_t FairMQSocketZMQ::Receive(vector& msgVec, const int timeout) -{ - int flags = 0; - if (timeout == 0) { - flags = ZMQ_DONTWAIT; - } - int elapsed = 0; - - while (true) { - int64_t totalSize = 0; - int64_t more = 0; - bool repeat = false; - - do { - unique_ptr part(new FairMQMessageZMQ(GetTransport())); - - int nbytes = zmq_msg_recv(static_cast(part.get())->GetMessage(), fSocket, flags); - if (nbytes >= 0) { - msgVec.push_back(move(part)); - totalSize += nbytes; - } else if (zmq_errno() == EAGAIN) { - if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { - if (timeout > 0) { - elapsed += fRcvTimeout; - if (elapsed >= timeout) { - return -2; - } - } - repeat = true; - break; - } else { - return -2; - } - } else if (zmq_errno() == EINTR) { - LOG(debug) << "Receive interrupted by system call"; - return nbytes; - } else { - return nbytes; - } - - size_t moreSize = sizeof(more); - zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize); - } while (more); - - if (repeat) { - continue; - } - - // store statistics on how many messages have been received (handle all parts as a single message) - ++fMessagesRx; - fBytesRx += totalSize; - return totalSize; - } -} - -void FairMQSocketZMQ::Close() -{ - // LOG(debug) << "Closing socket " << fId; - - if (fSocket == nullptr) - { - return; - } - - if (zmq_close(fSocket) != 0) - { - LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); - } - - fSocket = nullptr; -} - -void* FairMQSocketZMQ::GetSocket() const -{ - return fSocket; -} - -void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t valueSize) -{ - if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) - { - LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno); - } -} - -void FairMQSocketZMQ::GetOption(const string& option, void* value, size_t* valueSize) -{ - if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) - { - LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno); - } -} - -void FairMQSocketZMQ::SetLinger(const int value) -{ - if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno))); - } -} - -int FairMQSocketZMQ::GetLinger() const -{ - int value = 0; - size_t valueSize = sizeof(value); - if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); - } - return value; -} - -void FairMQSocketZMQ::SetSndBufSize(const int value) -{ - if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); - } -} - -int FairMQSocketZMQ::GetSndBufSize() const -{ - int value = 0; - size_t valueSize = sizeof(value); - if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); - } - return value; -} - -void FairMQSocketZMQ::SetRcvBufSize(const int value) -{ - if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); - } -} - -int FairMQSocketZMQ::GetRcvBufSize() const -{ - int value = 0; - size_t valueSize = sizeof(value); - if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); - } - return value; -} - -void FairMQSocketZMQ::SetSndKernelSize(const int value) -{ - if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); - } -} - -int FairMQSocketZMQ::GetSndKernelSize() const -{ - int value = 0; - size_t valueSize = sizeof(value); - if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); - } - return value; -} - -void FairMQSocketZMQ::SetRcvKernelSize(const int value) -{ - if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); - } -} - -int FairMQSocketZMQ::GetRcvKernelSize() const -{ - int value = 0; - size_t valueSize = sizeof(value); - if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) { - throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); - } - return value; -} - -unsigned long FairMQSocketZMQ::GetBytesTx() const -{ - return fBytesTx; -} - -unsigned long FairMQSocketZMQ::GetBytesRx() const -{ - return fBytesRx; -} - -unsigned long FairMQSocketZMQ::GetMessagesTx() const -{ - return fMessagesTx; -} - -unsigned long FairMQSocketZMQ::GetMessagesRx() const -{ - return fMessagesRx; -} - -int FairMQSocketZMQ::GetConstant(const string& constant) -{ - if (constant == "") return 0; - if (constant == "sub") return ZMQ_SUB; - if (constant == "pub") return ZMQ_PUB; - if (constant == "xsub") return ZMQ_XSUB; - if (constant == "xpub") return ZMQ_XPUB; - if (constant == "push") return ZMQ_PUSH; - if (constant == "pull") return ZMQ_PULL; - if (constant == "req") return ZMQ_REQ; - if (constant == "rep") return ZMQ_REP; - if (constant == "dealer") return ZMQ_DEALER; - if (constant == "router") return ZMQ_ROUTER; - if (constant == "pair") return ZMQ_PAIR; - - if (constant == "snd-hwm") return ZMQ_SNDHWM; - if (constant == "rcv-hwm") return ZMQ_RCVHWM; - if (constant == "snd-size") return ZMQ_SNDBUF; - if (constant == "rcv-size") return ZMQ_RCVBUF; - if (constant == "snd-more") return ZMQ_SNDMORE; - if (constant == "rcv-more") return ZMQ_RCVMORE; - - if (constant == "linger") return ZMQ_LINGER; - - return -1; -} - -FairMQSocketZMQ::~FairMQSocketZMQ() -{ - Close(); -} diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 13e5b9c2..dc52fc93 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -10,8 +10,13 @@ #define FAIRMQSOCKETZMQ_H_ #include -#include "FairMQSocket.h" -#include "FairMQMessage.h" +#include +#include +#include +#include +#include "FairMQMessageZMQ.h" + +#include #include #include // unique_ptr @@ -21,47 +26,440 @@ class FairMQTransportFactory; class FairMQSocketZMQ final : public FairMQSocket { public: - FairMQSocketZMQ(fair::mq::zmq::Context& ctx, const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* factory = nullptr); + FairMQSocketZMQ(fair::mq::zmq::Context& ctx, const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* factory = nullptr) + : FairMQSocket(factory) + , fCtx(ctx) + , fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type))) + , fId(id + "." + name + "." + type) + , fBytesTx(0) + , fBytesRx(0) + , fMessagesTx(0) + , fMessagesRx(0) + , fSndTimeout(100) + , fRcvTimeout(100) + { + if (fSocket == nullptr) + { + LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); + exit(EXIT_FAILURE); + } + + if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) + { + LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); + } + + // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. + // Default value for ZeroMQ is -1, which is to wait forever. + int linger = 1000; + if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) + { + LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); + } + + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) + { + LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); + } + + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) + { + LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); + } + + if (type == "sub") + { + if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, nullptr, 0) != 0) + { + LOG(error) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); + } + } + + LOG(debug) << "Created socket " << GetId(); + } FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; std::string GetId() const override { return fId; } - bool Bind(const std::string& address) override; - bool Connect(const std::string& address) override; + bool Bind(const std::string& address) override + { + // LOG(info) << "bind socket " << fId << " on " << address; - int Send(FairMQMessagePtr& msg, const int timeout = -1) override; - int Receive(FairMQMessagePtr& msg, const int timeout = -1) override; - int64_t Send(std::vector>& msgVec, const int timeout = -1) override; - int64_t Receive(std::vector>& msgVec, const int timeout = -1) override; + if (zmq_bind(fSocket, address.c_str()) != 0) + { + if (errno == EADDRINUSE) { + // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. + return false; + } + LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); + return false; + } - void* GetSocket() const; + return true; + } + bool Connect(const std::string& address) override + { + // LOG(info) << "connect socket " << fId << " on " << address; - void Close() override; + if (zmq_connect(fSocket, address.c_str()) != 0) + { + LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno); + return false; + } - void SetOption(const std::string& option, const void* value, size_t valueSize) override; - void GetOption(const std::string& option, void* value, size_t* valueSize) override; + return true; + } - void SetLinger(const int value) override; - int GetLinger() const override; - void SetSndBufSize(const int value) override; - int GetSndBufSize() const override; - void SetRcvBufSize(const int value) override; - int GetRcvBufSize() const override; - void SetSndKernelSize(const int value) override; - int GetSndKernelSize() const override; - void SetRcvKernelSize(const int value) override; - int GetRcvKernelSize() const override; + int Send(FairMQMessagePtr& msg, const int timeout = -1) override + { + int flags = 0; + if (timeout == 0) { + flags = ZMQ_DONTWAIT; + } + int elapsed = 0; - unsigned long GetBytesTx() const override; - unsigned long GetBytesRx() const override; - unsigned long GetMessagesTx() const override; - unsigned long GetMessagesRx() const override; + static_cast(msg.get())->ApplyUsedSize(); - static int GetConstant(const std::string& constant); + while (true) { + int nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); + if (nbytes >= 0) { + fBytesTx += nbytes; + ++fMessagesTx; - ~FairMQSocketZMQ() override; + return nbytes; + } else if (zmq_errno() == EAGAIN) { + if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { + elapsed += fSndTimeout; + if (elapsed >= timeout) { + return -2; + } + } + continue; + } else { + return -2; + } + } else if (zmq_errno() == ETERM) { + LOG(info) << "terminating socket " << fId; + return -1; + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Send interrupted by system call"; + return nbytes; + } else { + LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + return nbytes; + } + } + } + int Receive(FairMQMessagePtr& msg, const int timeout = -1) override + { + int flags = 0; + if (timeout == 0) { + flags = ZMQ_DONTWAIT; + } + int elapsed = 0; + + while (true) { + int nbytes = zmq_msg_recv(static_cast(msg.get())->GetMessage(), fSocket, flags); + if (nbytes >= 0) { + fBytesRx += nbytes; + ++fMessagesRx; + return nbytes; + } else if (zmq_errno() == EAGAIN) { + if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { + elapsed += fRcvTimeout; + if (elapsed >= timeout) { + return -2; + } + } + continue; + } else { + return -2; + } + } else if (zmq_errno() == ETERM) { + LOG(info) << "terminating socket " << fId; + return -1; + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Receive interrupted by system call"; + return nbytes; + } else { + LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); + return nbytes; + } + } + } + int64_t Send(std::vector>& msgVec, const int timeout = -1) override + { + int flags = 0; + if (timeout == 0) { + flags = ZMQ_DONTWAIT; + } + + const unsigned int vecSize = msgVec.size(); + + // Sending vector typicaly handles more then one part + if (vecSize > 1) { + int elapsed = 0; + + while (true) { + int64_t totalSize = 0; + bool repeat = false; + + for (unsigned int i = 0; i < vecSize; ++i) { + static_cast(msgVec[i].get())->ApplyUsedSize(); + + int nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), + fSocket, + (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags); + if (nbytes >= 0) { + totalSize += nbytes; + } else { + // according to ZMQ docs, this can only occur for the first part + if (zmq_errno() == EAGAIN) { + if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { + elapsed += fSndTimeout; + if (elapsed >= timeout) { + return -2; + } + } + repeat = true; + break; + } else { + return -2; + } + } + if (zmq_errno() == ETERM) { + LOG(info) << "terminating socket " << fId; + return -1; + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Receive interrupted by system call"; + return nbytes; + } else { + LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + return nbytes; + } + } + } + + if (repeat) { + continue; + } + + // store statistics on how many messages have been sent (handle all parts as a single message) + ++fMessagesTx; + fBytesTx += totalSize; + return totalSize; + } + } // If there's only one part, send it as a regular message + else if (vecSize == 1) { + return Send(msgVec.back(), timeout); + } else { // if the vector is empty, something might be wrong + LOG(warn) << "Will not send empty vector"; + return -1; + } + } + int64_t Receive(std::vector>& msgVec, const int timeout = -1) override + { + int flags = 0; + if (timeout == 0) { + flags = ZMQ_DONTWAIT; + } + int elapsed = 0; + + while (true) { + int64_t totalSize = 0; + int64_t more = 0; + bool repeat = false; + + do { + std::unique_ptr part(new FairMQMessageZMQ(GetTransport())); + + int nbytes = zmq_msg_recv(static_cast(part.get())->GetMessage(), fSocket, flags); + if (nbytes >= 0) { + msgVec.push_back(move(part)); + totalSize += nbytes; + } else if (zmq_errno() == EAGAIN) { + if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { + elapsed += fRcvTimeout; + if (elapsed >= timeout) { + return -2; + } + } + repeat = true; + break; + } else { + return -2; + } + } else if (zmq_errno() == EINTR) { + LOG(debug) << "Receive interrupted by system call"; + return nbytes; + } else { + return nbytes; + } + + size_t moreSize = sizeof(more); + zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &moreSize); + } while (more); + + if (repeat) { + continue; + } + + // store statistics on how many messages have been received (handle all parts as a single message) + ++fMessagesRx; + fBytesRx += totalSize; + return totalSize; + } + } + + void* GetSocket() const { return fSocket; } + + void Close() override + { + // LOG(debug) << "Closing socket " << fId; + + if (fSocket == nullptr) + { + return; + } + + if (zmq_close(fSocket) != 0) + { + LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); + } + + fSocket = nullptr; + } + + void SetOption(const std::string& option, const void* value, size_t valueSize) override + { + if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) + { + LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno); + } + } + void GetOption(const std::string& option, void* value, size_t* valueSize) override + { + if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) + { + LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno); + } + } + + void SetLinger(const int value) override + { + if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno))); + } + } + int GetLinger() const override + { + int value = 0; + size_t valueSize = sizeof(value); + if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); + } + return value; + } + void SetSndBufSize(const int value) override + { + if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + } + } + int GetSndBufSize() const override + { + int value = 0; + size_t valueSize = sizeof(value); + if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + } + return value; + } + void SetRcvBufSize(const int value) override + { + if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + } + } + int GetRcvBufSize() const override + { + int value = 0; + size_t valueSize = sizeof(value); + if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + } + return value; + } + void SetSndKernelSize(const int value) override + { + if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + } + } + int GetSndKernelSize() const override + { + int value = 0; + size_t valueSize = sizeof(value); + if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + } + return value; + } + void SetRcvKernelSize(const int value) override + { + if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + } + } + int GetRcvKernelSize() const override + { + int value = 0; + size_t valueSize = sizeof(value); + if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) { + throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + } + return value; + } + + unsigned long GetBytesTx() const override { return fBytesTx; } + unsigned long GetBytesRx() const override { return fBytesRx; } + unsigned long GetMessagesTx() const override { return fMessagesTx; } + unsigned long GetMessagesRx() const override { return fMessagesRx; } + + static int GetConstant(const std::string& constant) + { + if (constant == "") return 0; + if (constant == "sub") return ZMQ_SUB; + if (constant == "pub") return ZMQ_PUB; + if (constant == "xsub") return ZMQ_XSUB; + if (constant == "xpub") return ZMQ_XPUB; + if (constant == "push") return ZMQ_PUSH; + if (constant == "pull") return ZMQ_PULL; + if (constant == "req") return ZMQ_REQ; + if (constant == "rep") return ZMQ_REP; + if (constant == "dealer") return ZMQ_DEALER; + if (constant == "router") return ZMQ_ROUTER; + if (constant == "pair") return ZMQ_PAIR; + + if (constant == "snd-hwm") return ZMQ_SNDHWM; + if (constant == "rcv-hwm") return ZMQ_RCVHWM; + if (constant == "snd-size") return ZMQ_SNDBUF; + if (constant == "rcv-size") return ZMQ_RCVBUF; + if (constant == "snd-more") return ZMQ_SNDMORE; + if (constant == "rcv-more") return ZMQ_RCVMORE; + + if (constant == "linger") return ZMQ_LINGER; + + return -1; + } + + ~FairMQSocketZMQ() override { Close(); } private: fair::mq::zmq::Context& fCtx; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx deleted file mode 100644 index e3a613f7..00000000 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ /dev/null @@ -1,125 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "FairMQTransportFactoryZMQ.h" -#include -#include - -#include // find_if - -using namespace std; - -FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fair::mq::ProgOptions* config) - : FairMQTransportFactory(id) - , fCtx(nullptr) -{ - int major, minor, patch; - zmq_version(&major, &minor, &patch); - LOG(debug) << "Transport: Using ZeroMQ library, version: " << major << "." << minor << "." << patch; - - if (config) { - fCtx = fair::mq::tools::make_unique(config->GetProperty("io-threads", 1)); - } else { - LOG(debug) << "fair::mq::ProgOptions not available! Using defaults."; - fCtx = fair::mq::tools::make_unique(1); - } -} - -FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage() -{ - return unique_ptr(new FairMQMessageZMQ(this)); -} - -FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(const size_t size) -{ - return unique_ptr(new FairMQMessageZMQ(size, this)); -} - -FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) -{ - return unique_ptr(new FairMQMessageZMQ(data, size, ffn, hint, this)); -} - -FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint) -{ - return unique_ptr(new FairMQMessageZMQ(region, data, size, hint, this)); -} - -FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) -{ - return unique_ptr(new FairMQSocketZMQ(*fCtx, type, name, GetId(), this)); -} - -FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector& channels) const -{ - return unique_ptr(new FairMQPollerZMQ(channels)); -} - -FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector& channels) const -{ - return unique_ptr(new FairMQPollerZMQ(channels)); -} - -FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const -{ - return unique_ptr(new FairMQPollerZMQ(channelsMap, channelList)); -} - -FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion( - const size_t size, - FairMQRegionCallback callback, - const string& path /* = "" */, - int flags /* = 0 */) -{ - return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); -} -FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion( - const size_t size, - FairMQRegionBulkCallback bulkCallback, - const string& path /* = "" */, - int flags /* = 0 */) -{ - return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); -} -FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion( - const size_t size, - const int64_t userFlags, - FairMQRegionCallback callback, - const string& path /* = "" */, - int flags /* = 0 */) -{ - return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); -} -FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion( - const size_t size, - const int64_t userFlags, - FairMQRegionBulkCallback bulkCallback, - const string& path /* = "" */, - int flags /* = 0 */) -{ - return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); -} - -FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion( - const size_t size, - const int64_t userFlags, - FairMQRegionCallback callback, - FairMQRegionBulkCallback bulkCallback, - const string& path /* = "" */, - int flags /* = 0 */) -{ - unique_ptr ptr = unique_ptr(new FairMQUnmanagedRegionZMQ(*fCtx, size, userFlags, callback, bulkCallback, path, flags, this)); - auto zPtr = static_cast(ptr.get()); - fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created); - return ptr; -} - -FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ() -{ - LOG(debug) << "Destroying ZeroMQ transport..."; -} diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index f2920ebf..81ea868b 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -16,8 +16,9 @@ #define FAIRMQTRANSPORTFACTORYZMQ_H_ #include +#include #include -#include "FairMQTransportFactory.h" +#include #include "FairMQMessageZMQ.h" #include "FairMQSocketZMQ.h" #include "FairMQPollerZMQ.h" @@ -30,26 +31,47 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory { public: - FairMQTransportFactoryZMQ(const std::string& id = "", const fair::mq::ProgOptions* config = nullptr); + FairMQTransportFactoryZMQ(const std::string& id = "", const fair::mq::ProgOptions* config = nullptr) + : FairMQTransportFactory(id) + , fCtx(nullptr) + { + int major, minor, patch; + zmq_version(&major, &minor, &patch); + LOG(debug) << "Transport: Using ZeroMQ library, version: " << major << "." << minor << "." << patch; + + if (config) { + fCtx = fair::mq::tools::make_unique(config->GetProperty("io-threads", 1)); + } else { + LOG(debug) << "fair::mq::ProgOptions not available! Using defaults."; + fCtx = fair::mq::tools::make_unique(1); + } + } + FairMQTransportFactoryZMQ(const FairMQTransportFactoryZMQ&) = delete; FairMQTransportFactoryZMQ operator=(const FairMQTransportFactoryZMQ&) = delete; - FairMQMessagePtr CreateMessage() override; - FairMQMessagePtr CreateMessage(const size_t size) override; - FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override; + FairMQMessagePtr CreateMessage() override { return fair::mq::tools::make_unique(this); } + FairMQMessagePtr CreateMessage(const size_t size) override { return fair::mq::tools::make_unique(size, this); } + FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override { return fair::mq::tools::make_unique(data, size, ffn, hint, this); } + FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override { return fair::mq::tools::make_unique(region, data, size, hint, this); } - FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) override; + FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) override { return fair::mq::tools::make_unique(*fCtx, type, name, GetId(), this); } - FairMQPollerPtr CreatePoller(const std::vector& channels) const override; - FairMQPollerPtr CreatePoller(const std::vector& channels) const override; - FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; + FairMQPollerPtr CreatePoller(const std::vector& channels) const override { return fair::mq::tools::make_unique(channels); } + FairMQPollerPtr CreatePoller(const std::vector& channels) const override { return fair::mq::tools::make_unique(channels); } + FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override { return fair::mq::tools::make_unique(channelsMap, channelList); } - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override; - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0); + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); } + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); } + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); } + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); } + FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) + { + auto ptr = std::unique_ptr(new FairMQUnmanagedRegionZMQ(*fCtx, size, userFlags, callback, bulkCallback, path, flags, this)); + auto zPtr = static_cast(ptr.get()); + fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created); + return ptr; + } void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override { fCtx->SubscribeToRegionEvents(callback); } bool SubscribedToRegionEvents() override { return fCtx->SubscribedToRegionEvents(); } @@ -62,7 +84,7 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory void Resume() override { fCtx->Resume(); } void Reset() override { fCtx->Reset(); } - ~FairMQTransportFactoryZMQ() override; + ~FairMQTransportFactoryZMQ() override { LOG(debug) << "Destroying ZeroMQ transport..."; } private: std::unique_ptr fCtx; diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx deleted file mode 100644 index 7f0d8c70..00000000 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx +++ /dev/null @@ -1,45 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include "FairMQUnmanagedRegionZMQ.h" -#include "FairMQLogger.h" - -FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(fair::mq::zmq::Context& ctx, - size_t size, - int64_t userFlags, - FairMQRegionCallback callback, - FairMQRegionBulkCallback bulkCallback, - const std::string& /* path = "" */, - int /* flags = 0 */, - FairMQTransportFactory* factory) - : FairMQUnmanagedRegion(factory) - , fCtx(ctx) - , fId(fCtx.RegionCount()) - , fBuffer(malloc(size)) - , fSize(size) - , fUserFlags(userFlags) - , fCallback(callback) - , fBulkCallback(bulkCallback) -{} - -void* FairMQUnmanagedRegionZMQ::GetData() const -{ - return fBuffer; -} - -size_t FairMQUnmanagedRegionZMQ::GetSize() const -{ - return fSize; -} - -FairMQUnmanagedRegionZMQ::~FairMQUnmanagedRegionZMQ() -{ - LOG(debug) << "destroying region " << fId; - fCtx.RemoveRegion(fId); - free(fBuffer); -} diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h index e6639dcf..f0e4a7a9 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h @@ -10,10 +10,12 @@ #define FAIRMQUNMANAGEDREGIONZMQ_H_ #include -#include "FairMQUnmanagedRegion.h" +#include +#include #include // size_t #include + class FairMQTransportFactory; class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion @@ -27,19 +29,33 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, - const std::string& path = "", - int flags = 0, - FairMQTransportFactory* factory = nullptr); + const std::string& /* path = "" */, + int /* flags = 0 */, + FairMQTransportFactory* factory = nullptr) + : FairMQUnmanagedRegion(factory) + , fCtx(ctx) + , fId(fCtx.RegionCount()) + , fBuffer(malloc(size)) + , fSize(size) + , fUserFlags(userFlags) + , fCallback(callback) + , fBulkCallback(bulkCallback) + {} FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete; FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete; - virtual void* GetData() const override; - virtual size_t GetSize() const override; + virtual void* GetData() const override { return fBuffer; } + virtual size_t GetSize() const override { return fSize; } uint64_t GetId() const override { return fId; } int64_t GetUserFlags() const { return fUserFlags; } - virtual ~FairMQUnmanagedRegionZMQ(); + virtual ~FairMQUnmanagedRegionZMQ() + { + LOG(debug) << "destroying region " << fId; + fCtx.RemoveRegion(fId); + free(fBuffer); + } private: fair::mq::zmq::Context& fCtx;