Shm: Add Monitor::GetDebugInfo()

This commit is contained in:
Alexey Rybalchenko 2020-08-18 12:06:21 +02:00
parent b63f31d0e0
commit 72a45f78b3
6 changed files with 104 additions and 33 deletions

View File

@ -63,6 +63,7 @@ struct RegionInfo
using Uint64RegionInfoPairAlloc = boost::interprocess::allocator<std::pair<const uint64_t, RegionInfo>, SegmentManager>;
using Uint64RegionInfoMap = boost::interprocess::map<uint64_t, RegionInfo, std::less<uint64_t>, Uint64RegionInfoPairAlloc>;
using Uint64RegionInfoHashMap = boost::unordered_map<uint64_t, RegionInfo, boost::hash<uint64_t>, std::equal_to<uint64_t>, Uint64RegionInfoPairAlloc>;
struct DeviceCounter
{
@ -115,9 +116,9 @@ struct MsgDebug
uint64_t fCreationTime;
};
using Uint64MsgDebugPairAlloc = boost::interprocess::allocator<std::pair<const size_t, MsgDebug>, SegmentManager>;
using Uint64MsgDebugHashMap = boost::unordered_map<size_t, MsgDebug, boost::hash<size_t>, std::equal_to<size_t>, Uint64MsgDebugPairAlloc>;
using Uint64MsgDebugMap = boost::interprocess::map<size_t, MsgDebug, std::less<size_t>, Uint64MsgDebugPairAlloc>;
using SizetMsgDebugPairAlloc = boost::interprocess::allocator<std::pair<const size_t, MsgDebug>, SegmentManager>;
using SizetMsgDebugHashMap = boost::unordered_map<size_t, MsgDebug, boost::hash<size_t>, std::equal_to<size_t>, SizetMsgDebugPairAlloc>;
using SizetMsgDebugMap = boost::interprocess::map<size_t, MsgDebug, std::less<size_t>, SizetMsgDebugPairAlloc>;
#endif
struct RegionBlock

View File

