Shm: add options to zero and/or mlock the segment

This commit is contained in:
Alexey Rybalchenko 2020-07-13 15:18:45 +02:00 committed by Dennis Klein
parent bf909f94dc
commit beb7766fca
3 changed files with 36 additions and 12 deletions

View File

@ -69,6 +69,8 @@ Plugin::ProgOptions ConfigPluginProgramOptions()
("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).") ("max-run-time", po::value<uint64_t >()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).")
("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)") ("print-channels", po::value<bool >()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (<channel name>:<min num subchannels>:<max num subchannels>)")
("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t >()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).")
("shm-mlock-segment", po::value<bool >()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.")
("shm-zero-segment", po::value<bool >()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.")
("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") ("shm-throw-bad-alloc", po::value<bool >()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).")
("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.") ("shm-monitor", po::value<bool >()->default_value(true), "Shared memory: run monitor daemon.")
("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") ("ofi-size-hint", po::value<size_t >()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.")

View File

@ -21,6 +21,7 @@
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <FairMQMessage.h> #include <FairMQMessage.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/tools/CppSTL.h> #include <fairmq/tools/CppSTL.h>
#include <fairmq/tools/Strings.h> #include <fairmq/tools/Strings.h>
@ -43,6 +44,8 @@
#include <utility> // pair #include <utility> // pair
#include <vector> #include <vector>
#include <sys/mman.h> // mlock
namespace fair namespace fair
{ {
namespace mq namespace mq
@ -55,8 +58,8 @@ struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtim
class Manager class Manager
{ {
public: public:
Manager(std::string id, std::string deviceId, size_t size, bool throwOnBadAlloc) Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(std::move(id)) : fShmId(std::move(shmId))
, fDeviceId(std::move(deviceId)) , fDeviceId(std::move(deviceId))
, fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size) , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360) , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360)
@ -70,10 +73,37 @@ class Manager
, fMsgCounter(0) , fMsgCounter(0)
, fHeartbeatThread() , fHeartbeatThread()
, fSendHeartbeats(true) , fSendHeartbeats(true)
, fThrowOnBadAlloc(throwOnBadAlloc) , fThrowOnBadAlloc(true)
{ {
using namespace boost::interprocess; using namespace boost::interprocess;
bool mlockSegment = false;
bool zeroSegment = false;
bool autolaunchMonitor = false;
if (config) {
mlockSegment = config->GetProperty<bool>("shm-mlock-segment", mlockSegment);
zeroSegment = config->GetProperty<bool>("shm-zero-segment", zeroSegment);
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
fThrowOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", fThrowOnBadAlloc);
} else {
LOG(debug) << "ProgOptions not available! Using defaults.";
}
if (autolaunchMonitor) {
StartMonitor(fShmId);
}
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes."; LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
if (mlockSegment) {
LOG(debug) << "Locking the memory pages behind the managed segment...";
mlock(fSegment.get_address(), fSegment.get_size());
LOG(debug) << "Successfully locked the memory pages.";
}
if (zeroSegment) {
LOG(debug) << "Zeroing the free memory of the managed segment...";
fSegment.zero_free_memory();
LOG(debug) << "Successfully zeroed the free memory of the managed segment";
}
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc); fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc);
// store info about the managed segment as region with id 0 // store info about the managed segment as region with id 0

View File

@ -58,14 +58,10 @@ class TransportFactory final : public fair::mq::TransportFactory
int numIoThreads = 1; int numIoThreads = 1;
std::string sessionName = "default"; std::string sessionName = "default";
size_t segmentSize = 2ULL << 30; size_t segmentSize = 2ULL << 30;
bool autolaunchMonitor = false;
bool throwOnBadAlloc = true;
if (config) { if (config) {
numIoThreads = config->GetProperty<int>("io-threads", numIoThreads); numIoThreads = config->GetProperty<int>("io-threads", numIoThreads);
sessionName = config->GetProperty<std::string>("session", sessionName); sessionName = config->GetProperty<std::string>("session", sessionName);
segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize); segmentSize = config->GetProperty<size_t>("shm-segment-size", segmentSize);
autolaunchMonitor = config->GetProperty<bool>("shm-monitor", autolaunchMonitor);
throwOnBadAlloc = config->GetProperty<bool>("shm-throw-bad-alloc", throwOnBadAlloc);
} else { } else {
LOG(debug) << "ProgOptions not available! Using defaults."; LOG(debug) << "ProgOptions not available! Using defaults.";
} }
@ -83,11 +79,7 @@ class TransportFactory final : public fair::mq::TransportFactory
LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno);
} }
if (autolaunchMonitor) { fManager = tools::make_unique<Manager>(fShmId, fDeviceId, segmentSize, config);
Manager::StartMonitor(fShmId);
}
fManager = tools::make_unique<Manager>(fShmId, fDeviceId, segmentSize, throwOnBadAlloc);
} catch (boost::interprocess::interprocess_exception& e) { } catch (boost::interprocess::interprocess_exception& e) {
LOG(error) << "Could not initialize shared memory transport: " << e.what(); LOG(error) << "Could not initialize shared memory transport: " << e.what();
throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what())); throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));