From 361fb0cba53fa9bf97af07e88b5b5358a7e7a66f Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Sat, 16 May 2020 16:44:18 +0200 Subject: [PATCH] Zmq: refactor to use namespaces --- fairmq/CMakeLists.txt | 10 +- fairmq/FairMQTransportFactory.cxx | 4 +- fairmq/zeromq/Context.h | 18 +-- fairmq/zeromq/FairMQTransportFactoryZMQ.h | 93 ------------ .../zeromq/{FairMQMessageZMQ.h => Message.h} | 71 ++++++---- fairmq/zeromq/{FairMQPollerZMQ.h => Poller.h} | 49 ++++--- fairmq/zeromq/{FairMQSocketZMQ.h => Socket.h} | 89 +++++++----- fairmq/zeromq/TransportFactory.h | 134 ++++++++++++++++++ ...UnmanagedRegionZMQ.h => UnmanagedRegion.h} | 45 +++--- 9 files changed, 304 insertions(+), 209 deletions(-) delete mode 100644 fairmq/zeromq/FairMQTransportFactoryZMQ.h rename fairmq/zeromq/{FairMQMessageZMQ.h => Message.h} (80%) rename fairmq/zeromq/{FairMQPollerZMQ.h => Poller.h} (79%) rename fairmq/zeromq/{FairMQSocketZMQ.h => Socket.h} (83%) create mode 100644 fairmq/zeromq/TransportFactory.h rename fairmq/zeromq/{FairMQUnmanagedRegionZMQ.h => UnmanagedRegion.h} (67%) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 71c8d523..6050903a 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -191,11 +191,11 @@ if(BUILD_FAIRMQ) shmem/Manager.h shmem/Region.h zeromq/Context.h - zeromq/FairMQMessageZMQ.h - zeromq/FairMQPollerZMQ.h - zeromq/FairMQUnmanagedRegionZMQ.h - zeromq/FairMQSocketZMQ.h - zeromq/FairMQTransportFactoryZMQ.h + zeromq/Message.h + zeromq/Poller.h + zeromq/UnmanagedRegion.h + zeromq/Socket.h + zeromq/TransportFactory.h ) if(BUILD_OFI_TRANSPORT) diff --git a/fairmq/FairMQTransportFactory.cxx b/fairmq/FairMQTransportFactory.cxx index 29ea87ef..9c5b4dc2 100644 --- a/fairmq/FairMQTransportFactory.cxx +++ b/fairmq/FairMQTransportFactory.cxx @@ -8,7 +8,7 @@ #include #include -#include +#include #ifdef BUILD_OFI_TRANSPORT #include #endif @@ -36,7 +36,7 @@ auto FairMQTransportFactory::CreateTransportFactory(const string& type, } if (type == "zeromq") { - return make_shared(finalId, config); + return make_shared(finalId, config); } else if (type == "shmem") { return make_shared(finalId, config); } diff --git a/fairmq/zeromq/Context.h b/fairmq/zeromq/Context.h index f9dcc847..bcf639cf 100644 --- a/fairmq/zeromq/Context.h +++ b/fairmq/zeromq/Context.h @@ -56,13 +56,13 @@ class Context throw ContextError(tools::ToString("failed configuring context, reason: ", zmq_strerror(errno))); } - fRegionEvents.emplace(0, nullptr, 0, 0, fair::mq::RegionEvent::local_only); + fRegionEvents.emplace(0, nullptr, 0, 0, RegionEvent::local_only); } Context(const Context&) = delete; Context operator=(const Context&) = delete; - void SubscribeToRegionEvents(FairMQRegionEventCallback callback) + void SubscribeToRegionEvents(RegionEventCallback callback) { if (fRegionEventThread.joinable()) { LOG(debug) << "Already subscribed. Overwriting previous subscription."; @@ -108,7 +108,7 @@ class Context } } - std::vector GetRegionInfo() const + std::vector GetRegionInfo() const { std::lock_guard lock(fMtx); return fRegionInfos; @@ -120,7 +120,7 @@ class Context return fRegionCounter; } - void AddRegion(uint64_t id, void* ptr, size_t size, int64_t userFlags, fair::mq::RegionEvent event) + void AddRegion(uint64_t id, void* ptr, size_t size, int64_t userFlags, RegionEvent event) { { std::lock_guard lock(fMtx); @@ -135,12 +135,12 @@ class Context { { std::lock_guard lock(fMtx); - auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const fair::mq::RegionInfo& i) { + auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const RegionInfo& i) { return i.id == id; }); if (it != fRegionInfos.end()) { fRegionEvents.push(*it); - fRegionEvents.back().event = fair::mq::RegionEvent::destroyed; + fRegionEvents.back().event = RegionEvent::destroyed; fRegionInfos.erase(it); } else { LOG(error) << "RemoveRegion: given id (" << id << ") not found."; @@ -181,10 +181,10 @@ class Context uint64_t fRegionCounter; std::condition_variable fRegionEventsCV; - std::vector fRegionInfos; - std::queue fRegionEvents; + std::vector fRegionInfos; + std::queue fRegionEvents; std::thread fRegionEventThread; - std::function fRegionEventCallback; + std::function fRegionEventCallback; bool fRegionEventsSubscriptionActive; }; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h deleted file mode 100644 index 81ea868b..00000000 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ /dev/null @@ -1,93 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQTransportFactoryZMQ.h - * - * @since 2014-01-20 - * @author: A. Rybalchenko - */ - -#ifndef FAIRMQTRANSPORTFACTORYZMQ_H_ -#define FAIRMQTRANSPORTFACTORYZMQ_H_ - -#include -#include -#include -#include -#include "FairMQMessageZMQ.h" -#include "FairMQSocketZMQ.h" -#include "FairMQPollerZMQ.h" -#include "FairMQUnmanagedRegionZMQ.h" - -#include // unique_ptr -#include -#include - -class FairMQTransportFactoryZMQ final : public FairMQTransportFactory -{ - public: - FairMQTransportFactoryZMQ(const std::string& id = "", const fair::mq::ProgOptions* config = nullptr) - : FairMQTransportFactory(id) - , fCtx(nullptr) - { - int major, minor, patch; - zmq_version(&major, &minor, &patch); - LOG(debug) << "Transport: Using ZeroMQ library, version: " << major << "." << minor << "." << patch; - - if (config) { - fCtx = fair::mq::tools::make_unique(config->GetProperty("io-threads", 1)); - } else { - LOG(debug) << "fair::mq::ProgOptions not available! Using defaults."; - fCtx = fair::mq::tools::make_unique(1); - } - } - - FairMQTransportFactoryZMQ(const FairMQTransportFactoryZMQ&) = delete; - FairMQTransportFactoryZMQ operator=(const FairMQTransportFactoryZMQ&) = delete; - - FairMQMessagePtr CreateMessage() override { return fair::mq::tools::make_unique(this); } - FairMQMessagePtr CreateMessage(const size_t size) override { return fair::mq::tools::make_unique(size, this); } - FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override { return fair::mq::tools::make_unique(data, size, ffn, hint, this); } - FairMQMessagePtr CreateMessage(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override { return fair::mq::tools::make_unique(region, data, size, hint, this); } - - FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) override { return fair::mq::tools::make_unique(*fCtx, type, name, GetId(), this); } - - FairMQPollerPtr CreatePoller(const std::vector& channels) const override { return fair::mq::tools::make_unique(channels); } - FairMQPollerPtr CreatePoller(const std::vector& channels) const override { return fair::mq::tools::make_unique(channels); } - FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override { return fair::mq::tools::make_unique(channelsMap, channelList); } - - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); } - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); } - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); } - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override { return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); } - FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) - { - auto ptr = std::unique_ptr(new FairMQUnmanagedRegionZMQ(*fCtx, size, userFlags, callback, bulkCallback, path, flags, this)); - auto zPtr = static_cast(ptr.get()); - fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created); - return ptr; - } - - void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override { fCtx->SubscribeToRegionEvents(callback); } - bool SubscribedToRegionEvents() override { return fCtx->SubscribedToRegionEvents(); } - void UnsubscribeFromRegionEvents() override { fCtx->UnsubscribeFromRegionEvents(); } - std::vector GetRegionInfo() override { return fCtx->GetRegionInfo(); } - - fair::mq::Transport GetType() const override { return fair::mq::Transport::ZMQ; } - - void Interrupt() override { fCtx->Interrupt(); } - void Resume() override { fCtx->Resume(); } - void Reset() override { fCtx->Reset(); } - - ~FairMQTransportFactoryZMQ() override { LOG(debug) << "Destroying ZeroMQ transport..."; } - - private: - std::unique_ptr fCtx; -}; - -#endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/Message.h similarity index 80% rename from fairmq/zeromq/FairMQMessageZMQ.h rename to fairmq/zeromq/Message.h index 09436cff..a8cc1cd6 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/Message.h @@ -6,11 +6,11 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIRMQMESSAGEZMQ_H_ -#define FAIRMQMESSAGEZMQ_H_ +#ifndef FAIR_MQ_ZMQ_MESSAGE_H +#define FAIR_MQ_ZMQ_MESSAGE_H #include -#include "FairMQUnmanagedRegionZMQ.h" +#include #include #include #include @@ -22,20 +22,25 @@ #include #include -class FairMQTransportFactory; - -class FairMQSocketZMQ; - -class FairMQMessageZMQ final : public FairMQMessage +namespace fair { - friend class FairMQSocketZMQ; +namespace mq +{ +namespace zmq +{ + +class Socket; + +class Message final : public fair::mq::Message +{ + friend class Socket; public: - FairMQMessageZMQ(FairMQTransportFactory* factory = nullptr) - : FairMQMessage(factory) + Message(FairMQTransportFactory* factory = nullptr) + : fair::mq::Message(factory) , fUsedSizeModified(false) , fUsedSize() - , fMsg(fair::mq::tools::make_unique()) + , fMsg(tools::make_unique()) , fViewMsg(nullptr) { if (zmq_msg_init(fMsg.get()) != 0) { @@ -43,11 +48,11 @@ class FairMQMessageZMQ final : public FairMQMessage } } - FairMQMessageZMQ(const size_t size, FairMQTransportFactory* factory = nullptr) - : FairMQMessage(factory) + Message(const size_t size, FairMQTransportFactory* factory = nullptr) + : fair::mq::Message(factory) , fUsedSizeModified(false) , fUsedSize(size) - , fMsg(fair::mq::tools::make_unique()) + , fMsg(tools::make_unique()) , fViewMsg(nullptr) { if (zmq_msg_init_size(fMsg.get(), size) != 0) { @@ -55,11 +60,11 @@ class FairMQMessageZMQ final : public FairMQMessage } } - FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr) - : FairMQMessage(factory) + Message(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr, FairMQTransportFactory* factory = nullptr) + : fair::mq::Message(factory) , fUsedSizeModified(false) , fUsedSize() - , fMsg(fair::mq::tools::make_unique()) + , fMsg(tools::make_unique()) , fViewMsg(nullptr) { if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) { @@ -67,11 +72,11 @@ class FairMQMessageZMQ final : public FairMQMessage } } - FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr) - : FairMQMessage(factory) + Message(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0, FairMQTransportFactory* factory = nullptr) + : fair::mq::Message(factory) , fUsedSizeModified(false) , fUsedSize() - , fMsg(fair::mq::tools::make_unique()) + , fMsg(tools::make_unique()) , fViewMsg(nullptr) { // FIXME: make this zero-copy: @@ -83,7 +88,7 @@ class FairMQMessageZMQ final : public FairMQMessage std::memcpy(zmq_msg_data(fMsg.get()), data, size); // call region callback - auto ptr = static_cast(region.get()); + auto ptr = static_cast(region.get()); if (ptr->fBulkCallback) { ptr->fBulkCallback({{data, size, hint}}); } else if (ptr->fCallback) { @@ -100,7 +105,7 @@ class FairMQMessageZMQ final : public FairMQMessage void Rebuild() override { CloseMessage(); - fMsg = fair::mq::tools::make_unique(); + fMsg = tools::make_unique(); if (zmq_msg_init(fMsg.get()) != 0) { LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); } @@ -109,7 +114,7 @@ class FairMQMessageZMQ final : public FairMQMessage void Rebuild(const size_t size) override { CloseMessage(); - fMsg = fair::mq::tools::make_unique(); + fMsg = tools::make_unique(); if (zmq_msg_init_size(fMsg.get(), size) != 0) { LOG(error) << "failed initializing message with size, reason: " << zmq_strerror(errno); } @@ -118,7 +123,7 @@ class FairMQMessageZMQ final : public FairMQMessage void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override { CloseMessage(); - fMsg = fair::mq::tools::make_unique(); + fMsg = tools::make_unique(); if (zmq_msg_init_data(fMsg.get(), data, size, ffn, hint) != 0) { LOG(error) << "failed initializing message with data, reason: " << zmq_strerror(errno); } @@ -165,7 +170,7 @@ class FairMQMessageZMQ final : public FairMQMessage // The check is needed because a send could fail and can be reattempted by the user, in this // case we do not want to modify buffer again. if (fUsedSizeModified && !fViewMsg) { - fViewMsg = fair::mq::tools::make_unique(); + fViewMsg = tools::make_unique(); void* ptr = zmq_msg_data(fMsg.get()); if (zmq_msg_init_data(fViewMsg.get(), ptr, fUsedSize, [](void* /* data */, void* obj) { zmq_msg_close(static_cast(obj)); @@ -176,11 +181,11 @@ class FairMQMessageZMQ final : public FairMQMessage } } - fair::mq::Transport GetType() const override { return fair::mq::Transport::ZMQ; } + Transport GetType() const override { return Transport::ZMQ; } - void Copy(const FairMQMessage& msg) override + void Copy(const fair::mq::Message& msg) override { - const FairMQMessageZMQ& zMsg = static_cast(msg); + const Message& zMsg = static_cast(msg); // Shares the message buffer between msg and this fMsg. if (zmq_msg_copy(fMsg.get(), zMsg.GetMessage()) != 0) { LOG(error) << "failed copying message, reason: " << zmq_strerror(errno); @@ -194,7 +199,7 @@ class FairMQMessageZMQ final : public FairMQMessage } } - ~FairMQMessageZMQ() override { CloseMessage(); } + ~Message() override { CloseMessage(); } private: bool fUsedSizeModified; @@ -231,4 +236,8 @@ class FairMQMessageZMQ final : public FairMQMessage } }; -#endif /* FAIRMQMESSAGEZMQ_H_ */ +} // namespace zmq +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_ZMQ_MESSAGE_H */ diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/Poller.h similarity index 79% rename from fairmq/zeromq/FairMQPollerZMQ.h rename to fairmq/zeromq/Poller.h index cbb12092..566208f2 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/Poller.h @@ -6,20 +6,29 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIRMQPOLLERZMQ_H_ -#define FAIRMQPOLLERZMQ_H_ +#ifndef FAIR_MQ_ZMQ_POLLER_H +#define FAIR_MQ_ZMQ_POLLER_H #include #include #include -#include -#include + #include -class FairMQPollerZMQ final : public FairMQPoller +#include +#include + +namespace fair +{ +namespace mq +{ +namespace zmq +{ + +class Poller final : public fair::mq::Poller { public: - FairMQPollerZMQ(const std::vector& channels) + Poller(const std::vector& channels) : fItems() , fNumItems(0) , fOffsetMap() @@ -28,19 +37,19 @@ class FairMQPollerZMQ final : public FairMQPoller fItems = new zmq_pollitem_t[fNumItems]; // TODO: fix me for (int i = 0; i < fNumItems; ++i) { - fItems[i].socket = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); + fItems[i].socket = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); fItems[i].fd = 0; fItems[i].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); SetItemEvents(fItems[i], type); } } - FairMQPollerZMQ(const std::vector& channels) + Poller(const std::vector& channels) : fItems() , fNumItems(0) , fOffsetMap() @@ -49,19 +58,19 @@ class FairMQPollerZMQ final : public FairMQPoller fItems = new zmq_pollitem_t[fNumItems]; for (int i = 0; i < fNumItems; ++i) { - fItems[i].socket = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); + fItems[i].socket = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); fItems[i].fd = 0; fItems[i].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); SetItemEvents(fItems[i], type); } } - FairMQPollerZMQ(const std::unordered_map>& channelsMap, const std::vector& channelList) + Poller(const std::unordered_map>& channelsMap, const std::vector& channelList) : fItems() , fNumItems(0) , fOffsetMap() @@ -82,13 +91,13 @@ class FairMQPollerZMQ final : public FairMQPoller for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) { index = fOffsetMap[channel] + i; - fItems[index].socket = static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); + fItems[index].socket = static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); fItems[index].fd = 0; fItems[index].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); SetItemEvents(fItems[index], type); } @@ -100,8 +109,8 @@ class FairMQPollerZMQ final : public FairMQPoller } } - FairMQPollerZMQ(const FairMQPollerZMQ&) = delete; - FairMQPollerZMQ operator=(const FairMQPollerZMQ&) = delete; + Poller(const Poller&) = delete; + Poller operator=(const Poller&) = delete; void SetItemEvents(zmq_pollitem_t& item, const int type) { @@ -177,7 +186,7 @@ class FairMQPollerZMQ final : public FairMQPoller } } - ~FairMQPollerZMQ() override { delete[] fItems; } + ~Poller() override { delete[] fItems; } private: zmq_pollitem_t* fItems; @@ -186,4 +195,8 @@ class FairMQPollerZMQ final : public FairMQPoller std::unordered_map fOffsetMap; }; -#endif /* FAIRMQPOLLERZMQ_H_ */ +} // namespace zmq +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_ZMQ_POLLER_H */ diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/Socket.h similarity index 83% rename from fairmq/zeromq/FairMQSocketZMQ.h rename to fairmq/zeromq/Socket.h index dc52fc93..32b7f20c 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/Socket.h @@ -6,28 +6,33 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIRMQSOCKETZMQ_H_ -#define FAIRMQSOCKETZMQ_H_ +#ifndef FAIR_MQ_ZMQ_SOCKET_H +#define FAIR_MQ_ZMQ_SOCKET_H #include +#include #include #include #include #include -#include "FairMQMessageZMQ.h" #include #include #include // unique_ptr -class FairMQTransportFactory; +namespace fair +{ +namespace mq +{ +namespace zmq +{ -class FairMQSocketZMQ final : public FairMQSocket +class Socket final : public fair::mq::Socket { public: - FairMQSocketZMQ(fair::mq::zmq::Context& ctx, const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* factory = nullptr) - : FairMQSocket(factory) + Socket(Context& ctx, const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* factory = nullptr) + : fair::mq::Socket(factory) , fCtx(ctx) , fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type))) , fId(id + "." + name + "." + type) @@ -78,8 +83,8 @@ class FairMQSocketZMQ final : public FairMQSocket LOG(debug) << "Created socket " << GetId(); } - FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; - FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; + Socket(const Socket&) = delete; + Socket operator=(const Socket&) = delete; std::string GetId() const override { return fId; } @@ -99,6 +104,7 @@ class FairMQSocketZMQ final : public FairMQSocket return true; } + bool Connect(const std::string& address) override { // LOG(info) << "connect socket " << fId << " on " << address; @@ -112,7 +118,7 @@ class FairMQSocketZMQ final : public FairMQSocket return true; } - int Send(FairMQMessagePtr& msg, const int timeout = -1) override + int Send(MessagePtr& msg, const int timeout = -1) override { int flags = 0; if (timeout == 0) { @@ -120,10 +126,10 @@ class FairMQSocketZMQ final : public FairMQSocket } int elapsed = 0; - static_cast(msg.get())->ApplyUsedSize(); + static_cast(msg.get())->ApplyUsedSize(); while (true) { - int nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); + int nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); if (nbytes >= 0) { fBytesTx += nbytes; ++fMessagesTx; @@ -153,7 +159,8 @@ class FairMQSocketZMQ final : public FairMQSocket } } } - int Receive(FairMQMessagePtr& msg, const int timeout = -1) override + + int Receive(MessagePtr& msg, const int timeout = -1) override { int flags = 0; if (timeout == 0) { @@ -162,7 +169,7 @@ class FairMQSocketZMQ final : public FairMQSocket int elapsed = 0; while (true) { - int nbytes = zmq_msg_recv(static_cast(msg.get())->GetMessage(), fSocket, flags); + int nbytes = zmq_msg_recv(static_cast(msg.get())->GetMessage(), fSocket, flags); if (nbytes >= 0) { fBytesRx += nbytes; ++fMessagesRx; @@ -191,7 +198,8 @@ class FairMQSocketZMQ final : public FairMQSocket } } } - int64_t Send(std::vector>& msgVec, const int timeout = -1) override + + int64_t Send(std::vector>& msgVec, const int timeout = -1) override { int flags = 0; if (timeout == 0) { @@ -209,9 +217,9 @@ class FairMQSocketZMQ final : public FairMQSocket bool repeat = false; for (unsigned int i = 0; i < vecSize; ++i) { - static_cast(msgVec[i].get())->ApplyUsedSize(); + static_cast(msgVec[i].get())->ApplyUsedSize(); - int nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), + int nbytes = zmq_msg_send(static_cast(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE|flags : flags); if (nbytes >= 0) { @@ -262,7 +270,8 @@ class FairMQSocketZMQ final : public FairMQSocket return -1; } } - int64_t Receive(std::vector>& msgVec, const int timeout = -1) override + + int64_t Receive(std::vector>& msgVec, const int timeout = -1) override { int flags = 0; if (timeout == 0) { @@ -276,9 +285,9 @@ class FairMQSocketZMQ final : public FairMQSocket bool repeat = false; do { - std::unique_ptr part(new FairMQMessageZMQ(GetTransport())); + FairMQMessagePtr part = tools::make_unique(GetTransport()); - int nbytes = zmq_msg_recv(static_cast(part.get())->GetMessage(), fSocket, flags); + int nbytes = zmq_msg_recv(static_cast(part.get())->GetMessage(), fSocket, flags); if (nbytes >= 0) { msgVec.push_back(move(part)); totalSize += nbytes; @@ -343,6 +352,7 @@ class FairMQSocketZMQ final : public FairMQSocket LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno); } } + void GetOption(const std::string& option, void* value, size_t* valueSize) override { if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) @@ -354,75 +364,84 @@ class FairMQSocketZMQ final : public FairMQSocket void SetLinger(const int value) override { if (zmq_setsockopt(fSocket, ZMQ_LINGER, &value, sizeof(value)) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed setting ZMQ_LINGER, reason: ", zmq_strerror(errno))); } } + int GetLinger() const override { int value = 0; size_t valueSize = sizeof(value); if (zmq_getsockopt(fSocket, ZMQ_LINGER, &value, &valueSize) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed getting ZMQ_LINGER, reason: ", zmq_strerror(errno))); } return value; } + void SetSndBufSize(const int value) override { if (zmq_setsockopt(fSocket, ZMQ_SNDHWM, &value, sizeof(value)) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed setting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); } } + int GetSndBufSize() const override { int value = 0; size_t valueSize = sizeof(value); if (zmq_getsockopt(fSocket, ZMQ_SNDHWM, &value, &valueSize) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed getting ZMQ_SNDHWM, reason: ", zmq_strerror(errno))); } return value; } + void SetRcvBufSize(const int value) override { if (zmq_setsockopt(fSocket, ZMQ_RCVHWM, &value, sizeof(value)) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed setting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); } } + int GetRcvBufSize() const override { int value = 0; size_t valueSize = sizeof(value); if (zmq_getsockopt(fSocket, ZMQ_RCVHWM, &value, &valueSize) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed getting ZMQ_RCVHWM, reason: ", zmq_strerror(errno))); } return value; } + void SetSndKernelSize(const int value) override { if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &value, sizeof(value)) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); } } + int GetSndKernelSize() const override { int value = 0; size_t valueSize = sizeof(value); if (zmq_getsockopt(fSocket, ZMQ_SNDBUF, &value, &valueSize) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed getting ZMQ_SNDBUF, reason: ", zmq_strerror(errno))); } return value; } + void SetRcvKernelSize(const int value) override { if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &value, sizeof(value)) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); } } + int GetRcvKernelSize() const override { int value = 0; size_t valueSize = sizeof(value); if (zmq_getsockopt(fSocket, ZMQ_RCVBUF, &value, &valueSize) < 0) { - throw fair::mq::SocketError(fair::mq::tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); + throw SocketError(tools::ToString("failed getting ZMQ_RCVBUF, reason: ", zmq_strerror(errno))); } return value; } @@ -459,10 +478,10 @@ class FairMQSocketZMQ final : public FairMQSocket return -1; } - ~FairMQSocketZMQ() override { Close(); } + ~Socket() override { Close(); } private: - fair::mq::zmq::Context& fCtx; + Context& fCtx; void* fSocket; std::string fId; std::atomic fBytesTx; @@ -474,4 +493,8 @@ class FairMQSocketZMQ final : public FairMQSocket int fRcvTimeout; }; -#endif /* FAIRMQSOCKETZMQ_H_ */ +} // namespace zmq +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_ZMQ_SOCKET_H */ diff --git a/fairmq/zeromq/TransportFactory.h b/fairmq/zeromq/TransportFactory.h new file mode 100644 index 00000000..df11c932 --- /dev/null +++ b/fairmq/zeromq/TransportFactory.h @@ -0,0 +1,134 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_ZMQ_TRANSPORTFACTORY_H +#define FAIR_MQ_ZMQ_TRANSPORTFACTORY_H + +#include +#include +#include +#include +#include +#include +#include +#include + +#include // unique_ptr +#include +#include + +namespace fair +{ +namespace mq +{ +namespace zmq +{ + +class TransportFactory final : public FairMQTransportFactory +{ + public: + TransportFactory(const std::string& id = "", const ProgOptions* config = nullptr) + : FairMQTransportFactory(id) + , fCtx(nullptr) + { + int major, minor, patch; + zmq_version(&major, &minor, &patch); + LOG(debug) << "Transport: Using ZeroMQ library, version: " << major << "." << minor << "." << patch; + + if (config) { + fCtx = tools::make_unique(config->GetProperty("io-threads", 1)); + } else { + LOG(debug) << "fair::mq::ProgOptions not available! Using defaults."; + fCtx = tools::make_unique(1); + } + } + + TransportFactory(const TransportFactory&) = delete; + TransportFactory operator=(const TransportFactory&) = delete; + + MessagePtr CreateMessage() override + { + return tools::make_unique(this); + } + MessagePtr CreateMessage(const size_t size) override + { + return tools::make_unique(size, this); + } + MessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override + { + return tools::make_unique(data, size, ffn, hint, this); + } + MessagePtr CreateMessage(UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0) override + { + return tools::make_unique(region, data, size, hint, this); + } + + SocketPtr CreateSocket(const std::string& type, const std::string& name) override + { + return tools::make_unique(*fCtx, type, name, GetId(), this); + } + + PollerPtr CreatePoller(const std::vector& channels) const override + { + return tools::make_unique(channels); + } + PollerPtr CreatePoller(const std::vector& channels) const override + { + return tools::make_unique(channels); + } + PollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override + { + return tools::make_unique(channelsMap, channelList); + } + + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) override + { + return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); + } + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override + { + return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); + } + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override + { + return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); + } + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override + { + return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); + } + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) + { + UnmanagedRegionPtr ptr = tools::make_unique(*fCtx, size, userFlags, callback, bulkCallback, path, flags, this); + auto zPtr = static_cast(ptr.get()); + fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created); + return ptr; + } + + void SubscribeToRegionEvents(RegionEventCallback callback) override { fCtx->SubscribeToRegionEvents(callback); } + bool SubscribedToRegionEvents() override { return fCtx->SubscribedToRegionEvents(); } + void UnsubscribeFromRegionEvents() override { fCtx->UnsubscribeFromRegionEvents(); } + std::vector GetRegionInfo() override { return fCtx->GetRegionInfo(); } + + Transport GetType() const override { return Transport::ZMQ; } + + void Interrupt() override { fCtx->Interrupt(); } + void Resume() override { fCtx->Resume(); } + void Reset() override { fCtx->Reset(); } + + ~TransportFactory() override { LOG(debug) << "Destroying ZeroMQ transport..."; } + + private: + std::unique_ptr fCtx; +}; + +} // namespace zmq +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_ZMQ_TRANSPORTFACTORY_H */ diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h b/fairmq/zeromq/UnmanagedRegion.h similarity index 67% rename from fairmq/zeromq/FairMQUnmanagedRegionZMQ.h rename to fairmq/zeromq/UnmanagedRegion.h index f0e4a7a9..95cb2a38 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h +++ b/fairmq/zeromq/UnmanagedRegion.h @@ -6,8 +6,8 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIRMQUNMANAGEDREGIONZMQ_H_ -#define FAIRMQUNMANAGEDREGIONZMQ_H_ +#ifndef FAIR_MQ_ZMQ_UNMANAGEDREGION_H +#define FAIR_MQ_ZMQ_UNMANAGEDREGION_H #include #include @@ -16,23 +16,28 @@ #include // size_t #include -class FairMQTransportFactory; - -class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion +namespace fair { - friend class FairMQSocketZMQ; - friend class FairMQMessageZMQ; +namespace mq +{ +namespace zmq +{ + +class UnmanagedRegion final : public fair::mq::UnmanagedRegion +{ + friend class Socket; + friend class Message; public: - FairMQUnmanagedRegionZMQ(fair::mq::zmq::Context& ctx, + UnmanagedRegion(Context& ctx, size_t size, int64_t userFlags, - FairMQRegionCallback callback, - FairMQRegionBulkCallback bulkCallback, + RegionCallback callback, + RegionBulkCallback bulkCallback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory = nullptr) - : FairMQUnmanagedRegion(factory) + : fair::mq::UnmanagedRegion(factory) , fCtx(ctx) , fId(fCtx.RegionCount()) , fBuffer(malloc(size)) @@ -42,15 +47,15 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion , fBulkCallback(bulkCallback) {} - FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete; - FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete; + UnmanagedRegion(const UnmanagedRegion&) = delete; + UnmanagedRegion operator=(const UnmanagedRegion&) = delete; virtual void* GetData() const override { return fBuffer; } virtual size_t GetSize() const override { return fSize; } uint64_t GetId() const override { return fId; } int64_t GetUserFlags() const { return fUserFlags; } - virtual ~FairMQUnmanagedRegionZMQ() + virtual ~UnmanagedRegion() { LOG(debug) << "destroying region " << fId; fCtx.RemoveRegion(fId); @@ -58,13 +63,17 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion } private: - fair::mq::zmq::Context& fCtx; + Context& fCtx; uint64_t fId; void* fBuffer; size_t fSize; int64_t fUserFlags; - FairMQRegionCallback fCallback; - FairMQRegionBulkCallback fBulkCallback; + RegionCallback fCallback; + RegionBulkCallback fBulkCallback; }; -#endif /* FAIRMQUNMANAGEDREGIONZMQ_H_ */ +} // namespace zmq +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_ZMQ_UNMANAGEDREGION_H */