From 9288a2c3d5cbc2d8cff9d4db5924a1e318e8c170 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 19 May 2017 09:41:45 +0200 Subject: [PATCH] Add tests for poller. --- fairmq/test/CMakeLists.txt | 3 + fairmq/test/helper/devices/TestPollIn.cxx | 133 +++++++++++++++++++++ fairmq/test/helper/devices/TestPollOut.cxx | 42 +++++++ fairmq/test/helper/runTestDevice.cxx | 12 ++ fairmq/test/protocols/_poller.cxx | 83 +++++++++++++ fairmq/test/protocols/config.json.in | 126 +++++++++++++++++++ 6 files changed, 399 insertions(+) create mode 100644 fairmq/test/helper/devices/TestPollIn.cxx create mode 100644 fairmq/test/helper/devices/TestPollOut.cxx create mode 100644 fairmq/test/protocols/_poller.cxx diff --git a/fairmq/test/CMakeLists.txt b/fairmq/test/CMakeLists.txt index 021eaa7a..45b559dc 100644 --- a/fairmq/test/CMakeLists.txt +++ b/fairmq/test/CMakeLists.txt @@ -18,6 +18,8 @@ include(GTestHelper) add_testhelper(runTestDevice SOURCES helper/runTestDevice.cxx + helper/devices/TestPollIn.cxx + helper/devices/TestPollOut.cxx helper/devices/TestPub.cxx helper/devices/TestPull.cxx helper/devices/TestPush.cxx @@ -38,6 +40,7 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/protocols/runner.cxx.in ${CMAKE_CURRE add_testsuite(FairMQ.Protocols SOURCES ${CMAKE_CURRENT_BINARY_DIR}/protocols/runner.cxx + protocols/_poller.cxx protocols/_pub_sub.cxx protocols/_push_pull.cxx protocols/_req_rep.cxx diff --git a/fairmq/test/helper/devices/TestPollIn.cxx b/fairmq/test/helper/devices/TestPollIn.cxx new file mode 100644 index 00000000..f4880c33 --- /dev/null +++ b/fairmq/test/helper/devices/TestPollIn.cxx @@ -0,0 +1,133 @@ +/******************************************************************************** + * Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace test +{ + +using namespace std; + +class PollIn : public FairMQDevice +{ + public: + PollIn() + : fPollType(0) + {} + + protected: + auto Init() -> void override + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + + auto Reset() -> void override + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + auto InitTask() -> void override + { + fPollType = fConfig->GetValue("poll-type"); + } + + auto Run() -> void override + { + vector chans; + + chans.push_back(&fChannels.at("data1").at(0)); + chans.push_back(&fChannels.at("data2").at(0)); + + FairMQPollerPtr poller = nullptr; + + if (fPollType == 0) + { + poller = NewPoller(chans); + } + else if (fPollType == 1) + { + poller = NewPoller("data1", "data2"); + } + else + { + LOG(ERROR) << "wrong poll type provided: " << fPollType; + } + + bool arrived1 = false; + bool arrived2 = false; + bool bothArrived = false; + + FairMQMessagePtr msg1(NewMessage()); + FairMQMessagePtr msg2(NewMessage()); + + while (!bothArrived) + { + poller->Poll(100); + + if (fPollType == 0) + { + if (poller->CheckInput(0)) + { + LOG(DEBUG) << "CheckInput(0) triggered"; + if (Receive(msg1, "data1", 0) >= 0) + { + arrived1 = true; + } + } + + if (poller->CheckInput(1)) + { + LOG(DEBUG) << "CheckInput(1) triggered"; + if (Receive(msg2, "data2", 0) >= 0) + { + arrived2 = true; + } + } + } + else if (fPollType == 1) + { + if (poller->CheckInput("data1", 0)) + { + LOG(DEBUG) << "CheckInput(\"data1\", 0) triggered"; + if (Receive(msg1, "data1", 0) >= 0) + { + arrived1 = true; + } + } + + if (poller->CheckInput("data2", 0)) + { + LOG(DEBUG) << "CheckInput(\"data2\", 0) triggered"; + if (Receive(msg2, "data2", 0) >= 0) + { + arrived2 = true; + } + } + } + + if (arrived1 && arrived2) + { + bothArrived = true; + LOG(INFO) << "POLL test successfull"; + } + } + }; + + private: + int fPollType; +}; + +} // namespace test +} // namespace mq +} // namespace fair diff --git a/fairmq/test/helper/devices/TestPollOut.cxx b/fairmq/test/helper/devices/TestPollOut.cxx new file mode 100644 index 00000000..7a79a218 --- /dev/null +++ b/fairmq/test/helper/devices/TestPollOut.cxx @@ -0,0 +1,42 @@ +/******************************************************************************** + * Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include + +namespace fair +{ +namespace mq +{ +namespace test +{ + +class PollOut : public FairMQDevice +{ + protected: + auto Init() -> void override + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + } + + auto Reset() -> void override + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + auto Run() -> void override + { + auto msg1 = FairMQMessagePtr{NewMessage()}; + auto msg2 = FairMQMessagePtr{NewMessage()}; + Send(msg1, "data1"); + Send(msg2, "data2"); + }; +}; + +} // namespace test +} // namespace mq +} // namespace fair diff --git a/fairmq/test/helper/runTestDevice.cxx b/fairmq/test/helper/runTestDevice.cxx index a2fd360c..560c5d06 100644 --- a/fairmq/test/helper/runTestDevice.cxx +++ b/fairmq/test/helper/runTestDevice.cxx @@ -6,6 +6,8 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ +#include "devices/TestPollIn.cxx" +#include "devices/TestPollOut.cxx" #include "devices/TestPub.cxx" #include "devices/TestPull.cxx" #include "devices/TestPush.cxx" @@ -22,6 +24,8 @@ namespace bpo = boost::program_options; auto addCustomOptions(bpo::options_description& options) -> void { + options.add_options() + ("poll-type", bpo::value()->default_value(0), "Poll type switch(0 - vector of (sub-)channels, 1 - vector of channel names)"); } auto getDevice(const FairMQProgOptions& config) -> FairMQDevicePtr @@ -58,6 +62,14 @@ auto getDevice(const FairMQProgOptions& config) -> FairMQDevicePtr { return new TransferTimeout; } + else if (0 == id.find("pollout_")) + { + return new PollOut; + } + else if (0 == id.find("pollin_")) + { + return new PollIn; + } else { cerr << "Don't know id '" << id << "'" << endl; diff --git a/fairmq/test/protocols/_poller.cxx b/fairmq/test/protocols/_poller.cxx new file mode 100644 index 00000000..2f9d8961 --- /dev/null +++ b/fairmq/test/protocols/_poller.cxx @@ -0,0 +1,83 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "runner.h" +#include +#include // std::stringstream +#include + +namespace +{ + +using namespace std; +using namespace fair::mq::test; + +auto RunPoller(string transport, int pollType) -> void +{ + auto pollout = execute_result{"", 0}; + thread poll_out_thread([&]() { + stringstream cmd; + cmd << runTestDevice + << " --id pollout_"<< transport + << " --control static --verbosity DEBUG --log-color false" + << " --mq-config \"" << mqConfig << "\""; + pollout = execute(cmd.str(), "[POLLOUT]"); + }); + + auto pollin = execute_result{"", 0}; + thread poll_in_thread([&]() { + stringstream cmd; + cmd << runTestDevice + << " --id pollin_" << transport + << " --control static --verbosity DEBUG --log-color false" + << " --mq-config \"" << mqConfig << "\" --poll-type " << pollType; + pollin = execute(cmd.str(), "[POLLIN]"); + }); + + poll_out_thread.join(); + poll_in_thread.join(); + cerr << pollout.error_out << pollin.error_out; + + exit(pollout.exit_code + pollin.exit_code); +} + +TEST(Poller, ZeroMQ_subchannel) +{ + EXPECT_EXIT(RunPoller("zeromq", 0), ::testing::ExitedWithCode(0), "POLL test successfull"); +} + +#ifdef NANOMSG_FOUND +TEST(Poller, Nanomsg_subchannel) +{ + EXPECT_EXIT(RunPoller("nanomsg", 0), ::testing::ExitedWithCode(0), "POLL test successfull"); +} +#endif /* NANOMSG_FOUND */ + +TEST(Poller, ShMem_subchannel) +{ + EXPECT_EXIT(RunPoller("shmem", 0), ::testing::ExitedWithCode(0), "POLL test successfull"); +} + +TEST(Poller, ZeroMQ_channel) +{ + EXPECT_EXIT(RunPoller("zeromq", 1), ::testing::ExitedWithCode(0), "POLL test successfull"); +} + +#ifdef NANOMSG_FOUND +TEST(Poller, Nanomsg_channel) +{ + EXPECT_EXIT(RunPoller("nanomsg", 1), ::testing::ExitedWithCode(0), "POLL test successfull"); +} +#endif /* NANOMSG_FOUND */ + +TEST(Poller, ShMem_channel) +{ + EXPECT_EXIT(RunPoller("shmem", 1), ::testing::ExitedWithCode(0), "POLL test successfull"); +} + +} // namespace diff --git a/fairmq/test/protocols/config.json.in b/fairmq/test/protocols/config.json.in index f3f16234..3a90777d 100644 --- a/fairmq/test/protocols/config.json.in +++ b/fairmq/test/protocols/config.json.in @@ -384,6 +384,132 @@ "type": "push" } ] + }, + { + "id": "pollout_zeromq", + "channels": [ + { + "address": "tcp://127.0.0.1:6000", + "method": "bind", + "name": "data1", + "rateLogging": 0, + "transport": "zeromq", + "type": "push" + }, + { + "address": "tcp://127.0.0.1:6001", + "method": "bind", + "name": "data2", + "rateLogging": 0, + "transport": "zeromq", + "type": "push" + } + ] + }, + { + "id": "pollin_zeromq", + "channels": [ + { + "address": "tcp://127.0.0.1:6000", + "method": "connect", + "name": "data1", + "rateLogging": 0, + "transport": "zeromq", + "type": "pull" + }, + { + "address": "tcp://127.0.0.1:6001", + "method": "connect", + "name": "data2", + "rateLogging": 0, + "transport": "zeromq", + "type": "pull" + } + ] + }, + { + "id": "pollout_nanomsg", + "channels": [ + { + "address": "tcp://127.0.0.1:6002", + "method": "bind", + "name": "data1", + "rateLogging": 0, + "transport": "nanomsg", + "type": "push" + }, + { + "address": "tcp://127.0.0.1:6003", + "method": "bind", + "name": "data2", + "rateLogging": 0, + "transport": "nanomsg", + "type": "push" + } + ] + }, + { + "id": "pollin_nanomsg", + "channels": [ + { + "address": "tcp://127.0.0.1:6002", + "method": "connect", + "name": "data1", + "rateLogging": 0, + "transport": "nanomsg", + "type": "pull" + }, + { + "address": "tcp://127.0.0.1:6003", + "method": "connect", + "name": "data2", + "rateLogging": 0, + "transport": "nanomsg", + "type": "pull" + } + ] + }, + { + "id": "pollout_shmem", + "channels": [ + { + "address": "tcp://127.0.0.1:6004", + "method": "bind", + "name": "data1", + "rateLogging": 0, + "transport": "shmem", + "type": "push" + }, + { + "address": "tcp://127.0.0.1:6005", + "method": "bind", + "name": "data2", + "rateLogging": 0, + "transport": "shmem", + "type": "push" + } + ] + }, + { + "id": "pollin_shmem", + "channels": [ + { + "address": "tcp://127.0.0.1:6004", + "method": "connect", + "name": "data1", + "rateLogging": 0, + "transport": "shmem", + "type": "pull" + }, + { + "address": "tcp://127.0.0.1:6005", + "method": "connect", + "name": "data2", + "rateLogging": 0, + "transport": "shmem", + "type": "pull" + } + ] } ] }