mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
add more single and multithreaded unit tests
This commit is contained in:
parent
3205e0c378
commit
b5dab60a82
|
@ -42,8 +42,9 @@ add_testsuite(FairMQ.Protocols
|
||||||
protocols/_push_pull.cxx
|
protocols/_push_pull.cxx
|
||||||
protocols/_req_rep.cxx
|
protocols/_req_rep.cxx
|
||||||
protocols/_transfer_timeout.cxx
|
protocols/_transfer_timeout.cxx
|
||||||
|
protocols/_push_pull_multipart.cxx
|
||||||
|
|
||||||
LINKS PStreams
|
LINKS PStreams FairMQ
|
||||||
DEPENDS testhelper_runTestDevice
|
DEPENDS testhelper_runTestDevice
|
||||||
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/protocols
|
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/protocols
|
||||||
TIMEOUT 30
|
TIMEOUT 30
|
||||||
|
|
|
@ -42,18 +42,18 @@ auto RunPushPull(string transport) -> void
|
||||||
exit(push.exit_code + pull.exit_code);
|
exit(push.exit_code + pull.exit_code);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(PushPull, ZeroMQ)
|
TEST(PushPull, MP_ZeroMQ__tcp____SingleMsg)
|
||||||
{
|
{
|
||||||
EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(PushPull, ShMem)
|
TEST(PushPull, MP_ShMem___tcp____SingleMsg)
|
||||||
{
|
{
|
||||||
EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef NANOMSG_FOUND
|
#ifdef NANOMSG_FOUND
|
||||||
TEST(PushPull, Nanomsg)
|
TEST(PushPull, MP_Nanomsg_tcp____SingleMsg)
|
||||||
{
|
{
|
||||||
EXPECT_EXIT(RunPushPull("nanomsg"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
EXPECT_EXIT(RunPushPull("nanomsg"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
|
||||||
}
|
}
|
||||||
|
|
160
fairmq/test/protocols/_push_pull_multipart.cxx
Normal file
160
fairmq/test/protocols/_push_pull_multipart.cxx
Normal file
|
@ -0,0 +1,160 @@
|
||||||
|
/********************************************************************************
|
||||||
|
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||||
|
* *
|
||||||
|
* This software is distributed under the terms of the *
|
||||||
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||||
|
* copied verbatim in the file "LICENSE" *
|
||||||
|
********************************************************************************/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <FairMQChannel.h>
|
||||||
|
#include <FairMQParts.h>
|
||||||
|
#include <FairMQLogger.h>
|
||||||
|
#include <FairMQTransportFactory.h>
|
||||||
|
#include <algorithm>
|
||||||
|
#include <memory>
|
||||||
|
#include <sstream>
|
||||||
|
#include <string>
|
||||||
|
#include <thread>
|
||||||
|
|
||||||
|
namespace
|
||||||
|
{
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
auto RunSingleThreadedMultipart(string transport, string address) -> void {
|
||||||
|
auto factory = FairMQTransportFactory::CreateTransportFactory(transport);
|
||||||
|
auto push = FairMQChannel{"Push", "push", factory};
|
||||||
|
ASSERT_TRUE(push.Bind(address));
|
||||||
|
auto pull = FairMQChannel{"Pull", "pull", factory};
|
||||||
|
pull.Connect(address);
|
||||||
|
|
||||||
|
// TODO validate that fTransportFactory is not nullptr
|
||||||
|
// TODO validate that fSocket is not nullptr
|
||||||
|
ASSERT_TRUE(push.ValidateChannel());
|
||||||
|
ASSERT_TRUE(pull.ValidateChannel());
|
||||||
|
|
||||||
|
{
|
||||||
|
auto sentMsg = FairMQParts{};
|
||||||
|
sentMsg.AddPart(push.NewSimpleMessage("1"));
|
||||||
|
sentMsg.AddPart(push.NewSimpleMessage("2"));
|
||||||
|
sentMsg.AddPart(push.NewSimpleMessage("3"));
|
||||||
|
|
||||||
|
ASSERT_GE(push.Send(sentMsg), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto receivedMsg = FairMQParts{};
|
||||||
|
ASSERT_GE(pull.Receive(receivedMsg), 0);
|
||||||
|
|
||||||
|
stringstream out;
|
||||||
|
for_each(receivedMsg.cbegin(), receivedMsg.cend(), [&out](const FairMQMessagePtr& part) {
|
||||||
|
out << string{static_cast<char*>(part->GetData()), part->GetSize()};
|
||||||
|
});
|
||||||
|
ASSERT_EQ(out.str(), "123");
|
||||||
|
}
|
||||||
|
|
||||||
|
auto RunMultiThreadedMultipart(string transport, string address) -> void
|
||||||
|
{
|
||||||
|
auto factory = FairMQTransportFactory::CreateTransportFactory(transport);
|
||||||
|
auto push = FairMQChannel{"Push", "push", factory};
|
||||||
|
ASSERT_TRUE(push.Bind(address));
|
||||||
|
auto pull = FairMQChannel{"Pull", "pull", factory};
|
||||||
|
pull.Connect(address);
|
||||||
|
|
||||||
|
auto pusher = thread{[&push](){
|
||||||
|
ASSERT_TRUE(push.ValidateChannel());
|
||||||
|
|
||||||
|
auto sentMsg = FairMQParts{};
|
||||||
|
sentMsg.AddPart(push.NewSimpleMessage("1"));
|
||||||
|
sentMsg.AddPart(push.NewSimpleMessage("2"));
|
||||||
|
sentMsg.AddPart(push.NewSimpleMessage("3"));
|
||||||
|
|
||||||
|
ASSERT_GE(push.Send(sentMsg), 0);
|
||||||
|
}};
|
||||||
|
|
||||||
|
auto puller = thread{[&pull](){
|
||||||
|
ASSERT_TRUE(pull.ValidateChannel());
|
||||||
|
|
||||||
|
auto receivedMsg = FairMQParts{};
|
||||||
|
ASSERT_GE(pull.Receive(receivedMsg), 0);
|
||||||
|
|
||||||
|
stringstream out;
|
||||||
|
for_each(receivedMsg.cbegin(), receivedMsg.cend(), [&out](const FairMQMessagePtr& part) {
|
||||||
|
out << string{static_cast<char*>(part->GetData()), part->GetSize()};
|
||||||
|
});
|
||||||
|
ASSERT_EQ(out.str(), "123");
|
||||||
|
}};
|
||||||
|
|
||||||
|
pusher.join();
|
||||||
|
puller.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(PushPull, ST_ZeroMQ__inproc_Multipart)
|
||||||
|
{
|
||||||
|
RunSingleThreadedMultipart("zeromq", "inproc://test");
|
||||||
|
}
|
||||||
|
|
||||||
|
//TEST(PushPull, ST_Shmem___inproc_Multipart)
|
||||||
|
//{
|
||||||
|
//RunSingleThreadedMultipart("shmem", "inproc://test");
|
||||||
|
//}
|
||||||
|
|
||||||
|
#ifdef NANOMSG_FOUND
|
||||||
|
TEST(PushPull, ST_Nanomsg_inproc_Multipart)
|
||||||
|
{
|
||||||
|
RunSingleThreadedMultipart("nanomsg", "inproc://test");
|
||||||
|
}
|
||||||
|
#endif /* NANOMSG_FOUND */
|
||||||
|
|
||||||
|
TEST(PushPull, ST_ZeroMQ__ipc____Multipart)
|
||||||
|
{
|
||||||
|
RunSingleThreadedMultipart("zeromq", "ipc://test");
|
||||||
|
}
|
||||||
|
|
||||||
|
//TEST(PushPull, ST_Shmen___ipc____Multipart)
|
||||||
|
//{
|
||||||
|
//RunSingleThreadedMultipart("shmem", "ipc://test");
|
||||||
|
//}
|
||||||
|
|
||||||
|
#ifdef NANOMSG_FOUND
|
||||||
|
TEST(PushPull, ST_Nanomsg_ipc____Multipart)
|
||||||
|
{
|
||||||
|
RunSingleThreadedMultipart("nanomsg", "ipc://test");
|
||||||
|
}
|
||||||
|
#endif /* NANOMSG_FOUND */
|
||||||
|
|
||||||
|
TEST(PushPull, MT_ZeroMQ__inproc_Multipart)
|
||||||
|
{
|
||||||
|
RunMultiThreadedMultipart("zeromq", "inproc://test");
|
||||||
|
}
|
||||||
|
|
||||||
|
//TEST(PushPull, MT_Shmem___inproc_Multipart)
|
||||||
|
//{
|
||||||
|
//RunMultiThreadedMultipart("shmem", "inproc://test");
|
||||||
|
//}
|
||||||
|
|
||||||
|
#ifdef NANOMSG_FOUND
|
||||||
|
TEST(PushPull, MT_Nanomsg_inproc_Multipart)
|
||||||
|
{
|
||||||
|
RunMultiThreadedMultipart("nanomsg", "inproc://test");
|
||||||
|
}
|
||||||
|
#endif /* NANOMSG_FOUND */
|
||||||
|
|
||||||
|
TEST(PushPull, MT_ZeroMQ__ipc____Multipart)
|
||||||
|
{
|
||||||
|
RunMultiThreadedMultipart("zeromq", "ipc://test");
|
||||||
|
}
|
||||||
|
|
||||||
|
//TEST(PushPull, MT_Shmem___ipc____Multipart)
|
||||||
|
//{
|
||||||
|
//RunMultiThreadedMultipart("shmem", "ipc://test");
|
||||||
|
//}
|
||||||
|
|
||||||
|
#ifdef NANOMSG_FOUND
|
||||||
|
TEST(PushPull, MT_Nanomsg_ipc____Multipart)
|
||||||
|
{
|
||||||
|
RunMultiThreadedMultipart("nanomsg", "ipc://test");
|
||||||
|
}
|
||||||
|
#endif /* NANOMSG_FOUND */
|
||||||
|
|
||||||
|
} // namespace
|
Loading…
Reference in New Issue
Block a user