diff --git a/fairmq/test/CMakeLists.txt b/fairmq/test/CMakeLists.txt index aee2fdd5..021eaa7a 100644 --- a/fairmq/test/CMakeLists.txt +++ b/fairmq/test/CMakeLists.txt @@ -42,8 +42,9 @@ add_testsuite(FairMQ.Protocols protocols/_push_pull.cxx protocols/_req_rep.cxx protocols/_transfer_timeout.cxx + protocols/_push_pull_multipart.cxx - LINKS PStreams + LINKS PStreams FairMQ DEPENDS testhelper_runTestDevice INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/protocols TIMEOUT 30 diff --git a/fairmq/test/protocols/_push_pull.cxx b/fairmq/test/protocols/_push_pull.cxx index 2167ad45..b4de75ac 100644 --- a/fairmq/test/protocols/_push_pull.cxx +++ b/fairmq/test/protocols/_push_pull.cxx @@ -42,18 +42,18 @@ auto RunPushPull(string transport) -> void exit(push.exit_code + pull.exit_code); } -TEST(PushPull, ZeroMQ) +TEST(PushPull, MP_ZeroMQ__tcp____SingleMsg) { EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull"); } -TEST(PushPull, ShMem) +TEST(PushPull, MP_ShMem___tcp____SingleMsg) { EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull"); } #ifdef NANOMSG_FOUND -TEST(PushPull, Nanomsg) +TEST(PushPull, MP_Nanomsg_tcp____SingleMsg) { EXPECT_EXIT(RunPushPull("nanomsg"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull"); } diff --git a/fairmq/test/protocols/_push_pull_multipart.cxx b/fairmq/test/protocols/_push_pull_multipart.cxx new file mode 100644 index 00000000..46ffc1b6 --- /dev/null +++ b/fairmq/test/protocols/_push_pull_multipart.cxx @@ -0,0 +1,160 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace +{ + +using namespace std; + +auto RunSingleThreadedMultipart(string transport, string address) -> void { + auto factory = FairMQTransportFactory::CreateTransportFactory(transport); + auto push = FairMQChannel{"Push", "push", factory}; + ASSERT_TRUE(push.Bind(address)); + auto pull = FairMQChannel{"Pull", "pull", factory}; + pull.Connect(address); + + // TODO validate that fTransportFactory is not nullptr + // TODO validate that fSocket is not nullptr + ASSERT_TRUE(push.ValidateChannel()); + ASSERT_TRUE(pull.ValidateChannel()); + + { + auto sentMsg = FairMQParts{}; + sentMsg.AddPart(push.NewSimpleMessage("1")); + sentMsg.AddPart(push.NewSimpleMessage("2")); + sentMsg.AddPart(push.NewSimpleMessage("3")); + + ASSERT_GE(push.Send(sentMsg), 0); + } + + auto receivedMsg = FairMQParts{}; + ASSERT_GE(pull.Receive(receivedMsg), 0); + + stringstream out; + for_each(receivedMsg.cbegin(), receivedMsg.cend(), [&out](const FairMQMessagePtr& part) { + out << string{static_cast(part->GetData()), part->GetSize()}; + }); + ASSERT_EQ(out.str(), "123"); +} + +auto RunMultiThreadedMultipart(string transport, string address) -> void +{ + auto factory = FairMQTransportFactory::CreateTransportFactory(transport); + auto push = FairMQChannel{"Push", "push", factory}; + ASSERT_TRUE(push.Bind(address)); + auto pull = FairMQChannel{"Pull", "pull", factory}; + pull.Connect(address); + + auto pusher = thread{[&push](){ + ASSERT_TRUE(push.ValidateChannel()); + + auto sentMsg = FairMQParts{}; + sentMsg.AddPart(push.NewSimpleMessage("1")); + sentMsg.AddPart(push.NewSimpleMessage("2")); + sentMsg.AddPart(push.NewSimpleMessage("3")); + + ASSERT_GE(push.Send(sentMsg), 0); + }}; + + auto puller = thread{[&pull](){ + ASSERT_TRUE(pull.ValidateChannel()); + + auto receivedMsg = FairMQParts{}; + ASSERT_GE(pull.Receive(receivedMsg), 0); + + stringstream out; + for_each(receivedMsg.cbegin(), receivedMsg.cend(), [&out](const FairMQMessagePtr& part) { + out << string{static_cast(part->GetData()), part->GetSize()}; + }); + ASSERT_EQ(out.str(), "123"); + }}; + + pusher.join(); + puller.join(); +} + +TEST(PushPull, ST_ZeroMQ__inproc_Multipart) +{ + RunSingleThreadedMultipart("zeromq", "inproc://test"); +} + +//TEST(PushPull, ST_Shmem___inproc_Multipart) +//{ + //RunSingleThreadedMultipart("shmem", "inproc://test"); +//} + +#ifdef NANOMSG_FOUND +TEST(PushPull, ST_Nanomsg_inproc_Multipart) +{ + RunSingleThreadedMultipart("nanomsg", "inproc://test"); +} +#endif /* NANOMSG_FOUND */ + +TEST(PushPull, ST_ZeroMQ__ipc____Multipart) +{ + RunSingleThreadedMultipart("zeromq", "ipc://test"); +} + +//TEST(PushPull, ST_Shmen___ipc____Multipart) +//{ + //RunSingleThreadedMultipart("shmem", "ipc://test"); +//} + +#ifdef NANOMSG_FOUND +TEST(PushPull, ST_Nanomsg_ipc____Multipart) +{ + RunSingleThreadedMultipart("nanomsg", "ipc://test"); +} +#endif /* NANOMSG_FOUND */ + +TEST(PushPull, MT_ZeroMQ__inproc_Multipart) +{ + RunMultiThreadedMultipart("zeromq", "inproc://test"); +} + +//TEST(PushPull, MT_Shmem___inproc_Multipart) +//{ + //RunMultiThreadedMultipart("shmem", "inproc://test"); +//} + +#ifdef NANOMSG_FOUND +TEST(PushPull, MT_Nanomsg_inproc_Multipart) +{ + RunMultiThreadedMultipart("nanomsg", "inproc://test"); +} +#endif /* NANOMSG_FOUND */ + +TEST(PushPull, MT_ZeroMQ__ipc____Multipart) +{ + RunMultiThreadedMultipart("zeromq", "ipc://test"); +} + +//TEST(PushPull, MT_Shmem___ipc____Multipart) +//{ + //RunMultiThreadedMultipart("shmem", "ipc://test"); +//} + +#ifdef NANOMSG_FOUND +TEST(PushPull, MT_Nanomsg_ipc____Multipart) +{ + RunMultiThreadedMultipart("nanomsg", "ipc://test"); +} +#endif /* NANOMSG_FOUND */ + +} // namespace