/******************************************************************************** * Copyright (C) 2017-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace { using namespace std; using namespace fair::mq; auto AsStringView(Message const& msg) -> string_view { return {static_cast(msg.GetData()), msg.GetSize()}; } auto RunPushPullWithMsgResize(string const & transport, string const & _address) -> void { ProgOptions config; config.SetProperty("session", tools::Uuid()); auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); Channel push{"Push", "push", factory}; Channel pull{"Pull", "pull", factory}; auto const address(tools::ToString(_address, "_", transport)); push.Bind(address); pull.Connect(address); { size_t const size{6}; auto outMsg(push.NewMessage(size)); ASSERT_EQ(outMsg->GetSize(), size); memcpy(outMsg->GetData(), "ABCDEF", size); ASSERT_TRUE(outMsg->SetUsedSize(5)); ASSERT_TRUE(outMsg->SetUsedSize(5)); ASSERT_FALSE(outMsg->SetUsedSize(7)); ASSERT_EQ(outMsg->GetSize(), 5); // check if the data is still intact ASSERT_EQ(AsStringView(*outMsg)[0], 'A'); ASSERT_EQ(AsStringView(*outMsg)[1], 'B'); ASSERT_EQ(AsStringView(*outMsg)[2], 'C'); ASSERT_EQ(AsStringView(*outMsg)[3], 'D'); ASSERT_EQ(AsStringView(*outMsg)[4], 'E'); ASSERT_TRUE(outMsg->SetUsedSize(2)); ASSERT_EQ(outMsg->GetSize(), 2); ASSERT_EQ(AsStringView(*outMsg)[0], 'A'); ASSERT_EQ(AsStringView(*outMsg)[1], 'B'); auto msgCopy(push.NewMessage()); msgCopy->Copy(*outMsg); ASSERT_EQ(msgCopy->GetSize(), 2); ASSERT_EQ(push.Send(outMsg), 2); auto inMsg(pull.NewMessage()); ASSERT_EQ(pull.Receive(inMsg), 2); ASSERT_EQ(inMsg->GetSize(), 2); ASSERT_EQ(AsStringView(*inMsg)[0], 'A'); ASSERT_EQ(AsStringView(*inMsg)[1], 'B'); } { size_t const size{1000}; auto outMsg(push.NewMessage(size)); ASSERT_TRUE(outMsg->SetUsedSize(0)); ASSERT_EQ(outMsg->GetSize(), 0); auto msgCopy(push.NewMessage()); msgCopy->Copy(*outMsg); ASSERT_EQ(msgCopy->GetSize(), 0); ASSERT_EQ(push.Send(outMsg), 0); auto inMsg(pull.NewMessage()); ASSERT_EQ(pull.Receive(inMsg), 0); ASSERT_EQ(inMsg->GetSize(), 0); } } auto RunMsgRebuild(const string& transport) -> void { ProgOptions config; config.SetProperty("session", tools::Uuid()); auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); size_t const msgSize{100}; string const expectedStr{"asdf"}; auto msg(factory->CreateMessage()); EXPECT_EQ(msg->GetSize(), 0); msg->Rebuild(msgSize); EXPECT_EQ(msg->GetSize(), msgSize); auto str(make_unique(expectedStr)); void* data(str->data()); auto const size(str->length()); msg->Rebuild( data, size, [](void* /*data*/, void* obj) { delete static_cast(obj); }, // NOLINT str.release()); EXPECT_NE(msg->GetSize(), msgSize); EXPECT_EQ(msg->GetSize(), expectedStr.length()); EXPECT_EQ(AsStringView(*msg), expectedStr); } auto CheckMsgAlignment(Message const& msg, fair::mq::Alignment alignment) -> bool { assert(static_cast(alignment) > 0); // NOLINT return (reinterpret_cast(msg.GetData()) % static_cast(alignment)) == 0; // NOLINT } auto RunPushPullWithAlignment(string const& transport, string const& _address) -> void { ProgOptions config; config.SetProperty("session", tools::Uuid()); auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); Channel push{"Push", "push", factory}; Channel pull{"Pull", "pull", factory}; auto const address(tools::ToString(_address, "_", transport)); push.Bind(address); pull.Connect(address); { Alignment const align{64}; for (size_t const size : {100, 32}) { auto outMsg(push.NewMessage(size, align)); auto inMsg(pull.NewMessage(align)); ASSERT_TRUE(CheckMsgAlignment(*outMsg, align)); ASSERT_EQ(push.Send(outMsg), size); ASSERT_EQ(pull.Receive(inMsg), size); ASSERT_TRUE(CheckMsgAlignment(*inMsg, align)); } } { Alignment const align{0}; size_t const size{100}; auto outMsg(push.NewMessage(size, align)); auto inMsg(pull.NewMessage(align)); ASSERT_EQ(push.Send(outMsg), size); ASSERT_EQ(pull.Receive(inMsg), size); } size_t const size25{25}; size_t const size50{50}; Alignment const align64{64}; auto msg1(push.NewMessage(size25)); msg1->Rebuild(size50, align64); ASSERT_TRUE(CheckMsgAlignment(*msg1, align64)); Alignment const align32{32}; auto msg2(push.NewMessage(size25, align64)); msg2->Rebuild(size50, align32); ASSERT_TRUE(CheckMsgAlignment(*msg2, align32)); auto msgCopy(push.NewMessage()); msgCopy->Copy(*msg2); ASSERT_TRUE(CheckMsgAlignment(*msgCopy, align32)); } auto EmptyMessage(string const& transport, string const& _address) -> void { ProgOptions config; config.SetProperty("session", tools::Uuid()); auto factory(TransportFactory::CreateTransportFactory(transport, tools::Uuid(), &config)); Channel push{"Push", "push", factory}; Channel pull{"Pull", "pull", factory}; auto const address(tools::ToString(_address, "_", transport)); push.Bind(address); pull.Connect(address); { auto outMsg(push.NewMessage()); ASSERT_EQ(outMsg->GetData(), nullptr); ASSERT_EQ(push.Send(outMsg), 0); auto inMsg(pull.NewMessage()); ASSERT_EQ(pull.Receive(inMsg), 0); ASSERT_EQ(inMsg->GetData(), nullptr); } { auto outMsg(push.NewMessage(0)); ASSERT_EQ(outMsg->GetSize(), 0); size_t const size100{100}; outMsg->Rebuild(size100); ASSERT_EQ(outMsg->GetSize(), size100); outMsg->Rebuild(0); ASSERT_EQ(outMsg->GetSize(), 0); auto msgCopy(push.NewMessage()); msgCopy->Copy(*outMsg); ASSERT_EQ(msgCopy->GetSize(), 0); ASSERT_EQ(push.Send(outMsg), 0); ASSERT_EQ(push.Send(msgCopy), 0); auto inMsg(pull.NewMessage()); auto inMsg2(pull.NewMessage()); ASSERT_EQ(pull.Receive(inMsg), 0); ASSERT_EQ(pull.Receive(inMsg2), 0); ASSERT_EQ(inMsg->GetSize(), 0); ASSERT_EQ(inMsg2->GetSize(), 0); } } TEST(Resize, zeromq) // NOLINT { RunPushPullWithMsgResize("zeromq", "ipc://test_message_resize"); } TEST(Resize, shmem) // NOLINT { RunPushPullWithMsgResize("shmem", "ipc://test_message_resize"); } TEST(Rebuild, zeromq) // NOLINT { RunMsgRebuild("zeromq"); } TEST(Rebuild, shmem) // NOLINT { RunMsgRebuild("shmem"); } TEST(Alignment, shmem) // NOLINT { RunPushPullWithAlignment("shmem", "ipc://test_message_alignment"); } TEST(Alignment, zeromq) // NOLINT { RunPushPullWithAlignment("zeromq", "ipc://test_message_alignment"); } TEST(EmptyMessage, zeromq) // NOLINT { EmptyMessage("zeromq", "ipc://test_empty_message"); } TEST(EmptyMessage, shmem) // NOLINT { EmptyMessage("shmem", "ipc://test_empty_message"); } } // namespace