Compare commits

..

4 Commits

Author SHA1 Message Date
Giulio Eulisse
fa0bf96eb2 Skip error message only for tcp 2022-02-28 19:13:00 +01:00
Alexey Rybalchenko
29827f0426 Shm: bring back thread-safety for fRegions (intra-process) 2022-02-16 23:17:04 +01:00
Alexey Rybalchenko
8efe7adf0e Shm: fix number of region events 2022-02-16 23:17:04 +01:00
Alexey Rybalchenko
b747a8787c shm: check region size when opening existing 2022-02-08 09:09:25 +01:00
4 changed files with 70 additions and 19 deletions

View File

@@ -228,6 +228,8 @@ class Manager
fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc); fShmSegments = fManagementSegment.find_or_construct<Uint16SegmentInfoHashMap>(unique_instance)(fShmVoidAlloc);
fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc); fShmRegions = fManagementSegment.find_or_construct<Uint16RegionInfoHashMap>(unique_instance)(fShmVoidAlloc);
bool createdSegment = false;
try { try {
std::string segmentName("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId)); std::string segmentName("fmq_" + fShmId + "_m_" + std::to_string(fSegmentId));
auto it = fShmSegments->find(fSegmentId); auto it = fShmSegments->find(fSegmentId);
@@ -246,6 +248,7 @@ class Manager
if (zeroSegmentOnCreation) { if (zeroSegmentOnCreation) {
ZeroSegment(fSegmentId); ZeroSegment(fSegmentId);
} }
createdSegment = true;
} else { } else {
// found segment with the given id, opening // found segment with the given id, opening
if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) { if (it->second.fAllocationAlgorithm == AllocationAlgorithm::rbtree_best_fit) {
@@ -278,7 +281,9 @@ class Manager
ZeroSegment(fSegmentId); ZeroSegment(fSegmentId);
} }
(fEventCounter->fCount)++; if (createdSegment) {
(fEventCounter->fCount)++;
}
#ifdef FAIRMQ_DEBUG_MODE #ifdef FAIRMQ_DEBUG_MODE
fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc); fMsgDebug = fManagementSegment.find_or_construct<Uint16MsgDebugMapHashMap>(unique_instance)(fShmVoidAlloc);
@@ -360,7 +365,7 @@ class Manager
} }
bool Interrupted() { return fInterrupted.load(); } bool Interrupted() { return fInterrupted.load(); }
std::pair<UnmanagedRegion*, uint16_t> CreateRegion(const size_t size, std::pair<UnmanagedRegion*, uint16_t> CreateRegion(size_t size,
RegionCallback callback, RegionCallback callback,
RegionBulkCallback bulkCallback, RegionBulkCallback bulkCallback,
RegionConfig cfg) RegionConfig cfg)
@@ -370,7 +375,7 @@ class Manager
std::pair<UnmanagedRegion*, uint16_t> result; std::pair<UnmanagedRegion*, uint16_t> result;
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
if (!cfg.id.has_value()) { if (!cfg.id.has_value()) {
RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first; RegionCounter* rc = fManagementSegment.find<RegionCounter>(unique_instance).first;
@@ -390,23 +395,28 @@ class Manager
const uint16_t id = cfg.id.value(); const uint16_t id = cfg.id.value();
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg)); UnmanagedRegion* region = nullptr;
bool newRegionCreated = res.second; bool newRegionCreated = false;
UnmanagedRegion& region = *(res.first->second); {
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg));
newRegionCreated = res.second;
region = res.first->second.get();
}
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
if (!newRegionCreated) { if (!newRegionCreated) {
region.fRemote = false; // TODO: this should be more clear, refactor it. region->fRemote = false; // TODO: this should be more clear, refactor it.
} }
// start ack receiver only if a callback has been provided. // start ack receiver only if a callback has been provided.
if (callback || bulkCallback) { if (callback || bulkCallback) {
region.SetCallbacks(callback, bulkCallback); region->SetCallbacks(callback, bulkCallback);
region.InitializeQueues(); region->InitializeQueues();
region.StartAckSender(); region->StartAckSender();
region.StartAckReceiver(); region->StartAckReceiver();
} }
result.first = &(region); result.first = region;
result.second = id; result.second = id;
} }
fRegionsGen += 1; // signal TL cache invalidation fRegionsGen += 1; // signal TL cache invalidation
@@ -419,7 +429,7 @@ class Manager
} }
} }
UnmanagedRegion* GetRegion(const uint16_t id) UnmanagedRegion* GetRegion(uint16_t id)
{ {
// NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path // NOTE: gcc optimizations. Prevent loading tls addresses many times in the fast path
const auto &lTlCache = fTlRegionCache; const auto &lTlCache = fTlRegionCache;
@@ -439,13 +449,14 @@ class Manager
fTlRegionCache.fRegionsTLCache.clear(); fTlRegionCache.fRegionsTLCache.clear();
} }
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto* lRegion = GetRegionUnsafe(id, shmLock); auto* lRegion = GetRegionUnsafe(id, shmLock);
fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64)); fTlRegionCache.fRegionsTLCache.emplace_back(std::make_tuple(lRegion, id, fShmId64));
fTlRegionCache.fRegionsTLCacheGen = fRegionsGen; fTlRegionCache.fRegionsTLCacheGen = fRegionsGen;
return lRegion; return lRegion;
} }
UnmanagedRegion* GetRegionUnsafe(const uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock) UnmanagedRegion* GetRegionUnsafe(uint16_t id, boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex>& lockedShmLock)
{ {
// remote region could actually be a local one if a message originates from this device (has been sent out and returned) // remote region could actually be a local one if a message originates from this device (has been sent out and returned)
auto it = fRegions.find(id); auto it = fRegions.find(id);
@@ -479,12 +490,13 @@ class Manager
} }
} }
void RemoveRegion(const uint16_t id) void RemoveRegion(uint16_t id)
{ {
try { try {
std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
fRegions.at(id)->StopAcks(); fRegions.at(id)->StopAcks();
{ {
boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> lock(*fShmMtx); boost::interprocess::scoped_lock<boost::interprocess::interprocess_mutex> shmLock(*fShmMtx);
if (fRegions.at(id)->RemoveOnDestruction()) { if (fRegions.at(id)->RemoveOnDestruction()) {
fShmRegions->at(id).fDestroyed = true; fShmRegions->at(id).fDestroyed = true;
(fEventCounter->fCount)++; (fEventCounter->fCount)++;
@@ -800,6 +812,7 @@ class Manager
VoidAlloc fShmVoidAlloc; VoidAlloc fShmVoidAlloc;
boost::interprocess::interprocess_mutex* fShmMtx; boost::interprocess::interprocess_mutex* fShmMtx;
std::mutex fLocalRegionsMtx;
std::mutex fRegionEventsMtx; std::mutex fRegionEventsMtx;
std::condition_variable fRegionEventsCV; std::condition_variable fRegionEventsCV;
std::thread fRegionEventThread; std::thread fRegionEventThread;

View File

@@ -99,16 +99,25 @@ struct UnmanagedRegion
fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags); fRegion = mapped_region(fFileMapping, read_write, 0, size, 0, cfg.creationFlags);
} else { } else {
try { try {
fShmemObject = shared_memory_object(open_or_create, fName.c_str(), read_write); // if opening fails, create
if (size != 0) { try {
fShmemObject = shared_memory_object(open_only, fName.c_str(), read_write);
} catch (interprocess_exception& e) {
LOG(debug) << "Could not open " << (remote ? "remote" : "local") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what() << ", creating...";
fShmemObject = shared_memory_object(create_only, fName.c_str(), read_write);
fShmemObject.truncate(size); fShmemObject.truncate(size);
} }
} catch (interprocess_exception& e) { } catch (interprocess_exception& e) {
LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); LOG(error) << "Failed " << (remote ? "opening" : "creating") << " shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
throw; throw;
} }
try { try {
fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags); fRegion = mapped_region(fShmemObject, read_write, 0, 0, 0, cfg.creationFlags);
if (size != 0 && size != fRegion.get_size()) {
LOG(error) << "Created/opened region size (" << fRegion.get_size() << ") does not match configured size (" << size << ")";
throw TransportError(tools::ToString("Created/opened region size (", fRegion.get_size(), ") does not match configured size (", size, ")"));
}
} catch (interprocess_exception& e) { } catch (interprocess_exception& e) {
LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what(); LOG(error) << "Failed mapping shared_memory_object for region id '" << cfg.id.value() << "': " << e.what();
throw; throw;

View File

@@ -27,7 +27,11 @@ inline bool Bind(void* socket, const std::string& address, const std::string& id
if (errno == EADDRINUSE) { if (errno == EADDRINUSE) {
// do not print error in this case, this is handled upstream in case no // do not print error in this case, this is handled upstream in case no
// connection could be established after trying a number of random ports from a range. // connection could be established after trying a number of random ports from a range.
return false; size_t protocolPos = address.find(':');
std::string protocol = address.substr(0, protocolPos);
if (protocol == "tcp") {
return false;
}
} else if (errno == EACCES) { } else if (errno == EACCES) {
// check if TCP port 1 was given, if yes then it will be handeled upstream, print debug only // check if TCP port 1 was given, if yes then it will be handeled upstream, print debug only
size_t protocolPos = address.find(':'); size_t protocolPos = address.find(':');

View File

@@ -25,6 +25,26 @@ namespace
using namespace std; using namespace std;
using namespace fair::mq; using namespace fair::mq;
void RegionsSizeMismatch()
{
size_t session = tools::UuidHash();
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 100000000);
auto factory = TransportFactory::CreateTransportFactory("shmem", tools::Uuid(), &config);
fair::mq::RegionConfig rCfg;
rCfg.id = 10;
UnmanagedRegionPtr region1 = nullptr;
ASSERT_NO_THROW(region1 = factory->CreateUnmanagedRegion(10000, [](void*, size_t, void*) {}, rCfg));
ASSERT_NE(region1, nullptr);
UnmanagedRegionPtr region2 = nullptr;
ASSERT_THROW(region2 = factory->CreateUnmanagedRegion(16000, [](void*, size_t, void*) {}, rCfg), fair::mq::TransportError);
ASSERT_EQ(region2, nullptr);
}
void RegionsCache(const string& transport, const string& address) void RegionsCache(const string& transport, const string& address)
{ {
size_t session1 = tools::UuidHash(); size_t session1 = tools::UuidHash();
@@ -226,6 +246,11 @@ void RegionCallbacks(const string& transport, const string& _address)
LOG(info) << "2 done."; LOG(info) << "2 done.";
} }
TEST(RegionsSizeMismatch, shmem)
{
RegionsSizeMismatch();
}
TEST(Cache, zeromq) TEST(Cache, zeromq)
{ {
RegionsCache("zeromq", "ipc://test_region_cache"); RegionsCache("zeromq", "ipc://test_region_cache");