Implement region events for zmq

This commit is contained in:
Alexey Rybalchenko
2020-05-06 15:45:06 +02:00
parent 4218c185a4
commit 9992811822
15 changed files with 196 additions and 71 deletions

View File

@@ -9,6 +9,8 @@
#include "FairMQTransportFactoryZMQ.h"
#include <zmq.h>
#include <algorithm> // find_if
using namespace std;
fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transport::ZMQ;
@@ -16,6 +18,7 @@ fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transp
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fair::mq::ProgOptions* config)
: FairMQTransportFactory(id)
, fContext(zmq_ctx_new())
, fRegionCounter(0)
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
@@ -80,7 +83,7 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChann
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
}
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const std::vector<FairMQChannel*>& channels) const
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel*>& channels) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
}
@@ -90,14 +93,93 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map<stri
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList));
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback, path, flags));
return CreateUnmanagedRegion(size, 0, callback, path, flags);
}
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
{
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, userFlags, callback, path, flags));
unique_ptr<FairMQUnmanagedRegion> ptr = nullptr;
{
lock_guard<mutex> lock(fMtx);
++fRegionCounter;
ptr = unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(fRegionCounter, size, userFlags, callback, path, flags, this));
auto zPtr = static_cast<FairMQUnmanagedRegionZMQ*>(ptr.get());
fRegionInfos.emplace_back(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
fRegionEvents.emplace(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
}
fRegionEventsCV.notify_one();
return ptr;
}
void FairMQTransportFactoryZMQ::SubscribeToRegionEvents(FairMQRegionEventCallback callback)
{
if (fRegionEventThread.joinable()) {
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
{
lock_guard<mutex> lock(fMtx);
fRegionEventsSubscriptionActive = false;
}
fRegionEventsCV.notify_one();
fRegionEventThread.join();
}
lock_guard<mutex> lock(fMtx);
fRegionEventCallback = callback;
fRegionEventsSubscriptionActive = true;
fRegionEventThread = thread(&FairMQTransportFactoryZMQ::RegionEventsSubscription, this);
}
void FairMQTransportFactoryZMQ::UnsubscribeFromRegionEvents()
{
if (fRegionEventThread.joinable()) {
unique_lock<mutex> lock(fMtx);
fRegionEventsSubscriptionActive = false;
lock.unlock();
fRegionEventsCV.notify_one();
fRegionEventThread.join();
lock.lock();
fRegionEventCallback = nullptr;
}
}
void FairMQTransportFactoryZMQ::RegionEventsSubscription()
{
unique_lock<mutex> lock(fMtx);
while (fRegionEventsSubscriptionActive) {
while (!fRegionEvents.empty()) {
auto i = fRegionEvents.front();
fRegionEventCallback(i);
fRegionEvents.pop();
}
fRegionEventsCV.wait(lock, [&]() { return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); });
}
}
vector<fair::mq::RegionInfo> FairMQTransportFactoryZMQ::GetRegionInfo()
{
lock_guard<mutex> lock(fMtx);
return fRegionInfos;
}
void FairMQTransportFactoryZMQ::RemoveRegion(uint64_t id)
{
{
lock_guard<mutex> lock(fMtx);
auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const fair::mq::RegionInfo& i) {
return i.id == id;
});
if (it != fRegionInfos.end()) {
fRegionEvents.push(*it);
fRegionEvents.back().event = fair::mq::RegionEvent::destroyed;
fRegionInfos.erase(it);
} else {
LOG(error) << "RemoveRegion: given id (" << id << ") not found.";
}
}
fRegionEventsCV.notify_one();
}
fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const
@@ -108,23 +190,19 @@ fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const
FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ()
{
LOG(debug) << "Destroying ZeroMQ transport...";
if (fContext)
{
if (zmq_ctx_term(fContext) != 0)
{
if (errno == EINTR)
{
UnsubscribeFromRegionEvents();
if (fContext) {
if (zmq_ctx_term(fContext) != 0) {
if (errno == EINTR) {
LOG(error) << " failed closing context, reason: " << zmq_strerror(errno);
}
else
{
} else {
fContext = nullptr;
return;
}
}
}
else
{
} else {
LOG(error) << "context not available for shutdown";
}
}

View File

@@ -15,9 +15,6 @@
#ifndef FAIRMQTRANSPORTFACTORYZMQ_H_
#define FAIRMQTRANSPORTFACTORYZMQ_H_
#include <vector>
#include <string>
#include "FairMQTransportFactory.h"
#include "FairMQMessageZMQ.h"
#include "FairMQSocketZMQ.h"
@@ -25,6 +22,14 @@
#include "FairMQUnmanagedRegionZMQ.h"
#include <fairmq/ProgOptions.h>
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <vector>
class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
{
public:
@@ -32,8 +37,6 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
FairMQTransportFactoryZMQ(const FairMQTransportFactoryZMQ&) = delete;
FairMQTransportFactoryZMQ operator=(const FairMQTransportFactoryZMQ&) = delete;
~FairMQTransportFactoryZMQ() override;
FairMQMessagePtr CreateMessage() override;
FairMQMessagePtr CreateMessage(const size_t size) override;
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
@@ -45,12 +48,14 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for ZeroMQ"; }
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for ZeroMQ"; }
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for ZeroMQ, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override;
void UnsubscribeFromRegionEvents() override;
void RegionEventsSubscription();
std::vector<fair::mq::RegionInfo> GetRegionInfo() override;
void RemoveRegion(uint64_t id);
fair::mq::Transport GetType() const override;
@@ -58,9 +63,20 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
void Resume() override { FairMQSocketZMQ::Resume(); }
void Reset() override {}
~FairMQTransportFactoryZMQ() override;
private:
static fair::mq::Transport fTransportType;
void* fContext;
std::mutex fMtx;
uint64_t fRegionCounter;
std::condition_variable fRegionEventsCV;
std::vector<fair::mq::RegionInfo> fRegionInfos;
std::queue<fair::mq::RegionInfo> fRegionEvents;
std::thread fRegionEventThread;
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
bool fRegionEventsSubscriptionActive;
};
#endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */

View File

@@ -7,23 +7,17 @@
********************************************************************************/
#include "FairMQUnmanagedRegionZMQ.h"
#include "FairMQTransportFactoryZMQ.h"
#include "FairMQLogger.h"
using namespace std;
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */)
: fBuffer(malloc(size))
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory /* = nullptr */)
: FairMQUnmanagedRegion(factory)
, fId(id)
, fBuffer(malloc(size))
, fSize(size)
, fUserFlags(userFlags)
, fCallback(callback)
{
}
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, int64_t /* userFlags */, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */)
: fBuffer(malloc(size))
, fSize(size)
, fCallback(callback)
{
}
{}
void* FairMQUnmanagedRegionZMQ::GetData() const
{
@@ -38,5 +32,6 @@ size_t FairMQUnmanagedRegionZMQ::GetSize() const
FairMQUnmanagedRegionZMQ::~FairMQUnmanagedRegionZMQ()
{
LOG(debug) << "destroying region";
static_cast<FairMQTransportFactoryZMQ*>(GetTransport())->RemoveRegion(fId);
free(fBuffer);
}

View File

@@ -13,6 +13,7 @@
#include <cstddef> // size_t
#include <string>
class FairMQTransportFactory;
class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
{
@@ -20,19 +21,23 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
friend class FairMQMessageZMQ;
public:
FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionZMQ(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory = nullptr);
FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete;
FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;
uint64_t GetId() const { return fId; }
virtual void* GetData() const override;
virtual size_t GetSize() const override;
int64_t GetUserFlags() const { return fUserFlags; }
virtual ~FairMQUnmanagedRegionZMQ();
private:
uint64_t fId;
void* fBuffer;
size_t fSize;
int64_t fUserFlags;
FairMQRegionCallback fCallback;
};