From 6e567ba4969649d505a3d15cc376495322cc0b13 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 7 May 2021 14:11:58 +0200 Subject: [PATCH] add mlock/zero options to unmanaged region --- examples/region/Sampler.cxx | 11 +++++++---- fairmq/FairMQTransportFactory.h | 8 ++++---- fairmq/FairMQUnmanagedRegion.h | 13 +++++++++++++ fairmq/ofi/TransportFactory.cxx | 8 ++++---- fairmq/shmem/Manager.h | 18 ++++++++++++++++-- fairmq/shmem/TransportFactory.h | 20 ++++++++++---------- fairmq/shmem/UnmanagedRegion.h | 9 +++++---- fairmq/zeromq/TransportFactory.h | 20 ++++++++++---------- fairmq/zeromq/UnmanagedRegion.h | 20 ++++++++++++++++++-- 9 files changed, 87 insertions(+), 40 deletions(-) diff --git a/examples/region/Sampler.cxx b/examples/region/Sampler.cxx index 47352d8a..70b81add 100644 --- a/examples/region/Sampler.cxx +++ b/examples/region/Sampler.cxx @@ -39,9 +39,9 @@ void Sampler::InitTask() << ", flags: " << info.flags; }); - fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", - 0, - 10000000, + fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel... + 0, // ... and this sub-channel + 10000000, // region size [this](const std::vector& blocks) { // callback to be called when message buffers no longer needed by transport lock_guard lock(fMtx); fNumUnackedMsgs -= blocks.size(); @@ -49,7 +49,10 @@ void Sampler::InitTask() if (fMaxIterations > 0) { LOG(info) << "Received " << blocks.size() << " acks"; } - } + }, + "", // path, if a region is backed by a file + 0, // flags that are passed for region creation + fair::mq::RegionConfig{true, true} // additional config: { call mlock on the region, zero the region memory } )); fRegion->SetLinger(fLinger); } diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index a53484df..86cacf68 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -92,8 +92,8 @@ class FairMQTransportFactory /// @param path optional parameter to pass to the underlying transport /// @param flags optional parameter to pass to the underlying transport /// @return pointer to UnmanagedRegion - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; + virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0; + virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0; /// @brief Create new UnmanagedRegion /// @param size size of the region /// @param userFlags flags to be stored with the region, have no effect on the transport, but can be retrieved from the region by the user @@ -101,8 +101,8 @@ class FairMQTransportFactory /// @param path optional parameter to pass to the underlying transport /// @param flags optional parameter to pass to the underlying transport /// @return pointer to UnmanagedRegion - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; - virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0) = 0; + virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0; + virtual FairMQUnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, FairMQRegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) = 0; /// @brief Subscribe to region events (creation, destruction, ...) /// @param callback the callback that is called when a region event occurs diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index c1d37e01..2d1f6399 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -108,6 +108,19 @@ inline std::ostream& operator<<(std::ostream& os, const FairMQRegionEvent& event namespace fair::mq { +struct RegionConfig { + bool lock; + bool zero; + + RegionConfig() + : lock(false), zero(false) + {} + + RegionConfig(bool l, bool z) + : lock(l), zero(z) + {} +}; + using RegionCallback = FairMQRegionCallback; using RegionBulkCallback = FairMQRegionBulkCallback; using RegionEventCallback = FairMQRegionEventCallback; diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx index 297f4a4a..a786de48 100644 --- a/fairmq/ofi/TransportFactory.cxx +++ b/fairmq/ofi/TransportFactory.cxx @@ -92,22 +92,22 @@ auto TransportFactory::CreatePoller(const unordered_map UnmanagedRegionPtr +auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr { throw runtime_error{"Not yet implemented UMR."}; } -auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr +auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr { throw runtime_error{"Not yet implemented UMR."}; } -auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr +auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr { throw runtime_error{"Not yet implemented UMR."}; } -auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */) -> UnmanagedRegionPtr +auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr { throw runtime_error{"Not yet implemented UMR."}; } diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 82d8a08d..01249c19 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -277,8 +277,9 @@ class Manager const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, - const std::string& path = "", - int flags = 0) + const std::string& path, + int flags, + fair::mq::RegionConfig cfg) { using namespace boost::interprocess; try { @@ -311,6 +312,19 @@ class Manager auto r = fRegions.emplace(id, std::make_unique(fShmId, id, size, false, callback, bulkCallback, path, flags)); // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; + if (cfg.lock) { + LOG(debug) << "Locking region " << id << "..."; + if (mlock(r.first->second->fRegion.get_address(), r.first->second->fRegion.get_size()) == -1) { + LOG(error) << "Could not lock region " << id << ". Code: " << errno << ", reason: " << strerror(errno); + } + LOG(debug) << "Successfully locked region " << id << "."; + } + if (cfg.zero) { + LOG(debug) << "Zeroing free memory of region " << id << "..."; + memset(r.first->second->fRegion.get_address(), 0x00, r.first->second->fRegion.get_size()); + LOG(debug) << "Successfully zeroed free memory of region " << id << "."; + } + fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); r.first->second->InitializeQueues(); diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index 0081f2d5..15797f2c 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -141,29 +141,29 @@ class TransportFactory final : public fair::mq::TransportFactory return std::make_unique(channelsMap, channelList); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); + return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); + return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0) override + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); + return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0) override + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback bulkCallback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); + return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags) + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags, fair::mq::RegionConfig cfg) { - return std::make_unique(*fManager, size, userFlags, callback, bulkCallback, path, flags, this); + return std::make_unique(*fManager, size, userFlags, callback, bulkCallback, path, flags, this, cfg); } void SubscribeToRegionEvents(RegionEventCallback callback) override { fManager->SubscribeToRegionEvents(callback); } diff --git a/fairmq/shmem/UnmanagedRegion.h b/fairmq/shmem/UnmanagedRegion.h index 480e1ae5..f01fc379 100644 --- a/fairmq/shmem/UnmanagedRegion.h +++ b/fairmq/shmem/UnmanagedRegion.h @@ -37,15 +37,16 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, - const std::string& path = "", - int flags = 0, - FairMQTransportFactory* factory = nullptr) + const std::string& path, + int flags, + FairMQTransportFactory* factory, + fair::mq::RegionConfig cfg) : FairMQUnmanagedRegion(factory) , fManager(manager) , fRegion(nullptr) , fRegionId(0) { - auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags); + auto result = fManager.CreateRegion(size, userFlags, callback, bulkCallback, path, flags, cfg); fRegion = result.first; fRegionId = result.second; } diff --git a/fairmq/zeromq/TransportFactory.h b/fairmq/zeromq/TransportFactory.h index de3eb3a5..c54e400b 100644 --- a/fairmq/zeromq/TransportFactory.h +++ b/fairmq/zeromq/TransportFactory.h @@ -96,29 +96,29 @@ class TransportFactory final : public FairMQTransportFactory return std::make_unique(channelsMap, channelList); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0) override + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags); + return CreateUnmanagedRegion(size, 0, callback, nullptr, path, flags, cfg); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags); + return CreateUnmanagedRegion(size, 0, nullptr, bulkCallback, path, flags, cfg); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0) override + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags); + return CreateUnmanagedRegion(size, userFlags, callback, nullptr, path, flags, cfg); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0) override + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionBulkCallback bulkCallback, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) override { - return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags); + return CreateUnmanagedRegion(size, userFlags, nullptr, bulkCallback, path, flags, cfg); } - UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int) + UnmanagedRegionPtr CreateUnmanagedRegion(const size_t size, const int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string&, int /* flags */, fair::mq::RegionConfig cfg) { - UnmanagedRegionPtr ptr = std::make_unique(*fCtx, size, userFlags, callback, bulkCallback, this); + UnmanagedRegionPtr ptr = std::make_unique(*fCtx, size, userFlags, callback, bulkCallback, this, cfg); auto zPtr = static_cast(ptr.get()); fCtx->AddRegion(false, zPtr->GetId(), zPtr->GetData(), zPtr->GetSize(), zPtr->GetUserFlags(), RegionEvent::created); return ptr; diff --git a/fairmq/zeromq/UnmanagedRegion.h b/fairmq/zeromq/UnmanagedRegion.h index b5a96165..aa06196b 100644 --- a/fairmq/zeromq/UnmanagedRegion.h +++ b/fairmq/zeromq/UnmanagedRegion.h @@ -16,6 +16,8 @@ #include // size_t #include +#include // mlock + namespace fair::mq::zmq { @@ -30,7 +32,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion int64_t userFlags, RegionCallback callback, RegionBulkCallback bulkCallback, - FairMQTransportFactory* factory = nullptr) + FairMQTransportFactory* factory, + fair::mq::RegionConfig cfg) : fair::mq::UnmanagedRegion(factory) , fCtx(ctx) , fId(fCtx.RegionCount()) @@ -39,7 +42,20 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion , fUserFlags(userFlags) , fCallback(callback) , fBulkCallback(bulkCallback) - {} + { + if (cfg.lock) { + LOG(debug) << "Locking region " << fId << "..."; + if (mlock(fBuffer, fSize) == -1) { + LOG(error) << "Could not lock region " << fId << ". Code: " << errno << ", reason: " << strerror(errno); + } + LOG(debug) << "Successfully locked region " << fId << "."; + } + if (cfg.zero) { + LOG(debug) << "Zeroing free memory of region " << fId << "..."; + memset(fBuffer, 0x00, fSize); + LOG(debug) << "Successfully zeroed free memory of region " << fId << "."; + } + } UnmanagedRegion(const UnmanagedRegion&) = delete; UnmanagedRegion operator=(const UnmanagedRegion&) = delete;