Add test for externally (outside the session) created shmem region

This commit is contained in:
Alexey Rybalchenko 2022-10-04 12:23:00 +02:00
parent 42ce691f57
commit 16275db125

View File

@ -6,6 +6,11 @@
* copied verbatim in the file "LICENSE" *
********************************************************************************/
#include <fairmq/shmem/Common.h>
#include <fairmq/shmem/Monitor.h>
#include <fairmq/shmem/Segment.h>
#include <fairmq/shmem/UnmanagedRegion.h>
#include <fairmq/TransportFactory.h>
#include <fairmq/ProgOptions.h>
#include <fairmq/tools/Unique.h>
@ -16,8 +21,12 @@
#include <gtest/gtest.h>
#include <cstdint>
#include <map>
#include <memory> // make_unique
#include <string>
#include <utility> // pair
#include <vector> // pair
namespace
{
@ -25,6 +34,34 @@ namespace
using namespace std;
using namespace fair::mq;
struct ShmOwner
{
ShmOwner(const string& sessionId,
const vector<pair<uint16_t, size_t>>& segments,
const vector<pair<uint16_t, size_t>>& regions)
: fShmId(fair::mq::shmem::makeShmIdStr(sessionId))
{
LOG(info) << "ShmOwner: creating segments";
for (auto [id, size] : segments) {
fSegments.emplace(id, fair::mq::shmem::Segment(fShmId, id, size, fair::mq::shmem::rbTreeBestFit));
}
LOG(info) << "ShmOwner: creating regions";
for (auto [id, size] : regions) {
fRegions.emplace(id, make_unique<fair::mq::shmem::UnmanagedRegion>(fShmId, id, size));
}
}
~ShmOwner()
{
LOG(info) << "ShmOwner: cleaning up";
fair::mq::shmem::Monitor::Cleanup(fair::mq::shmem::ShmId{fShmId});
}
string fShmId;
map<uint16_t, fair::mq::shmem::Segment> fSegments;
map<uint16_t, unique_ptr<fair::mq::shmem::UnmanagedRegion>> fRegions;
};
void RegionsSizeMismatch()
{
size_t session = tools::UuidHash();
@ -108,31 +145,69 @@ void RegionsCache(const string& transport, const string& address)
}
}
void RegionEventSubscriptions(const string& transport)
void RegionEventSubscriptions(const string& transport, bool external)
{
fair::Logger::SetConsoleSeverity(fair::Severity::debug);
unique_ptr<ShmOwner> shmOwner = nullptr;
size_t session{tools::UuidHash()};
constexpr int sSize = 100000000;
constexpr int r1Size = 1000000;
constexpr int r2Size = 5000000;
constexpr uint16_t sId = 0;
constexpr uint16_t r1id = 100;
constexpr uint16_t r2id = 101;
if (external) {
shmOwner = make_unique<ShmOwner>(
to_string(session),
vector<pair<uint16_t, size_t>>{ { sId, sSize } },
vector<pair<uint16_t, size_t>>{ { r1id, r1Size }, { r2id, r2Size } }
);
}
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<size_t>("shm-segment-size", sSize);
if (external) {
config.SetProperty<bool>("shm-no-cleanup", true);
config.SetProperty<bool>("shm-monitor", false);
}
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
constexpr int size1 = 1000000;
constexpr int size2 = 5000000;
constexpr int64_t userFlags = 12345;
tools::Semaphore blocker;
{
auto region1 = factory->CreateUnmanagedRegion(size1, [](void*, size_t, void*) {});
fair::mq::RegionConfig r1Cfg;
if (external) {
r1Cfg.id = r1id;
r1Cfg.removeOnDestruction = false;
}
auto region1 = factory->CreateUnmanagedRegion(r1Size, [](void*, size_t, void*) {}, r1Cfg);
void* ptr1 = region1->GetData();
uint64_t id1 = region1->GetId();
ASSERT_EQ(region1->GetSize(), size1);
if (external) {
ASSERT_EQ(id1, r1id);
}
ASSERT_EQ(region1->GetSize(), r1Size);
auto region2 = factory->CreateUnmanagedRegion(size2, userFlags, [](void*, size_t, void*) {});
fair::mq::RegionConfig r2Cfg;
r2Cfg.userFlags = userFlags;
if (external) {
r2Cfg.id = r2id;
r2Cfg.removeOnDestruction = false;
}
auto region2 = factory->CreateUnmanagedRegion(r2Size, [](void*, size_t, void*) {}, r2Cfg);
void* ptr2 = region2->GetData();
uint64_t id2 = region2->GetId();
ASSERT_EQ(region2->GetSize(), size2);
if (external) {
ASSERT_EQ(id2, r2id);
}
ASSERT_EQ(region2->GetSize(), r2Size);
ASSERT_EQ(factory->SubscribedToRegionEvents(), false);
factory->SubscribeToRegionEvents([&, id1, id2, ptr1, ptr2](RegionInfo info) {
@ -144,13 +219,15 @@ void RegionEventSubscriptions(const string& transport)
<< ", flags: " << info.flags;
if (info.event == RegionEvent::created) {
if (info.id == id1) {
ASSERT_EQ(info.size, size1);
ASSERT_EQ(info.size, r1Size);
ASSERT_EQ(info.ptr, ptr1);
blocker.Signal();
} else if (info.id == id2) {
ASSERT_EQ(info.size, size2);
ASSERT_EQ(info.size, r2Size);
ASSERT_EQ(info.ptr, ptr2);
ASSERT_EQ(info.flags, userFlags);
if (!external) {
ASSERT_EQ(info.flags, userFlags);
}
blocker.Signal();
}
} else if (info.event == RegionEvent::destroyed) {
@ -170,10 +247,12 @@ void RegionEventSubscriptions(const string& transport)
LOG(info) << "2 done.";
}
blocker.Wait();
LOG(info) << "3 done.";
blocker.Wait();
LOG(info) << "4 done.";
if (!external) {
blocker.Wait();
LOG(info) << "3 done.";
blocker.Wait();
LOG(info) << "4 done.";
}
LOG(info) << "All done.";
factory->UnsubscribeFromRegionEvents();
@ -185,9 +264,13 @@ void RegionCallbacks(const string& transport, const string& _address)
size_t session(tools::UuidHash());
std::string address(tools::ToString(_address, "_", transport));
constexpr size_t sSize = 100000000;
constexpr size_t r1Size = 2000000;
constexpr size_t r2Size = 3000000;
ProgOptions config;
config.SetProperty<string>("session", to_string(session));
config.SetProperty<size_t>("shm-segment-size", 100000000);
config.SetProperty<size_t>("shm-segment-size", sSize);
auto factory = TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config);
@ -206,7 +289,7 @@ void RegionCallbacks(const string& transport, const string& _address)
void* ptr2 = nullptr;
size_t size2 = 200;
auto region1 = factory->CreateUnmanagedRegion(2000000, [&](void* ptr, size_t size, void* hint) {
auto region1 = factory->CreateUnmanagedRegion(r1Size, [&](void* ptr, size_t size, void* hint) {
ASSERT_EQ(ptr, ptr1);
ASSERT_EQ(size, size1);
ASSERT_EQ(hint, intPtr1.get());
@ -215,7 +298,7 @@ void RegionCallbacks(const string& transport, const string& _address)
});
ptr1 = region1->GetData();
auto region2 = factory->CreateUnmanagedRegion(3000000, [&](const std::vector<RegionBlock>& blocks) {
auto region2 = factory->CreateUnmanagedRegion(r2Size, [&](const std::vector<RegionBlock>& blocks) {
ASSERT_EQ(blocks.size(), 1);
ASSERT_EQ(blocks.at(0).ptr, ptr2);
ASSERT_EQ(blocks.at(0).size, size2);
@ -263,12 +346,12 @@ TEST(Cache, shmem)
TEST(EventSubscriptions, zeromq)
{
RegionEventSubscriptions("zeromq");
RegionEventSubscriptions("zeromq", false);
}
TEST(EventSubscriptions, shmem)
{
RegionEventSubscriptions("shmem");
RegionEventSubscriptions("shmem", false);
}
TEST(Callbacks, zeromq)
@ -281,4 +364,9 @@ TEST(Callbacks, shmem)
RegionCallbacks("shmem", "ipc://test_region_callbacks");
}
TEST(EventSubscriptionsExternalRegion, shmem)
{
RegionEventSubscriptions("shmem", true);
}
} // namespace