From dbdabd23a4213f322e8d5da9b40e9c8be0e77914 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Sat, 16 May 2020 14:57:36 +0200 Subject: [PATCH] Zmq: remove global (static) state, refactor --- fairmq/CMakeLists.txt | 1 + fairmq/zeromq/Context.h | 195 ++++++++++++++++++++ fairmq/zeromq/FairMQMessageZMQ.cxx | 9 +- fairmq/zeromq/FairMQSocketZMQ.cxx | 32 +--- fairmq/zeromq/FairMQSocketZMQ.h | 18 +- fairmq/zeromq/FairMQTransportFactoryZMQ.cxx | 138 +------------- fairmq/zeromq/FairMQTransportFactoryZMQ.h | 36 ++-- fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx | 14 +- fairmq/zeromq/FairMQUnmanagedRegionZMQ.h | 11 +- 9 files changed, 254 insertions(+), 200 deletions(-) create mode 100644 fairmq/zeromq/Context.h diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 6dfab539..380a01b3 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -190,6 +190,7 @@ if(BUILD_FAIRMQ) shmem/Common.h shmem/Manager.h shmem/Region.h + zeromq/Context.h zeromq/FairMQMessageZMQ.h zeromq/FairMQPollerZMQ.h zeromq/FairMQUnmanagedRegionZMQ.h diff --git a/fairmq/zeromq/Context.h b/fairmq/zeromq/Context.h new file mode 100644 index 00000000..f9dcc847 --- /dev/null +++ b/fairmq/zeromq/Context.h @@ -0,0 +1,195 @@ +/******************************************************************************** + * Copyright (C) 2020 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_ZMQ_CONTEXT_H_ +#define FAIR_MQ_ZMQ_CONTEXT_H_ + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace zmq +{ + +struct ContextError : std::runtime_error { using std::runtime_error::runtime_error; }; + +class Context +{ + public: + Context(int numIoThreads) + : fZmqCtx(zmq_ctx_new()) + , fInterrupted(false) + , fRegionCounter(1) + { + if (!fZmqCtx) { + throw ContextError(tools::ToString("failed creating context, reason: ", zmq_strerror(errno))); + } + + if (zmq_ctx_set(fZmqCtx, ZMQ_MAX_SOCKETS, 10000) != 0) { + LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); + throw ContextError(tools::ToString("failed configuring context, reason: ", zmq_strerror(errno))); + } + + if (zmq_ctx_set(fZmqCtx, ZMQ_IO_THREADS, numIoThreads) != 0) { + LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); + throw ContextError(tools::ToString("failed configuring context, reason: ", zmq_strerror(errno))); + } + + fRegionEvents.emplace(0, nullptr, 0, 0, fair::mq::RegionEvent::local_only); + } + + Context(const Context&) = delete; + Context operator=(const Context&) = delete; + + void SubscribeToRegionEvents(FairMQRegionEventCallback callback) + { + if (fRegionEventThread.joinable()) { + LOG(debug) << "Already subscribed. Overwriting previous subscription."; + { + std::lock_guard lock(fMtx); + fRegionEventsSubscriptionActive = false; + } + fRegionEventsCV.notify_one(); + fRegionEventThread.join(); + } + std::lock_guard lock(fMtx); + fRegionEventCallback = callback; + fRegionEventsSubscriptionActive = true; + fRegionEventThread = std::thread(&Context::RegionEventsSubscription, this); + } + + bool SubscribedToRegionEvents() const { return fRegionEventThread.joinable(); } + + void UnsubscribeFromRegionEvents() + { + if (fRegionEventThread.joinable()) { + std::unique_lock lock(fMtx); + fRegionEventsSubscriptionActive = false; + lock.unlock(); + fRegionEventsCV.notify_one(); + fRegionEventThread.join(); + lock.lock(); + fRegionEventCallback = nullptr; + } + } + + void RegionEventsSubscription() + { + std::unique_lock lock(fMtx); + while (fRegionEventsSubscriptionActive) { + + while (!fRegionEvents.empty()) { + auto i = fRegionEvents.front(); + fRegionEventCallback(i); + fRegionEvents.pop(); + } + fRegionEventsCV.wait(lock, [&]() { return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); }); + } + } + + std::vector GetRegionInfo() const + { + std::lock_guard lock(fMtx); + return fRegionInfos; + } + + uint64_t RegionCount() const + { + std::lock_guard lock(fMtx); + return fRegionCounter; + } + + void AddRegion(uint64_t id, void* ptr, size_t size, int64_t userFlags, fair::mq::RegionEvent event) + { + { + std::lock_guard lock(fMtx); + ++fRegionCounter; + fRegionInfos.emplace_back(id, ptr, size, userFlags, event); + fRegionEvents.emplace(id, ptr, size, userFlags, event); + } + fRegionEventsCV.notify_one(); + } + + void RemoveRegion(uint64_t id) + { + { + std::lock_guard lock(fMtx); + auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const fair::mq::RegionInfo& i) { + return i.id == id; + }); + if (it != fRegionInfos.end()) { + fRegionEvents.push(*it); + fRegionEvents.back().event = fair::mq::RegionEvent::destroyed; + fRegionInfos.erase(it); + } else { + LOG(error) << "RemoveRegion: given id (" << id << ") not found."; + } + } + fRegionEventsCV.notify_one(); + } + + void Interrupt() { fInterrupted.store(true); } + void Resume() { fInterrupted.store(false); } + void Reset() {} + bool Interrupted() { return fInterrupted.load(); } + + void* GetZmqCtx() { return fZmqCtx; } + + ~Context() + { + UnsubscribeFromRegionEvents(); + + if (fZmqCtx) { + if (zmq_ctx_term(fZmqCtx) != 0) { + if (errno == EINTR) { + LOG(error) << " failed closing context, reason: " << zmq_strerror(errno); + } else { + fZmqCtx = nullptr; + return; + } + } + } else { + LOG(error) << "context not available for shutdown"; + } + } + + private: + void* fZmqCtx; + mutable std::mutex fMtx; + std::atomic fInterrupted; + + uint64_t fRegionCounter; + std::condition_variable fRegionEventsCV; + std::vector fRegionInfos; + std::queue fRegionEvents; + std::thread fRegionEventThread; + std::function fRegionEventCallback; + bool fRegionEventsSubscriptionActive; +}; + +} // namespace zmq +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_ZMQ_CONTEXT_H_ */ diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 07b89e56..7ca522b0 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -17,14 +17,13 @@ #include "FairMQLogger.h" #include #include "FairMQUnmanagedRegionZMQ.h" -#include #include using namespace std; FairMQMessageZMQ::FairMQMessageZMQ(FairMQTransportFactory* factory) - : FairMQMessage{factory} + : FairMQMessage(factory) , fUsedSizeModified(false) , fUsedSize() , fMsg(fair::mq::tools::make_unique()) @@ -37,7 +36,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(FairMQTransportFactory* factory) } FairMQMessageZMQ::FairMQMessageZMQ(const size_t size, FairMQTransportFactory* factory) - : FairMQMessage{factory} + : FairMQMessage(factory) , fUsedSizeModified(false) , fUsedSize(size) , fMsg(fair::mq::tools::make_unique()) @@ -50,7 +49,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(const size_t size, FairMQTransportFactory* fa } FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint, FairMQTransportFactory* factory) - : FairMQMessage{factory} + : FairMQMessage(factory) , fUsedSizeModified(false) , fUsedSize() , fMsg(fair::mq::tools::make_unique()) @@ -63,7 +62,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn } FairMQMessageZMQ::FairMQMessageZMQ(FairMQUnmanagedRegionPtr& region, void* data, const size_t size, void* hint, FairMQTransportFactory* factory) - : FairMQMessage{factory} + : FairMQMessage(factory) , fUsedSizeModified(false) , fUsedSize() , fMsg(fair::mq::tools::make_unique()) diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index c14ec2d6..cd6dac4e 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -13,16 +13,13 @@ #include -#include - using namespace std; using namespace fair::mq; -atomic FairMQSocketZMQ::fInterrupted(false); - -FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context, FairMQTransportFactory* fac) - : FairMQSocket{fac} - , fSocket(nullptr) +FairMQSocketZMQ::FairMQSocketZMQ(fair::mq::zmq::Context& ctx, const string& type, const string& name, const string& id /*= ""*/, FairMQTransportFactory* factory) + : FairMQSocket(factory) + , fCtx(ctx) + , fSocket(zmq_socket(fCtx.GetZmqCtx(), GetConstant(type))) , fId(id + "." + name + "." + type) , fBytesTx(0) , fBytesRx(0) @@ -31,9 +28,6 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const s , fSndTimeout(100) , fRcvTimeout(100) { - assert(context); - fSocket = zmq_socket(context, GetConstant(type)); - if (fSocket == nullptr) { LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); @@ -122,7 +116,7 @@ int FairMQSocketZMQ::Send(FairMQMessagePtr& msg, const int timeout) return nbytes; } else if (zmq_errno() == EAGAIN) { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) { @@ -161,7 +155,7 @@ int FairMQSocketZMQ::Receive(FairMQMessagePtr& msg, const int timeout) ++fMessagesRx; return nbytes; } else if (zmq_errno() == EAGAIN) { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) { @@ -213,7 +207,7 @@ int64_t FairMQSocketZMQ::Send(vector& msgVec, const int timeou } else { // according to ZMQ docs, this can only occur for the first part if (zmq_errno() == EAGAIN) { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fSndTimeout; if (elapsed >= timeout) { @@ -278,7 +272,7 @@ int64_t FairMQSocketZMQ::Receive(vector& msgVec, const int tim msgVec.push_back(move(part)); totalSize += nbytes; } else if (zmq_errno() == EAGAIN) { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) { if (timeout > 0) { elapsed += fRcvTimeout; if (elapsed >= timeout) { @@ -329,16 +323,6 @@ void FairMQSocketZMQ::Close() fSocket = nullptr; } -void FairMQSocketZMQ::Interrupt() -{ - fInterrupted = true; -} - -void FairMQSocketZMQ::Resume() -{ - fInterrupted = false; -} - void* FairMQSocketZMQ::GetSocket() const { return fSocket; diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index ba84d2aa..13e5b9c2 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -9,18 +9,20 @@ #ifndef FAIRMQSOCKETZMQ_H_ #define FAIRMQSOCKETZMQ_H_ -#include - -#include // unique_ptr - +#include #include "FairMQSocket.h" #include "FairMQMessage.h" + +#include +#include // unique_ptr + class FairMQTransportFactory; class FairMQSocketZMQ final : public FairMQSocket { public: - FairMQSocketZMQ(const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr, FairMQTransportFactory* factory = nullptr); + FairMQSocketZMQ(fair::mq::zmq::Context& ctx, const std::string& type, const std::string& name, const std::string& id = "", FairMQTransportFactory* factory = nullptr); + FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; @@ -38,9 +40,6 @@ class FairMQSocketZMQ final : public FairMQSocket void Close() override; - static void Interrupt(); - static void Resume(); - void SetOption(const std::string& option, const void* value, size_t valueSize) override; void GetOption(const std::string& option, void* value, size_t* valueSize) override; @@ -65,6 +64,7 @@ class FairMQSocketZMQ final : public FairMQSocket ~FairMQSocketZMQ() override; private: + fair::mq::zmq::Context& fCtx; void* fSocket; std::string fId; std::atomic fBytesTx; @@ -72,8 +72,6 @@ class FairMQSocketZMQ final : public FairMQSocket std::atomic fMessagesTx; std::atomic fMessagesRx; - static std::atomic fInterrupted; - int fSndTimeout; int fRcvTimeout; }; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index d15c0791..e3a613f7 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include "FairMQTransportFactoryZMQ.h" +#include #include #include // find_if @@ -15,40 +16,18 @@ using namespace std; FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fair::mq::ProgOptions* config) : FairMQTransportFactory(id) - , fContext(zmq_ctx_new()) - , fRegionCounter(0) + , fCtx(nullptr) { int major, minor, patch; zmq_version(&major, &minor, &patch); LOG(debug) << "Transport: Using ZeroMQ library, version: " << major << "." << minor << "." << patch; - if (!fContext) - { - LOG(error) << "failed creating context, reason: " << zmq_strerror(errno); - exit(EXIT_FAILURE); - } - - if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) - { - LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); - } - - int numIoThreads = 1; - if (config) - { - numIoThreads = config->GetProperty("io-threads", numIoThreads); - } - else - { + if (config) { + fCtx = fair::mq::tools::make_unique(config->GetProperty("io-threads", 1)); + } else { LOG(debug) << "fair::mq::ProgOptions not available! Using defaults."; + fCtx = fair::mq::tools::make_unique(1); } - - if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) - { - LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); - } - - fRegionEvents.emplace(0, nullptr, 0, 0, fair::mq::RegionEvent::local_only); } FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage() @@ -73,8 +52,7 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQUnmanagedRegionP FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) { - assert(fContext); - return unique_ptr(new FairMQSocketZMQ(type, name, GetId(), fContext, this)); + return unique_ptr(new FairMQSocketZMQ(*fCtx, type, name, GetId(), this)); } FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector& channels) const @@ -135,109 +113,13 @@ FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion( const string& path /* = "" */, int flags /* = 0 */) { - unique_ptr ptr = nullptr; - { - lock_guard lock(fMtx); - - ++fRegionCounter; - ptr = unique_ptr(new FairMQUnmanagedRegionZMQ(fRegionCounter, size, userFlags, callback, bulkCallback, path, flags, this)); - auto zPtr = static_cast(ptr.get()); - fRegionInfos.emplace_back(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created); - fRegionEvents.emplace(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created); - } - fRegionEventsCV.notify_one(); + unique_ptr ptr = unique_ptr(new FairMQUnmanagedRegionZMQ(*fCtx, size, userFlags, callback, bulkCallback, path, flags, this)); + auto zPtr = static_cast(ptr.get()); + fCtx->AddRegion(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created); return ptr; } -void FairMQTransportFactoryZMQ::SubscribeToRegionEvents(FairMQRegionEventCallback callback) -{ - if (fRegionEventThread.joinable()) { - LOG(debug) << "Already subscribed. Overwriting previous subscription."; - { - lock_guard lock(fMtx); - fRegionEventsSubscriptionActive = false; - } - fRegionEventsCV.notify_one(); - fRegionEventThread.join(); - } - lock_guard lock(fMtx); - fRegionEventCallback = callback; - fRegionEventsSubscriptionActive = true; - fRegionEventThread = thread(&FairMQTransportFactoryZMQ::RegionEventsSubscription, this); -} - -bool FairMQTransportFactoryZMQ::SubscribedToRegionEvents() -{ - return fRegionEventThread.joinable(); -} - -void FairMQTransportFactoryZMQ::UnsubscribeFromRegionEvents() -{ - if (fRegionEventThread.joinable()) { - unique_lock lock(fMtx); - fRegionEventsSubscriptionActive = false; - lock.unlock(); - fRegionEventsCV.notify_one(); - fRegionEventThread.join(); - lock.lock(); - fRegionEventCallback = nullptr; - } -} - -void FairMQTransportFactoryZMQ::RegionEventsSubscription() -{ - unique_lock lock(fMtx); - while (fRegionEventsSubscriptionActive) { - - while (!fRegionEvents.empty()) { - auto i = fRegionEvents.front(); - fRegionEventCallback(i); - fRegionEvents.pop(); - } - fRegionEventsCV.wait(lock, [&]() { return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); }); - } -} - -vector FairMQTransportFactoryZMQ::GetRegionInfo() -{ - lock_guard lock(fMtx); - return fRegionInfos; -} - -void FairMQTransportFactoryZMQ::RemoveRegion(uint64_t id) -{ - { - lock_guard lock(fMtx); - auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const fair::mq::RegionInfo& i) { - return i.id == id; - }); - if (it != fRegionInfos.end()) { - fRegionEvents.push(*it); - fRegionEvents.back().event = fair::mq::RegionEvent::destroyed; - fRegionInfos.erase(it); - } else { - LOG(error) << "RemoveRegion: given id (" << id << ") not found."; - } - } - fRegionEventsCV.notify_one(); -} - FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ() { LOG(debug) << "Destroying ZeroMQ transport..."; - - UnsubscribeFromRegionEvents(); - - if (fContext) { - if (zmq_ctx_term(fContext) != 0) { - if (errno == EINTR) { - LOG(error) << " failed closing context, reason: " << zmq_strerror(errno); - } else { - fContext = nullptr; - return; - } - } - } else { - LOG(error) << "context not available for shutdown"; - } } diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index bc125463..f2920ebf 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -15,19 +15,16 @@ #ifndef FAIRMQTRANSPORTFACTORYZMQ_H_ #define FAIRMQTRANSPORTFACTORYZMQ_H_ +#include +#include #include "FairMQTransportFactory.h" #include "FairMQMessageZMQ.h" #include "FairMQSocketZMQ.h" #include "FairMQPollerZMQ.h" #include "FairMQUnmanagedRegionZMQ.h" -#include -#include -#include -#include -#include +#include // unique_ptr #include -#include #include class FairMQTransportFactoryZMQ final : public FairMQTransportFactory @@ -54,32 +51,21 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override; FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0); - void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override; - bool SubscribedToRegionEvents() override; - void UnsubscribeFromRegionEvents() override; - void RegionEventsSubscription(); - std::vector GetRegionInfo() override; - void RemoveRegion(uint64_t id); + void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override { fCtx->SubscribeToRegionEvents(callback); } + bool SubscribedToRegionEvents() override { return fCtx->SubscribedToRegionEvents(); } + void UnsubscribeFromRegionEvents() override { fCtx->UnsubscribeFromRegionEvents(); } + std::vector GetRegionInfo() override { return fCtx->GetRegionInfo(); } fair::mq::Transport GetType() const override { return fair::mq::Transport::ZMQ; } - void Interrupt() override { FairMQSocketZMQ::Interrupt(); } - void Resume() override { FairMQSocketZMQ::Resume(); } - void Reset() override {} + void Interrupt() override { fCtx->Interrupt(); } + void Resume() override { fCtx->Resume(); } + void Reset() override { fCtx->Reset(); } ~FairMQTransportFactoryZMQ() override; private: - void* fContext; - - std::mutex fMtx; - uint64_t fRegionCounter; - std::condition_variable fRegionEventsCV; - std::vector fRegionInfos; - std::queue fRegionEvents; - std::thread fRegionEventThread; - std::function fRegionEventCallback; - bool fRegionEventsSubscriptionActive; + std::unique_ptr fCtx; }; #endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx index 66a4ed4d..7f0d8c70 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.cxx @@ -7,19 +7,19 @@ ********************************************************************************/ #include "FairMQUnmanagedRegionZMQ.h" -#include "FairMQTransportFactoryZMQ.h" #include "FairMQLogger.h" -FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(uint64_t id, - const size_t size, +FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(fair::mq::zmq::Context& ctx, + size_t size, int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& /* path = "" */, int /* flags = 0 */, - FairMQTransportFactory* factory /* = nullptr */) + FairMQTransportFactory* factory) : FairMQUnmanagedRegion(factory) - , fId(id) + , fCtx(ctx) + , fId(fCtx.RegionCount()) , fBuffer(malloc(size)) , fSize(size) , fUserFlags(userFlags) @@ -39,7 +39,7 @@ size_t FairMQUnmanagedRegionZMQ::GetSize() const FairMQUnmanagedRegionZMQ::~FairMQUnmanagedRegionZMQ() { - LOG(debug) << "destroying region"; - static_cast(GetTransport())->RemoveRegion(fId); + LOG(debug) << "destroying region " << fId; + fCtx.RemoveRegion(fId); free(fBuffer); } diff --git a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h index cda32214..e6639dcf 100644 --- a/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h +++ b/fairmq/zeromq/FairMQUnmanagedRegionZMQ.h @@ -9,6 +9,7 @@ #ifndef FAIRMQUNMANAGEDREGIONZMQ_H_ #define FAIRMQUNMANAGEDREGIONZMQ_H_ +#include #include "FairMQUnmanagedRegion.h" #include // size_t @@ -21,7 +22,14 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion friend class FairMQMessageZMQ; public: - FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, FairMQRegionBulkCallback bulkCallback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory = nullptr); + FairMQUnmanagedRegionZMQ(fair::mq::zmq::Context& ctx, + size_t size, + int64_t userFlags, + FairMQRegionCallback callback, + FairMQRegionBulkCallback bulkCallback, + const std::string& path = "", + int flags = 0, + FairMQTransportFactory* factory = nullptr); FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete; FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete; @@ -34,6 +42,7 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion virtual ~FairMQUnmanagedRegionZMQ(); private: + fair::mq::zmq::Context& fCtx; uint64_t fId; void* fBuffer; size_t fSize;