mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-17 02:21:47 +00:00
Compare commits
4 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
fa0bf96eb2 | ||
|
29827f0426 | ||
|
8efe7adf0e | ||
|
b747a8787c |
@@ -228,6 +228,8 @@ class Manager
|
|||||||
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||||
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
|
||||||
|
|
||||||
|
bool createdSegment = false;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
std::string segmentName("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId));
|
std::string segmentName("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId));
|
||||||
auto it = fShmSegments->find(fSegmentId);
|
auto it = fShmSegments->find(fSegmentId);
|
||||||
@@ -246,6 +248,7 @@ class Manager
|
|||||||
if (zeroSegmentOnCreation) {
|
if (zeroSegmentOnCreation) {
|
||||||
ZeroSegment(fSegmentId);
|
ZeroSegment(fSegmentId);
|
||||||
}
|
}
|
||||||
|
createdSegment = true;
|
||||||
} else {
|
} else {
|
||||||
// found segment with the given id, opening
|
// found segment with the given id, opening
|
||||||
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
|
||||||
@@ -278,7 +281,9 @@ class Manager
|
|||||||
ZeroSegment(fSegmentId);
|
ZeroSegment(fSegmentId);
|
||||||
}
|
}
|
||||||
|
|
||||||
(fEventCounter->fCount)++;
|
if (createdSegment) {
|
||||||
|
(fEventCounter->fCount)++;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef FAIRMQ_DEBUG_MODE
|
#ifdef FAIRMQ_DEBUG_MODE
|
||||||
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
|
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
|
||||||
@@ -360,7 +365,7 @@ class Manager
|
|||||||
}
|
}
|
||||||
bool Interrupted() { return fInterrupted.load(); }
|
bool Interrupted() { return fInterrupted.load(); }
|
||||||
|
|
||||||
std::pair<UnmanagedRegion*, uint16_t> CreateRegion(const size_t size,
|
std::pair<UnmanagedRegion*, uint16_t> CreateRegion(size_t size,
|
||||||
RegionCallback callback,
|
RegionCallback callback,
|
||||||
RegionBulkCallback bulkCallback,
|
RegionBulkCallback bulkCallback,
|
||||||
RegionConfig cfg)
|
RegionConfig cfg)
|
||||||
@@ -370,7 +375,7 @@ class Manager
|
|||||||
std::pair<UnmanagedRegion*, uint16_t> result;
|
std::pair<UnmanagedRegion*, uint16_t> result;
|
||||||
|
|
||||||
{
|
{
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
|
|
||||||
if (!cfg.id.has_value()) {
|
if (!cfg.id.has_value()) {
|
||||||
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
|
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
|
||||||
@@ -390,23 +395,28 @@ class Manager
|
|||||||
|
|
||||||
const uint16_t id = cfg.id.value();
|
const uint16_t id = cfg.id.value();
|
||||||
|
|
||||||
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
UnmanagedRegion* region = nullptr;
|
||||||
bool newRegionCreated = res.second;
|
bool newRegionCreated = false;
|
||||||
UnmanagedRegion& region = *(res.first->second);
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
|
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
|
||||||
|
newRegionCreated = res.second;
|
||||||
|
region = res.first->second.get();
|
||||||
|
}
|
||||||
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
|
||||||
|
|
||||||
if (!newRegionCreated) {
|
if (!newRegionCreated) {
|
||||||
region.fRemote = false; // TODO: this should be more clear, refactor it.
|
region->fRemote = false; // TODO: this should be more clear, refactor it.
|
||||||
}
|
}
|
||||||
|
|
||||||
// start ack receiver only if a callback has been provided.
|
// start ack receiver only if a callback has been provided.
|
||||||
if (callback || bulkCallback) {
|
if (callback || bulkCallback) {
|
||||||
region.SetCallbacks(callback, bulkCallback);
|
region->SetCallbacks(callback, bulkCallback);
|
||||||
region.InitializeQueues();
|
region->InitializeQueues();
|
||||||
region.StartAckSender();
|
region->StartAckSender();
|
||||||
region.StartAckReceiver();
|
region->StartAckReceiver();
|
||||||
}
|
}
|
||||||
result.first = &(region);
|
result.first = region;
|
||||||
result.second = id;
|
result.second = id;
|
||||||
}
|
}
|
||||||
fRegionsGen += 1; // signal TL cache invalidation
|
fRegionsGen += 1; // signal TL cache invalidation
|
||||||
@@ -419,7 +429,7 @@ class Manager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegion* GetRegion(const uint16_t id)
|
UnmanagedRegion* GetRegion(uint16_t id)
|
||||||
{
|
{
|
||||||
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
|
||||||
const auto &lTlCache = fTlRegionCache;
|
const auto &lTlCache = fTlRegionCache;
|
||||||
@@ -439,13 +449,14 @@ class Manager
|
|||||||
fTlRegionCache.fRegionsTLCache.clear();
|
fTlRegionCache.fRegionsTLCache.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
auto* lRegion = GetRegionUnsafe(id, shmLock);
|
auto* lRegion = GetRegionUnsafe(id, shmLock);
|
||||||
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
|
||||||
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
|
||||||
return lRegion;
|
return lRegion;
|
||||||
}
|
}
|
||||||
|
|
||||||
UnmanagedRegion* GetRegionUnsafe(const uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
|
UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
|
||||||
{
|
{
|
||||||
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
// remote region could actually be a local one if a message originates from this device (has been sent out and returned)
|
||||||
auto it = fRegions.find(id);
|
auto it = fRegions.find(id);
|
||||||
@@ -479,12 +490,13 @@ class Manager
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void RemoveRegion(const uint16_t id)
|
void RemoveRegion(uint16_t id)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
|
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
|
||||||
fRegions.at(id)->StopAcks();
|
fRegions.at(id)->StopAcks();
|
||||||
{
|
{
|
||||||
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx);
|
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
|
||||||
if (fRegions.at(id)->RemoveOnDestruction()) {
|
if (fRegions.at(id)->RemoveOnDestruction()) {
|
||||||
fShmRegions->at(id).fDestroyed = true;
|
fShmRegions->at(id).fDestroyed = true;
|
||||||
(fEventCounter->fCount)++;
|
(fEventCounter->fCount)++;
|
||||||
@@ -800,6 +812,7 @@ class Manager
|
|||||||
VoidAlloc fShmVoidAlloc;
|
VoidAlloc fShmVoidAlloc;
|
||||||
boost::interprocess::interprocess_mutex* fShmMtx;
|
boost::interprocess::interprocess_mutex* fShmMtx;
|
||||||
|
|
||||||
|
std::mutex fLocalRegionsMtx;
|
||||||
std::mutex fRegionEventsMtx;
|
std::mutex fRegionEventsMtx;
|
||||||
std::condition_variable fRegionEventsCV;
|
std::condition_variable fRegionEventsCV;
|
||||||
std::thread fRegionEventThread;
|
std::thread fRegionEventThread;
|
||||||
|
@@ -99,16 +99,25 @@ struct UnmanagedRegion
|
|||||||
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
|
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
|
||||||
} else {
|
} else {
|
||||||
try {
|
try {
|
||||||
fShmemObject = shared_memory_object(open_or_create, fName.c_str(), read_write);
|
// if opening fails, create
|
||||||
if (size != 0) {
|
try {
|
||||||
|
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
|
||||||
|
} catch (interprocess_exception& e) {
|
||||||
|
LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what() << ", creating...";
|
||||||
|
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
|
||||||
fShmemObject.truncate(size);
|
fShmemObject.truncate(size);
|
||||||
}
|
}
|
||||||
} catch (interprocess_exception& e) {
|
} catch (interprocess_exception& e) {
|
||||||
LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags);
|
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags);
|
||||||
|
if (size != 0 && size != fRegion.get_size()) {
|
||||||
|
LOG(error) << "Created/opened region size (" << fRegion.get_size() << ") does not match configured size (" << size << ")";
|
||||||
|
throw TransportError(tools::ToString("Created/opened region size (", fRegion.get_size(), ") does not match configured size (", size, ")"));
|
||||||
|
}
|
||||||
} catch (interprocess_exception& e) {
|
} catch (interprocess_exception& e) {
|
||||||
LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
|
||||||
throw;
|
throw;
|
||||||
|
@@ -27,7 +27,11 @@ inline bool Bind(void* socket, const std::string& address, const std::string& id
|
|||||||
if (errno == EADDRINUSE) {
|
if (errno == EADDRINUSE) {
|
||||||
// do not print error in this case, this is handled upstream in case no
|
// do not print error in this case, this is handled upstream in case no
|
||||||
// connection could be established after trying a number of random ports from a range.
|
// connection could be established after trying a number of random ports from a range.
|
||||||
return false;
|
size_t protocolPos = address.find(':');
|
||||||
|
std::string protocol = address.substr(0, protocolPos);
|
||||||
|
if (protocol == "tcp") {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
} else if (errno == EACCES) {
|
} else if (errno == EACCES) {
|
||||||
// check if TCP port 1 was given, if yes then it will be handeled upstream, print debug only
|
// check if TCP port 1 was given, if yes then it will be handeled upstream, print debug only
|
||||||
size_t protocolPos = address.find(':');
|
size_t protocolPos = address.find(':');
|
||||||
|
@@ -25,6 +25,26 @@ namespace
|
|||||||
using namespace std;
|
using namespace std;
|
||||||
using namespace fair::mq;
|
using namespace fair::mq;
|
||||||
|
|
||||||
|
void RegionsSizeMismatch()
|
||||||
|
{
|
||||||
|
size_t session = tools::UuidHash();
|
||||||
|
|
||||||
|
ProgOptions config;
|
||||||
|
config.SetProperty<string>("session", to_string(session));
|
||||||
|
config.SetProperty<size_t>("shm-segment-size", 100000000);
|
||||||
|
|
||||||
|
auto factory = TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config);
|
||||||
|
|
||||||
|
fair::mq::RegionConfig rCfg;
|
||||||
|
rCfg.id = 10;
|
||||||
|
UnmanagedRegionPtr region1 = nullptr;
|
||||||
|
ASSERT_NO_THROW(region1 = factory->CreateUnmanagedRegion(10000, [](void*, size_t, void*) {}, rCfg));
|
||||||
|
ASSERT_NE(region1, nullptr);
|
||||||
|
UnmanagedRegionPtr region2 = nullptr;
|
||||||
|
ASSERT_THROW(region2 = factory->CreateUnmanagedRegion(16000, [](void*, size_t, void*) {}, rCfg), fair::mq::TransportError);
|
||||||
|
ASSERT_EQ(region2, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
void RegionsCache(const string& transport, const string& address)
|
void RegionsCache(const string& transport, const string& address)
|
||||||
{
|
{
|
||||||
size_t session1 = tools::UuidHash();
|
size_t session1 = tools::UuidHash();
|
||||||
@@ -226,6 +246,11 @@ void RegionCallbacks(const string& transport, const string& _address)
|
|||||||
LOG(info) << "2 done.";
|
LOG(info) << "2 done.";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(RegionsSizeMismatch, shmem)
|
||||||
|
{
|
||||||
|
RegionsSizeMismatch();
|
||||||
|
}
|
||||||
|
|
||||||
TEST(Cache, zeromq)
|
TEST(Cache, zeromq)
|
||||||
{
|
{
|
||||||
RegionsCache("zeromq", "ipc://test_region_cache");
|
RegionsCache("zeromq", "ipc://test_region_cache");
|
||||||
|
Reference in New Issue
Block a user