mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-16 10:01:47 +00:00
Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
8cfc04721e | ||
|
e9318dd234 | ||
|
c8fc5ad33f | ||
|
59e32437a2 | ||
|
a3afadb824 | ||
|
9992811822 | ||
|
4218c185a4 | ||
|
5a49c5b9b1 |
@@ -84,7 +84,7 @@ class FairMQTransportFactory
|
|||||||
/// @param path optional parameter to pass to the underlying transport
|
/// @param path optional parameter to pass to the underlying transport
|
||||||
/// @param flags optional parameter to pass to the underlying transport
|
/// @param flags optional parameter to pass to the underlying transport
|
||||||
/// @return pointer to UnmanagedRegion
|
/// @return pointer to UnmanagedRegion
|
||||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0;
|
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||||
/// @brief Create new UnmanagedRegion
|
/// @brief Create new UnmanagedRegion
|
||||||
/// @param size size of the region
|
/// @param size size of the region
|
||||||
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
|
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
|
||||||
@@ -92,11 +92,14 @@ class FairMQTransportFactory
|
|||||||
/// @param path optional parameter to pass to the underlying transport
|
/// @param path optional parameter to pass to the underlying transport
|
||||||
/// @param flags optional parameter to pass to the underlying transport
|
/// @param flags optional parameter to pass to the underlying transport
|
||||||
/// @return pointer to UnmanagedRegion
|
/// @return pointer to UnmanagedRegion
|
||||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const = 0;
|
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||||
|
|
||||||
/// @brief Subscribe to region events (creation, destruction, ...)
|
/// @brief Subscribe to region events (creation, destruction, ...)
|
||||||
/// @param callback the callback that is called when a region event occurs
|
/// @param callback the callback that is called when a region event occurs
|
||||||
virtual void SubscribeToRegionEvents(FairMQRegionEventCallback callback) = 0;
|
virtual void SubscribeToRegionEvents(FairMQRegionEventCallback callback) = 0;
|
||||||
|
/// @brief Check if there is an active subscription to region events
|
||||||
|
/// @return true/false
|
||||||
|
virtual bool SubscribedToRegionEvents() = 0;
|
||||||
/// @brief Unsubscribe from region events
|
/// @brief Unsubscribe from region events
|
||||||
virtual void UnsubscribeFromRegionEvents() = 0;
|
virtual void UnsubscribeFromRegionEvents() = 0;
|
||||||
|
|
||||||
|
@@ -14,13 +14,33 @@
|
|||||||
#include <functional> // std::function
|
#include <functional> // std::function
|
||||||
#include <ostream> // std::ostream
|
#include <ostream> // std::ostream
|
||||||
|
|
||||||
|
class FairMQTransportFactory;
|
||||||
|
|
||||||
enum class FairMQRegionEvent : int
|
enum class FairMQRegionEvent : int
|
||||||
{
|
{
|
||||||
created,
|
created,
|
||||||
destroyed
|
destroyed,
|
||||||
|
local_only
|
||||||
};
|
};
|
||||||
|
|
||||||
struct FairMQRegionInfo {
|
struct FairMQRegionInfo
|
||||||
|
{
|
||||||
|
FairMQRegionInfo()
|
||||||
|
: id(0)
|
||||||
|
, ptr(nullptr)
|
||||||
|
, size(0)
|
||||||
|
, flags(0)
|
||||||
|
, event(FairMQRegionEvent::created)
|
||||||
|
{}
|
||||||
|
|
||||||
|
FairMQRegionInfo(uint64_t _id, void* _ptr, size_t _size, int64_t _flags, FairMQRegionEvent _event)
|
||||||
|
: id(_id)
|
||||||
|
, ptr(_ptr)
|
||||||
|
, size(_size)
|
||||||
|
, flags(_flags)
|
||||||
|
, event (_event)
|
||||||
|
{}
|
||||||
|
|
||||||
uint64_t id; // id of the region
|
uint64_t id; // id of the region
|
||||||
void* ptr; // pointer to the start of the region
|
void* ptr; // pointer to the start of the region
|
||||||
size_t size; // region size
|
size_t size; // region size
|
||||||
@@ -34,20 +54,35 @@ using FairMQRegionEventCallback = std::function<void(FairMQRegionInfo)>;
|
|||||||
class FairMQUnmanagedRegion
|
class FairMQUnmanagedRegion
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
FairMQUnmanagedRegion() {}
|
||||||
|
FairMQUnmanagedRegion(FairMQTransportFactory* factory): fTransport(factory) {}
|
||||||
|
|
||||||
virtual void* GetData() const = 0;
|
virtual void* GetData() const = 0;
|
||||||
virtual size_t GetSize() const = 0;
|
virtual size_t GetSize() const = 0;
|
||||||
|
virtual uint64_t GetId() const = 0;
|
||||||
|
|
||||||
|
FairMQTransportFactory* GetTransport() { return fTransport; }
|
||||||
|
void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }
|
||||||
|
|
||||||
virtual ~FairMQUnmanagedRegion() {};
|
virtual ~FairMQUnmanagedRegion() {};
|
||||||
|
|
||||||
|
private:
|
||||||
|
FairMQTransportFactory* fTransport{nullptr};
|
||||||
};
|
};
|
||||||
|
|
||||||
using FairMQUnmanagedRegionPtr = std::unique_ptr<FairMQUnmanagedRegion>;
|
using FairMQUnmanagedRegionPtr = std::unique_ptr<FairMQUnmanagedRegion>;
|
||||||
|
|
||||||
inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event)
|
inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event)
|
||||||
{
|
{
|
||||||
if (event == FairMQRegionEvent::created) {
|
switch (event) {
|
||||||
return os << "created";
|
case FairMQRegionEvent::created:
|
||||||
} else {
|
return os << "created";
|
||||||
return os << "destroyed";
|
case FairMQRegionEvent::destroyed:
|
||||||
|
return os << "destroyed";
|
||||||
|
case FairMQRegionEvent::local_only:
|
||||||
|
return os << "local_only";
|
||||||
|
default:
|
||||||
|
return os << "unrecognized event";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -19,6 +19,7 @@ fair::mq::Transport FairMQTransportFactoryNN::fTransportType = fair::mq::Transpo
|
|||||||
|
|
||||||
FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const fair::mq::ProgOptions* /*config*/)
|
FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const fair::mq::ProgOptions* /*config*/)
|
||||||
: FairMQTransportFactory(id)
|
: FairMQTransportFactory(id)
|
||||||
|
, fRegionCounter(0)
|
||||||
{
|
{
|
||||||
LOG(debug) << "Transport: Using nanomsg library";
|
LOG(debug) << "Transport: Using nanomsg library";
|
||||||
}
|
}
|
||||||
@@ -65,14 +66,14 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map<strin
|
|||||||
return unique_ptr<FairMQPoller>(new FairMQPollerNN(channelsMap, channelList));
|
return unique_ptr<FairMQPoller>(new FairMQPollerNN(channelsMap, channelList));
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
|
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
|
||||||
{
|
{
|
||||||
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, callback, path, flags));
|
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(++fRegionCounter, size, callback, path, flags, this));
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
|
FairMQUnmanagedRegionPtr FairMQTransportFactoryNN::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
|
||||||
{
|
{
|
||||||
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, userFlags, callback, path, flags));
|
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionNN(size, userFlags, callback, path, flags, this));
|
||||||
}
|
}
|
||||||
|
|
||||||
fair::mq::Transport FairMQTransportFactoryNN::GetType() const
|
fair::mq::Transport FairMQTransportFactoryNN::GetType() const
|
||||||
|
@@ -36,10 +36,11 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory
|
|||||||
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
|
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
|
||||||
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
||||||
|
|
||||||
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override;
|
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
|
||||||
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
|
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override;
|
||||||
|
|
||||||
void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for nanomsg"; }
|
void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for nanomsg"; }
|
||||||
|
bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for nanomsg"; return false; }
|
||||||
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for nanomsg"; }
|
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for nanomsg"; }
|
||||||
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for nanomsg, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
|
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for nanomsg, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
|
||||||
|
|
||||||
@@ -51,6 +52,7 @@ class FairMQTransportFactoryNN final : public FairMQTransportFactory
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
static fair::mq::Transport fTransportType;
|
static fair::mq::Transport fTransportType;
|
||||||
|
uint64_t fRegionCounter;
|
||||||
mutable std::vector<FairMQSocket*> fSockets;
|
mutable std::vector<FairMQSocket*> fSockets;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -11,15 +11,19 @@
|
|||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */)
|
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(uint64_t id, const size_t size, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */, FairMQTransportFactory* factory /* = nullptr */)
|
||||||
: fBuffer(malloc(size))
|
: FairMQUnmanagedRegion(factory)
|
||||||
|
, fId(id)
|
||||||
|
, fBuffer(malloc(size))
|
||||||
, fSize(size)
|
, fSize(size)
|
||||||
, fCallback(callback)
|
, fCallback(callback)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(const size_t size, const int64_t /*userFlags*/, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */)
|
FairMQUnmanagedRegionNN::FairMQUnmanagedRegionNN(uint64_t id, const size_t size, const int64_t /*userFlags*/, FairMQRegionCallback callback, const std::string& /*path = "" */, int /*flags = 0 */, FairMQTransportFactory* factory /* = nullptr */)
|
||||||
: fBuffer(malloc(size))
|
: FairMQUnmanagedRegion(factory)
|
||||||
|
, fId(id)
|
||||||
|
, fBuffer(malloc(size))
|
||||||
, fSize(size)
|
, fSize(size)
|
||||||
, fCallback(callback)
|
, fCallback(callback)
|
||||||
{
|
{
|
||||||
@@ -35,6 +39,12 @@ size_t FairMQUnmanagedRegionNN::GetSize() const
|
|||||||
return fSize;
|
return fSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t FairMQUnmanagedRegionNN::GetId() const
|
||||||
|
{
|
||||||
|
return fId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
FairMQUnmanagedRegionNN::~FairMQUnmanagedRegionNN()
|
FairMQUnmanagedRegionNN::~FairMQUnmanagedRegionNN()
|
||||||
{
|
{
|
||||||
LOG(debug) << "destroying region";
|
LOG(debug) << "destroying region";
|
||||||
|
@@ -19,18 +19,20 @@ class FairMQUnmanagedRegionNN final : public FairMQUnmanagedRegion
|
|||||||
friend class FairMQSocketNN;
|
friend class FairMQSocketNN;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
FairMQUnmanagedRegionNN(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
|
FairMQUnmanagedRegionNN(uint64_t id, const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr);
|
||||||
FairMQUnmanagedRegionNN(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
|
FairMQUnmanagedRegionNN(uint64_t id, const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr);
|
||||||
|
|
||||||
FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete;
|
FairMQUnmanagedRegionNN(const FairMQUnmanagedRegionNN&) = delete;
|
||||||
FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete;
|
FairMQUnmanagedRegionNN operator=(const FairMQUnmanagedRegionNN&) = delete;
|
||||||
|
|
||||||
virtual void* GetData() const override;
|
void* GetData() const override;
|
||||||
virtual size_t GetSize() const override;
|
size_t GetSize() const override;
|
||||||
|
uint64_t GetId() const override;
|
||||||
|
|
||||||
virtual ~FairMQUnmanagedRegionNN();
|
virtual ~FairMQUnmanagedRegionNN();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
uint64_t fId;
|
||||||
void* fBuffer;
|
void* fBuffer;
|
||||||
size_t fSize;
|
size_t fSize;
|
||||||
FairMQRegionCallback fCallback;
|
FairMQRegionCallback fCallback;
|
||||||
|
@@ -85,12 +85,12 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
|
|||||||
// return PollerPtr{new Poller(channelsMap, channelList)};
|
// return PollerPtr{new Poller(channelsMap, channelList)};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) const -> UnmanagedRegionPtr
|
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||||
{
|
{
|
||||||
throw runtime_error{"Not yet implemented UMR."};
|
throw runtime_error{"Not yet implemented UMR."};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) const -> UnmanagedRegionPtr
|
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||||
{
|
{
|
||||||
throw runtime_error{"Not yet implemented UMR."};
|
throw runtime_error{"Not yet implemented UMR."};
|
||||||
}
|
}
|
||||||
|
@@ -46,10 +46,11 @@ class TransportFactory final : public FairMQTransportFactory
|
|||||||
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override;
|
auto CreatePoller(const std::vector<FairMQChannel*>& channels) const -> PollerPtr override;
|
||||||
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
|
auto CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const -> PollerPtr override;
|
||||||
|
|
||||||
auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override;
|
auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
|
||||||
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const -> UnmanagedRegionPtr override;
|
auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) -> UnmanagedRegionPtr override;
|
||||||
|
|
||||||
void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; }
|
void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; }
|
||||||
|
bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for OFI"; return false; }
|
||||||
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for OFI"; }
|
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for OFI"; }
|
||||||
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
|
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
|
||||||
|
|
||||||
|
@@ -183,7 +183,6 @@ Region* Manager::GetRegionUnsafe(const uint64_t id)
|
|||||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
||||||
|
|
||||||
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, 0, true, nullptr, path, flags));
|
auto r = fRegions.emplace(id, tools::make_unique<Region>(*this, id, 0, true, nullptr, path, flags));
|
||||||
r.first->second->StartSendingAcks();
|
|
||||||
return r.first->second.get();
|
return r.first->second.get();
|
||||||
} catch (bie& e) {
|
} catch (bie& e) {
|
||||||
LOG(warn) << "Could not get remote region for id: " << id;
|
LOG(warn) << "Could not get remote region for id: " << id;
|
||||||
@@ -234,31 +233,42 @@ vector<fair::mq::RegionInfo> Manager::GetRegionInfoUnsafe()
|
|||||||
|
|
||||||
void Manager::SubscribeToRegionEvents(RegionEventCallback callback)
|
void Manager::SubscribeToRegionEvents(RegionEventCallback callback)
|
||||||
{
|
{
|
||||||
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
|
|
||||||
if (fRegionEventThread.joinable()) {
|
if (fRegionEventThread.joinable()) {
|
||||||
fRegionEventsSubscriptionActive.store(false);
|
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
|
||||||
|
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
|
||||||
|
fRegionEventsSubscriptionActive = false;
|
||||||
|
lock.unlock();
|
||||||
|
fRegionEventsCV.notify_all();
|
||||||
fRegionEventThread.join();
|
fRegionEventThread.join();
|
||||||
}
|
}
|
||||||
|
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
|
||||||
fRegionEventCallback = callback;
|
fRegionEventCallback = callback;
|
||||||
fRegionEventsSubscriptionActive.store(true);
|
fRegionEventsSubscriptionActive = true;
|
||||||
fRegionEventThread = thread(&Manager::RegionEventsSubscription, this);
|
fRegionEventThread = thread(&Manager::RegionEventsSubscription, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Manager::SubscribedToRegionEvents()
|
||||||
|
{
|
||||||
|
return fRegionEventThread.joinable();
|
||||||
|
}
|
||||||
|
|
||||||
void Manager::UnsubscribeFromRegionEvents()
|
void Manager::UnsubscribeFromRegionEvents()
|
||||||
{
|
{
|
||||||
if (fRegionEventThread.joinable()) {
|
if (fRegionEventThread.joinable()) {
|
||||||
fRegionEventsSubscriptionActive.store(false);
|
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
|
||||||
|
fRegionEventsSubscriptionActive = false;
|
||||||
|
lock.unlock();
|
||||||
fRegionEventsCV.notify_all();
|
fRegionEventsCV.notify_all();
|
||||||
fRegionEventThread.join();
|
fRegionEventThread.join();
|
||||||
|
lock.lock();
|
||||||
|
fRegionEventCallback = nullptr;
|
||||||
}
|
}
|
||||||
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
|
|
||||||
fRegionEventCallback = nullptr;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Manager::RegionEventsSubscription()
|
void Manager::RegionEventsSubscription()
|
||||||
{
|
{
|
||||||
while (fRegionEventsSubscriptionActive.load()) {
|
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
|
||||||
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
|
while (fRegionEventsSubscriptionActive) {
|
||||||
auto infos = GetRegionInfoUnsafe();
|
auto infos = GetRegionInfoUnsafe();
|
||||||
for (const auto& i : infos) {
|
for (const auto& i : infos) {
|
||||||
auto el = fObservedRegionEvents.find(i.id);
|
auto el = fObservedRegionEvents.find(i.id);
|
||||||
@@ -299,11 +309,7 @@ Manager::~Manager()
|
|||||||
{
|
{
|
||||||
bool lastRemoved = false;
|
bool lastRemoved = false;
|
||||||
|
|
||||||
if (fRegionEventThread.joinable()) {
|
UnsubscribeFromRegionEvents();
|
||||||
fRegionEventsSubscriptionActive.store(false);
|
|
||||||
fRegionEventsCV.notify_all();
|
|
||||||
fRegionEventThread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
|
bipc::scoped_lock<bipc::named_mutex> lock(fShmMtx);
|
||||||
|
@@ -78,6 +78,7 @@ class Manager
|
|||||||
std::vector<fair::mq::RegionInfo> GetRegionInfo();
|
std::vector<fair::mq::RegionInfo> GetRegionInfo();
|
||||||
std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe();
|
std::vector<fair::mq::RegionInfo> GetRegionInfoUnsafe();
|
||||||
void SubscribeToRegionEvents(RegionEventCallback callback);
|
void SubscribeToRegionEvents(RegionEventCallback callback);
|
||||||
|
bool SubscribedToRegionEvents();
|
||||||
void UnsubscribeFromRegionEvents();
|
void UnsubscribeFromRegionEvents();
|
||||||
void RegionEventsSubscription();
|
void RegionEventsSubscription();
|
||||||
|
|
||||||
@@ -94,7 +95,7 @@ class Manager
|
|||||||
|
|
||||||
boost::interprocess::named_condition fRegionEventsCV;
|
boost::interprocess::named_condition fRegionEventsCV;
|
||||||
std::thread fRegionEventThread;
|
std::thread fRegionEventThread;
|
||||||
std::atomic<bool> fRegionEventsSubscriptionActive;
|
bool fRegionEventsSubscriptionActive;
|
||||||
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
||||||
std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents;
|
std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents;
|
||||||
|
|
||||||
|
@@ -19,6 +19,7 @@
|
|||||||
|
|
||||||
#include <cerrno>
|
#include <cerrno>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
|
#include <ios>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
@@ -49,7 +50,17 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, Region
|
|||||||
if (path != "") {
|
if (path != "") {
|
||||||
fName = string(path + fName);
|
fName = string(path + fName);
|
||||||
|
|
||||||
fFile = fopen(fName.c_str(), fRemote ? "r+" : "w+");
|
if (!fRemote) {
|
||||||
|
// create a file
|
||||||
|
filebuf fbuf;
|
||||||
|
if (fbuf.open(fName, ios_base::in | ios_base::out | ios_base::trunc | ios_base::binary)) {
|
||||||
|
// set the size
|
||||||
|
fbuf.pubseekoff(size - 1, ios_base::beg);
|
||||||
|
fbuf.sputc(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fFile = fopen(fName.c_str(), "r+");
|
||||||
|
|
||||||
if (!fFile) {
|
if (!fFile) {
|
||||||
LOG(error) << "Failed to initialize file: " << fName;
|
LOG(error) << "Failed to initialize file: " << fName;
|
||||||
@@ -70,6 +81,7 @@ Region::Region(Manager& manager, uint64_t id, uint64_t size, bool remote, Region
|
|||||||
}
|
}
|
||||||
|
|
||||||
InitializeQueues();
|
InitializeQueues();
|
||||||
|
StartSendingAcks();
|
||||||
LOG(debug) << "shmem: initialized region: " << fName;
|
LOG(debug) << "shmem: initialized region: " << fName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -159,14 +159,14 @@ PollerPtr TransportFactory::CreatePoller(const unordered_map<string, vector<Fair
|
|||||||
return tools::make_unique<Poller>(channelsMap, channelList);
|
return tools::make_unique<Poller>(channelsMap, channelList);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
|
UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
|
||||||
{
|
{
|
||||||
return tools::make_unique<UnmanagedRegion>(*fManager, size, callback, path, flags);
|
return tools::make_unique<UnmanagedRegion>(*fManager, size, callback, path, flags, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
|
UnmanagedRegionPtr TransportFactory::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */)
|
||||||
{
|
{
|
||||||
return tools::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, path, flags);
|
return tools::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, path, flags, this);
|
||||||
}
|
}
|
||||||
|
|
||||||
void TransportFactory::SubscribeToRegionEvents(RegionEventCallback callback)
|
void TransportFactory::SubscribeToRegionEvents(RegionEventCallback callback)
|
||||||
@@ -174,6 +174,11 @@ void TransportFactory::SubscribeToRegionEvents(RegionEventCallback callback)
|
|||||||
fManager->SubscribeToRegionEvents(callback);
|
fManager->SubscribeToRegionEvents(callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool TransportFactory::SubscribedToRegionEvents()
|
||||||
|
{
|
||||||
|
return fManager->SubscribedToRegionEvents();
|
||||||
|
}
|
||||||
|
|
||||||
void TransportFactory::UnsubscribeFromRegionEvents()
|
void TransportFactory::UnsubscribeFromRegionEvents()
|
||||||
{
|
{
|
||||||
fManager->UnsubscribeFromRegionEvents();
|
fManager->UnsubscribeFromRegionEvents();
|
||||||
|
@@ -49,10 +49,11 @@ class TransportFactory final : public fair::mq::TransportFactory
|
|||||||
PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
|
PollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
|
||||||
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
PollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override;
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
|
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override;
|
||||||
|
|
||||||
void SubscribeToRegionEvents(RegionEventCallback callback) override;
|
void SubscribeToRegionEvents(RegionEventCallback callback) override;
|
||||||
|
bool SubscribedToRegionEvents() override;
|
||||||
void UnsubscribeFromRegionEvents() override;
|
void UnsubscribeFromRegionEvents() override;
|
||||||
std::vector<fair::mq::RegionInfo> GetRegionInfo() override;
|
std::vector<fair::mq::RegionInfo> GetRegionInfo() override;
|
||||||
|
|
||||||
|
@@ -36,12 +36,13 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||||||
friend class Socket;
|
friend class Socket;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0)
|
UnmanagedRegion(Manager& manager, const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr)
|
||||||
: UnmanagedRegion(manager, size, 0, callback, path, flags)
|
: UnmanagedRegion(manager, size, 0, callback, path, flags, factory)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
UnmanagedRegion(Manager& manager, const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0)
|
UnmanagedRegion(Manager& manager, const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, FairMQTransportFactory* factory = nullptr)
|
||||||
: fManager(manager)
|
: FairMQUnmanagedRegion(factory)
|
||||||
|
, fManager(manager)
|
||||||
, fRegion(nullptr)
|
, fRegion(nullptr)
|
||||||
, fRegionId(0)
|
, fRegionId(0)
|
||||||
{
|
{
|
||||||
@@ -52,6 +53,7 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||||||
|
|
||||||
void* GetData() const override { return fRegion->get_address(); }
|
void* GetData() const override { return fRegion->get_address(); }
|
||||||
size_t GetSize() const override { return fRegion->get_size(); }
|
size_t GetSize() const override { return fRegion->get_size(); }
|
||||||
|
uint64_t GetId() const override { return fRegionId; }
|
||||||
|
|
||||||
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
|
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
|
||||||
|
|
||||||
|
@@ -9,6 +9,8 @@
|
|||||||
#include "FairMQTransportFactoryZMQ.h"
|
#include "FairMQTransportFactoryZMQ.h"
|
||||||
#include <zmq.h>
|
#include <zmq.h>
|
||||||
|
|
||||||
|
#include <algorithm> // find_if
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transport::ZMQ;
|
fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transport::ZMQ;
|
||||||
@@ -16,6 +18,7 @@ fair::mq::Transport FairMQTransportFactoryZMQ::fTransportType = fair::mq::Transp
|
|||||||
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fair::mq::ProgOptions* config)
|
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fair::mq::ProgOptions* config)
|
||||||
: FairMQTransportFactory(id)
|
: FairMQTransportFactory(id)
|
||||||
, fContext(zmq_ctx_new())
|
, fContext(zmq_ctx_new())
|
||||||
|
, fRegionCounter(0)
|
||||||
{
|
{
|
||||||
int major, minor, patch;
|
int major, minor, patch;
|
||||||
zmq_version(&major, &minor, &patch);
|
zmq_version(&major, &minor, &patch);
|
||||||
@@ -47,6 +50,7 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const fai
|
|||||||
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fRegionEvents.emplace(0, nullptr, 0, 0, fair::mq::RegionEvent::local_only);
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage()
|
FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage()
|
||||||
@@ -80,7 +84,7 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChann
|
|||||||
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
|
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const std::vector<FairMQChannel*>& channels) const
|
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel*>& channels) const
|
||||||
{
|
{
|
||||||
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
|
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
|
||||||
}
|
}
|
||||||
@@ -90,14 +94,98 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map<stri
|
|||||||
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList));
|
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList));
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
|
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
|
||||||
{
|
{
|
||||||
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, callback, path, flags));
|
return CreateUnmanagedRegion(size, 0, callback, path, flags);
|
||||||
}
|
}
|
||||||
|
|
||||||
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path /* = "" */, int flags /* = 0 */) const
|
FairMQUnmanagedRegionPtr FairMQTransportFactoryZMQ::CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const string& path /* = "" */, int flags /* = 0 */)
|
||||||
{
|
{
|
||||||
return unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(size, userFlags, callback, path, flags));
|
unique_ptr<FairMQUnmanagedRegion> ptr = nullptr;
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock(fMtx);
|
||||||
|
|
||||||
|
++fRegionCounter;
|
||||||
|
ptr = unique_ptr<FairMQUnmanagedRegion>(new FairMQUnmanagedRegionZMQ(fRegionCounter, size, userFlags, callback, path, flags, this));
|
||||||
|
auto zPtr = static_cast<FairMQUnmanagedRegionZMQ*>(ptr.get());
|
||||||
|
fRegionInfos.emplace_back(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
|
||||||
|
fRegionEvents.emplace(zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), fair::mq::RegionEvent::created);
|
||||||
|
}
|
||||||
|
fRegionEventsCV.notify_one();
|
||||||
|
return ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQTransportFactoryZMQ::SubscribeToRegionEvents(FairMQRegionEventCallback callback)
|
||||||
|
{
|
||||||
|
if (fRegionEventThread.joinable()) {
|
||||||
|
LOG(debug) << "Already subscribed. Overwriting previous subscription.";
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock(fMtx);
|
||||||
|
fRegionEventsSubscriptionActive = false;
|
||||||
|
}
|
||||||
|
fRegionEventsCV.notify_one();
|
||||||
|
fRegionEventThread.join();
|
||||||
|
}
|
||||||
|
lock_guard<mutex> lock(fMtx);
|
||||||
|
fRegionEventCallback = callback;
|
||||||
|
fRegionEventsSubscriptionActive = true;
|
||||||
|
fRegionEventThread = thread(&FairMQTransportFactoryZMQ::RegionEventsSubscription, this);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool FairMQTransportFactoryZMQ::SubscribedToRegionEvents()
|
||||||
|
{
|
||||||
|
return fRegionEventThread.joinable();
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQTransportFactoryZMQ::UnsubscribeFromRegionEvents()
|
||||||
|
{
|
||||||
|
if (fRegionEventThread.joinable()) {
|
||||||
|
unique_lock<mutex> lock(fMtx);
|
||||||
|
fRegionEventsSubscriptionActive = false;
|
||||||
|
lock.unlock();
|
||||||
|
fRegionEventsCV.notify_one();
|
||||||
|
fRegionEventThread.join();
|
||||||
|
lock.lock();
|
||||||
|
fRegionEventCallback = nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQTransportFactoryZMQ::RegionEventsSubscription()
|
||||||
|
{
|
||||||
|
unique_lock<mutex> lock(fMtx);
|
||||||
|
while (fRegionEventsSubscriptionActive) {
|
||||||
|
|
||||||
|
while (!fRegionEvents.empty()) {
|
||||||
|
auto i = fRegionEvents.front();
|
||||||
|
fRegionEventCallback(i);
|
||||||
|
fRegionEvents.pop();
|
||||||
|
}
|
||||||
|
fRegionEventsCV.wait(lock, [&]() { return !fRegionEventsSubscriptionActive || !fRegionEvents.empty(); });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
vector<fair::mq::RegionInfo> FairMQTransportFactoryZMQ::GetRegionInfo()
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock(fMtx);
|
||||||
|
return fRegionInfos;
|
||||||
|
}
|
||||||
|
|
||||||
|
void FairMQTransportFactoryZMQ::RemoveRegion(uint64_t id)
|
||||||
|
{
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock(fMtx);
|
||||||
|
auto it = find_if(fRegionInfos.begin(), fRegionInfos.end(), [id](const fair::mq::RegionInfo& i) {
|
||||||
|
return i.id == id;
|
||||||
|
});
|
||||||
|
if (it != fRegionInfos.end()) {
|
||||||
|
fRegionEvents.push(*it);
|
||||||
|
fRegionEvents.back().event = fair::mq::RegionEvent::destroyed;
|
||||||
|
fRegionInfos.erase(it);
|
||||||
|
} else {
|
||||||
|
LOG(error) << "RemoveRegion: given id (" << id << ") not found.";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fRegionEventsCV.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const
|
fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const
|
||||||
@@ -108,23 +196,19 @@ fair::mq::Transport FairMQTransportFactoryZMQ::GetType() const
|
|||||||
FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ()
|
FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ()
|
||||||
{
|
{
|
||||||
LOG(debug) << "Destroying ZeroMQ transport...";
|
LOG(debug) << "Destroying ZeroMQ transport...";
|
||||||
if (fContext)
|
|
||||||
{
|
UnsubscribeFromRegionEvents();
|
||||||
if (zmq_ctx_term(fContext) != 0)
|
|
||||||
{
|
if (fContext) {
|
||||||
if (errno == EINTR)
|
if (zmq_ctx_term(fContext) != 0) {
|
||||||
{
|
if (errno == EINTR) {
|
||||||
LOG(error) << " failed closing context, reason: " << zmq_strerror(errno);
|
LOG(error) << " failed closing context, reason: " << zmq_strerror(errno);
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
{
|
|
||||||
fContext = nullptr;
|
fContext = nullptr;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
{
|
|
||||||
LOG(error) << "context not available for shutdown";
|
LOG(error) << "context not available for shutdown";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -15,9 +15,6 @@
|
|||||||
#ifndef FAIRMQTRANSPORTFACTORYZMQ_H_
|
#ifndef FAIRMQTRANSPORTFACTORYZMQ_H_
|
||||||
#define FAIRMQTRANSPORTFACTORYZMQ_H_
|
#define FAIRMQTRANSPORTFACTORYZMQ_H_
|
||||||
|
|
||||||
#include <vector>
|
|
||||||
#include <string>
|
|
||||||
|
|
||||||
#include "FairMQTransportFactory.h"
|
#include "FairMQTransportFactory.h"
|
||||||
#include "FairMQMessageZMQ.h"
|
#include "FairMQMessageZMQ.h"
|
||||||
#include "FairMQSocketZMQ.h"
|
#include "FairMQSocketZMQ.h"
|
||||||
@@ -25,6 +22,14 @@
|
|||||||
#include "FairMQUnmanagedRegionZMQ.h"
|
#include "FairMQUnmanagedRegionZMQ.h"
|
||||||
#include <fairmq/ProgOptions.h>
|
#include <fairmq/ProgOptions.h>
|
||||||
|
|
||||||
|
#include <condition_variable>
|
||||||
|
#include <functional>
|
||||||
|
#include <mutex>
|
||||||
|
#include <queue>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
|
class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@@ -32,8 +37,6 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
|
|||||||
FairMQTransportFactoryZMQ(const FairMQTransportFactoryZMQ&) = delete;
|
FairMQTransportFactoryZMQ(const FairMQTransportFactoryZMQ&) = delete;
|
||||||
FairMQTransportFactoryZMQ operator=(const FairMQTransportFactoryZMQ&) = delete;
|
FairMQTransportFactoryZMQ operator=(const FairMQTransportFactoryZMQ&) = delete;
|
||||||
|
|
||||||
~FairMQTransportFactoryZMQ() override;
|
|
||||||
|
|
||||||
FairMQMessagePtr CreateMessage() override;
|
FairMQMessagePtr CreateMessage() override;
|
||||||
FairMQMessagePtr CreateMessage(const size_t size) override;
|
FairMQMessagePtr CreateMessage(const size_t size) override;
|
||||||
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
|
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
|
||||||
@@ -45,12 +48,15 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
|
|||||||
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
|
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel*>& channels) const override;
|
||||||
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
||||||
|
|
||||||
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) const override;
|
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
|
||||||
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) const override;
|
FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0) override;
|
||||||
|
|
||||||
void SubscribeToRegionEvents(FairMQRegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for ZeroMQ"; }
|
void SubscribeToRegionEvents(FairMQRegionEventCallback callback) override;
|
||||||
void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for ZeroMQ"; }
|
bool SubscribedToRegionEvents() override;
|
||||||
std::vector<FairMQRegionInfo> GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for ZeroMQ, returning empty vector"; return std::vector<FairMQRegionInfo>(); }
|
void UnsubscribeFromRegionEvents() override;
|
||||||
|
void RegionEventsSubscription();
|
||||||
|
std::vector<fair::mq::RegionInfo> GetRegionInfo() override;
|
||||||
|
void RemoveRegion(uint64_t id);
|
||||||
|
|
||||||
fair::mq::Transport GetType() const override;
|
fair::mq::Transport GetType() const override;
|
||||||
|
|
||||||
@@ -58,9 +64,20 @@ class FairMQTransportFactoryZMQ final : public FairMQTransportFactory
|
|||||||
void Resume() override { FairMQSocketZMQ::Resume(); }
|
void Resume() override { FairMQSocketZMQ::Resume(); }
|
||||||
void Reset() override {}
|
void Reset() override {}
|
||||||
|
|
||||||
|
~FairMQTransportFactoryZMQ() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static fair::mq::Transport fTransportType;
|
static fair::mq::Transport fTransportType;
|
||||||
void* fContext;
|
void* fContext;
|
||||||
|
|
||||||
|
std::mutex fMtx;
|
||||||
|
uint64_t fRegionCounter;
|
||||||
|
std::condition_variable fRegionEventsCV;
|
||||||
|
std::vector<fair::mq::RegionInfo> fRegionInfos;
|
||||||
|
std::queue<fair::mq::RegionInfo> fRegionEvents;
|
||||||
|
std::thread fRegionEventThread;
|
||||||
|
std::function<void(fair::mq::RegionInfo)> fRegionEventCallback;
|
||||||
|
bool fRegionEventsSubscriptionActive;
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */
|
#endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */
|
||||||
|
@@ -7,23 +7,17 @@
|
|||||||
********************************************************************************/
|
********************************************************************************/
|
||||||
|
|
||||||
#include "FairMQUnmanagedRegionZMQ.h"
|
#include "FairMQUnmanagedRegionZMQ.h"
|
||||||
|
#include "FairMQTransportFactoryZMQ.h"
|
||||||
#include "FairMQLogger.h"
|
#include "FairMQLogger.h"
|
||||||
|
|
||||||
using namespace std;
|
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory /* = nullptr */)
|
||||||
|
: FairMQUnmanagedRegion(factory)
|
||||||
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */)
|
, fId(id)
|
||||||
: fBuffer(malloc(size))
|
, fBuffer(malloc(size))
|
||||||
, fSize(size)
|
, fSize(size)
|
||||||
|
, fUserFlags(userFlags)
|
||||||
, fCallback(callback)
|
, fCallback(callback)
|
||||||
{
|
{}
|
||||||
}
|
|
||||||
|
|
||||||
FairMQUnmanagedRegionZMQ::FairMQUnmanagedRegionZMQ(const size_t size, int64_t /* userFlags */, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */)
|
|
||||||
: fBuffer(malloc(size))
|
|
||||||
, fSize(size)
|
|
||||||
, fCallback(callback)
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
void* FairMQUnmanagedRegionZMQ::GetData() const
|
void* FairMQUnmanagedRegionZMQ::GetData() const
|
||||||
{
|
{
|
||||||
@@ -38,5 +32,6 @@ size_t FairMQUnmanagedRegionZMQ::GetSize() const
|
|||||||
FairMQUnmanagedRegionZMQ::~FairMQUnmanagedRegionZMQ()
|
FairMQUnmanagedRegionZMQ::~FairMQUnmanagedRegionZMQ()
|
||||||
{
|
{
|
||||||
LOG(debug) << "destroying region";
|
LOG(debug) << "destroying region";
|
||||||
|
static_cast<FairMQTransportFactoryZMQ*>(GetTransport())->RemoveRegion(fId);
|
||||||
free(fBuffer);
|
free(fBuffer);
|
||||||
}
|
}
|
||||||
|
@@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
#include <cstddef> // size_t
|
#include <cstddef> // size_t
|
||||||
#include <string>
|
#include <string>
|
||||||
|
class FairMQTransportFactory;
|
||||||
|
|
||||||
class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
|
class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
|
||||||
{
|
{
|
||||||
@@ -20,19 +21,23 @@ class FairMQUnmanagedRegionZMQ final : public FairMQUnmanagedRegion
|
|||||||
friend class FairMQMessageZMQ;
|
friend class FairMQMessageZMQ;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
FairMQUnmanagedRegionZMQ(const size_t size, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
|
FairMQUnmanagedRegionZMQ(uint64_t id, const size_t size, int64_t userFlags, FairMQRegionCallback callback, const std::string& /* path = "" */, int /* flags = 0 */, FairMQTransportFactory* factory = nullptr);
|
||||||
FairMQUnmanagedRegionZMQ(const size_t size, const int64_t userFlags, FairMQRegionCallback callback, const std::string& path = "", int flags = 0);
|
|
||||||
FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete;
|
FairMQUnmanagedRegionZMQ(const FairMQUnmanagedRegionZMQ&) = delete;
|
||||||
FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;
|
FairMQUnmanagedRegionZMQ operator=(const FairMQUnmanagedRegionZMQ&) = delete;
|
||||||
|
|
||||||
virtual void* GetData() const override;
|
virtual void* GetData() const override;
|
||||||
virtual size_t GetSize() const override;
|
virtual size_t GetSize() const override;
|
||||||
|
uint64_t GetId() const override { return fId; }
|
||||||
|
int64_t GetUserFlags() const { return fUserFlags; }
|
||||||
|
|
||||||
virtual ~FairMQUnmanagedRegionZMQ();
|
virtual ~FairMQUnmanagedRegionZMQ();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
uint64_t fId;
|
||||||
void* fBuffer;
|
void* fBuffer;
|
||||||
size_t fSize;
|
size_t fSize;
|
||||||
|
int64_t fUserFlags;
|
||||||
FairMQRegionCallback fCallback;
|
FairMQRegionCallback fCallback;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@@ -96,6 +96,19 @@ add_testsuite(Message
|
|||||||
${definitions}
|
${definitions}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
add_testsuite(Region
|
||||||
|
SOURCES
|
||||||
|
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
|
||||||
|
region/_region.cxx
|
||||||
|
|
||||||
|
LINKS FairMQ
|
||||||
|
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}
|
||||||
|
${CMAKE_CURRENT_SOURCE_DIR}/region
|
||||||
|
${CMAKE_CURRENT_BINARY_DIR}
|
||||||
|
TIMEOUT 5
|
||||||
|
${definitions}
|
||||||
|
)
|
||||||
|
|
||||||
add_testsuite(Device
|
add_testsuite(Device
|
||||||
SOURCES
|
SOURCES
|
||||||
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
|
${CMAKE_CURRENT_BINARY_DIR}/runner.cxx
|
||||||
|
103
test/region/_region.cxx
Normal file
103
test/region/_region.cxx
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#include <FairMQLogger.h>
|
||||||
|
#include <FairMQTransportFactory.h>
|
||||||
|
#include <fairmq/ProgOptions.h>
|
||||||
|
#include <fairmq/Tools.h>
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
|
||||||
|
#include <string>
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
void RegionEventSubscriptions(const string& transport)
|
||||||
|
{
|
||||||
|
size_t session{fair::mq::tools::UuidHash()};
|
||||||
|
|
||||||
|
fair::mq::ProgOptions config;
|
||||||
|
config.SetProperty<string>("session", to_string(session));
|
||||||
|
|
||||||
|
auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config);
|
||||||
|
|
||||||
|
constexpr int size1 = 1000000;
|
||||||
|
constexpr int size2 = 5000000;
|
||||||
|
constexpr int64_t userFlags = 12345;
|
||||||
|
fair::mq::tools::SharedSemaphore blocker;
|
||||||
|
|
||||||
|
{
|
||||||
|
auto region1 = factory->CreateUnmanagedRegion(size1, [](void*, size_t, void*) {});
|
||||||
|
void* ptr1 = region1->GetData();
|
||||||
|
uint64_t id1 = region1->GetId();
|
||||||
|
ASSERT_EQ(region1->GetSize(), size1);
|
||||||
|
|
||||||
|
auto region2 = factory->CreateUnmanagedRegion(size2, userFlags, [](void*, size_t, void*) {});
|
||||||
|
void* ptr2 = region2->GetData();
|
||||||
|
uint64_t id2 = region2->GetId();
|
||||||
|
ASSERT_EQ(region2->GetSize(), size2);
|
||||||
|
|
||||||
|
ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
|
||||||
|
factory->SubscribeToRegionEvents([&](FairMQRegionInfo info) {
|
||||||
|
LOG(warn) << ">>>" << info.event;
|
||||||
|
LOG(warn) << "id: " << info.id;
|
||||||
|
LOG(warn) << "ptr: " << info.ptr;
|
||||||
|
LOG(warn) << "size: " << info.size;
|
||||||
|
LOG(warn) << "flags: " << info.flags;
|
||||||
|
if (info.event == FairMQRegionEvent::created) {
|
||||||
|
if (info.id == id1) {
|
||||||
|
ASSERT_EQ(info.size, size1);
|
||||||
|
ASSERT_EQ(info.ptr, ptr1);
|
||||||
|
blocker.Signal();
|
||||||
|
} else if (info.id == id2) {
|
||||||
|
ASSERT_EQ(info.size, size2);
|
||||||
|
ASSERT_EQ(info.ptr, ptr2);
|
||||||
|
ASSERT_EQ(info.flags, userFlags);
|
||||||
|
blocker.Signal();
|
||||||
|
}
|
||||||
|
} else if (info.event == FairMQRegionEvent::destroyed) {
|
||||||
|
if (info.id == id1) {
|
||||||
|
blocker.Signal();
|
||||||
|
} else if (info.id == id2) {
|
||||||
|
blocker.Signal();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
ASSERT_EQ(factory->SubscribedToRegionEvents(), true);
|
||||||
|
|
||||||
|
LOG(info) << "waiting for blockers...";
|
||||||
|
blocker.Wait();
|
||||||
|
LOG(info) << "1 done.";
|
||||||
|
blocker.Wait();
|
||||||
|
LOG(info) << "2 done.";
|
||||||
|
}
|
||||||
|
|
||||||
|
blocker.Wait();
|
||||||
|
LOG(info) << "3 done.";
|
||||||
|
blocker.Wait();
|
||||||
|
LOG(info) << "4 done.";
|
||||||
|
LOG(info) << "All done.";
|
||||||
|
|
||||||
|
factory->UnsubscribeFromRegionEvents();
|
||||||
|
ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(EventSubscriptions, zeromq)
|
||||||
|
{
|
||||||
|
RegionEventSubscriptions("zeromq");
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(EventSubscriptions, shmem)
|
||||||
|
{
|
||||||
|
RegionEventSubscriptions("shmem");
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace
|
Reference in New Issue
Block a user