add mlock/zero options to unmanaged region

This commit is contained in:
Alexey Rybalchenko 2021-05-07 14:11:58 +02:00
parent c85d6e079c
commit 857aa84fa3
9 changed files with 87 additions and 40 deletions

View File

@ -39,9 +39,9 @@ void Sampler::InitTask()
<< ", flags: " << info.flags; << ", flags: " << info.flags;
}); });
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel...
0, 0, // ... and this sub-channel
10000000, 10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport [this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
lock_guard<mutex> lock(fMtx); lock_guard<mutex> lock(fMtx);
fNumUnackedMsgs -= blocks.size(); fNumUnackedMsgs -= blocks.size();
@ -49,7 +49,10 @@ void Sampler::InitTask()
if (fMaxIterations > 0) { if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks"; LOG(info) << "Received " << blocks.size() << " acks";
} }
} },
"", // path, if a region is backed by a file
0, // flags that are passed for region creation
fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory }
)); ));
fRegion->SetLinger(fLinger); fRegion->SetLinger(fLinger);
} }

View File

@ -92,8 +92,8 @@ class FairMQTransportFactory
/// @param path optional parameter to pass to the underlying transport /// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport /// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion /// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
/// @brief Create new UnmanagedRegion /// @brief Create new UnmanagedRegion
/// @param size size of the region /// @param size size of the region
/// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user /// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user
@ -101,8 +101,8 @@ class FairMQTransportFactory
/// @param path optional parameter to pass to the underlying transport /// @param path optional parameter to pass to the underlying transport
/// @param flags optional parameter to pass to the underlying transport /// @param flags optional parameter to pass to the underlying transport
/// @return pointer to UnmanagedRegion /// @return pointer to UnmanagedRegion
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0;
/// @brief Subscribe to region events (creation, destruction, ...) /// @brief Subscribe to region events (creation, destruction, ...)
/// @param callback the callback that is called when a region event occurs /// @param callback the callback that is called when a region event occurs

View File

@ -108,6 +108,19 @@ inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event
namespace fair::mq namespace fair::mq
{ {
struct RegionConfig {
bool lock;
bool zero;
RegionConfig()
: lock(false), zero(false)
{}
RegionConfig(bool l, bool z)
: lock(l), zero(z)
{}
};
using RegionCallback = FairMQRegionCallback; using RegionCallback = FairMQRegionCallback;
using RegionBulkCallback = FairMQRegionBulkCallback; using RegionBulkCallback = FairMQRegionBulkCallback;
using RegionEventCallback = FairMQRegionEventCallback; using RegionEventCallback = FairMQRegionEventCallback;

View File

@ -92,22 +92,22 @@ auto TransportFactory::CreatePoller(const unordered_map<string, vector<FairMQCha
// return PollerPtr{new Poller(channelsMap, channelList)}; // return PollerPtr{new Poller(channelsMap, channelList)};
} }
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{ {
throw runtime_error{"Not yet implemented UMR."}; throw runtime_error{"Not yet implemented UMR."};
} }
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{ {
throw runtime_error{"Not yet implemented UMR."}; throw runtime_error{"Not yet implemented UMR."};
} }
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{ {
throw runtime_error{"Not yet implemented UMR."}; throw runtime_error{"Not yet implemented UMR."};
} }
auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr
{ {
throw runtime_error{"Not yet implemented UMR."}; throw runtime_error{"Not yet implemented UMR."};
} }

View File

@ -277,8 +277,9 @@ class Manager
const int64_t userFlags, const int64_t userFlags,
RegionCallback callback, RegionCallback callback,
RegionBulkCallback bulkCallback, RegionBulkCallback bulkCallback,
const std::string& path = "", const std::string& path,
int flags = 0) int flags,
fair::mq::RegionConfig cfg)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
try { try {
@ -311,6 +312,19 @@ class Manager
auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags)); auto r = fRegions.emplace(id, std::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
if (cfg.lock) {
LOG(debug) << "Locking region " << id << "...";
if (mlock(r.first->second->fRegion.get_address(), r.first->second->fRegion.get_size()) == -1) {
LOG(error) << "Could not lock region " << id << ". Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked region " << id << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << id << "...";
memset(r.first->second->fRegion.get_address(), 0x00, r.first->second->fRegion.get_size());
LOG(debug) << "Successfully zeroed free memory of region " << id << ".";
}
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
r.first->second->InitializeQueues(); r.first->second->InitializeQueues();

View File

@ -141,29 +141,29 @@ class TransportFactory final : public fair::mq::TransportFactory
return std::make_unique<Poller>(channelsMap, channelList); return std::make_unique<Poller>(channelsMap, channelList);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{ {
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{ {
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{ {
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{ {
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg)
{ {
return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this); return std::make_unique<UnmanagedRegion>(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg);
} }
void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); } void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); }

View File

@ -37,15 +37,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
const int64_t userFlags, const int64_t userFlags,
RegionCallback callback, RegionCallback callback,
RegionBulkCallback bulkCallback, RegionBulkCallback bulkCallback,
const std::string& path = "", const std::string& path,
int flags = 0, int flags,
FairMQTransportFactory* factory = nullptr) FairMQTransportFactory* factory,
fair::mq::RegionConfig cfg)
: FairMQUnmanagedRegion(factory) : FairMQUnmanagedRegion(factory)
, fManager(manager) , fManager(manager)
, fRegion(nullptr) , fRegion(nullptr)
, fRegionId(0) , fRegionId(0)
{ {
auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags); auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg);
fRegion = result.first; fRegion = result.first;
fRegionId = result.second; fRegionId = result.second;
} }

