mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
7 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
d105960444 | ||
|
3aae5bae58 | ||
|
9031029d2c | ||
|
d478e050ba | ||
|
06b2b9b01f | ||
|
b3fa4f6e7e | ||
|
da5cb34416 |
86
.zenodo.json
Normal file
86
.zenodo.json
Normal file
@@ -0,0 +1,86 @@
|
|||||||
|
{
|
||||||
|
"creators": [
|
||||||
|
{
|
||||||
|
"name": "Al-Turany, Mohammad"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"orcid": "0000-0003-3787-1910",
|
||||||
|
"name": "Klein, Dennis"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Kollegger, Thorsten"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Rybalchenko, Alexey"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Winckler, Nicolas"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"contributors": [
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Aphecetche, Laurent"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Binet, Sebastien"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Eulisse, Giulio"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Karabowicz, Radoslaw"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Kretz, Matthias"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Krzewicki, Mikolaj"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Lebedev, Andrey"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Mrnjavac, Teo"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Neskovic, Gvozden"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Richter, Matthias"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"orcid": "0000-0002-5321-8404",
|
||||||
|
"name": "Tacke, Christian"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Uhlig, Florian"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "Other",
|
||||||
|
"name": "Wenzel, Sandro"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"description": "<p>C++ Message Queuing Library and Framework</p>",
|
||||||
|
"related_identifiers": [
|
||||||
|
{
|
||||||
|
"identifier": "https://github.com/FairRootGroup/FairMQ/",
|
||||||
|
"relation": "isSupplementTo",
|
||||||
|
"resource_type": "software",
|
||||||
|
"scheme": "url"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"title": "FairMQ",
|
||||||
|
"license": "LGPL-3.0-only"
|
||||||
|
}
|
2
AUTHORS
2
AUTHORS
@@ -1,5 +1,5 @@
|
|||||||
Al-Turany, Mohammad
|
Al-Turany, Mohammad
|
||||||
Klein, Dennis
|
Klein, Dennis [https://orcid.org/0000-0003-3787-1910]
|
||||||
Kollegger, Thorsten
|
Kollegger, Thorsten
|
||||||
Rybalchenko, Alexey
|
Rybalchenko, Alexey
|
||||||
Winckler, Nicolas
|
Winckler, Nicolas
|
||||||
|
@@ -3,11 +3,11 @@ Binet, Sebastien
|
|||||||
Eulisse, Giulio
|
Eulisse, Giulio
|
||||||
Karabowicz, Radoslaw
|
Karabowicz, Radoslaw
|
||||||
Kretz, Matthias <kretz@kde.org>
|
Kretz, Matthias <kretz@kde.org>
|
||||||
Krzewicki, Mikolaj
|
Krzewicki, Mikolaj
|
||||||
Lebedev, Andrey
|
Lebedev, Andrey
|
||||||
Mrnjavac, Teo <teo.m@cern.ch>
|
Mrnjavac, Teo <teo.m@cern.ch>
|
||||||
Neskovic, Gvozden
|
Neskovic, Gvozden
|
||||||
Richter, Matthias
|
Richter, Matthias
|
||||||
Tacke, Christian
|
Tacke, Christian [https://orcid.org/0000-0002-5321-8404]
|
||||||
Uhlig, Florian
|
Uhlig, Florian
|
||||||
Wenzel, Sandro
|
Wenzel, Sandro
|
||||||
|
@@ -18,7 +18,8 @@
|
|||||||
{
|
{
|
||||||
"@type": "Person",
|
"@type": "Person",
|
||||||
"givenName": "Dennis",
|
"givenName": "Dennis",
|
||||||
"familyName": "Klein"
|
"familyName": "Klein",
|
||||||
|
"@id": "https://orcid.org/0000-0003-3787-1910"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"@type": "Person",
|
"@type": "Person",
|
||||||
@@ -92,7 +93,8 @@
|
|||||||
{
|
{
|
||||||
"@type": "Person",
|
"@type": "Person",
|
||||||
"givenName": "Christian",
|
"givenName": "Christian",
|
||||||
"familyName": "Tacke"
|
"familyName": "Tacke",
|
||||||
|
"@id": "https://orcid.org/0000-0002-5321-8404"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"@type": "Person",
|
"@type": "Person",
|
||||||
|
@@ -76,6 +76,8 @@ struct Sampler : fair::mq::Device
|
|||||||
|
|
||||||
void ResetTask() override
|
void ResetTask() override
|
||||||
{
|
{
|
||||||
|
// give some time for acks to be received
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(250));
|
||||||
fRegion.reset();
|
fRegion.reset();
|
||||||
{
|
{
|
||||||
std::lock_guard<std::mutex> lock(fMtx);
|
std::lock_guard<std::mutex> lock(fMtx);
|
||||||
|
@@ -397,12 +397,10 @@ class Manager
|
|||||||
|
|
||||||
UnmanagedRegion* region = nullptr;
|
UnmanagedRegion* region = nullptr;
|
||||||
bool newRegionCreated = false;
|
bool newRegionCreated = false;
|
||||||
{
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
||||||
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
newRegionCreated = res.second;
|
||||||
newRegionCreated = res.second;
|
region = res.first->second.get();
|
||||||
region = res.first->second.get();
|
|
||||||
}
|
|
||||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
if (!newRegionCreated) {
|
if (!newRegionCreated) {
|
||||||
@@ -429,7 +427,7 @@ class Manager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegion* GetRegion(uint16_t id)
|
UnmanagedRegion* GetRegionFromCache(uint16_t id)
|
||||||
{
|
{
|
||||||
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
||||||
const auto &lTlCache = fTlRegionCache;
|
const auto &lTlCache = fTlRegionCache;
|
||||||
@@ -443,41 +441,40 @@ class Manager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
||||||
// slow path: check invalidation
|
// slow path: check invalidation
|
||||||
if (lTlCacheGen != fRegionsGen) {
|
if (lTlCacheGen != fRegionsGen) {
|
||||||
fTlRegionCache.fRegionsTLCache.clear();
|
fTlRegionCache.fRegionsTLCache.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
auto* lRegion = GetRegion(id);
|
||||||
auto* lRegion = GetRegionUnsafe(id, shmLock);
|
|
||||||
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
||||||
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
||||||
return lRegion;
|
return lRegion;
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
|
UnmanagedRegion* GetRegion(uint16_t id)
|
||||||
{
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
||||||
auto it = fRegions.find(id);
|
auto it = fRegions.find(id);
|
||||||
if (it != fRegions.end()) {
|
if (it != fRegions.end()) {
|
||||||
return it->second.get();
|
return it->second.get();
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
// get region info
|
|
||||||
RegionInfo regionInfo = fShmRegions->at(id);
|
|
||||||
// safe to unlock now - no shm container accessed after this
|
|
||||||
lockedShmLock.unlock();
|
|
||||||
RegionConfig cfg;
|
RegionConfig cfg;
|
||||||
cfg.id = id;
|
// get region info
|
||||||
cfg.creationFlags = regionInfo.fCreationFlags;
|
{
|
||||||
cfg.path = regionInfo.fPath.c_str();
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
|
RegionInfo regionInfo = fShmRegions->at(id);
|
||||||
|
cfg.id = id;
|
||||||
|
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||||
|
cfg.path = regionInfo.fPath.c_str();
|
||||||
|
}
|
||||||
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
// LOG(debug) << "Located remote region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
auto r = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, 0, true, std::move(cfg)));
|
||||||
r.first->second->InitializeQueues();
|
r.first->second->InitializeQueues();
|
||||||
r.first->second->StartAckSender();
|
r.first->second->StartAckSender();
|
||||||
lockedShmLock.lock();
|
|
||||||
return r.first->second.get();
|
return r.first->second.get();
|
||||||
} catch (std::out_of_range& oor) {
|
} catch (std::out_of_range& oor) {
|
||||||
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
||||||
@@ -493,10 +490,10 @@ class Manager
|
|||||||
void RemoveRegion(uint16_t id)
|
void RemoveRegion(uint16_t id)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
fRegions.at(id)->StopAcks();
|
fRegions.at(id)->StopAcks();
|
||||||
{
|
{
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
|
||||||
if (fRegions.at(id)->RemoveOnDestruction()) {
|
if (fRegions.at(id)->RemoveOnDestruction()) {
|
||||||
fShmRegions->at(id).fDestroyed = true;
|
fShmRegions->at(id).fDestroyed = true;
|
||||||
(fEventCounter->fCount)++;
|
(fEventCounter->fCount)++;
|
||||||
@@ -512,44 +509,73 @@ class Manager
|
|||||||
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
std::vector<fair::mq::RegionInfo> GetRegionInfo()
|
||||||
{
|
{
|
||||||
std::vector<fair::mq::RegionInfo> result;
|
std::vector<fair::mq::RegionInfo> result;
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
std::map<uint64_t, RegionConfig> regionCfgs;
|
||||||
|
|
||||||
for (const auto& e : *fShmSegments) {
|
{
|
||||||
// make sure any segments in the session are found
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
GetSegment(e.first);
|
|
||||||
try {
|
for (const auto& [segmentId, segmentInfo] : *fShmSegments) {
|
||||||
|
// make sure any segments in the session are found
|
||||||
|
GetSegment(segmentId);
|
||||||
|
try {
|
||||||
|
fair::mq::RegionInfo info;
|
||||||
|
info.managed = true;
|
||||||
|
info.id = segmentId;
|
||||||
|
info.event = RegionEvent::created;
|
||||||
|
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(segmentId));
|
||||||
|
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(segmentId));
|
||||||
|
result.push_back(info);
|
||||||
|
} catch (const std::out_of_range& oor) {
|
||||||
|
LOG(error) << "could not find segment with id " << segmentId;
|
||||||
|
LOG(error) << oor.what();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const auto& [regionId, regionInfo] : *fShmRegions) {
|
||||||
fair::mq::RegionInfo info;
|
fair::mq::RegionInfo info;
|
||||||
info.managed = true;
|
info.managed = false;
|
||||||
info.id = e.first;
|
info.id = regionId;
|
||||||
info.event = RegionEvent::created;
|
info.flags = regionInfo.fUserFlags;
|
||||||
info.ptr = boost::apply_visitor(SegmentAddress(), fSegments.at(e.first));
|
info.event = regionInfo.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
||||||
info.size = boost::apply_visitor(SegmentSize(), fSegments.at(e.first));
|
if (info.event == RegionEvent::created) {
|
||||||
|
RegionConfig cfg;
|
||||||
|
cfg.id = info.id;
|
||||||
|
cfg.creationFlags = regionInfo.fCreationFlags;
|
||||||
|
cfg.path = regionInfo.fPath.c_str();
|
||||||
|
regionCfgs.emplace(info.id, cfg);
|
||||||
|
// fill the ptr+size info after shmLock is released, to avoid constructing local region under it
|
||||||
|
} else {
|
||||||
|
info.ptr = nullptr;
|
||||||
|
info.size = 0;
|
||||||
|
}
|
||||||
result.push_back(info);
|
result.push_back(info);
|
||||||
} catch (const std::out_of_range& oor) {
|
|
||||||
LOG(error) << "could not find segment with id " << e.first;
|
|
||||||
LOG(error) << oor.what();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& e : *fShmRegions) {
|
// do another iteration outside of shm lock, to fill ptr+size of unmanaged regions
|
||||||
fair::mq::RegionInfo info;
|
for (auto& info : result) {
|
||||||
info.managed = false;
|
if (!info.managed && info.event == RegionEvent::created) {
|
||||||
info.id = e.first;
|
auto cfgIt = regionCfgs.find(info.id);
|
||||||
info.flags = e.second.fUserFlags;
|
if (cfgIt != regionCfgs.end()) {
|
||||||
info.event = e.second.fDestroyed ? RegionEvent::destroyed : RegionEvent::created;
|
UnmanagedRegion* region = nullptr;
|
||||||
if (info.event == RegionEvent::created) {
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
auto region = GetRegionUnsafe(info.id, shmLock);
|
auto it = fRegions.find(info.id);
|
||||||
if (region) {
|
if (it != fRegions.end()) {
|
||||||
|
region = it->second.get();
|
||||||
|
} else {
|
||||||
|
auto r = fRegions.emplace(cfgIt->first, std::make_unique<UnmanagedRegion>(fShmId, 0, true, cfgIt->second));
|
||||||
|
region = r.first->second.get();
|
||||||
|
region->InitializeQueues();
|
||||||
|
region->StartAckSender();
|
||||||
|
}
|
||||||
|
|
||||||
info.ptr = region->GetData();
|
info.ptr = region->GetData();
|
||||||
info.size = region->GetSize();
|
info.size = region->GetSize();
|
||||||
} else {
|
} else {
|
||||||
throw std::runtime_error(tools::ToString("GetRegionInfo() could not get region with id '", info.id, "'"));
|
info.ptr = nullptr;
|
||||||
|
info.size = 0;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
info.ptr = nullptr;
|
|
||||||
info.size = 0;
|
|
||||||
}
|
}
|
||||||
result.push_back(info);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
@@ -195,7 +195,7 @@ class Message final : public fair::mq::Message
|
|||||||
fLocalPtr = nullptr;
|
fLocalPtr = nullptr;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
|
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
|
||||||
if (fRegionPtr) {
|
if (fRegionPtr) {
|
||||||
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle;
|
fLocalPtr = reinterpret_cast<char*>(fRegionPtr->GetData()) + fMeta.fHandle;
|
||||||
} else {
|
} else {
|
||||||
@@ -365,7 +365,7 @@ class Message final : public fair::mq::Message
|
|||||||
void ReleaseUnmanagedRegionBlock()
|
void ReleaseUnmanagedRegionBlock()
|
||||||
{
|
{
|
||||||
if (!fRegionPtr) {
|
if (!fRegionPtr) {
|
||||||
fRegionPtr = fManager.GetRegion(fMeta.fRegionId);
|
fRegionPtr = fManager.GetRegionFromCache(fMeta.fRegionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fRegionPtr) {
|
if (fRegionPtr) {
|
||||||
|
Reference in New Issue
Block a user