diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index 9f21f218..b6924be0 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -40,7 +40,7 @@ struct FairMQRegionInfo , ptr(_ptr) , size(_size) , flags(_flags) - , event (_event) + , event(_event) {} uint64_t id; // id of the region diff --git a/fairmq/shmem/Common.h b/fairmq/shmem/Common.h index 70cfac6c..d2a3bb85 100644 --- a/fairmq/shmem/Common.h +++ b/fairmq/shmem/Common.h @@ -63,6 +63,7 @@ struct RegionInfo using Uint64RegionInfoPairAlloc = boost::interprocess::allocator, SegmentManager>; using Uint64RegionInfoMap = boost::interprocess::map, Uint64RegionInfoPairAlloc>; +using Uint64RegionInfoHashMap = boost::unordered_map, std::equal_to, Uint64RegionInfoPairAlloc>; struct DeviceCounter { @@ -115,9 +116,9 @@ struct MsgDebug uint64_t fCreationTime; }; -using Uint64MsgDebugPairAlloc = boost::interprocess::allocator, SegmentManager>; -using Uint64MsgDebugHashMap = boost::unordered_map, std::equal_to, Uint64MsgDebugPairAlloc>; -using Uint64MsgDebugMap = boost::interprocess::map, Uint64MsgDebugPairAlloc>; +using SizetMsgDebugPairAlloc = boost::interprocess::allocator, SegmentManager>; +using SizetMsgDebugHashMap = boost::unordered_map, std::equal_to, SizetMsgDebugPairAlloc>; +using SizetMsgDebugMap = boost::interprocess::map, SizetMsgDebugPairAlloc>; #endif struct RegionBlock diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index a5a829f6..7a3cb5fc 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -70,14 +70,15 @@ class Manager Manager(std::string shmId, std::string deviceId, size_t size, const ProgOptions* config) : fShmId(std::move(shmId)) , fDeviceId(std::move(deviceId)) - , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size) + // , fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size) + , fSegments() , fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 6553600) , fShmVoidAlloc(fManagementSegment.get_segment_manager()) , fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str()) , fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str()) , fRegionEventsSubscriptionActive(false) , fDeviceCounter(nullptr) - , fRegionInfos(nullptr) + , fShmRegions(nullptr) , fInterrupted(false) , fMsgCounter(0) #ifdef FAIRMQ_DEBUG_MODE @@ -106,28 +107,39 @@ class Manager StartMonitor(fShmId); } - LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes."; + { + boost::interprocess::scoped_lock lock(fShmMtx); + + try { + fSegments.emplace(0, RBTreeBestFitSegment(open_only, std::string("fmq_" + fShmId + "_main").c_str())); + LOG(debug) << "opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegments.at(0).get_size() << " bytes. Available are " << fSegments.at(0).get_free_memory() << " bytes."; + } catch(interprocess_exception&) { + fSegments.emplace(0, RBTreeBestFitSegment(create_only, std::string("fmq_" + fShmId + "_main").c_str(), size)); + LOG(debug) << "created shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegments.at(0).get_size() << " bytes. Available are " << fSegments.at(0).get_free_memory() << " bytes."; + } + } + if (mlockSegment) { LOG(debug) << "Locking the managed segment memory pages..."; - if (mlock(fSegment.get_address(), fSegment.get_size()) == -1) { + if (mlock(fSegments.at(0).get_address(), fSegments.at(0).get_size()) == -1) { LOG(error) << "Could not lock the managed segment memory. Code: " << errno << ", reason: " << strerror(errno); } LOG(debug) << "Successfully locked the managed segment memory pages."; } if (zeroSegment) { LOG(debug) << "Zeroing the managed segment free memory..."; - fSegment.zero_free_memory(); + fSegments.at(0).zero_free_memory(); LOG(debug) << "Successfully zeroed the managed segment free memory."; } - fRegionInfos = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); + boost::interprocess::scoped_lock lock(fShmMtx); + + fShmRegions = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); #ifdef FAIRMQ_DEBUG_MODE - fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); + fMsgDebug = fManagementSegment.find_or_construct(unique_instance)(fShmVoidAlloc); #endif // store info about the managed segment as region with id 0 - fRegionInfos->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc)); - - boost::interprocess::scoped_lock lock(fShmMtx); + fShmRegions->emplace(0, RegionInfo("", 0, 0, fShmVoidAlloc)); fDeviceCounter = fManagementSegment.find(unique_instance).first; @@ -161,7 +173,7 @@ class Manager Manager(const Manager&) = delete; Manager operator=(const Manager&) = delete; - RBTreeBestFitSegment& Segment() { return fSegment; } + RBTreeBestFitSegment& Segment() { return fSegments.at(0); } boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; } static void StartMonitor(const std::string& id) @@ -251,7 +263,7 @@ class Manager } // create region info - fRegionInfos->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); + fShmRegions->emplace(id, RegionInfo(path.c_str(), flags, userFlags, fShmVoidAlloc)); auto r = fRegions.emplace(id, tools::make_unique(fShmId, id, size, false, callback, bulkCallback, path, flags)); // LOG(debug) << "Created region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; @@ -286,7 +298,7 @@ class Manager } else { try { // get region info - RegionInfo regionInfo = fRegionInfos->at(id); + RegionInfo regionInfo = fShmRegions->at(id); std::string path = regionInfo.fPath.c_str(); int flags = regionInfo.fFlags; // LOG(debug) << "Located remote region with id '" << id << "', path: '" << path << "', flags: '" << flags << "'"; @@ -309,7 +321,7 @@ class Manager fRegions.erase(id); { boost::interprocess::scoped_lock lock(fShmMtx); - fRegionInfos->at(id).fDestroyed = true; + fShmRegions->at(id).fDestroyed = true; } fRegionEventsCV.notify_all(); } @@ -324,7 +336,7 @@ class Manager { std::vector result; - for (const auto& e : *fRegionInfos) { + for (const auto& e : *fShmRegions) { fair::mq::RegionInfo info; info.id = e.first; info.flags = e.second.fUserFlags; @@ -341,8 +353,8 @@ class Manager result.push_back(info); } else { if (!e.second.fDestroyed) { - info.ptr = fSegment.get_address(); - info.size = fSegment.get_size(); + info.ptr = fSegments.at(0).get_address(); + info.size = fSegments.at(0).get_size(); } else { info.ptr = nullptr; info.size = 0; @@ -492,7 +504,7 @@ class Manager std::string fShmId; std::string fDeviceId; // boost::interprocess::managed_shared_memory fSegment; - RBTreeBestFitSegment fSegment; + std::unordered_map fSegments; boost::interprocess::managed_shared_memory fManagementSegment; VoidAlloc fShmVoidAlloc; boost::interprocess::named_mutex fShmMtx; @@ -504,13 +516,13 @@ class Manager std::unordered_map fObservedRegionEvents; DeviceCounter* fDeviceCounter; - Uint64RegionInfoMap* fRegionInfos; + Uint64RegionInfoHashMap* fShmRegions; std::unordered_map> fRegions; std::atomic fInterrupted; std::atomic fMsgCounter; // TODO: find a better lifetime solution instead of the counter #ifdef FAIRMQ_DEBUG_MODE - Uint64MsgDebugMap* fMsgDebug; + SizetMsgDebugMap* fMsgDebug; MsgCounter* fShmMsgCounter; #endif diff --git a/fairmq/shmem/Monitor.cxx b/fairmq/shmem/Monitor.cxx index 4624aa1e..a57b1200 100644 --- a/fairmq/shmem/Monitor.cxx +++ b/fairmq/shmem/Monitor.cxx @@ -226,7 +226,7 @@ void Monitor::Interactive() cout << "\n[\\n] --> invalid input." << endl; break; case 'b': - PrintDebug(ShmId{fShmId}); + PrintDebugInfo(ShmId{fShmId}); break; default: cout << "\n[" << c << "] --> invalid input." << endl; @@ -387,23 +387,23 @@ void Monitor::CheckSegment() } } -void Monitor::PrintDebug(const ShmId& shmId __attribute__((unused))) +void Monitor::PrintDebugInfo(const ShmId& shmId __attribute__((unused))) { #ifdef FAIRMQ_DEBUG_MODE string managementSegmentName("fmq_" + shmId.shmId + "_mng"); try { bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); - boost::interprocess::named_mutex mtx(boost::interprocess::open_only, std::string("fmq_" + shmId.shmId + "_mtx").c_str()); + boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str()); boost::interprocess::scoped_lock lock(mtx); - Uint64MsgDebugMap* debug = managementSegment.find(bipc::unique_instance).first; + SizetMsgDebugMap* debug = managementSegment.find(bipc::unique_instance).first; cout << endl << "found " << debug->size() << " message(s):" << endl; for (const auto& e : *debug) { - using time_point = std::chrono::system_clock::time_point; - time_point tmpt{std::chrono::duration_cast(std::chrono::nanoseconds(e.second.fCreationTime))}; - std::time_t t = std::chrono::system_clock::to_time_t(tmpt); + using time_point = chrono::system_clock::time_point; + time_point tmpt{chrono::duration_cast(chrono::nanoseconds(e.second.fCreationTime))}; + time_t t = chrono::system_clock::to_time_t(tmpt); uint64_t ms = e.second.fCreationTime % 1000000; auto tm = localtime(&t); cout << "offset: " << setw(12) << setfill(' ') << e.first @@ -420,6 +420,45 @@ void Monitor::PrintDebug(const ShmId& shmId __attribute__((unused))) #endif } +void Monitor::PrintDebugInfo(const SessionId& sessionId) +{ + ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)}; + PrintDebugInfo(shmId); +} + +vector Monitor::GetDebugInfo(const ShmId& shmId __attribute__((unused))) +{ + vector result; + +#ifdef FAIRMQ_DEBUG_MODE + string managementSegmentName("fmq_" + shmId.shmId + "_mng"); + try { + bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str()); + boost::interprocess::named_mutex mtx(boost::interprocess::open_only, string("fmq_" + shmId.shmId + "_mtx").c_str()); + boost::interprocess::scoped_lock lock(mtx); + + SizetMsgDebugMap* debug = managementSegment.find(bipc::unique_instance).first; + + result.reserve(debug->size()); + + for (const auto& e : *debug) { + result.emplace_back(e.first, e.second.fPid, e.second.fSize, e.second.fCreationTime); + } + } catch (bie&) { + cout << "no segment found" << endl; + } +#else + cout << "FairMQ was not compiled in debug mode (FAIRMQ_DEBUG_MODE)" << endl; +#endif + + return result; +} +vector Monitor::GetDebugInfo(const SessionId& sessionId) +{ + ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)}; + return GetDebugInfo(shmId); +} + void Monitor::PrintQueues() { cout << '\n'; diff --git a/fairmq/shmem/Monitor.h b/fairmq/shmem/Monitor.h index 88a2b2ca..a618b228 100644 --- a/fairmq/shmem/Monitor.h +++ b/fairmq/shmem/Monitor.h @@ -14,6 +14,7 @@ #include #include #include +#include namespace fair { @@ -34,6 +35,21 @@ struct ShmId explicit operator std::string() const { return shmId; } }; +struct BufferDebugInfo +{ + BufferDebugInfo(size_t offset, pid_t pid, size_t size, uint64_t creationTime) + : fOffset(offset) + , fPid(pid) + , fSize(size) + , fCreationTime(creationTime) + {} + + size_t fOffset; + pid_t fPid; + size_t fSize; + uint64_t fCreationTime; +}; + class Monitor { public: @@ -60,7 +76,10 @@ class Monitor /// @param sessionId session id static void CleanupFull(const SessionId& sessionId); - static void PrintDebug(const ShmId& shmId); + static void PrintDebugInfo(const ShmId& shmId); + static void PrintDebugInfo(const SessionId& shmId); + static std::vector GetDebugInfo(const ShmId& shmId); + static std::vector GetDebugInfo(const SessionId& shmId); static void RemoveObject(const std::string&); static void RemoveFileMapping(const std::string&); diff --git a/fairmq/shmem/runMonitor.cxx b/fairmq/shmem/runMonitor.cxx index fa9deb1b..8af03c7f 100644 --- a/fairmq/shmem/runMonitor.cxx +++ b/fairmq/shmem/runMonitor.cxx @@ -120,7 +120,7 @@ int main(int argc, char** argv) } if (debug) { - Monitor::PrintDebug(ShmId{shmId}); + Monitor::PrintDebugInfo(ShmId{shmId}); return 0; }