diff --git a/fairmq/plugins/config/Config.cxx b/fairmq/plugins/config/Config.cxx index 99fa3429..5fa15b93 100644 --- a/fairmq/plugins/config/Config.cxx +++ b/fairmq/plugins/config/Config.cxx @@ -69,6 +69,8 @@ Plugin::ProgOptions ConfigPluginProgramOptions() ("max-run-time", po::value()->default_value(0), "Maximum runtime for the Running state handler, after which state will change to Ready (in seconds, 0 for no limit).") ("print-channels", po::value()->implicit_value(true), "Print registered channel endpoints in a machine-readable format (::)") ("shm-segment-size", po::value()->default_value(2ULL << 30), "Shared memory: size of the shared memory segment (in bytes).") + ("shm-mlock-segment", po::value()->default_value(false), "Shared memory: mlock the shared memory segment after initialization.") + ("shm-zero-segment", po::value()->default_value(false), "Shared memory: zero the shared memory segment memory after initialization.") ("shm-throw-bad-alloc", po::value()->default_value(true), "Throw a fair::mq::MessageBadAlloc if cannot allocate a message (retry if false).") ("shm-monitor", po::value()->default_value(true), "Shared memory: run monitor daemon.") ("ofi-size-hint", po::value()->default_value(0), "EXPERIMENTAL: OFI size hint for the allocator.") diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index 6978ed30..ebf0d7fd 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -21,6 +21,7 @@ #include #include +#include #include #include @@ -43,6 +44,8 @@ #include // pair #include +#include // mlock + namespace fair { namespace mq @@ -55,8 +58,8 @@ struct SharedMemoryError : std::runtime_error { using std::runtime_error::runtim class Manager { public: - Manager(std::string id, std::string deviceId, size_t size, bool throwOnBadAlloc) - : fShmId(std::move(id)) + Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config) + : fShmId(std::move(shmId)) , fDeviceId(std::move(deviceId)) , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size) , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360) @@ -70,10 +73,37 @@ class Manager , fMsgCounter(0) , fHeartbeatThread() , fSendHeartbeats(true) - , fThrowOnBadAlloc(throwOnBadAlloc) + , fThrowOnBadAlloc(true) { using namespace boost::interprocess; + + bool mlockSegment = false; + bool zeroSegment = false; + bool autolaunchMonitor = false; + if (config) { + mlockSegment = config->GetProperty("shm-mlock-segment", mlockSegment); + zeroSegment = config->GetProperty("shm-zero-segment", zeroSegment); + autolaunchMonitor = config->GetProperty("shm-monitor", autolaunchMonitor); + fThrowOnBadAlloc = config->GetProperty("shm-throw-bad-alloc", fThrowOnBadAlloc); + } else { + LOG(debug) << "ProgOptions not available! Using defaults."; + } + + if (autolaunchMonitor) { + StartMonitor(fShmId); + } + LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes."; + if (mlockSegment) { + LOG(debug) << "Locking the memory pages behind the managed segment..."; + mlock(fSegment.get_address(), fSegment.get_size()); + LOG(debug) << "Successfully locked the memory pages."; + } + if (zeroSegment) { + LOG(debug) << "Zeroing the free memory of the managed segment..."; + fSegment.zero_free_memory(); + LOG(debug) << "Successfully zeroed the free memory of the managed segment"; + } fRegionInfos = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); // store info about the managed segment as region with id 0 diff --git a/fairmq/shmem/TransportFactory.h b/fairmq/shmem/TransportFactory.h index 3808e6a0..ed69a373 100644 --- a/fairmq/shmem/TransportFactory.h +++ b/fairmq/shmem/TransportFactory.h @@ -58,14 +58,10 @@ class TransportFactory final : public fair::mq::TransportFactory int numIoThreads = 1; std::string sessionName = "default"; size_t segmentSize = 2ULL << 30; - bool autolaunchMonitor = false; - bool throwOnBadAlloc = true; if (config) { numIoThreads = config->GetProperty("io-threads", numIoThreads); sessionName = config->GetProperty("session", sessionName); segmentSize = config->GetProperty("shm-segment-size", segmentSize); - autolaunchMonitor = config->GetProperty("shm-monitor", autolaunchMonitor); - throwOnBadAlloc = config->GetProperty("shm-throw-bad-alloc", throwOnBadAlloc); } else { LOG(debug) << "ProgOptions not available! Using defaults."; } @@ -83,11 +79,7 @@ class TransportFactory final : public fair::mq::TransportFactory LOG(error) << "failed configuring context, reason: " << zmq_strerror(errno); } - if (autolaunchMonitor) { - Manager::StartMonitor(fShmId); - } - - fManager = tools::make_unique(fShmId, fDeviceId, segmentSize, throwOnBadAlloc); + fManager = tools::make_unique(fShmId, fDeviceId, segmentSize, config); } catch (boost::interprocess::interprocess_exception& e) { LOG(error) << "Could not initialize shared memory transport: " << e.what(); throw std::runtime_error(tools::ToString("Could not initialize shared memory transport: ", e.what()));