shmem: configurable segment name.

This commit is contained in:
Alexey Rybalchenko 2017-04-24 13:55:10 +02:00 committed by Mohammad Al-Turany
parent 14f9084a80
commit 7b4a2ae932
2 changed files with 48 additions and 43 deletions

View File

@ -297,53 +297,56 @@ void FairMQProgOptions::InitOptionDescription()
if (fUseConfigFile) if (fUseConfigFile)
{ {
fMQOptionsInCmd.add_options() fMQOptionsInCmd.add_options()
("id", po::value<string>(), "Device ID (required argument).") ("id", po::value<string>(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.") ("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") ("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).") ("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
("control", po::value<string>()->default_value("interactive"), "States control ('interactive'/'static'/<control library filename>).") ("control", po::value<string>()->default_value("interactive"), "States control ('interactive'/'static'/<control library filename>).")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") ("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.") ("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).") ("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") ("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.") ("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.") ("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.") ("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
; ;
fMQOptionsInCfg.add_options() fMQOptionsInCfg.add_options()
("id", po::value<string>()->required(), "Device ID (required argument).") ("id", po::value<string>()->required(), "Device ID (required argument).")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.") ("io-threads", po::value<int >()->default_value(1), "Number of I/O threads.")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") ("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).") ("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
("control", po::value<string>()->default_value("interactive"), "States control ('interactive'/'static'/<control library filename>).") ("control", po::value<string>()->default_value("interactive"), "States control ('interactive'/'static'/<control library filename>).")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") ("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.") ("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).") ("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") ("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.") ("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.") ("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.") ("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
; ;
} }
else else
{ {
fMQOptionsInCmd.add_options() fMQOptionsInCmd.add_options()
("id", po::value<string>()->required(), "Device ID (required argument)") ("id", po::value<string>()->required(), "Device ID (required argument)")
("io-threads", po::value<int >()->default_value(1), "Number of I/O threads") ("io-threads", po::value<int >()->default_value(1), "Number of I/O threads")
("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').") ("transport", po::value<string>()->default_value("zeromq"), "Transport ('zeromq'/'nanomsg').")
("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).") ("config", po::value<string>()->default_value("static"), "Config source ('static'/<config library filename>).")
("control", po::value<string>()->default_value("interactive"), "States control ('interactive'/'static'/<control library filename>).") ("control", po::value<string>()->default_value("interactive"), "States control ('interactive'/'static'/<control library filename>).")
("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).") ("network-interface", po::value<string>()->default_value("default"), "Network interface to bind on (e.g. eth0, ib0..., default will try to detect the interface of the default route).")
("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.") ("config-key", po::value<string>(), "Use provided value instead of device id for fetching the configuration from the config file.")
("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).") ("catch-signals", po::value<int >()->default_value(1), "Enable signal handling (1/0).")
("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).") ("initialization-timeout", po::value<int >()->default_value(120), "Timeout for the initialization in seconds (when expecting dynamic initialization).")
("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.") ("port-range-min", po::value<int >()->default_value(22000), "Start of the port range for dynamic initialization.")
("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.") ("port-range-max", po::value<int >()->default_value(32000), "End of the port range for dynamic initialization.")
("log-to-file", po::value<string>()->default_value(""), "Log output to a file.") ("log-to-file", po::value<string>()->default_value(""), "Log output to a file.")
("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).") ("shm-segment-size", po::value<size_t>()->default_value(2000000000), "shmem transport: size of the shared memory segment (in bytes).")
("shm-segment-name", po::value<string>()->default_value("fairmq_shmem_main"), "shmem transport: name of the shared memory segment.")
; ;
} }

View File

@ -53,10 +53,12 @@ void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config)
{ {
int numIoThreads = 1; int numIoThreads = 1;
size_t segmentSize = 2000000000; size_t segmentSize = 2000000000;
string segmentName = "fairmq_shmem_main";
if (config) if (config)
{ {
numIoThreads = config->GetValue<int>("io-threads"); numIoThreads = config->GetValue<int>("io-threads");
segmentSize = config->GetValue<size_t>("shm-segment-size"); segmentSize = config->GetValue<size_t>("shm-segment-size");
segmentName = config->GetValue<string>("shm-segment-name");
} }
else else
{ {
@ -77,7 +79,7 @@ void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config)
fSendHeartbeats = true; fSendHeartbeats = true;
fHeartbeatThread = thread(&FairMQTransportFactorySHM::SendHeartbeats, this); fHeartbeatThread = thread(&FairMQTransportFactorySHM::SendHeartbeats, this);
Manager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemory", segmentSize); Manager::Instance().InitializeSegment("open_or_create", segmentName, segmentSize);
LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << Manager::Instance().Segment()->get_free_memory() << " bytes."; LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << Manager::Instance().Segment()->get_free_memory() << " bytes.";
{ // mutex scope { // mutex scope
@ -202,9 +204,9 @@ void FairMQTransportFactorySHM::Terminate()
if (fDeviceCounter->count == 0) if (fDeviceCounter->count == 0)
{ {
LOG(DEBUG) << "shmem: last FairMQSharedMemory user, removing segment."; LOG(DEBUG) << "shmem: last 'fairmq_shmem_main' user, removing segment.";
if (bipc::shared_memory_object::remove("FairMQSharedMemory")) if (bipc::shared_memory_object::remove("fairmq_shmem_main"))
{ {
LOG(DEBUG) << "shmem: successfully removed shared memory segment after the device has stopped."; LOG(DEBUG) << "shmem: successfully removed shared memory segment after the device has stopped.";
} }
@ -215,7 +217,7 @@ void FairMQTransportFactorySHM::Terminate()
} }
else else
{ {
LOG(DEBUG) << "shmem: other FairMQSharedMemory users present (" << fDeviceCounter->count << "), not removing it."; LOG(DEBUG) << "shmem: other 'fairmq_shmem_main' users present (" << fDeviceCounter->count << "), not removing it.";
} }
} }
} }