mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
Expose fair::mq::shmem::Monitor::Cleanup() API
This commit is contained in:
parent
beb510ded8
commit
539e5602a6
|
@ -170,6 +170,7 @@ if(BUILD_FAIRMQ)
|
|||
PluginManager.h
|
||||
PluginServices.h
|
||||
runFairMQDevice.h
|
||||
shmem/Monitor.h
|
||||
)
|
||||
|
||||
set(FAIRMQ_PRIVATE_HEADER_FILES
|
||||
|
@ -234,6 +235,7 @@ if(BUILD_FAIRMQ)
|
|||
plugins/config/Config.cxx
|
||||
plugins/Control.cxx
|
||||
MemoryResources.cxx
|
||||
shmem/Monitor.cxx
|
||||
)
|
||||
|
||||
if(BUILD_OFI_TRANSPORT)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
#include "Common.h"
|
||||
#include "Region.h"
|
||||
#include "Monitor.h"
|
||||
|
||||
#include <FairMQLogger.h>
|
||||
#include <FairMQMessage.h>
|
||||
|
@ -57,10 +58,8 @@ class Manager
|
|||
Manager(std::string id, std::string deviceId, size_t size, bool throwOnBadAlloc)
|
||||
: fShmId(std::move(id))
|
||||
, fDeviceId(std::move(deviceId))
|
||||
, fSegmentName("fmq_" + fShmId + "_main")
|
||||
, fManagementSegmentName("fmq_" + fShmId + "_mng")
|
||||
, fSegment(boost::interprocess::open_or_create, fSegmentName.c_str(), size)
|
||||
, fManagementSegment(boost::interprocess::open_or_create, fManagementSegmentName.c_str(), 655360)
|
||||
, fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
|
||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360)
|
||||
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
||||
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
|
||||
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
|
||||
|
@ -350,22 +349,6 @@ class Manager
|
|||
void IncrementMsgCounter() { ++fMsgCounter; }
|
||||
void DecrementMsgCounter() { --fMsgCounter; }
|
||||
|
||||
void RemoveSegments()
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
if (shared_memory_object::remove(fSegmentName.c_str())) {
|
||||
LOG(debug) << "Removed '" << fSegmentName << "' segment after the device has stopped.";
|
||||
} else {
|
||||
LOG(debug) << "Did not remove " << fSegmentName << " segment after the device stopped. Already removed?";
|
||||
}
|
||||
|
||||
if (shared_memory_object::remove(fManagementSegmentName.c_str())) {
|
||||
LOG(debug) << "Removed '" << fManagementSegmentName << "' segment after the device has stopped.";
|
||||
} else {
|
||||
LOG(debug) << "Did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?";
|
||||
}
|
||||
}
|
||||
|
||||
void SendHeartbeats()
|
||||
{
|
||||
std::string controlQueueName("fmq_" + fShmId + "_cq");
|
||||
|
@ -411,8 +394,6 @@ class Manager
|
|||
|
||||
if (fDeviceCounter->fCount == 0) {
|
||||
LOG(debug) << "Last segment user, removing segment.";
|
||||
|
||||
RemoveSegments();
|
||||
lastRemoved = true;
|
||||
} else {
|
||||
LOG(debug) << "Other segment users present (" << fDeviceCounter->fCount << "), skipping removal.";
|
||||
|
@ -422,16 +403,13 @@ class Manager
|
|||
}
|
||||
|
||||
if (lastRemoved) {
|
||||
named_mutex::remove(std::string("fmq_" + fShmId + "_mtx").c_str());
|
||||
named_condition::remove(std::string("fmq_" + fShmId + "_cv").c_str());
|
||||
Monitor::Cleanup(ShmId{fShmId});
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::string fShmId;
|
||||
std::string fDeviceId;
|
||||
std::string fSegmentName;
|
||||
std::string fManagementSegmentName;
|
||||
boost::interprocess::managed_shared_memory fSegment;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
VoidAlloc fShmVoidAlloc;
|
||||
|
|
|
@ -64,7 +64,6 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
|
|||
, fHeartbeatTriggered(false)
|
||||
, fLastHeartbeat(chrono::high_resolution_clock::now())
|
||||
, fSignalThread()
|
||||
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
|
||||
, fDeviceHeartbeats()
|
||||
{
|
||||
if (!fViewOnly) {
|
||||
|
@ -203,7 +202,7 @@ void Monitor::Interactive()
|
|||
case 'x':
|
||||
cout << "\n[x] --> closing shared memory:" << endl;
|
||||
if (!fViewOnly) {
|
||||
Cleanup(fShmId);
|
||||
Cleanup(ShmId{fShmId});
|
||||
} else {
|
||||
cout << "cannot close because in view only mode" << endl;
|
||||
}
|
||||
|
@ -288,7 +287,7 @@ void Monitor::CheckSegment()
|
|||
|
||||
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
|
||||
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
|
||||
Cleanup(fShmId);
|
||||
Cleanup(ShmId{fShmId});
|
||||
fHeartbeatTriggered = false;
|
||||
if (fSelfDestruct) {
|
||||
cout << "\nself destructing" << endl;
|
||||
|
@ -321,7 +320,7 @@ void Monitor::CheckSegment()
|
|||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (fIsDaemon && duration > fTimeoutInMS * 2) {
|
||||
Cleanup(fShmId);
|
||||
Cleanup(ShmId{fShmId});
|
||||
fHeartbeatTriggered = false;
|
||||
if (fSelfDestruct) {
|
||||
cout << "\nself destructing" << endl;
|
||||
|
@ -395,51 +394,52 @@ void Monitor::PrintHelp()
|
|||
void Monitor::RemoveObject(const string& name)
|
||||
{
|
||||
if (bipc::shared_memory_object::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::RemoveFileMapping(const string& name)
|
||||
{
|
||||
if (bipc::file_mapping::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::RemoveQueue(const string& name)
|
||||
{
|
||||
if (bipc::message_queue::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::RemoveMutex(const string& name)
|
||||
{
|
||||
if (bipc::named_mutex::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::RemoveCondition(const string& name)
|
||||
{
|
||||
if (bipc::named_condition::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::Cleanup(const string& shmId)
|
||||
void Monitor::Cleanup(const ShmId& shmId)
|
||||
{
|
||||
string managementSegmentName("fmq_" + shmId + "_mng");
|
||||
cout << "Cleaning up for shared memory id '" << shmId.shmId << "'..." << endl;
|
||||
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
|
||||
try {
|
||||
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
|
||||
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
|
||||
|
@ -454,20 +454,20 @@ void Monitor::Cleanup(const string& shmId)
|
|||
RegionInfo ri = m->at(i);
|
||||
string path = ri.fPath.c_str();
|
||||
int flags = ri.fFlags;
|
||||
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << "'." << endl;
|
||||
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "." << endl;
|
||||
if (path != "") {
|
||||
RemoveFileMapping(tools::ToString(path, "fmq_" + shmId + "_rg_" + to_string(i)));
|
||||
RemoveFileMapping(tools::ToString(path, "fmq_" + shmId.shmId + "_rg_" + to_string(i)));
|
||||
} else {
|
||||
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
|
||||
RemoveObject("fmq_" + shmId.shmId + "_rg_" + to_string(i));
|
||||
}
|
||||
} else {
|
||||
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
|
||||
RemoveObject("fmq_" + shmId.shmId + "_rg_" + to_string(i));
|
||||
}
|
||||
|
||||
RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i)));
|
||||
RemoveQueue(string("fmq_" + shmId.shmId + "_rgq_" + to_string(i)));
|
||||
}
|
||||
} else {
|
||||
cout << "No region counter found. no regions to cleanup." << endl;
|
||||
cout << "No region counter found. No regions to cleanup." << endl;
|
||||
}
|
||||
|
||||
RemoveObject(managementSegmentName.c_str());
|
||||
|
@ -477,20 +477,41 @@ void Monitor::Cleanup(const string& shmId)
|
|||
cout << "Could not locate element in the region map, out of range: " << oor.what() << endl;
|
||||
}
|
||||
|
||||
RemoveObject("fmq_" + shmId + "_main");
|
||||
RemoveMutex("fmq_" + shmId + "_mtx");
|
||||
RemoveCondition("fmq_" + shmId + "_cv");
|
||||
RemoveObject("fmq_" + shmId.shmId + "_main");
|
||||
RemoveMutex("fmq_" + shmId.shmId + "_mtx");
|
||||
RemoveCondition("fmq_" + shmId.shmId + "_cv");
|
||||
|
||||
cout << endl;
|
||||
}
|
||||
|
||||
void Monitor::Cleanup(const SessionId& sessionId)
|
||||
{
|
||||
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
|
||||
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
|
||||
Cleanup(shmId);
|
||||
}
|
||||
|
||||
void Monitor::CleanupFull(const ShmId& shmId)
|
||||
{
|
||||
Cleanup(shmId);
|
||||
RemoveMutex("fmq_" + shmId.shmId + "_ms");
|
||||
RemoveQueue("fmq_" + shmId.shmId + "_cq");
|
||||
}
|
||||
|
||||
void Monitor::CleanupFull(const SessionId& sessionId)
|
||||
{
|
||||
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
|
||||
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
|
||||
CleanupFull(shmId);
|
||||
}
|
||||
|
||||
Monitor::~Monitor()
|
||||
{
|
||||
if (fSignalThread.joinable()) {
|
||||
fSignalThread.join();
|
||||
}
|
||||
if (fCleanOnExit) {
|
||||
Cleanup(fShmId);
|
||||
Cleanup(ShmId{fShmId});
|
||||
}
|
||||
if (!fViewOnly) {
|
||||
RemoveMutex("fmq_" + fShmId + "_ms");
|
||||
|
|
|
@ -8,8 +8,6 @@
|
|||
#ifndef FAIR_MQ_SHMEM_MONITOR_H_
|
||||
#define FAIR_MQ_SHMEM_MONITOR_H_
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <atomic>
|
||||
|
@ -24,6 +22,18 @@ namespace mq
|
|||
namespace shmem
|
||||
{
|
||||
|
||||
struct SessionId
|
||||
{
|
||||
std::string sessionId;
|
||||
explicit operator std::string() const { return sessionId; }
|
||||
};
|
||||
|
||||
struct ShmId
|
||||
{
|
||||
std::string shmId;
|
||||
explicit operator std::string() const { return shmId; }
|
||||
};
|
||||
|
||||
class Monitor
|
||||
{
|
||||
public:
|
||||
|
@ -37,7 +47,19 @@ class Monitor
|
|||
void CatchSignals();
|
||||
void Run();
|
||||
|
||||
static void Cleanup(const std::string& shmId);
|
||||
/// @brief Cleanup all shared memory artifacts created by devices
|
||||
/// @param shmId shared memory id
|
||||
static void Cleanup(const ShmId& shmId);
|
||||
/// @brief Cleanup all shared memory artifacts created by devices
|
||||
/// @param sessionId session id
|
||||
static void Cleanup(const SessionId& sessionId);
|
||||
/// @brief Cleanup all shared memory artifacts created by devices and monitors
|
||||
/// @param shmId shared memory id
|
||||
static void CleanupFull(const ShmId& shmId);
|
||||
/// @brief Cleanup all shared memory artifacts created by devices and monitors
|
||||
/// @param sessionId session id
|
||||
static void CleanupFull(const SessionId& sessionId);
|
||||
|
||||
static void RemoveObject(const std::string&);
|
||||
static void RemoveFileMapping(const std::string&);
|
||||
static void RemoveQueue(const std::string&);
|
||||
|
@ -70,7 +92,6 @@ class Monitor
|
|||
std::atomic<bool> fHeartbeatTriggered;
|
||||
std::chrono::high_resolution_clock::time_point fLastHeartbeat;
|
||||
std::thread fSignalThread;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
std::unordered_map<std::string, std::chrono::high_resolution_clock::time_point> fDeviceHeartbeats;
|
||||
};
|
||||
|
||||
|
|
|
@ -6,6 +6,23 @@ The transport manages shared memory via boost::interprocess library. The transfe
|
|||
|
||||
Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section.
|
||||
|
||||
# Shared Memory objects / files
|
||||
|
||||
FairMQ Shared Memory currently uses the following names to register shared memory on the system:
|
||||
|
||||
| name | info | created by | used by |
|
||||
| ------------------------- | ---------------------------------------------- | ------------------ | ------------------------------ |
|
||||
| `fmq_<shmId>_main` | main segment (user data) | one of the devices | devices |
|
||||
| `fmq_<shmId>_mng` | management segment (management data) | one of the devices | devices |
|
||||
| `fmq_<shmId>_mtx` | mutex | one of the devices | devices |
|
||||
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_rg_<index>` | unmanaged region(s) | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_rgq_<index>` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_ms` | shmmonitor status | shmmonitor | devices, shmmonitor |
|
||||
| `fmq_<shmId>_cq` | message queue between transport and shmmonitor | shmmonitor | devices, shmmonitor |
|
||||
|
||||
The shmId is generated out of session id and user id.
|
||||
|
||||
## Shared memory monitor
|
||||
|
||||
The shared memory monitor tool, supplied with the shared memory transport can be used to monitor shared memory use and automatically cleanup shared memory in case of device crashes.
|
||||
|
@ -25,13 +42,3 @@ Without the `--self-destruct` option, the monitor will run continuously, moitori
|
|||
Possible further implementation would be to run the monitor with `--self-destruct` with each topology.
|
||||
|
||||
The Monitor class can also be used independently from the supplied executable (built from `runMonitor.cxx`), allowing integration on any level. For example invoking the monitor could be a functionality that a device offers.
|
||||
|
||||
FairMQ Shared Memory currently uses following names to register shared memory on the system:
|
||||
|
||||
`fmq_<shmId>_main` - main segment name, used for user data (the shmId is generated out of session id and user id).
|
||||
`fmq_<shmId>_mng` - management segment name, used for storing management data.
|
||||
`fmq_<shmId>_cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
|
||||
`fmq_<shmId>_mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
|
||||
`fmq_<shmId>_ms` - shmmonitor status used to signal if it is active or not (exists independent of above segments).
|
||||
`fmq_<shmId>_rg_<index>` - names of unmanaged regions.
|
||||
`fmq_<shmId>_rgq_<index>` - names of queues for the unmanaged regions.
|
||||
|
|
|
@ -232,11 +232,11 @@ struct Region
|
|||
}
|
||||
|
||||
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
|
||||
LOG(debug) << "Region " << fName << " destroyed.";
|
||||
LOG(debug) << "Region '" << fName << "' destroyed.";
|
||||
}
|
||||
|
||||
if (boost::interprocess::file_mapping::remove(fName.c_str())) {
|
||||
LOG(debug) << "File mapping " << fName << " destroyed.";
|
||||
LOG(debug) << "File mapping '" << fName << "' destroyed.";
|
||||
}
|
||||
|
||||
if (fFile) {
|
||||
|
@ -244,14 +244,14 @@ struct Region
|
|||
}
|
||||
|
||||
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
|
||||
LOG(debug) << "Region queue " << fQueueName << " destroyed.";
|
||||
LOG(debug) << "Region queue '" << fQueueName << "' destroyed.";
|
||||
}
|
||||
} else {
|
||||
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
|
||||
LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
|
||||
}
|
||||
|
||||
LOG(debug) << "Region " << fName << " (" << (fRemote ? "remote" : "local") << ") destructed.";
|
||||
LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
|
||||
}
|
||||
|
||||
bool fRemote;
|
||||
|
|
|
@ -111,9 +111,7 @@ int main(int argc, char** argv)
|
|||
}
|
||||
|
||||
if (cleanup) {
|
||||
cout << "Cleaning up \"" << shmId << "\"..." << endl;
|
||||
Monitor::Cleanup(shmId);
|
||||
Monitor::RemoveQueue("fmq_" + shmId + "_cq");
|
||||
Monitor::CleanupFull(ShmId{shmId});
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user