View File

@ -96,29 +96,29 @@ class TransportFactory final : public FairMQTransportFactory
return std::make_unique<Poller>(channelsMap, channelList); return std::make_unique<Poller>(channelsMap, channelList);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{ {
return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{ {
return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{ {
return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override
{ {
return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg);
} }
UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int) UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg)
{ {
UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this); UnmanagedRegionPtr ptr = std::make_unique<UnmanagedRegion>(*fCtx, size, userFlags, callback, bulkCallback, this, cfg);
auto zPtr = static_cast<UnmanagedRegion*>(ptr.get()); auto zPtr = static_cast<UnmanagedRegion*>(ptr.get());
fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created); fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created);
return ptr; return ptr;

View File

@ -16,6 +16,8 @@
#include <cstddef> // size_t #include <cstddef> // size_t
#include <string> #include <string>
#include <sys/mman.h> // mlock
namespace fair::mq::zmq namespace fair::mq::zmq
{ {
@ -30,7 +32,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
int64_t userFlags, int64_t userFlags,
RegionCallback callback, RegionCallback callback,
RegionBulkCallback bulkCallback, RegionBulkCallback bulkCallback,
FairMQTransportFactory* factory = nullptr) FairMQTransportFactory* factory,
fair::mq::RegionConfig cfg)
: fair::mq::UnmanagedRegion(factory) : fair::mq::UnmanagedRegion(factory)
, fCtx(ctx) , fCtx(ctx)
, fId(fCtx.RegionCount()) , fId(fCtx.RegionCount())
@ -39,7 +42,20 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
, fUserFlags(userFlags) , fUserFlags(userFlags)
, fCallback(callback) , fCallback(callback)
, fBulkCallback(bulkCallback) , fBulkCallback(bulkCallback)
{} {
if (cfg.lock) {
LOG(debug) << "Locking region " << fId << "...";
if (mlock(fBuffer, fSize) == -1) {
LOG(error) << "Could not lock region " << fId << ". Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked region " << fId << ".";
}
if (cfg.zero) {
LOG(debug) << "Zeroing free memory of region " << fId << "...";
memset(fBuffer, 0x00, fSize);
LOG(debug) << "Successfully zeroed free memory of region " << fId << ".";
}
}
UnmanagedRegion(const UnmanagedRegion&) = delete; UnmanagedRegion(const UnmanagedRegion&) = delete;
UnmanagedRegion operator=(const UnmanagedRegion&) = delete; UnmanagedRegion operator=(const UnmanagedRegion&) = delete;