mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
consolidate UnmanagedRegion options
This commit is contained in:
parent
acfb495411
commit
9075bd6930
|
@ -32,22 +32,20 @@ struct Sampler : fair::mq::Device
|
||||||
<< ", flags: " << info.flags;
|
<< ", flags: " << info.flags;
|
||||||
});
|
});
|
||||||
|
|
||||||
|
fair::mq::RegionConfig regionCfg;
|
||||||
|
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
|
||||||
|
regionCfg.lock = true; // mlock region after creation
|
||||||
|
regionCfg.zero = true; // zero region content after creation
|
||||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
|
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
|
||||||
0, // ... and this sub-channel
|
0, // ... and this sub-channel
|
||||||
10000000, // region size
|
10000000, // region size
|
||||||
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
||||||
std::lock_guard<std::mutex> lock(fMtx);
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
fNumUnackedMsgs -= blocks.size();
|
fNumUnackedMsgs -= blocks.size();
|
||||||
|
|
||||||
if (fMaxIterations > 0) {
|
if (fMaxIterations > 0) {
|
||||||
LOG(info) << "Received " << blocks.size() << " acks";
|
LOG(info) << "Received " << blocks.size() << " acks";
|
||||||
}
|
}
|
||||||
},
|
}, regionCfg));
|
||||||
"", // path, if a region is backed by a file
|
|
||||||
0, // flags that are passed for region creation
|
|
||||||
fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory }
|
|
||||||
));
|
|
||||||
fRegion->SetLinger(fLinger);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ConditionalRun() override
|
bool ConditionalRun() override
|
||||||
|
|
|
@ -109,13 +109,15 @@ class TransportFactory
|
||||||
/// @param path optional parameter to pass to the underlying transport
|
/// @param path optional parameter to pass to the underlying transport
|
||||||
/// @param flags optional parameter to pass to the underlying transport
|
/// @param flags optional parameter to pass to the underlying transport
|
||||||
/// @return pointer to UnmanagedRegion
|
/// @return pointer to UnmanagedRegion
|
||||||
|
// [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]]
|
||||||
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
|
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
|
||||||
RegionCallback callback = nullptr,
|
RegionCallback callback = nullptr,
|
||||||
const std::string& path = "",
|
const std::string& path = "",
|
||||||
int flags = 0,
|
int flags = 0,
|
||||||
RegionConfig cfg = RegionConfig()) = 0;
|
RegionConfig cfg = RegionConfig()) = 0;
|
||||||
|
// [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]]
|
||||||
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
|
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
|
||||||
RegionBulkCallback callback = nullptr,
|
RegionBulkCallback bulkCallback = nullptr,
|
||||||
const std::string& path = "",
|
const std::string& path = "",
|
||||||
int flags = 0,
|
int flags = 0,
|
||||||
RegionConfig cfg = RegionConfig()) = 0;
|
RegionConfig cfg = RegionConfig()) = 0;
|
||||||
|
@ -128,19 +130,36 @@ class TransportFactory
|
||||||
/// @param path optional parameter to pass to the underlying transport
|
/// @param path optional parameter to pass to the underlying transport
|
||||||
/// @param flags optional parameter to pass to the underlying transport
|
/// @param flags optional parameter to pass to the underlying transport
|
||||||
/// @return pointer to UnmanagedRegion
|
/// @return pointer to UnmanagedRegion
|
||||||
|
// [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]]
|
||||||
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
|
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
|
||||||
int64_t userFlags,
|
int64_t userFlags,
|
||||||
RegionCallback callback = nullptr,
|
RegionCallback callback = nullptr,
|
||||||
const std::string& path = "",
|
const std::string& path = "",
|
||||||
int flags = 0,
|
int flags = 0,
|
||||||
RegionConfig cfg = RegionConfig()) = 0;
|
RegionConfig cfg = RegionConfig()) = 0;
|
||||||
|
// [[deprecated("Use CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg)")]]
|
||||||
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
|
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size,
|
||||||
int64_t userFlags,
|
int64_t userFlags,
|
||||||
RegionBulkCallback callback = nullptr,
|
RegionBulkCallback bulkCallback = nullptr,
|
||||||
const std::string& path = "",
|
const std::string& path = "",
|
||||||
int flags = 0,
|
int flags = 0,
|
||||||
RegionConfig cfg = RegionConfig()) = 0;
|
RegionConfig cfg = RegionConfig()) = 0;
|
||||||
|
|
||||||
|
|
||||||
|
/// @brief Create new UnmanagedRegion
|
||||||
|
/// @param size size of the region
|
||||||
|
/// @param callback callback to be called when a message belonging to this region is no longer needed by the transport
|
||||||
|
/// @param cfg region configuration
|
||||||
|
/// @return pointer to UnmanagedRegion
|
||||||
|
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg) = 0;
|
||||||
|
|
||||||
|
/// @brief Create new UnmanagedRegion
|
||||||
|
/// @param size size of the region
|
||||||
|
/// @param bulkCallback callback to be called when message(s) belonging to this region is no longer needed by the transport
|
||||||
|
/// @param cfg region configuration
|
||||||
|
/// @return pointer to UnmanagedRegion
|
||||||
|
virtual UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback, RegionConfig cfg) = 0;
|
||||||
|
|
||||||
/// @brief Subscribe to region events (creation, destruction, ...)
|
/// @brief Subscribe to region events (creation, destruction, ...)
|
||||||
/// @param callback the callback that is called when a region event occurs
|
/// @param callback the callback that is called when a region event occurs
|
||||||
virtual void SubscribeToRegionEvents(RegionEventCallback callback) = 0;
|
virtual void SubscribeToRegionEvents(RegionEventCallback callback) = 0;
|
||||||
|
|
|
@ -9,12 +9,15 @@
|
||||||
#ifndef FAIR_MQ_UNMANAGEDREGION_H
|
#ifndef FAIR_MQ_UNMANAGEDREGION_H
|
||||||
#define FAIR_MQ_UNMANAGEDREGION_H
|
#define FAIR_MQ_UNMANAGEDREGION_H
|
||||||
|
|
||||||
|
#include <fairmq/Transports.h>
|
||||||
|
|
||||||
#include <cstddef> // size_t
|
#include <cstddef> // size_t
|
||||||
#include <cstdint> // uint32_t
|
#include <cstdint> // uint32_t
|
||||||
#include <fairmq/Transports.h>
|
|
||||||
#include <functional> // std::function
|
#include <functional> // std::function
|
||||||
#include <memory> // std::unique_ptr
|
#include <memory> // std::unique_ptr
|
||||||
#include <ostream> // std::ostream
|
#include <ostream>
|
||||||
|
#include <string>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
namespace fair::mq {
|
namespace fair::mq {
|
||||||
|
@ -119,13 +122,17 @@ struct RegionConfig
|
||||||
{
|
{
|
||||||
RegionConfig() = default;
|
RegionConfig() = default;
|
||||||
|
|
||||||
RegionConfig(bool l, bool z)
|
RegionConfig(bool _lock, bool _zero)
|
||||||
: lock(l)
|
: lock(_lock)
|
||||||
, zero(z)
|
, zero(_zero)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
bool lock = false;
|
bool lock = false; /// mlock region after creation
|
||||||
bool zero = false;
|
bool zero = false; /// zero region content after creation
|
||||||
|
int creationFlags = 0; /// flags passed to the underlying transport on region creation
|
||||||
|
int64_t userFlags = 0; /// custom flags that have no effect on the transport, but can be retrieved from the region by the user
|
||||||
|
std::string path = ""; /// file path, if the region is backed by a file
|
||||||
|
uint32_t linger = 100; /// delay in ms before region destruction to collect outstanding events
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace fair::mq
|
} // namespace fair::mq
|
||||||
|
|
|
@ -166,6 +166,22 @@ struct TransportFactory final : mq::TransportFactory
|
||||||
throw std::runtime_error("Not yet implemented UMR.");
|
throw std::runtime_error("Not yet implemented UMR.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto CreateUnmanagedRegion(std::size_t /*size*/,
|
||||||
|
RegionCallback /*callback*/,
|
||||||
|
RegionConfig /*cfg*/)
|
||||||
|
-> std::unique_ptr<mq::UnmanagedRegion> override
|
||||||
|
{
|
||||||
|
throw std::runtime_error("Not yet implemented UMR.");
|
||||||
|
}
|
||||||
|
|
||||||
|
auto CreateUnmanagedRegion(std::size_t /*size*/,
|
||||||
|
RegionBulkCallback /*callback*/,
|
||||||
|
RegionConfig /*cfg*/)
|
||||||
|
-> std::unique_ptr<mq::UnmanagedRegion> override
|
||||||
|
{
|
||||||
|
throw std::runtime_error("Not yet implemented UMR.");
|
||||||
|
}
|
||||||
|
|
||||||
auto SubscribeToRegionEvents(RegionEventCallback /*callback*/) -> void override
|
auto SubscribeToRegionEvents(RegionEventCallback /*callback*/) -> void override
|
||||||
{
|
{
|
||||||
throw std::runtime_error("Not yet implemented.");
|
throw std::runtime_error("Not yet implemented.");
|
||||||
|
|
|
@ -60,20 +60,20 @@ struct RegionInfo
|
||||||
{
|
{
|
||||||
RegionInfo(const VoidAlloc& alloc)
|
RegionInfo(const VoidAlloc& alloc)
|
||||||
: fPath("", alloc)
|
: fPath("", alloc)
|
||||||
, fFlags(0)
|
, fCreationFlags(0)
|
||||||
, fUserFlags(0)
|
, fUserFlags(0)
|
||||||
, fDestroyed(false)
|
, fDestroyed(false)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
RegionInfo(const char* path, const int flags, const uint64_t userFlags, const VoidAlloc& alloc)
|
RegionInfo(const char* path, const int flags, const uint64_t userFlags, const VoidAlloc& alloc)
|
||||||
: fPath(path, alloc)
|
: fPath(path, alloc)
|
||||||
, fFlags(flags)
|
, fCreationFlags(flags)
|
||||||
, fUserFlags(userFlags)
|
, fUserFlags(userFlags)
|
||||||
, fDestroyed(false)
|
, fDestroyed(false)
|
||||||
{}
|
{}
|
||||||
|
|
||||||
Str fPath;
|
Str fPath;
|
||||||
int fFlags;
|
int fCreationFlags;
|
||||||
uint64_t fUserFlags;
|
uint64_t fUserFlags;
|
||||||
bool fDestroyed;
|
bool fDestroyed;
|
||||||
};
|
};
|
||||||
|
|
|
@ -12,12 +12,12 @@
|
||||||
#include "Common.h"
|
#include "Common.h"
|
||||||
#include "Monitor.h"
|
#include "Monitor.h"
|
||||||
#include "Region.h"
|
#include "Region.h"
|
||||||
|
#include <fairmq/Message.h>
|
||||||
#include <fairmq/ProgOptions.h>
|
#include <fairmq/ProgOptions.h>
|
||||||
#include <fairmq/tools/Strings.h>
|
#include <fairmq/tools/Strings.h>
|
||||||
#include <fairmq/Transports.h>
|
#include <fairmq/Transports.h>
|
||||||
#include <FairMQLogger.h>
|
|
||||||
#include <FairMQMessage.h>
|
#include <fairlogger/Logger.h>
|
||||||
|
|
||||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||||
#include <boost/filesystem.hpp>
|
#include <boost/filesystem.hpp>
|
||||||
|
@ -369,19 +369,15 @@ class Manager
|
||||||
bool Interrupted() { return fInterrupted.load(); }
|
bool Interrupted() { return fInterrupted.load(); }
|
||||||
|
|
||||||
std::pair<boost::interprocess::mapped_region*, uint16_t> CreateRegion(const size_t size,
|
std::pair<boost::interprocess::mapped_region*, uint16_t> CreateRegion(const size_t size,
|
||||||
const int64_t userFlags,
|
|
||||||
RegionCallback callback,
|
RegionCallback callback,
|
||||||
RegionBulkCallback bulkCallback,
|
RegionBulkCallback bulkCallback,
|
||||||
const std::string& path,
|
RegionConfig cfg)
|
||||||
int flags,
|
|
||||||
fair::mq::RegionConfig cfg)
|
|
||||||
{
|
{
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
try {
|
try {
|
||||||
std::pair<mapped_region*, uint16_t> result;
|
std::pair<mapped_region*, uint16_t> result;
|
||||||
|
|
||||||
{
|
{
|
||||||
uint16_t id = 0;
|
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||||
|
|
||||||
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
|
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
|
||||||
|
@ -396,7 +392,7 @@ class Manager
|
||||||
LOG(debug) << "initialized region counter with: " << rc->fCount;
|
LOG(debug) << "initialized region counter with: " << rc->fCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
id = rc->fCount;
|
uint16_t id = rc->fCount;
|
||||||
|
|
||||||
auto it = fRegions.find(id);
|
auto it = fRegions.find(id);
|
||||||
if (it != fRegions.end()) {
|
if (it != fRegions.end()) {
|
||||||
|
@ -404,8 +400,8 @@ class Manager
|
||||||
return {nullptr, id};
|
return {nullptr, id};
|
||||||
}
|
}
|
||||||
|
|
||||||
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
|
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, cfg));
|
||||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
if (cfg.lock) {
|
if (cfg.lock) {
|
||||||
LOG(debug) << "Locking region " << id << "...";
|
LOG(debug) << "Locking region " << id << "...";
|
||||||
|
@ -421,7 +417,7 @@ class Manager
|
||||||
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||||
}
|
}
|
||||||
|
|
||||||
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
|
fShmRegions->emplace(id, RegionInfo(cfg.path.c_str(), cfg.creationFlags, cfg.userFlags, fShmVoidAlloc));
|
||||||
|
|
||||||
r.first->second->StartReceivingAcks();
|
r.first->second->StartReceivingAcks();
|
||||||
result.first = &(r.first->second->fRegion);
|
result.first = &(r.first->second->fRegion);
|
||||||
|
@ -476,11 +472,12 @@ class Manager
|
||||||
try {
|
try {
|
||||||
// get region info
|
// get region info
|
||||||
RegionInfo regionInfo = fShmRegions->at(id);
|
RegionInfo regionInfo = fShmRegions->at(id);
|
||||||
std::string path = regionInfo.fPath.c_str();
|
RegionConfig cfg;
|
||||||
int flags = regionInfo.fFlags;
|
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
cfg.path = regionInfo.fPath.c_str();
|
||||||
|
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, 0, true, nullptr, nullptr, path, flags));
|
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, 0, true, nullptr, nullptr, std::move(cfg)));
|
||||||
return r.first->second.get();
|
return r.first->second.get();
|
||||||
} catch (std::out_of_range& oor) {
|
} catch (std::out_of_range& oor) {
|
||||||
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
||||||
|
|
|
@ -12,9 +12,10 @@
|
||||||
#include "Manager.h"
|
#include "Manager.h"
|
||||||
#include "Region.h"
|
#include "Region.h"
|
||||||
#include "UnmanagedRegion.h"
|
#include "UnmanagedRegion.h"
|
||||||
#include <FairMQLogger.h>
|
#include <fairmq/Message.h>
|
||||||
#include <FairMQMessage.h>
|
#include <fairmq/UnmanagedRegion.h>
|
||||||
#include <FairMQUnmanagedRegion.h>
|
|
||||||
|
#include <fairlogger/Logger.h>
|
||||||
|
|
||||||
#include <boost/interprocess/mapped_region.hpp>
|
#include <boost/interprocess/mapped_region.hpp>
|
||||||
|
|
||||||
|
|
|
@ -545,7 +545,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmId, b
|
||||||
if (m != nullptr) {
|
if (m != nullptr) {
|
||||||
RegionInfo ri = m->at(i);
|
RegionInfo ri = m->at(i);
|
||||||
string path = ri.fPath.c_str();
|
string path = ri.fPath.c_str();
|
||||||
int flags = ri.fFlags;
|
int flags = ri.fCreationFlags;
|
||||||
if (verbose) {
|
if (verbose) {
|
||||||
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << ".";
|
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << ".";
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,11 +10,11 @@
|
||||||
#define FAIR_MQ_SHMEM_REGION_H_
|
#define FAIR_MQ_SHMEM_REGION_H_
|
||||||
|
|
||||||
#include "Common.h"
|
#include "Common.h"
|
||||||
|
#include <fairmq/UnmanagedRegion.h>
|
||||||
#include <FairMQLogger.h>
|
|
||||||
#include <FairMQUnmanagedRegion.h>
|
|
||||||
#include <fairmq/tools/Strings.h>
|
#include <fairmq/tools/Strings.h>
|
||||||
|
|
||||||
|
#include <fairlogger/Logger.h>
|
||||||
|
|
||||||
#include <boost/filesystem.hpp>
|
#include <boost/filesystem.hpp>
|
||||||
#include <boost/date_time/posix_time/posix_time.hpp>
|
#include <boost/date_time/posix_time/posix_time.hpp>
|
||||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||||
|
@ -38,9 +38,9 @@ namespace fair::mq::shmem
|
||||||
|
|
||||||
struct Region
|
struct Region
|
||||||
{
|
{
|
||||||
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
|
Region(const std::string& shmId, uint16_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, RegionConfig cfg)
|
||||||
: fRemote(remote)
|
: fRemote(remote)
|
||||||
, fLinger(100)
|
, fLinger(cfg.linger)
|
||||||
, fStopAcks(false)
|
, fStopAcks(false)
|
||||||
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
|
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
|
||||||
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
|
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
|
||||||
|
@ -53,8 +53,8 @@ struct Region
|
||||||
{
|
{
|
||||||
using namespace boost::interprocess;
|
using namespace boost::interprocess;
|
||||||
|
|
||||||
if (!path.empty()) {
|
if (!cfg.path.empty()) {
|
||||||
fName = std::string(path + fName);
|
fName = std::string(cfg.path + fName);
|
||||||
|
|
||||||
if (!fRemote) {
|
if (!fRemote) {
|
||||||
// create a file
|
// create a file
|
||||||
|
@ -75,7 +75,7 @@ struct Region
|
||||||
}
|
}
|
||||||
fFileMapping = file_mapping(fName.c_str(), read_write);
|
fFileMapping = file_mapping(fName.c_str(), read_write);
|
||||||
LOG(debug) << "shmem: initialized file: " << fName;
|
LOG(debug) << "shmem: initialized file: " << fName;
|
||||||
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, flags);
|
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
if (fRemote) {
|
if (fRemote) {
|
||||||
|
@ -84,19 +84,18 @@ struct Region
|
||||||
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
||||||
fShmemObject.truncate(size);
|
fShmemObject.truncate(size);
|
||||||
}
|
}
|
||||||
} catch(interprocess_exception& e) {
|
} catch (interprocess_exception& e) {
|
||||||
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
|
LOG(error) << "Failed " << (fRemote ? "opening" : "creating") << " shared_memory_object for region id '" << id << "': " << e.what();
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, flags);
|
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags);
|
||||||
} catch(interprocess_exception& e) {
|
} catch (interprocess_exception& e) {
|
||||||
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
|
LOG(error) << "Failed mapping shared_memory_object for region id '" << id << "': " << e.what();
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
InitializeQueues();
|
InitializeQueues();
|
||||||
StartSendingAcks();
|
StartSendingAcks();
|
||||||
|
|
||||||
|
|
|
@ -11,13 +11,13 @@
|
||||||
#include "Common.h"
|
#include "Common.h"
|
||||||
#include "Manager.h"
|
#include "Manager.h"
|
||||||
#include "Message.h"
|
#include "Message.h"
|
||||||
|
#include <fairmq/Socket.h>
|
||||||
#include <FairMQSocket.h>
|
#include <fairmq/Message.h>
|
||||||
#include <FairMQMessage.h>
|
|
||||||
#include <FairMQLogger.h>
|
|
||||||
#include <fairmq/tools/Strings.h>
|
#include <fairmq/tools/Strings.h>
|
||||||
#include <fairmq/zeromq/Common.h>
|
#include <fairmq/zeromq/Common.h>
|
||||||
|
|
||||||
|
#include <fairlogger/Logger.h>
|
||||||
|
|
||||||
#include <zmq.h>
|
#include <zmq.h>
|
||||||
|
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
|
|
@ -9,17 +9,17 @@
|
||||||
#ifndef FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
|
#ifndef FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
|
||||||
#define FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
|
#define FAIR_MQ_SHMEM_TRANSPORTFACTORY_H_
|
||||||
|
|
||||||
#include "Manager.h"
|
|
||||||
#include "Common.h"
|
#include "Common.h"
|
||||||
|
#include "Manager.h"
|
||||||
#include "Message.h"
|
#include "Message.h"
|
||||||
#include "Socket.h"
|
|
||||||
#include "Poller.h"
|
#include "Poller.h"
|
||||||
|
#include "Socket.h"
|
||||||
#include "UnmanagedRegion.h"
|
#include "UnmanagedRegion.h"
|
||||||
|
|
||||||
#include <FairMQTransportFactory.h>
|
|
||||||
#include <fairmq/ProgOptions.h>
|
#include <fairmq/ProgOptions.h>
|
||||||
#include <FairMQLogger.h>
|
|
||||||
#include <fairmq/tools/Strings.h>
|
#include <fairmq/tools/Strings.h>
|
||||||
|
#include <fairmq/TransportFactory.h>
|
||||||
|
|
||||||
|
#include <fairlogger/Logger.h>
|
||||||
|
|
||||||
#include <boost/version.hpp>
|
#include <boost/version.hpp>
|
||||||
|
|
||||||
|
@ -145,27 +145,46 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
|
cfg.path = path;
|
||||||
|
cfg.creationFlags = flags;
|
||||||
|
return CreateUnmanagedRegion(size, callback, nullptr, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
|
cfg.path = path;
|
||||||
|
cfg.creationFlags = flags;
|
||||||
|
return CreateUnmanagedRegion(size, nullptr, bulkCallback, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
|
cfg.path = path;
|
||||||
|
cfg.userFlags = userFlags;
|
||||||
|
cfg.creationFlags = flags;
|
||||||
|
return CreateUnmanagedRegion(size, callback, nullptr, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||||
{
|
{
|
||||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
|
cfg.path = path;
|
||||||
|
cfg.userFlags = userFlags;
|
||||||
|
cfg.creationFlags = flags;
|
||||||
|
return CreateUnmanagedRegion(size, nullptr, bulkCallback, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg)
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg) override
|
||||||
{
|
{
|
||||||
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg);
|
return CreateUnmanagedRegion(size, callback, nullptr, cfg);
|
||||||
|
}
|
||||||
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback, RegionConfig cfg) override
|
||||||
|
{
|
||||||
|
return CreateUnmanagedRegion(size, nullptr, bulkCallback, cfg);
|
||||||
|
}
|
||||||
|
|
||||||
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionBulkCallback bulkCallback, fair::mq::RegionConfig cfg)
|
||||||
|
{
|
||||||
|
return std::make_unique<UnmanagedRegion>(*fManager, size, callback, bulkCallback, std::move(cfg), this);
|
||||||
}
|
}
|
||||||
|
|
||||||
void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }
|
void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }
|
||||||
|
|
|
@ -10,9 +10,9 @@
|
||||||
#define FAIR_MQ_SHMEM_UNMANAGEDREGION_H_
|
#define FAIR_MQ_SHMEM_UNMANAGEDREGION_H_
|
||||||
|
|
||||||
#include "Manager.h"
|
#include "Manager.h"
|
||||||
|
#include <fairmq/UnmanagedRegion.h>
|
||||||
|
|
||||||
#include <FairMQUnmanagedRegion.h>
|
#include <fairlogger/Logger.h>
|
||||||
#include <FairMQLogger.h>
|
|
||||||
|
|
||||||
#include <boost/interprocess/shared_memory_object.hpp>
|
#include <boost/interprocess/shared_memory_object.hpp>
|
||||||
#include <boost/interprocess/mapped_region.hpp>
|
#include <boost/interprocess/mapped_region.hpp>
|
||||||
|
@ -34,19 +34,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
||||||
public:
|
public:
|
||||||
UnmanagedRegion(Manager& manager,
|
UnmanagedRegion(Manager& manager,
|
||||||
const size_t size,
|
const size_t size,
|
||||||
const int64_t userFlags,
|
|
||||||
RegionCallback callback,
|
RegionCallback callback,
|
||||||
RegionBulkCallback bulkCallback,
|
RegionBulkCallback bulkCallback,
|
||||||
const std::string& path,
|
fair::mq::RegionConfig cfg,
|
||||||
int flags,
|
FairMQTransportFactory* factory)
|
||||||
FairMQTransportFactory* factory,
|
|
||||||
fair::mq::RegionConfig cfg)
|
|
||||||
: FairMQUnmanagedRegion(factory)
|
: FairMQUnmanagedRegion(factory)
|
||||||
, fManager(manager)
|
, fManager(manager)
|
||||||
, fRegion(nullptr)
|
, fRegion(nullptr)
|
||||||
, fRegionId(0)
|
, fRegionId(0)
|
||||||
{
|
{
|
||||||
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg);
|
auto result = fManager.CreateRegion(size, callback, bulkCallback, std::move(cfg));
|
||||||
fRegion = result.first;
|
fRegion = result.first;
|
||||||
fRegionId = result.second;
|
fRegionId = result.second;
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,6 +118,15 @@ class TransportFactory final : public FairMQTransportFactory
|
||||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
|
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionCallback callback, RegionConfig cfg) override
|
||||||
|
{
|
||||||
|
return CreateUnmanagedRegion(size, cfg.userFlags, callback, nullptr, cfg.path, cfg.creationFlags, cfg);
|
||||||
|
}
|
||||||
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, RegionBulkCallback bulkCallback, RegionConfig cfg) override
|
||||||
|
{
|
||||||
|
return CreateUnmanagedRegion(size, cfg.userFlags, nullptr, bulkCallback, cfg.path, cfg.creationFlags, cfg);
|
||||||
|
}
|
||||||
|
|
||||||
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg)
|
UnmanagedRegionPtr CreateUnmanagedRegion(size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg)
|
||||||
{
|
{
|
||||||
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);
|
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);
|
||||||
|
|
Loading…
Reference in New Issue
Block a user