From ac661dfd6383a39960ca66dd4561f9c17c475d95 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 4 Oct 2022 12:23:00 +0200 Subject: [PATCH] Add test for externally (outside the session) created shmem region --- test/region/_region.cxx | 128 +++++++++++++++++++++++++++++++++------- 1 file changed, 108 insertions(+), 20 deletions(-) diff --git a/test/region/_region.cxx b/test/region/_region.cxx index 59f5a92d..5d2c237b 100644 --- a/test/region/_region.cxx +++ b/test/region/_region.cxx @@ -6,6 +6,11 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ +#include +#include +#include +#include + #include #include #include @@ -16,8 +21,12 @@ #include +#include +#include #include // make_unique #include +#include // pair +#include // pair namespace { @@ -25,6 +34,34 @@ namespace using namespace std; using namespace fair::mq; +struct ShmOwner +{ + ShmOwner(const string& sessionId, + const vector>& segments, + const vector>& regions) + : fShmId(fair::mq::shmem::makeShmIdStr(sessionId)) + { + LOG(info) << "ShmOwner: creating segments"; + for (auto [id, size] : segments) { + fSegments.emplace(id, fair::mq::shmem::Segment(fShmId, id, size, fair::mq::shmem::rbTreeBestFit)); + } + LOG(info) << "ShmOwner: creating regions"; + for (auto [id, size] : regions) { + fRegions.emplace(id, make_unique(fShmId, id, size)); + } + } + + ~ShmOwner() + { + LOG(info) << "ShmOwner: cleaning up"; + fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{fShmId}); + } + + string fShmId; + map fSegments; + map> fRegions; +}; + void RegionsSizeMismatch() { size_t session = tools::UuidHash(); @@ -109,31 +146,69 @@ void RegionsCache(const string& transport, const string& _address) } } -void RegionEventSubscriptions(const string& transport) +void RegionEventSubscriptions(const string& transport, bool external) { + fair::Logger::SetConsoleSeverity(fair::Severity::debug); + + unique_ptr shmOwner = nullptr; + size_t session{tools::UuidHash()}; + constexpr int sSize = 100000000; + constexpr int r1Size = 1000000; + constexpr int r2Size = 5000000; + constexpr uint16_t sId = 0; + constexpr uint16_t r1id = 100; + constexpr uint16_t r2id = 101; + + if (external) { + shmOwner = make_unique( + to_string(session), + vector>{ { sId, sSize } }, + vector>{ { r1id, r1Size }, { r2id, r2Size } } + ); + } + ProgOptions config; config.SetProperty("session", to_string(session)); - config.SetProperty("shm-segment-size", 100000000); + config.SetProperty("shm-segment-size", sSize); + if (external) { + config.SetProperty("shm-no-cleanup", true); + config.SetProperty("shm-monitor", false); + } auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); - constexpr int size1 = 1000000; - constexpr int size2 = 5000000; constexpr int64_t userFlags = 12345; tools::Semaphore blocker; { - auto region1 = factory->CreateUnmanagedRegion(size1, [](void*, size_t, void*) {}); + fair::mq::RegionConfig r1Cfg; + if (external) { + r1Cfg.id = r1id; + r1Cfg.removeOnDestruction = false; + } + auto region1 = factory->CreateUnmanagedRegion(r1Size, [](void*, size_t, void*) {}, r1Cfg); void* ptr1 = region1->GetData(); uint64_t id1 = region1->GetId(); - ASSERT_EQ(region1->GetSize(), size1); + if (external) { + ASSERT_EQ(id1, r1id); + } + ASSERT_EQ(region1->GetSize(), r1Size); - auto region2 = factory->CreateUnmanagedRegion(size2, userFlags, [](void*, size_t, void*) {}); + fair::mq::RegionConfig r2Cfg; + r2Cfg.userFlags = userFlags; + if (external) { + r2Cfg.id = r2id; + r2Cfg.removeOnDestruction = false; + } + auto region2 = factory->CreateUnmanagedRegion(r2Size, [](void*, size_t, void*) {}, r2Cfg); void* ptr2 = region2->GetData(); uint64_t id2 = region2->GetId(); - ASSERT_EQ(region2->GetSize(), size2); + if (external) { + ASSERT_EQ(id2, r2id); + } + ASSERT_EQ(region2->GetSize(), r2Size); ASSERT_EQ(factory->SubscribedToRegionEvents(), false); factory->SubscribeToRegionEvents([&, id1, id2, ptr1, ptr2](RegionInfo info) { @@ -145,13 +220,15 @@ void RegionEventSubscriptions(const string& transport) << ", flags: " << info.flags; if (info.event == RegionEvent::created) { if (info.id == id1) { - ASSERT_EQ(info.size, size1); + ASSERT_EQ(info.size, r1Size); ASSERT_EQ(info.ptr, ptr1); blocker.Signal(); } else if (info.id == id2) { - ASSERT_EQ(info.size, size2); + ASSERT_EQ(info.size, r2Size); ASSERT_EQ(info.ptr, ptr2); - ASSERT_EQ(info.flags, userFlags); + if (!external) { + ASSERT_EQ(info.flags, userFlags); + } blocker.Signal(); } } else if (info.event == RegionEvent::destroyed) { @@ -171,10 +248,12 @@ void RegionEventSubscriptions(const string& transport) LOG(info) << "2 done."; } - blocker.Wait(); - LOG(info) << "3 done."; - blocker.Wait(); - LOG(info) << "4 done."; + if (!external) { + blocker.Wait(); + LOG(info) << "3 done."; + blocker.Wait(); + LOG(info) << "4 done."; + } LOG(info) << "All done."; factory->UnsubscribeFromRegionEvents(); @@ -186,9 +265,13 @@ void RegionCallbacks(const string& transport, const string& _address) size_t session(tools::UuidHash()); std::string address(tools::ToString(_address, "_", transport)); + constexpr size_t sSize = 100000000; + constexpr size_t r1Size = 2000000; + constexpr size_t r2Size = 3000000; + ProgOptions config; config.SetProperty("session", to_string(session)); - config.SetProperty("shm-segment-size", 100000000); + config.SetProperty("shm-segment-size", sSize); auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config); @@ -207,7 +290,7 @@ void RegionCallbacks(const string& transport, const string& _address) void* ptr2 = nullptr; size_t size2 = 200; - auto region1 = factory->CreateUnmanagedRegion(2000000, [&](void* ptr, size_t size, void* hint) { + auto region1 = factory->CreateUnmanagedRegion(r1Size, [&](void* ptr, size_t size, void* hint) { ASSERT_EQ(ptr, ptr1); ASSERT_EQ(size, size1); ASSERT_EQ(hint, intPtr1.get()); @@ -216,7 +299,7 @@ void RegionCallbacks(const string& transport, const string& _address) }); ptr1 = region1->GetData(); - auto region2 = factory->CreateUnmanagedRegion(3000000, [&](const std::vector& blocks) { + auto region2 = factory->CreateUnmanagedRegion(r2Size, [&](const std::vector& blocks) { ASSERT_EQ(blocks.size(), 1); ASSERT_EQ(blocks.at(0).ptr, ptr2); ASSERT_EQ(blocks.at(0).size, size2); @@ -264,12 +347,12 @@ TEST(Cache, shmem) TEST(EventSubscriptions, zeromq) { - RegionEventSubscriptions("zeromq"); + RegionEventSubscriptions("zeromq", false); } TEST(EventSubscriptions, shmem) { - RegionEventSubscriptions("shmem"); + RegionEventSubscriptions("shmem", false); } TEST(Callbacks, zeromq) @@ -282,4 +365,9 @@ TEST(Callbacks, shmem) RegionCallbacks("shmem", "ipc://test_region_callbacks"); } +TEST(EventSubscriptionsExternalRegion, shmem) +{ + RegionEventSubscriptions("shmem", true); +} + } // namespace