mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
add mlock/zero options to unmanaged region
This commit is contained in:
parent
4bc392be54
commit
6e567ba496
|
@ -39,9 +39,9 @@ void Sampler::InitTask()
|
|||
<< ", flags: " << info.flags;
|
||||
});
|
||||
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
|
||||
0,
|
||||
10000000,
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
|
||||
0, // ... and this sub-channel
|
||||
10000000, // region size
|
||||
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fNumUnackedMsgs -= blocks.size();
|
||||
|
@ -49,7 +49,10 @@ void Sampler::InitTask()
|
|||
if (fMaxIterations > 0) {
|
||||
LOG(info) << "Received " << blocks.size() << " acks";
|
||||
}
|
||||
}
|
||||
},
|
||||
"", // path, if a region is backed by a file
|
||||
0, // flags that are passed for region creation
|
||||
fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory }
|
||||
));
|
||||
fRegion->SetLinger(fLinger);
|
||||
}
|
||||
|
|
|
@ -92,8 +92,8 @@ class FairMQTransportFactory
|
|||
/// @param path optional parameter to pass to the underlying transport
|
||||
/// @param flags optional parameter to pass to the underlying transport
|
||||
/// @return pointer to UnmanagedRegion
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||
/// @brief Create new UnmanagedRegion
|
||||
/// @param size size of the region
|
||||
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
|
||||
|
@ -101,8 +101,8 @@ class FairMQTransportFactory
|
|||
/// @param path optional parameter to pass to the underlying transport
|
||||
/// @param flags optional parameter to pass to the underlying transport
|
||||
/// @return pointer to UnmanagedRegion
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
|
||||
|
||||
/// @brief Subscribe to region events (creation, destruction, ...)
|
||||
/// @param callback the callback that is called when a region event occurs
|
||||
|
|
|
@ -108,6 +108,19 @@ inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event
|
|||
namespace fair::mq
|
||||
{
|
||||
|
||||
struct RegionConfig {
|
||||
bool lock;
|
||||
bool zero;
|
||||
|
||||
RegionConfig()
|
||||
: lock(false), zero(false)
|
||||
{}
|
||||
|
||||
RegionConfig(bool l, bool z)
|
||||
: lock(l), zero(z)
|
||||
{}
|
||||
};
|
||||
|
||||
using RegionCallback = FairMQRegionCallback;
|
||||
using RegionBulkCallback = FairMQRegionBulkCallback;
|
||||
using RegionEventCallback = FairMQRegionEventCallback;
|
||||
|
|
|
@ -92,22 +92,22 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
|
|||
// return PollerPtr{new Poller(channelsMap, channelList)};
|
||||
}
|
||||
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||
{
|
||||
throw runtime_error{"Not yet implemented UMR."};
|
||||
}
|
||||
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||
{
|
||||
throw runtime_error{"Not yet implemented UMR."};
|
||||
}
|
||||
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||
{
|
||||
throw runtime_error{"Not yet implemented UMR."};
|
||||
}
|
||||
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr
|
||||
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
|
||||
{
|
||||
throw runtime_error{"Not yet implemented UMR."};
|
||||
}
|
||||
|
|
|
@ -277,8 +277,9 @@ class Manager
|
|||
const int64_t userFlags,
|
||||
RegionCallback callback,
|
||||
RegionBulkCallback bulkCallback,
|
||||
const std::string& path = "",
|
||||
int flags = 0)
|
||||
const std::string& path,
|
||||
int flags,
|
||||
fair::mq::RegionConfig cfg)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
try {
|
||||
|
@ -311,6 +312,19 @@ class Manager
|
|||
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
|
||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
|
||||
|
||||
if (cfg.lock) {
|
||||
LOG(debug) << "Locking region " << id << "...";
|
||||
if (mlock(r.first->second->fRegion.get_address(), r.first->second->fRegion.get_size()) == -1) {
|
||||
LOG(error) << "Could not lock region " << id << ". Code: " << errno << ", reason: " << strerror(errno);
|
||||
}
|
||||
LOG(debug) << "Successfully locked region " << id << ".";
|
||||
}
|
||||
if (cfg.zero) {
|
||||
LOG(debug) << "Zeroing free memory of region " << id << "...";
|
||||
memset(r.first->second->fRegion.get_address(), 0x00, r.first->second->fRegion.get_size());
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
|
||||
}
|
||||
|
||||
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
|
||||
|
||||
r.first->second->InitializeQueues();
|
||||
|
|
|
@ -141,29 +141,29 @@ class TransportFactory final : public fair::mq::TransportFactory
|
|||
return std::make_unique<Poller>(channelsMap, channelList);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
|
||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
|
||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
|
||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
|
||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg)
|
||||
{
|
||||
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this);
|
||||
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg);
|
||||
}
|
||||
|
||||
void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }
|
||||
|
|
|
@ -37,15 +37,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||
const int64_t userFlags,
|
||||
RegionCallback callback,
|
||||
RegionBulkCallback bulkCallback,
|
||||
const std::string& path = "",
|
||||
int flags = 0,
|
||||
FairMQTransportFactory* factory = nullptr)
|
||||
const std::string& path,
|
||||
int flags,
|
||||
FairMQTransportFactory* factory,
|
||||
fair::mq::RegionConfig cfg)
|
||||
: FairMQUnmanagedRegion(factory)
|
||||
, fManager(manager)
|
||||
, fRegion(nullptr)
|
||||
, fRegionId(0)
|
||||
{
|
||||
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags);
|
||||
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg);
|
||||
fRegion = result.first;
|
||||
fRegionId = result.second;
|
||||
}
|
||||
|
|
|
@ -96,29 +96,29 @@ class TransportFactory final : public FairMQTransportFactory
|
|||
return std::make_unique<Poller>(channelsMap, channelList);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags);
|
||||
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags);
|
||||
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags);
|
||||
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
|
||||
{
|
||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags);
|
||||
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
|
||||
}
|
||||
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int)
|
||||
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg)
|
||||
{
|
||||
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this);
|
||||
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);
|
||||
auto zPtr = static_cast<UnmanagedRegion*>(ptr.get());
|
||||
fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
|
||||
return ptr;
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
#include <cstddef> // size_t
|
||||
#include <string>
|
||||
|
||||
#include <sys/mman.h> // mlock
|
||||
|
||||
namespace fair::mq::zmq
|
||||
{
|
||||
|
||||
|
@ -30,7 +32,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||
int64_t userFlags,
|
||||
RegionCallback callback,
|
||||
RegionBulkCallback bulkCallback,
|
||||
FairMQTransportFactory* factory = nullptr)
|
||||
FairMQTransportFactory* factory,
|
||||
fair::mq::RegionConfig cfg)
|
||||
: fair::mq::UnmanagedRegion(factory)
|
||||
, fCtx(ctx)
|
||||
, fId(fCtx.RegionCount())
|
||||
|
@ -39,7 +42,20 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
|||
, fUserFlags(userFlags)
|
||||
, fCallback(callback)
|
||||
, fBulkCallback(bulkCallback)
|
||||
{}
|
||||
{
|
||||
if (cfg.lock) {
|
||||
LOG(debug) << "Locking region " << fId << "...";
|
||||
if (mlock(fBuffer, fSize) == -1) {
|
||||
LOG(error) << "Could not lock region " << fId << ". Code: " << errno << ", reason: " << strerror(errno);
|
||||
}
|
||||
LOG(debug) << "Successfully locked region " << fId << ".";
|
||||
}
|
||||
if (cfg.zero) {
|
||||
LOG(debug) << "Zeroing free memory of region " << fId << "...";
|
||||
memset(fBuffer, 0x00, fSize);
|
||||
LOG(debug) << "Successfully zeroed free memory of region " << fId << ".";
|
||||
}
|
||||
}
|
||||
|
||||
UnmanagedRegion(const UnmanagedRegion&) = delete;
|
||||
UnmanagedRegion operator=(const UnmanagedRegion&) = delete;
|
||||
|
|
Loading…
Reference in New Issue
Block a user