@ -70,14 +70,15 @@ class Manager
Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config)
: fShmId(std::move(shmId))
, fDeviceId(std::move(deviceId))
, fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
// , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
, fSegments()
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600)
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
, fRegionEventsSubscriptionActive(false)
, fDeviceCounter(nullptr)
, fRegionInfos(nullptr)
, fShmRegions(nullptr)
, fInterrupted(false)
, fMsgCounter(0)
#ifdef FAIRMQ_DEBUG_MODE
@ -106,28 +107,39 @@ class Manager
StartMonitor(fShmId);
}
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
try {
fSegments.emplace(0, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str()));
LOG(debug) << "opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegments.at(0).get_size() << " bytes. Available are " << fSegments.at(0).get_free_memory() << " bytes.";
} catch(interprocess_exception&) {
fSegments.emplace(0, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size));
LOG(debug) << "created shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegments.at(0).get_size() << " bytes. Available are " << fSegments.at(0).get_free_memory() << " bytes.";
}
}
if (mlockSegment) {
LOG(debug) << "Locking the managed segment memory pages...";
if (mlock(fSegment.get_address(), fSegment.get_size()) == -1) {
if (mlock(fSegments.at(0).get_address(), fSegments.at(0).get_size()) == -1) {
LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno);
}
LOG(debug) << "Successfully locked the managed segment memory pages.";
}
if (zeroSegment) {
LOG(debug) << "Zeroing the managed segment free memory...";
fSegment.zero_free_memory();
fSegments.at(0).zero_free_memory();
LOG(debug) << "Successfully zeroed the managed segment free memory.";
}
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc);
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fShmRegions = fManagementSegment.find_or_construct<Uint64RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
#ifdef FAIRMQ_DEBUG_MODE
fMsgDebug = fManagementSegment.find_or_construct<Uint64MsgDebugMap>(unique_instance)(fShmVoidAlloc);
fMsgDebug = fManagementSegment.find_or_construct<SizetMsgDebugMap>(unique_instance)(fShmVoidAlloc);
#endif
// store info about the managed segment as region with id 0
fRegionInfos->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc));
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
fShmRegions->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc));
fDeviceCounter = fManagementSegment.find<DeviceCounter>(unique_instance).first;
@ -161,7 +173,7 @@ class Manager
Manager(const Manager&) = delete;
Manager operator=(const Manager&) = delete;
RBTreeBestFitSegment& Segment() { return fSegment; }
RBTreeBestFitSegment& Segment() { return fSegments.at(0); }
boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; }
static void StartMonitor(const std::string& id)
@ -251,7 +263,7 @@ class Manager
}
// create region info
fRegionInfos->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc));
auto r = fRegions.emplace(id, tools::make_unique<Region>(fShmId, id, size, false, callback, bulkCallback, path, flags));
// LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
@ -286,7 +298,7 @@ class Manager
} else {
try {
// get region info
RegionInfo regionInfo = fRegionInfos->at(id);
RegionInfo regionInfo = fShmRegions->at(id);
std::string path = regionInfo.fPath.c_str();
int flags = regionInfo.fFlags;
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'";
@ -309,7 +321,7 @@ class Manager
fRegions.erase(id);
{
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
fRegionInfos->at(id).fDestroyed = true;
fShmRegions->at(id).fDestroyed = true;
}
fRegionEventsCV.notify_all();
}
@ -324,7 +336,7 @@ class Manager
{
std::vector<fair::mq::RegionInfo> result;
for (const auto& e : *fRegionInfos) {
for (const auto& e : *fShmRegions) {
fair::mq::RegionInfo info;
info.id = e.first;
info.flags = e.second.fUserFlags;
@ -341,8 +353,8 @@ class Manager
result.push_back(info);
} else {
if (!e.second.fDestroyed) {
info.ptr = fSegment.get_address();
info.size = fSegment.get_size();
info.ptr = fSegments.at(0).get_address();
info.size = fSegments.at(0).get_size();
} else {
info.ptr = nullptr;
info.size = 0;
@ -492,7 +504,7 @@ class Manager
std::string fShmId;
std::string fDeviceId;
// boost::interprocess::managed_shared_memory fSegment;
RBTreeBestFitSegment fSegment;
std::unordered_map<uint64_t, RBTreeBestFitSegment> fSegments;
boost::interprocess::managed_shared_memory fManagementSegment;
VoidAlloc fShmVoidAlloc;
boost::interprocess::named_mutex fShmMtx;
@ -504,13 +516,13 @@ class Manager
std::unordered_map<uint64_t, RegionEvent> fObservedRegionEvents;
DeviceCounter* fDeviceCounter;
Uint64RegionInfoMap* fRegionInfos;
Uint64RegionInfoHashMap* fShmRegions;
std::unordered_map<uint64_t, std::unique_ptr<Region>> fRegions;
std::atomic<bool> fInterrupted;
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
#ifdef FAIRMQ_DEBUG_MODE
Uint64MsgDebugMap* fMsgDebug;
SizetMsgDebugMap* fMsgDebug;
MsgCounter* fShmMsgCounter;
#endif

View File

@ -226,7 +226,7 @@ void Monitor::Interactive()
cout << "\n[\\n] --> invalid input." << endl;
break;
case 'b':
PrintDebug(ShmId{fShmId});
PrintDebugInfo(ShmId{fShmId});
break;
default:
cout << "\n[" << c << "] --> invalid input." << endl;
@ -387,23 +387,23 @@ void Monitor::CheckSegment()
}
}
void Monitor::PrintDebug(const ShmId& shmId __attribute__((unused)))
void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused)))
{
#ifdef FAIRMQ_DEBUG_MODE
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
boost::interprocess::named_mutex mtx(boost::interprocess::open_only, std::string("fmq_" + shmId.shmId + "_mtx").c_str());
boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str());
boost::interprocess::scoped_lock<bipc::named_mutex> lock(mtx);
Uint64MsgDebugMap* debug = managementSegment.find<Uint64MsgDebugMap>(bipc::unique_instance).first;
SizetMsgDebugMap* debug = managementSegment.find<SizetMsgDebugMap>(bipc::unique_instance).first;
cout << endl << "found " << debug->size() << " message(s):" << endl;
for (const auto& e : *debug) {
using time_point = std::chrono::system_clock::time_point;
time_point tmpt{std::chrono::duration_cast<time_point::duration>(std::chrono::nanoseconds(e.second.fCreationTime))};
std::time_t t = std::chrono::system_clock::to_time_t(tmpt);
using time_point = chrono::system_clock::time_point;
time_point tmpt{chrono::duration_cast<time_point::duration>(chrono::nanoseconds(e.second.fCreationTime))};
time_t t = chrono::system_clock::to_time_t(tmpt);
uint64_t ms = e.second.fCreationTime % 1000000;
auto tm = localtime(&t);
cout << "offset: " << setw(12) << setfill(' ') << e.first
@ -420,6 +420,45 @@ void Monitor::PrintDebug(const ShmId& shmId __attribute__((unused)))
#endif
}
void Monitor::PrintDebugInfo(const SessionId& sessionId)
{
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
PrintDebugInfo(shmId);
}
vector<BufferDebugInfo> Monitor::GetDebugInfo(const ShmId& shmId __attribute__((unused)))
{
vector<BufferDebugInfo> result;
#ifdef FAIRMQ_DEBUG_MODE
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
try {
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str());
boost::interprocess::scoped_lock<bipc::named_mutex> lock(mtx);
SizetMsgDebugMap* debug = managementSegment.find<SizetMsgDebugMap>(bipc::unique_instance).first;
result.reserve(debug->size());
for (const auto& e : *debug) {
result.emplace_back(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime);
}
} catch (bie&) {
cout << "no segment found" << endl;
}
#else
cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl;
#endif
return result;
}
vector<BufferDebugInfo> Monitor::GetDebugInfo(const SessionId& sessionId)
{
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
return GetDebugInfo(shmId);
}
void Monitor::PrintQueues()
{
cout << '\n';

View File

@ -14,6 +14,7 @@
#include <string>
#include <stdexcept>
#include <unordered_map>
#include <vector>
namespace fair
{
@ -34,6 +35,21 @@ struct ShmId
explicit operator std::string() const { return shmId; }
};
struct BufferDebugInfo
{
BufferDebugInfo(size_t offset, pid_t pid, size_t size, uint64_t creationTime)
: fOffset(offset)
, fPid(pid)
, fSize(size)
, fCreationTime(creationTime)
{}
size_t fOffset;
pid_t fPid;
size_t fSize;
uint64_t fCreationTime;
};
class Monitor
{
public:
@ -60,7 +76,10 @@ class Monitor
/// @param sessionId session id
static void CleanupFull(const SessionId& sessionId);
static void PrintDebug(const ShmId& shmId);
static void PrintDebugInfo(const ShmId& shmId);
static void PrintDebugInfo(const SessionId& shmId);
static std::vector<BufferDebugInfo> GetDebugInfo(const ShmId& shmId);
static std::vector<BufferDebugInfo> GetDebugInfo(const SessionId& shmId);
static void RemoveObject(const std::string&);
static void RemoveFileMapping(const std::string&);

View File

@ -120,7 +120,7 @@ int main(int argc, char** argv)
}
if (debug) {
Monitor::PrintDebug(ShmId{shmId});
Monitor::PrintDebugInfo(ShmId{shmId});
return 0;
}