From 87e0ca54507a65cb6b1b30ddce64cd2b14703aaa Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 6 May 2021 22:59:19 +0200 Subject: [PATCH] add region cache test --- test/region/_region.cxx | 71 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/test/region/_region.cxx b/test/region/_region.cxx index 80708256..cccf4383 100644 --- a/test/region/_region.cxx +++ b/test/region/_region.cxx @@ -23,6 +23,67 @@ namespace using namespace std; +void RegionsCache(const string& transport, const string& address) +{ + size_t session1 = fair::mq::tools::UuidHash(); + size_t session2 = fair::mq::tools::UuidHash(); + + fair::mq::ProgOptions config1; + fair::mq::ProgOptions config2; + config1.SetProperty("session", to_string(session1)); + config2.SetProperty("session", to_string(session2)); + + auto factory1 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config1); + auto factory2 = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config2); + + auto region1 = factory1->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {}); + auto region2 = factory2->CreateUnmanagedRegion(1000000, [](void*, size_t, void*) {}); + void* r1ptr = region1->GetData(); + void* r2ptr = region2->GetData(); + + FairMQChannel push1("Push1", "push", factory1); + FairMQChannel pull1("Pull1", "pull", factory1); + push1.Bind(address + to_string(1)); + pull1.Connect(address + to_string(1)); + FairMQChannel push2("Push2", "push", factory2); + FairMQChannel pull2("Pull2", "pull", factory2); + push2.Bind(address + to_string(2)); + pull2.Connect(address + to_string(2)); + + { + static_cast(r1ptr)[0] = 97; // a + static_cast(static_cast(r1ptr) + 100)[0] = 98; // b + static_cast(r2ptr)[0] = 99; // c + static_cast(static_cast(r2ptr) + 100)[0] = 100; // d + + FairMQMessagePtr m1(push1.NewMessage(region1, r1ptr, 100, nullptr)); + FairMQMessagePtr m2(push1.NewMessage(region1, static_cast(r1ptr) + 100, 100, nullptr)); + push1.Send(m1); + push1.Send(m2); + + FairMQMessagePtr m3(push2.NewMessage(region2, r2ptr, 100, nullptr)); + FairMQMessagePtr m4(push2.NewMessage(region2, static_cast(r2ptr) + 100, 100, nullptr)); + push2.Send(m3); + push2.Send(m4); + } + + { + FairMQMessagePtr m1(pull1.NewMessage()); + FairMQMessagePtr m2(pull1.NewMessage()); + ASSERT_EQ(pull1.Receive(m1), 100); + ASSERT_EQ(pull1.Receive(m2), 100); + ASSERT_EQ(static_cast(m1->GetData())[0], 'a'); + ASSERT_EQ(static_cast(m2->GetData())[0], 'b'); + + FairMQMessagePtr m3(pull2.NewMessage()); + FairMQMessagePtr m4(pull2.NewMessage()); + ASSERT_EQ(pull2.Receive(m3), 100); + ASSERT_EQ(pull2.Receive(m4), 100); + ASSERT_EQ(static_cast(m3->GetData())[0], 'c'); + ASSERT_EQ(static_cast(m4->GetData())[0], 'd'); + } +} + void RegionEventSubscriptions(const string& transport) { size_t session{fair::mq::tools::UuidHash()}; @@ -160,6 +221,16 @@ void RegionCallbacks(const string& transport, const string& _address) LOG(info) << "2 done."; } +TEST(Cache, zeromq) +{ + RegionsCache("zeromq", "ipc://test_region_cache"); +} + +TEST(Cache, shmem) +{ + RegionsCache("shmem", "ipc://test_region_cache"); +} + TEST(EventSubscriptions, zeromq) { RegionEventSubscriptions("zeromq");