refactor tests

* move to gtest
* add shmem tests
* cleanup cmake file (organize tests in suites)
This commit is contained in:
Dennis Klein 2017-03-28 14:19:08 +02:00 committed by Mohammad Al-Turany
parent 55a9d69908
commit 7e34f7042f
38 changed files with 1185 additions and 1298 deletions

View File

@ -1,137 +1,61 @@
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-push-pull.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-pub-sub.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-req-rep.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh)
find_package(GTest REQUIRED)
set(PSTREAMS_SOURCE_DIR ${CMAKE_SOURCE_DIR}/3rdparty/pstreams)
find_package(PStreams REQUIRED)
include(GTestHelper)
Set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq
${CMAKE_SOURCE_DIR}/fairmq/zeromq
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
${CMAKE_SOURCE_DIR}/fairmq/devices
${CMAKE_SOURCE_DIR}/fairmq/tools
${CMAKE_SOURCE_DIR}/fairmq/options
${CMAKE_SOURCE_DIR}/fairmq/test/
${CMAKE_SOURCE_DIR}/fairmq/test/push-pull
${CMAKE_SOURCE_DIR}/fairmq/test/pub-sub
${CMAKE_SOURCE_DIR}/fairmq/test/req-rep
${CMAKE_CURRENT_BINARY_DIR}
#############################
# FairMQ Testsuites/helpers #
#############################
add_testhelper(runTestDevice
SOURCES
helper/runTestDevice.cxx
helper/devices/TestPub.cxx
helper/devices/TestPull.cxx
helper/devices/TestPush.cxx
helper/devices/TestRep.cxx
helper/devices/TestReq.cxx
helper/devices/TestSub.cxx
helper/devices/TestTransferTimeout.cxx
LINKS FairMQ
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${Boost_INCLUDE_DIR}
${ZeroMQ_INCLUDE_DIR}
set(MQ_CONFIG "${CMAKE_BINARY_DIR}/bin/testsuite_FairMQ.IOPatterns_config.json")
set(RUN_TEST_DEVICE "${CMAKE_BINARY_DIR}/bin/testhelper_runTestDevice")
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/protocols/config.json.in ${MQ_CONFIG})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/protocols/runner.cxx.in ${CMAKE_CURRENT_BINARY_DIR}/protocols/runner.cxx)
add_testsuite(FairMQ.Protocols
SOURCES
${CMAKE_CURRENT_BINARY_DIR}/protocols/runner.cxx
protocols/_pub_sub.cxx
protocols/_push_pull.cxx
protocols/_req_rep.cxx
protocols/_transfer_timeout.cxx
LINKS PStreams
DEPENDS testhelper_runTestDevice
INCLUDES ${CMAKE_CURRENT_SOURCE_DIR}/protocols
TIMEOUT 30
)
If(NANOMSG_FOUND)
Set(SYSTEM_INCLUDE_DIRECTORIES
${SYSTEM_INCLUDE_DIRECTORIES}
${NANOMSG_INCLUDE_DIR}
)
EndIf(NANOMSG_FOUND)
Include_Directories(${INCLUDE_DIRECTORIES})
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
Set(LINK_DIRECTORIES
${Boost_LIBRARY_DIRS}
##############################
# Aggregate all test targets #
##############################
add_custom_target(FairMQTests
DEPENDS
${ALL_TEST_TARGETS}
)
Link_Directories(${LINK_DIRECTORIES})
set(SRCS
"push-pull/FairMQTestPush.cxx"
"push-pull/FairMQTestPull.cxx"
"pub-sub/FairMQTestPub.cxx"
"pub-sub/FairMQTestSub.cxx"
"req-rep/FairMQTestReq.cxx"
"req-rep/FairMQTestRep.cxx"
)
set(DEPENDENCIES
${DEPENDENCIES}
FairMQ
)
set(LIBRARY_NAME FairMQTest)
GENERATE_LIBRARY()
set(Exe_Names
test-fairmq-push
test-fairmq-pull
test-fairmq-pub
test-fairmq-sub
test-fairmq-req
test-fairmq-rep
test-fairmq-transfer-timeout
)
set(Exe_Source
push-pull/runTestPush.cxx
push-pull/runTestPull.cxx
pub-sub/runTestPub.cxx
pub-sub/runTestSub.cxx
req-rep/runTestReq.cxx
req-rep/runTestRep.cxx
runTransferTimeoutTest.cxx
)
list(LENGTH Exe_Names _length)
math(EXPR _length ${_length}-1)
ForEach(_file RANGE 0 ${_length})
list(GET Exe_Names ${_file} _name)
list(GET Exe_Source ${_file} _src)
set(EXE_NAME ${_name})
set(SRCS ${_src})
set(DEPENDENCIES FairMQTest)
GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length})
add_test(NAME run_fairmq_push_pull_zmq COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh zeromq)
set_tests_properties(run_fairmq_push_pull_zmq PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_push_pull_zmq PROPERTIES PASS_REGULAR_EXPRESSION "PUSH-PULL test successfull")
add_test(NAME run_fairmq_pub_sub_zmq COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh zeromq)
set_tests_properties(run_fairmq_pub_sub_zmq PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_pub_sub_zmq PROPERTIES PASS_REGULAR_EXPRESSION "PUB-SUB test successfull")
Set_Tests_Properties(run_fairmq_pub_sub_zmq PROPERTIES DEPENDS run_fairmq_push_pull_zmq)
add_test(NAME run_fairmq_req_rep_zmq COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh zeromq)
set_tests_properties(run_fairmq_req_rep_zmq PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_req_rep_zmq PROPERTIES PASS_REGULAR_EXPRESSION "REQ-REP test successfull")
Set_Tests_Properties(run_fairmq_req_rep_zmq PROPERTIES DEPENDS run_fairmq_pub_sub_zmq)
add_test(NAME run_fairmq_transfer_timeout_zmq COMMAND ${CMAKE_BINARY_DIR}/bin/test-fairmq-transfer-timeout zeromq)
set_tests_properties(run_fairmq_transfer_timeout_zmq PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_transfer_timeout_zmq PROPERTIES PASS_REGULAR_EXPRESSION "Transfer timeout test successfull")
Set_Tests_Properties(run_fairmq_transfer_timeout_zmq PROPERTIES DEPENDS run_fairmq_pub_sub_zmq)
If(NANOMSG_FOUND)
add_test(NAME run_fairmq_push_pull_nn COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh nanomsg)
set_tests_properties(run_fairmq_push_pull_nn PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_push_pull_nn PROPERTIES PASS_REGULAR_EXPRESSION "PUSH-PULL test successfull")
Set_Tests_Properties(run_fairmq_push_pull_nn PROPERTIES DEPENDS run_fairmq_transfer_timeout_zmq)
add_test(NAME run_fairmq_pub_sub_nn COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh nanomsg)
set_tests_properties(run_fairmq_pub_sub_nn PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_pub_sub_nn PROPERTIES PASS_REGULAR_EXPRESSION "PUB-SUB test successfull")
Set_Tests_Properties(run_fairmq_pub_sub_nn PROPERTIES DEPENDS run_fairmq_push_pull_nn)
add_test(NAME run_fairmq_req_rep_nn COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh nanomsg)
set_tests_properties(run_fairmq_req_rep_nn PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_req_rep_nn PROPERTIES PASS_REGULAR_EXPRESSION "REQ-REP test successfull")
Set_Tests_Properties(run_fairmq_req_rep_nn PROPERTIES DEPENDS run_fairmq_pub_sub_nn)
add_test(NAME run_fairmq_transfer_timeout_nn COMMAND ${CMAKE_BINARY_DIR}/bin/test-fairmq-transfer-timeout nanomsg)
set_tests_properties(run_fairmq_transfer_timeout_nn PROPERTIES TIMEOUT "30")
set_tests_properties(run_fairmq_transfer_timeout_nn PROPERTIES PASS_REGULAR_EXPRESSION "Transfer timeout test successfull")
Set_Tests_Properties(run_fairmq_transfer_timeout_nn PROPERTIES DEPENDS run_fairmq_req_rep_nn)
EndIf(NANOMSG_FOUND)

View File

@ -0,0 +1,64 @@
/********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/helper/devices/TestPub.cxx
*/
#include <FairMQDevice.h>
#include <FairMQLogger.h>
namespace fair
{
namespace mq
{
namespace test
{
class Pub : public FairMQDevice
{
protected:
auto Run() -> void override
{
auto ready1 = FairMQMessagePtr{NewMessage()};
auto ready2 = FairMQMessagePtr{NewMessage()};
auto r1 = Receive(ready1, "control");
auto r2 = Receive(ready2, "control");
if (r1 >= 0 && r2 >= 0)
{
LOG(INFO) << "Received both ready signals, proceeding to publish data";
auto msg = FairMQMessagePtr{NewMessage()};
auto d1 = Send(msg, "data");
if (d1 < 0)
{
LOG(ERROR) << "Failed sending data: d1 = " << d1;
}
auto ack1 = FairMQMessagePtr{NewMessage()};
auto ack2 = FairMQMessagePtr{NewMessage()};
auto a1 = Receive(ack1, "control");
auto a2 = Receive(ack2, "control");
if (a1 >= 0 && a2 >= 0)
{
LOG(INFO) << "PUB-SUB test successfull";
}
else
{
LOG(ERROR) << "Failed receiving ack signal: a1 = " << a1 << ", a2 = " << a2;
}
}
else
{
LOG(ERROR) << "Failed receiving ready signal: r1 = " << r1 << ", r2 = " << r2;
}
};
};
} // namespace test
} // namespace mq
} // namespace fair

View File

@ -0,0 +1,40 @@
/********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/helper/devices/TestPull.cxx
*/
#include <FairMQDevice.h>
#include <FairMQLogger.h>
namespace fair
{
namespace mq
{
namespace test
{
using namespace std;
class Pull : public FairMQDevice
{
protected:
auto Run() -> void override
{
auto msg = FairMQMessagePtr{NewMessage()};
if (Receive(msg, "data") >= 0)
{
LOG(INFO) << "PUSH-PULL test successfull";
}
};
};
} // namespace test
} // namespace mq
} // namespace fair

View File

@ -0,0 +1,33 @@
/********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/helper/devices/TestPush.cxx
*/
#include <FairMQDevice.h>
namespace fair
{
namespace mq
{
namespace test
{
class Push : public FairMQDevice
{
protected:
auto Run() -> void override
{
auto msg = FairMQMessagePtr{NewMessage()};
Send(msg, "data");
};
};
} // namespace test
} // namespace mq
} // namespace fair

View File

@ -0,0 +1,48 @@
/********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/helper/devices/TestRep.cxx
*/
#include <FairMQDevice.h>
#include <FairMQLogger.h>
namespace fair
{
namespace mq
{
namespace test
{
class Rep : public FairMQDevice
{
protected:
auto Run() -> void override
{
auto request1 = FairMQMessagePtr{NewMessage()};
if (Receive(request1, "data") >= 0)
{
LOG(INFO) << "Received request 1";
auto reply = FairMQMessagePtr{NewMessage()};
Send(reply, "data");
}
auto request2 = FairMQMessagePtr{NewMessage()};
if (Receive(request2, "data") >= 0)
{
LOG(INFO) << "Received request 2";
auto reply = FairMQMessagePtr{NewMessage()};
Send(reply, "data");
}
LOG(INFO) << "REQ-REP test successfull";
};
};
} // namespace test
} // namespace mq
} // namespace fair

View File

@ -0,0 +1,40 @@
/********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/helper/devices/TestReq.cxx
*/
#include <FairMQDevice.h>
#include <FairMQLogger.h>
namespace fair
{
namespace mq
{
namespace test
{
class Req : public FairMQDevice
{
protected:
auto Run() -> void override
{
auto request = FairMQMessagePtr{NewMessage()};
Send(request, "data");
auto reply = FairMQMessagePtr{NewMessage()};
if (Receive(reply, "data") >= 0)
{
LOG(INFO) << "received reply";
}
};
};
} // namespace test
} // namespace mq
} // namespace fair

View File

@ -0,0 +1,56 @@
/********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/helper/devices/TestSub.cxx
*/
#include <FairMQDevice.h>
#include <FairMQLogger.h>
namespace fair
{
namespace mq
{
namespace test
{
class Sub : public FairMQDevice
{
protected:
auto Run() -> void override
{
auto ready = FairMQMessagePtr{NewMessage()};
auto r1 = Send(ready, "control");
if (r1 >= 0)
{
auto msg = FairMQMessagePtr{NewMessage()};
auto d1 = Receive(msg, "data");
if (d1 >= 0)
{
auto ack = FairMQMessagePtr{NewMessage()};
auto a1 = Send(ack, "control");
if (a1 < 0)
{
LOG(ERROR) << "Failed sending ack signal: a1 = " << a1;
}
}
else
{
LOG(ERROR) << "Failed receiving data: d1 = " << d1;
}
}
else
{
LOG(ERROR) << "Failed sending ready signal: r1 = " << r1;
}
};
};
} // namespace test
} // namespace mq
} // namespace fair

View File

@ -0,0 +1,62 @@
/********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/helper/devices/TestTransferTimeout.cxx
*/
#include <FairMQDevice.h>
#include <FairMQLogger.h>
namespace fair
{
namespace mq
{
namespace test
{
class TransferTimeout : public FairMQDevice
{
protected:
auto Run() -> void override
{
auto sendCanceling = false;
auto receiveCanceling = false;
auto msg1 = FairMQMessagePtr{NewMessage()};
auto msg2 = FairMQMessagePtr{NewMessage()};
if (Send(msg1, "data-out", 0, 100) == -2)
{
LOG(INFO) << "send canceled";
sendCanceling = true;
}
else
{
LOG(ERROR) << "send did not cancel";
}
if (Receive(msg2, "data-in", 0, 100) == -2)
{
LOG(INFO) << "receive canceled";
receiveCanceling = true;
}
else
{
LOG(ERROR) << "receive did not cancel";
}
if (sendCanceling && receiveCanceling)
{
LOG(INFO) << "Transfer timeout test successfull";
}
};
};
} // namespace test
} // namespace mq
} // namespace fair

View File

@ -0,0 +1,69 @@
/********************************************************************************
* Copyright (C) 2015-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/helper/runTestDevice.cxx
*/
#include "devices/TestPub.cxx"
#include "devices/TestPull.cxx"
#include "devices/TestPush.cxx"
#include "devices/TestRep.cxx"
#include "devices/TestReq.cxx"
#include "devices/TestSub.cxx"
#include "devices/TestTransferTimeout.cxx"
#include <boost/program_options.hpp>
#include <iostream>
#include <runFairMQDevice.h>
#include <string>
namespace bpo = boost::program_options;
auto addCustomOptions(bpo::options_description& options) -> void
{
}
auto getDevice(const FairMQProgOptions& config) -> FairMQDevicePtr
{
using namespace std;
using namespace fair::mq::test;
auto id = config.GetValue<std::string>("id");
if (0 == id.find("pull_"))
{
return new Pull;
}
else if (0 == id.find("push_"))
{
return new Push;
}
else if (0 == id.find("sub_"))
{
return new Sub;
}
else if (0 == id.find("pub_"))
{
return new Pub;
}
else if (0 == id.find("req_"))
{
return new Req;
}
else if (0 == id.find("rep_"))
{
return new Rep;
}
else if (0 == id.find("transfer_timeout_"))
{
return new TransferTimeout;
}
else
{
cerr << "Don't know id '" << id << "'" << endl;
return nullptr;
}
}

View File

@ -0,0 +1,69 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/protocols/_pub_sub.cxx
*/
#include "runner.h"
#include <gtest/gtest.h>
#include <sstream> // std::stringstream
#include <thread>
namespace
{
using namespace std;
using namespace fair::mq::test;
auto RunPubSub(string transport) -> void
{
auto pub = execute_result{"", 0};
thread pub_thread([&]() {
stringstream cmd;
cmd << runTestDevice << " --id pub_" << transport << " --control static --verbosity DEBUG "
<< "--log-color false --mq-config \"" << mqConfig << "\"";
pub = execute(cmd.str(), "[PUB]");
});
auto sub1 = execute_result{"", 0};
thread sub1_thread([&]() {
stringstream cmd;
cmd << runTestDevice << " --id sub_" << transport << " --control static --verbosity DEBUG "
<< "--log-color false --mq-config \"" << mqConfig << "\"";
sub1 = execute(cmd.str(), "[SUB1]");
});
auto sub2 = execute_result{"", 0};
thread sub2_thread([&]() {
stringstream cmd;
cmd << runTestDevice << " --id sub_" << transport << " --control static --verbosity DEBUG "
<< "--log-color false --mq-config \"" << mqConfig << "\"";
sub2 = execute(cmd.str(), "[SUB2]");
});
pub_thread.join();
sub1_thread.join();
sub2_thread.join();
cerr << pub.error_out << sub1.error_out << sub2.error_out;
exit(pub.exit_code + sub1.exit_code + sub2.exit_code);
}
TEST(PubSub, ZeroMQ)
{
EXPECT_EXIT(RunPubSub("zeromq"), ::testing::ExitedWithCode(0), "PUB-SUB test successfull");
}
#ifdef NANOMSG_FOUND
TEST(PubSub, Nanomsg)
{
EXPECT_EXIT(RunPubSub("nanomsg"), ::testing::ExitedWithCode(0), "PUB-SUB test successfull");
}
#endif /* NANOMSG_FOUND */
} // namespace

View File

@ -0,0 +1,65 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/protocols/_push_pull.cxx
*/
#include "runner.h"
#include <gtest/gtest.h>
#include <sstream> // std::stringstream
#include <thread>
namespace
{
using namespace std;
using namespace fair::mq::test;
auto RunPushPull(string transport) -> void
{
auto push = execute_result{"", 100};
thread push_thread([&]() {
stringstream cmd;
cmd << runTestDevice << " --id push_" << transport << " --control static --verbosity DEBUG "
<< "--log-color false --mq-config \"" << mqConfig << "\"";
push = execute(cmd.str(), "[PUSH]");
});
auto pull = execute_result{"", 100};
thread pull_thread([&]() {
stringstream cmd;
cmd << runTestDevice << " --id pull_" << transport << " --control static --verbosity DEBUG "
<< "--log-color false --mq-config \"" << mqConfig << "\"";
pull = execute(cmd.str(), "[PULL]");
});
push_thread.join();
pull_thread.join();
cerr << push.error_out << pull.error_out;
exit(push.exit_code + pull.exit_code);
}
TEST(PushPull, ZeroMQ)
{
EXPECT_EXIT(RunPushPull("zeromq"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}
TEST(PushPull, ShMem)
{
EXPECT_EXIT(RunPushPull("shmem"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}
#ifdef NANOMSG_FOUND
TEST(PushPull, Nanomsg)
{
EXPECT_EXIT(RunPushPull("nanomsg"), ::testing::ExitedWithCode(0), "PUSH-PULL test successfull");
}
#endif /* NANOMSG_FOUND */
} // namespace

View File

@ -0,0 +1,74 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/protocols/_req_rep.cxx
*/
#include "runner.h"
#include <gtest/gtest.h>
#include <sstream> // std::stringstream
#include <thread>
namespace
{
using namespace std;
using namespace fair::mq::test;
auto RunReqRep(string transport) -> void
{
auto rep = execute_result{ "", 0 };
thread rep_thread([&]() {
stringstream cmd;
cmd << runTestDevice << " --id rep_" << transport << " --control static --verbosity DEBUG "
<< "--log-color false --mq-config \"" << mqConfig << "\"";
rep = execute(cmd.str(), "[REP]");
});
auto req1 = execute_result{ "", 0 };
thread req1_thread([&]() {
stringstream cmd;
cmd << runTestDevice << " --id req_1" << transport << " --control static --verbosity DEBUG "
<< "--log-color false --mq-config \"" << mqConfig << "\"";
req1 = execute(cmd.str(), "[REQ1]");
});
auto req2 = execute_result{ "", 0 };
thread req2_thread([&]() {
stringstream cmd;
cmd << runTestDevice << " --id req_2" << transport << " --control static --verbosity DEBUG "
<< "--log-color false --mq-config \"" << mqConfig << "\"";
req2 = execute(cmd.str(), "[REQ2]");
});
rep_thread.join();
req1_thread.join();
req2_thread.join();
cerr << req1.error_out << req2.error_out << rep.error_out;
exit(req1.exit_code + req2.exit_code + rep.exit_code);
}
TEST(ReqRep, ZeroMQ)
{
EXPECT_EXIT(RunReqRep("zeromq"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
}
TEST(ReqRep, ShMem)
{
EXPECT_EXIT(RunReqRep("shmem"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
}
#ifdef NANOMSG_FOUND
TEST(ReqRep, Nanomsg)
{
EXPECT_EXIT(RunReqRep("nanomsg"), ::testing::ExitedWithCode(0), "REQ-REP test successfull");
}
#endif /* NANOMSG_FOUND */
} // namespace

View File

@ -0,0 +1,51 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/protocols/_transfer_timeout.cxx
*/
#include "runner.h"
#include <gtest/gtest.h>
#include <sstream> // std::stringstream
namespace
{
using namespace std;
using namespace fair::mq::test;
auto RunTransferTimeout(string transport) -> void
{
stringstream cmd;
cmd << runTestDevice << " --id transfer_timeout_" << transport << " --control static --verbosity DEBUG "
<< "--log-color false --mq-config \"" << mqConfig << "\"";
auto res = execute(cmd.str());
cerr << res.error_out;
exit(res.exit_code);
}
TEST(TransferTimeout, ZeroMQ)
{
EXPECT_EXIT(RunTransferTimeout("zeromq"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull");
}
TEST(TransferTimeout, ShMem)
{
EXPECT_EXIT(RunTransferTimeout("shmem"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull");
}
#ifdef NANOMSG_FOUND
TEST(TransferTimeout, Nanomsg)
{
EXPECT_EXIT(RunTransferTimeout("nanomsg"), ::testing::ExitedWithCode(0), "Transfer timeout test successfull");
}
#endif /* NANOMSG_FOUND */
} // namespace

View File

@ -0,0 +1,348 @@
{
"fairMQOptions": {
"devices": [
{
"id": "push_zeromq",
"channels": [
{
"address": "tcp://127.0.0.1:5557",
"method": "bind",
"name": "data",
"rateLogging": 0,
"transport": "zeromq",
"type": "push"
}
]
},
{
"id": "pull_zeromq",
"channels": [
{
"address": "tcp://127.0.0.1:5557",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "zeromq",
"type": "pull"
}
]
},
{
"id": "push_nanomsg",
"channels": [
{
"address": "tcp://127.0.0.1:5757",
"method": "bind",
"name": "data",
"rateLogging": 0,
"transport": "nanomsg",
"type": "push"
}
]
},
{
"id": "pull_nanomsg",
"channels": [
{
"address": "tcp://127.0.0.1:5757",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "nanomsg",
"type": "pull"
}
]
},
{
"id": "push_shmem",
"channels": [
{
"address": "tcp://127.0.0.1:5857",
"method": "bind",
"name": "data",
"rateLogging": 0,
"transport": "shmem",
"type": "push"
}
]
},
{
"id": "pull_shmem",
"channels": [
{
"address": "tcp://127.0.0.1:5857",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "shmem",
"type": "pull"
}
]
},
{
"id": "pub_zeromq",
"channels": [
{
"address": "tcp://127.0.0.1:5556",
"method": "bind",
"name": "data",
"rateLogging": 0,
"transport": "zeromq",
"type": "pub"
},
{
"address": "tcp://127.0.0.1:5555",
"method": "bind",
"name": "control",
"rateLogging": 0,
"transport": "zeromq",
"type": "pull"
}
]
},
{
"id": "pub_nanomsg",
"channels": [
{
"address": "tcp://127.0.0.1:5756",
"method": "bind",
"name": "data",
"rateLogging": 0,
"transport": "nanomsg",
"type": "pub"
},
{
"address": "tcp://127.0.0.1:5755",
"method": "bind",
"name": "control",
"rateLogging": 0,
"transport": "nanomsg",
"type": "pull"
}
]
},
{
"id": "sub_zeromq",
"channels": [
{
"address": "tcp://127.0.0.1:5556",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "zeromq",
"type": "sub"
},
{
"address": "tcp://127.0.0.1:5555",
"method": "connect",
"name": "control",
"rateLogging": 0,
"transport": "zeromq",
"type": "push"
}
]
},
{
"id": "sub_nanomsg",
"channels": [
{
"address": "tcp://127.0.0.1:5756",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "nanomsg",
"type": "sub"
},
{
"address": "tcp://127.0.0.1:5755",
"method": "connect",
"name": "control",
"rateLogging": 0,
"transport": "nanomsg",
"type": "push"
}
]
},
{
"id": "req_1zeromq",
"channels": [
{
"address": "tcp://127.0.0.1:5558",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "zeromq",
"type": "req"
}
]
},
{
"id": "req_1nanomsg",
"channels": [
{
"address": "tcp://127.0.0.1:5758",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "nanomsg",
"type": "req"
}
]
},
{
"id": "req_2zeromq",
"channels": [
{
"address": "tcp://127.0.0.1:5558",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "zeromq",
"type": "req"
}
]
},
{
"id": "req_2nanomsg",
"channels": [
{
"address": "tcp://127.0.0.1:5758",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "nanomsg",
"type": "req"
}
]
},
{
"id": "req_1shmem",
"channels": [
{
"address": "tcp://127.0.0.1:5758",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "shmem",
"type": "req"
}
]
},
{
"id": "req_2shmem",
"channels": [
{
"address": "tcp://127.0.0.1:5758",
"method": "connect",
"name": "data",
"rateLogging": 0,
"transport": "shmem",
"type": "req"
}
]
},
{
"id": "rep_zeromq",
"channels": [
{
"address": "tcp://127.0.0.1:5558",
"method": "bind",
"name": "data",
"rateLogging": 0,
"transport": "zeromq",
"type": "rep"
}
]
},
{
"id": "rep_nanomsg",
"channels": [
{
"address": "tcp://127.0.0.1:5758",
"method": "bind",
"name": "data",
"rateLogging": 0,
"transport": "nanomsg",
"type": "rep"
}
]
},
{
"id": "rep_shmem",
"channels": [
{
"address": "tcp://127.0.0.1:5758",
"method": "bind",
"name": "data",
"rateLogging": 0,
"transport": "shmem",
"type": "rep"
}
]
},
{
"id": "transfer_timeout_zeromq",
"channels": [
{
"address": "tcp://127.0.0.1:5559",
"method": "bind",
"name": "data-in",
"rateLogging": 0,
"transport": "zeromq",
"type": "pull"
},
{
"address": "tcp://127.0.0.1:5560",
"method": "bind",
"name": "data-out",
"rateLogging": 0,
"transport": "zeromq",
"type": "push"
}
]
},
{
"id": "transfer_timeout_shmem",
"channels": [
{
"address": "tcp://127.0.0.1:5959",
"method": "bind",
"name": "data-in",
"rateLogging": 0,
"transport": "shmem",
"type": "pull"
},
{
"address": "tcp://127.0.0.1:5960",
"method": "bind",
"name": "data-out",
"rateLogging": 0,
"transport": "shmem",
"type": "push"
}
]
},
{
"id": "transfer_timeout_nanomsg",
"channels": [
{
"address": "tcp://127.0.0.1:5759",
"method": "bind",
"name": "data-in",
"rateLogging": 0,
"transport": "nanomsg",
"type": "pull"
},
{
"address": "tcp://127.0.0.1:5560",
"method": "bind",
"name": "data-out",
"rateLogging": 0,
"transport": "nanomsg",
"type": "push"
}
]
}
]
}
}

View File

@ -0,0 +1,66 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/protocols/runner.cxx.in
*/
#include "runner.h"
#include <gtest/gtest.h>
#include <sstream>
#include <string>
#include <pstream.h> // redi::ipstream
#include <iostream>
namespace fair
{
namespace mq
{
namespace test
{
using namespace std;
string runTestDevice = "@RUN_TEST_DEVICE@";
string mqConfig = "@MQ_CONFIG@";
auto execute(string cmd, string log_prefix) -> execute_result
{
auto res = execute_result{"", 0};
stringstream out;
// Log cmd
out << log_prefix << cmd << endl;
// Execute command and capture stderr, add log_prefix line by line
redi::ipstream in(cmd, redi::pstreams::pstderr);
auto line = string{};
while (getline(in, line))
{
out << log_prefix << line << endl;
}
in.close();
// Capture exit code
res.exit_code = in.rdbuf()->status();
out << log_prefix << " Exit code: " << res.exit_code << endl;
// Return result
res.error_out = out.str();
return res;
}
} /* namespace test */
} /* namespace mq */
} /* namespace fair */
auto main(int argc, char** argv) -> int
{
::testing::InitGoogleTest(&argc, argv);
::testing::FLAGS_gtest_death_test_style = "threadsafe";
return RUN_ALL_TESTS();
}

View File

@ -0,0 +1,49 @@
/********************************************************************************
* Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* @file fairmq/test/protocols/runner.h
*/
#ifndef FAIR_MQ_TEST_RUNNER_H
#define FAIR_MQ_TEST_RUNNER_H
#include <string>
namespace fair
{
namespace mq
{
namespace test
{
extern std::string runTestDevice; /// Path to test device executable.
extern std::string mqConfig; /// Path to FairMQ device config file.
/**
* Result type for execute function. Holds captured stderr output and exit code.
*/
struct execute_result {
std::string error_out;
int exit_code;
};
/**
* Execute given command in forked process and capture stderr output
* and exit code.
*
* @param[in] cmd Command to execute
* @param[in] log_prefix How to prefix each captured output line with
* @return Captured error output and exit code
*/
auto execute(std::string cmd, std::string log_prefix = "") -> execute_result;
} /* namespace test */
} /* namespace mq */
} /* namespace fair */
#endif /* FAIR_MQ_TEST_RUNNER_H */

View File

@ -1,60 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPub.cpp
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQTestPub.h"
#include "FairMQLogger.h"
FairMQTestPub::FairMQTestPub()
{
}
void FairMQTestPub::Run()
{
FairMQMessagePtr ready1(NewMessage());
FairMQMessagePtr ready2(NewMessage());
int r1 = Receive(ready1, "control");
int r2 = Receive(ready2, "control");
if (r1 >= 0 && r2 >= 0)
{
LOG(INFO) << "Received both ready signals, proceeding to publish data";
FairMQMessagePtr msg(NewMessage());
int d1 = Send(msg, "data");
if (d1 < 0)
{
LOG(ERROR) << "Failed sending data: d1 = " << d1;
}
FairMQMessagePtr ack1(NewMessage());
FairMQMessagePtr ack2(NewMessage());
int a1 = Receive(ack1, "control");
int a2 = Receive(ack2, "control");
if (a1 >= 0 && a2 >= 0)
{
LOG(INFO) << "PUB-SUB test successfull";
}
else
{
LOG(ERROR) << "Failed receiving ack signal: a1 = " << a1 << ", a2 = " << a2;
}
}
else
{
LOG(ERROR) << "Failed receiving ready signal: r1 = " << r1 << ", r2 = " << r2;
}
}
FairMQTestPub::~FairMQTestPub()
{
}

View File

@ -1,30 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPub.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTPUB_H_
#define FAIRMQTESTPUB_H_
#include "FairMQDevice.h"
class FairMQTestPub : public FairMQDevice
{
public:
FairMQTestPub();
virtual ~FairMQTestPub();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTPUB_H_ */

View File

@ -1,52 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestSub.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQTestSub.h"
#include "FairMQLogger.h"
FairMQTestSub::FairMQTestSub()
{
}
void FairMQTestSub::Run()
{
FairMQMessagePtr ready(NewMessage());
int r1 = Send(ready, "control");
if (r1 >= 0)
{
FairMQMessagePtr msg(NewMessage());
int d1 = Receive(msg, "data");
if (d1 >= 0)
{
FairMQMessagePtr ack(NewMessage());
int a1 = Send(ack, "control");
if (a1 < 0)
{
LOG(ERROR) << "Failed sending ack signal: a1 = " << a1;
}
}
else
{
LOG(ERROR) << "Failed receiving data: d1 = " << d1;
}
}
else
{
LOG(ERROR) << "Failed sending ready signal: r1 = " << r1;
}
}
FairMQTestSub::~FairMQTestSub()
{
}

View File

@ -1,30 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestSub.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTSUB_H_
#define FAIRMQTESTSUB_H_
#include "FairMQDevice.h"
class FairMQTestSub : public FairMQDevice
{
public:
FairMQTestSub();
virtual ~FairMQTestSub();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTSUB_H_ */

View File

@ -1,93 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestPub.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQTestPub.h"
#include <chrono>
#include <thread>
int main(int argc, char** argv)
{
reinit_logger(false);
FairMQTestPub testPub;
testPub.CatchSignals();
std::string transport;
if ( (argc != 2) || (argv[1] == NULL) )
{
LOG(ERROR) << "Transport for the test not specified!";
return 1;
}
if ( strncmp(argv[1],"zeromq",6) == 0 )
{
transport = "zeromq";
testPub.SetTransport(transport);
}
else if ( strncmp(argv[1],"nanomsg",7) == 0 )
{
transport = "nanomsg";
testPub.SetTransport(transport);
}
else
{
LOG(ERROR) << "Incorrect transport requested! Expected 'zeromq' or 'nanomsg', found: " << argv[1];
return 1;
}
testPub.SetProperty(FairMQTestPub::Id, "testPub");
FairMQChannel controlChannel("pull", "bind", "tcp://127.0.0.1:5555");
if (transport == "nanomsg")
{
controlChannel.UpdateAddress("tcp://127.0.0.1:5755");
}
controlChannel.UpdateRateLogging(0);
testPub.fChannels["control"].push_back(controlChannel);
FairMQChannel pubChannel("pub", "bind", "tcp://127.0.0.1:5556");
if (transport == "nanomsg")
{
pubChannel.UpdateAddress("tcp://127.0.0.1:5756");
}
pubChannel.UpdateRateLogging(0);
testPub.fChannels["data"].push_back(pubChannel);
testPub.ChangeState("INIT_DEVICE");
testPub.WaitForEndOfState("INIT_DEVICE");
testPub.ChangeState("INIT_TASK");
testPub.WaitForEndOfState("INIT_TASK");
testPub.ChangeState("RUN");
testPub.WaitForEndOfState("RUN");
// nanomsg does not implement the LINGER option. Give the sockets some time before their queues are terminated
if (transport == "nanomsg")
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
testPub.ChangeState("RESET_TASK");
testPub.WaitForEndOfState("RESET_TASK");
testPub.ChangeState("RESET_DEVICE");
testPub.WaitForEndOfState("RESET_DEVICE");
testPub.ChangeState("END");
return 0;
}

View File

@ -1,95 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestSub.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQTestSub.h"
#include <string>
#include <chrono>
#include <thread>
int main(int argc, char** argv)
{
reinit_logger(false);
SET_LOG_CONSOLE_LEVEL(WARN);
FairMQTestSub testSub;
testSub.CatchSignals();
std::string transport;
if ( (argc != 2) || (argv[1] == NULL) )
{
LOG(ERROR) << "Transport for the test not specified!";
return 1;
}
if ( strncmp(argv[1],"zeromq",6) == 0 )
{
transport = "zeromq";
testSub.SetTransport(transport);
}
else if ( strncmp(argv[1],"nanomsg",7) == 0 )
{
transport = "nanomsg";
testSub.SetTransport(transport);
}
else
{
LOG(ERROR) << "Incorrect transport requested! Expected 'zeromq' or 'nanomsg', found: " << argv[1];
return 1;
}
testSub.SetProperty(FairMQTestSub::Id, "testSub_" + std::to_string(getpid()));
FairMQChannel controlChannel("push", "connect", "tcp://127.0.0.1:5555");
if (transport == "nanomsg")
{
controlChannel.UpdateAddress("tcp://127.0.0.1:5755");
}
controlChannel.UpdateRateLogging(0);
testSub.fChannels["control"].push_back(controlChannel);
FairMQChannel subChannel("sub", "connect", "tcp://127.0.0.1:5556");
if (transport == "nanomsg")
{
subChannel.UpdateAddress("tcp://127.0.0.1:5756");
}
subChannel.UpdateRateLogging(0);
testSub.fChannels["data"].push_back(subChannel);
testSub.ChangeState("INIT_DEVICE");
testSub.WaitForEndOfState("INIT_DEVICE");
testSub.ChangeState("INIT_TASK");
testSub.WaitForEndOfState("INIT_TASK");
testSub.ChangeState("RUN");
testSub.WaitForEndOfState("RUN");
// nanomsg does not implement the LINGER option. Give the sockets some time before their queues are terminated
if (transport == "nanomsg")
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
testSub.ChangeState("RESET_TASK");
testSub.WaitForEndOfState("RESET_TASK");
testSub.ChangeState("RESET_DEVICE");
testSub.WaitForEndOfState("RESET_DEVICE");
testSub.ChangeState("END");
return 0;
}

View File

@ -1,34 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPull.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQTestPull.h"
#include "FairMQLogger.h"
FairMQTestPull::FairMQTestPull()
{
}
void FairMQTestPull::Run()
{
FairMQMessagePtr msg(NewMessage());
if (Receive(msg, "data") >= 0)
{
LOG(INFO) << "PUSH-PULL test successfull";
}
}
FairMQTestPull::~FairMQTestPull()
{
}

View File

@ -1,30 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPull.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTPULL_H_
#define FAIRMQTESTPULL_H_
#include "FairMQDevice.h"
class FairMQTestPull : public FairMQDevice
{
public:
FairMQTestPull();
virtual ~FairMQTestPull();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTPULL_H_ */

View File

@ -1,30 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPush.cpp
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQTestPush.h"
#include "FairMQLogger.h"
FairMQTestPush::FairMQTestPush()
{
}
void FairMQTestPush::Run()
{
FairMQMessagePtr msg(NewMessage());
Send(msg, "data");
}
FairMQTestPush::~FairMQTestPush()
{
}

View File

@ -1,30 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestPush.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTPUSH_H_
#define FAIRMQTESTPUSH_H_
#include "FairMQDevice.h"
class FairMQTestPush : public FairMQDevice
{
public:
FairMQTestPush();
virtual ~FairMQTestPush();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTPUSH_H_ */

View File

@ -1,85 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestPull.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQTestPull.h"
#include <chrono>
#include <thread>
int main(int argc, char** argv)
{
reinit_logger(false);
FairMQTestPull testPull;
testPull.CatchSignals();
std::string transport;
if ( (argc != 2) || (argv[1] == NULL) )
{
LOG(ERROR) << "Transport for the test not specified!";
return 1;
}
if ( strncmp(argv[1],"zeromq",6) == 0 )
{
transport = "zeromq";
testPull.SetTransport(transport);
}
else if ( strncmp(argv[1],"nanomsg",7) == 0 )
{
transport = "nanomsg";
testPull.SetTransport(transport);
}
else
{
LOG(ERROR) << "Incorrect transport requested! Expected 'zeromq' or 'nanomsg', found: " << argv[1];
return 1;
}
testPull.SetProperty(FairMQTestPull::Id, "testPull");
FairMQChannel pullChannel("pull", "connect", "tcp://127.0.0.1:5557");
if (transport == "nanomsg")
{
pullChannel.UpdateAddress("tcp://127.0.0.1:5757");
}
pullChannel.UpdateRateLogging(0);
testPull.fChannels["data"].push_back(pullChannel);
testPull.ChangeState("INIT_DEVICE");
testPull.WaitForEndOfState("INIT_DEVICE");
testPull.ChangeState("INIT_TASK");
testPull.WaitForEndOfState("INIT_TASK");
testPull.ChangeState("RUN");
testPull.WaitForEndOfState("RUN");
// nanomsg does not implement the LINGER option. Give the sockets some time before their queues are terminated
if (transport == "nanomsg")
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
testPull.ChangeState("RESET_TASK");
testPull.WaitForEndOfState("RESET_TASK");
testPull.ChangeState("RESET_DEVICE");
testPull.WaitForEndOfState("RESET_DEVICE");
testPull.ChangeState("END");
return 0;
}

View File

@ -1,91 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestPush.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQTestPush.h"
#include <chrono>
#include <thread>
int main(int argc, char** argv)
{
reinit_logger(false);
SET_LOG_CONSOLE_LEVEL(WARN);
FairMQTestPush testPush;
testPush.CatchSignals();
std::string transport;
if (argc != 2)
{
LOG(ERROR) << "Transport for the test not specified!";
return 1;
}
if (argv[1] == NULL) {
LOG(ERROR) << "Transport for the test not specified!";
return 1;
}
if ( strncmp(argv[1],"zeromq",6) == 0 )
{
transport = "zeromq";
testPush.SetTransport(transport);
}
else if ( strncmp(argv[1],"nanomsg",7) == 0 )
{
transport = "nanomsg";
testPush.SetTransport(transport);
}
else
{
LOG(ERROR) << "Incorrect transport requested! Expected 'zeromq' or 'nanomsg', found: " << argv[1];
return 1;
}
testPush.SetProperty(FairMQTestPush::Id, "testPush");
FairMQChannel pushChannel("push", "bind", "tcp://127.0.0.1:5557");
if (transport == "nanomsg")
{
pushChannel.UpdateAddress("tcp://127.0.0.1:5757");
}
pushChannel.UpdateRateLogging(0);
testPush.fChannels["data"].push_back(pushChannel);
testPush.ChangeState("INIT_DEVICE");
testPush.WaitForEndOfState("INIT_DEVICE");
testPush.ChangeState("INIT_TASK");
testPush.WaitForEndOfState("INIT_TASK");
testPush.ChangeState("RUN");
testPush.WaitForEndOfState("RUN");
// nanomsg does not implement the LINGER option. Give the sockets some time before their queues are terminated
if (transport == "nanomsg")
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
testPush.ChangeState("RESET_TASK");
testPush.WaitForEndOfState("RESET_TASK");
testPush.ChangeState("RESET_DEVICE");
testPush.WaitForEndOfState("RESET_DEVICE");
testPush.ChangeState("END");
return 0;
}

View File

@ -1,44 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestRep.cpp
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQTestRep.h"
#include "FairMQLogger.h"
FairMQTestRep::FairMQTestRep()
{
}
void FairMQTestRep::Run()
{
FairMQMessagePtr request1(NewMessage());
if (Receive(request1, "data") >= 0)
{
LOG(INFO) << "Received request 1";
FairMQMessagePtr reply(NewMessage());
Send(reply, "data");
}
FairMQMessagePtr request2(NewMessage());
if (Receive(request2, "data") >= 0)
{
LOG(INFO) << "Received request 2";
FairMQMessagePtr reply(NewMessage());
Send(reply, "data");
}
LOG(INFO) << "REQ-REP test successfull";
}
FairMQTestRep::~FairMQTestRep()
{
}

View File

@ -1,30 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestRep.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTREP_H_
#define FAIRMQTESTREP_H_
#include "FairMQDevice.h"
class FairMQTestRep : public FairMQDevice
{
public:
FairMQTestRep();
virtual ~FairMQTestRep();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTREP_H_ */

View File

@ -1,36 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestReq.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQTestReq.h"
#include "FairMQLogger.h"
FairMQTestReq::FairMQTestReq()
{
}
void FairMQTestReq::Run()
{
FairMQMessagePtr request(NewMessage());
Send(request, "data");
FairMQMessagePtr reply(NewMessage());
if (Receive(reply, "data") >= 0)
{
LOG(INFO) << "received reply";
}
}
FairMQTestReq::~FairMQTestReq()
{
}

View File

@ -1,30 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQTestReq.h
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#ifndef FAIRMQTESTREQ_H_
#define FAIRMQTESTREQ_H_
#include "FairMQDevice.h"
class FairMQTestReq : public FairMQDevice
{
public:
FairMQTestReq();
virtual ~FairMQTestReq();
protected:
virtual void Run();
};
#endif /* FAIRMQTESTREQ_H_ */

View File

@ -1,87 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestRep.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQTestRep.h"
#include <iostream>
#include <chrono>
#include <thread>
int main(int argc, char** argv)
{
reinit_logger(false);
FairMQTestRep testRep;
testRep.CatchSignals();
std::string transport;
if ( (argc != 2) || (argv[1] == NULL) )
{
LOG(ERROR) << "Transport for the test not specified!";
return 1;
}
if ( strncmp(argv[1],"zeromq",6) == 0 )
{
transport = "zeromq";
testRep.SetTransport(transport);
}
else if ( strncmp(argv[1],"nanomsg",7) == 0 )
{
transport = "nanomsg";
testRep.SetTransport(transport);
}
else
{
LOG(ERROR) << "Incorrect transport requested! Expected 'zeromq' or 'nanomsg', found: " << argv[1];
return 1;
}
testRep.SetProperty(FairMQTestRep::Id, "testRep");
FairMQChannel repChannel("rep", "bind", "tcp://127.0.0.1:5558");
if (transport == "nanomsg")
{
repChannel.UpdateAddress("tcp://127.0.0.1:5758");
}
repChannel.UpdateRateLogging(0);
testRep.fChannels["data"].push_back(repChannel);
testRep.ChangeState("INIT_DEVICE");
testRep.WaitForEndOfState("INIT_DEVICE");
testRep.ChangeState("INIT_TASK");
testRep.WaitForEndOfState("INIT_TASK");
testRep.ChangeState("RUN");
testRep.WaitForEndOfState("RUN");
// nanomsg does not implement the LINGER option. Give the sockets some time before their queues are terminated
if (transport == "nanomsg")
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
testRep.ChangeState("RESET_TASK");
testRep.WaitForEndOfState("RESET_TASK");
testRep.ChangeState("RESET_DEVICE");
testRep.WaitForEndOfState("RESET_DEVICE");
testRep.ChangeState("END");
return 0;
}

View File

@ -1,87 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTestReq.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQTestReq.h"
#include <iostream>
#include <chrono>
#include <thread>
int main(int argc, char** argv)
{
reinit_logger(false);
SET_LOG_CONSOLE_LEVEL(WARN);
FairMQTestReq testReq;
testReq.CatchSignals();
std::string transport;
if ( (argc != 2) || (argv[1] == NULL) )
{
LOG(ERROR) << "Transport for the test not specified!";
return 1;
}
if ( strncmp(argv[1],"zeromq",6) == 0 )
{
transport = "zeromq";
testReq.SetTransport(transport);
}
else if ( strncmp(argv[1],"nanomsg",7) == 0 )
{
transport = "nanomsg";
testReq.SetTransport(transport);
}
else
{
LOG(ERROR) << "Incorrect transport requested! Expected 'zeromq' or 'nanomsg', found: " << argv[1];
return 1;
}
testReq.SetProperty(FairMQTestReq::Id, "testReq" + std::to_string(getpid()));
FairMQChannel reqChannel("req", "connect", "tcp://127.0.0.1:5558");
if (transport == "nanomsg")
{
reqChannel.UpdateAddress("tcp://127.0.0.1:5758");
}
reqChannel.UpdateRateLogging(0);
testReq.fChannels["data"].push_back(reqChannel);
testReq.ChangeState("INIT_DEVICE");
testReq.WaitForEndOfState("INIT_DEVICE");
testReq.ChangeState("INIT_TASK");
testReq.WaitForEndOfState("INIT_TASK");
testReq.ChangeState("RUN");
testReq.WaitForEndOfState("RUN");
// nanomsg does not implement the LINGER option. Give the sockets some time before their queues are terminated
if (transport == "nanomsg")
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
testReq.ChangeState("RESET_TASK");
testReq.WaitForEndOfState("RESET_TASK");
testReq.ChangeState("RESET_DEVICE");
testReq.WaitForEndOfState("RESET_DEVICE");
testReq.ChangeState("END");
return 0;
}

View File

@ -1,146 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runTransferTimeoutTester.cxx
*
* @since 2015-09-05
* @author A. Rybalchenko
*/
#include "FairMQLogger.h"
#include "FairMQDevice.h"
#include <chrono>
#include <thread>
class TransferTimeoutTester : public FairMQDevice
{
public:
TransferTimeoutTester() {}
virtual ~TransferTimeoutTester() {}
protected:
virtual void Run()
{
bool sendCanceling = false;
bool receiveCanceling = false;
FairMQMessagePtr msg1(NewMessage());
FairMQMessagePtr msg2(NewMessage());
if (Send(msg1, "data-out", 0, 1000) == -2)
{
LOG(INFO) << "send canceled";
sendCanceling = true;
}
else
{
LOG(ERROR) << "send did not cancel";
}
if (Receive(msg2, "data-in", 0, 1000) == -2)
{
LOG(INFO) << "receive canceled";
receiveCanceling = true;
}
else
{
LOG(ERROR) << "receive did not cancel";
}
if (sendCanceling && receiveCanceling)
{
LOG(INFO) << "Transfer timeout test successfull";
}
}
};
int main(int argc, char** argv)
{
TransferTimeoutTester timeoutTester;
timeoutTester.CatchSignals();
std::string transport;
if ( (argc != 2) || (argv[1] == NULL) )
{
LOG(ERROR) << "Transport for the test not specified!";
return 1;
}
if ( strncmp(argv[1],"zeromq",6) == 0 )
{
transport = "zeromq";
timeoutTester.SetTransport(transport);
}
else if ( strncmp(argv[1],"nanomsg",7) == 0 )
{
transport = "nanomsg";
timeoutTester.SetTransport(transport);
}
else
{
LOG(ERROR) << "Incorrect transport requested! Expected 'zeromq' or 'nanomsg', found: " << argv[1];
return 1;
}
reinit_logger(false);
timeoutTester.SetProperty(TransferTimeoutTester::Id, "timeoutTester");
FairMQChannel dataOutChannel;
dataOutChannel.UpdateType("push");
dataOutChannel.UpdateMethod("bind");
dataOutChannel.UpdateAddress("tcp://127.0.0.1:5559");
if (transport == "nanomsg")
{
dataOutChannel.UpdateAddress("tcp://127.0.0.1:5759");
}
dataOutChannel.UpdateSndBufSize(1000);
dataOutChannel.UpdateRcvBufSize(1000);
dataOutChannel.UpdateRateLogging(0);
timeoutTester.fChannels["data-out"].push_back(dataOutChannel);
FairMQChannel dataInChannel;
dataInChannel.UpdateType("pull");
dataInChannel.UpdateMethod("bind");
dataInChannel.UpdateAddress("tcp://127.0.0.1:5560");
if (transport == "nanomsg")
{
dataInChannel.UpdateAddress("tcp://127.0.0.1:5760");
}
dataInChannel.UpdateSndBufSize(1000);
dataInChannel.UpdateRcvBufSize(1000);
dataInChannel.UpdateRateLogging(0);
timeoutTester.fChannels["data-in"].push_back(dataInChannel);
timeoutTester.ChangeState("INIT_DEVICE");
timeoutTester.WaitForEndOfState("INIT_DEVICE");
timeoutTester.ChangeState("INIT_TASK");
timeoutTester.WaitForEndOfState("INIT_TASK");
timeoutTester.ChangeState("RUN");
timeoutTester.WaitForEndOfState("RUN");
// nanomsg does not implement the LINGER option. Give the sockets some time before their queues are terminated
if (transport == "nanomsg")
{
std::this_thread::sleep_for(std::chrono::seconds(1));
}
timeoutTester.ChangeState("RESET_TASK");
timeoutTester.WaitForEndOfState("RESET_TASK");
timeoutTester.ChangeState("RESET_DEVICE");
timeoutTester.WaitForEndOfState("RESET_DEVICE");
timeoutTester.ChangeState("END");
return 0;
}

View File

@ -1,18 +0,0 @@
#!/bin/bash
transport="zeromq"
if [ "$1" = "nanomsg" ]; then
transport="nanomsg"
fi
trap 'kill -TERM $PUB_PID; kill -TERM $SUB1_PID; kill -TERM $SUB2_PID; wait $PUB_PID; wait $SUB1_PID; wait $SUB2_PID;' TERM
@CMAKE_BINARY_DIR@/bin/test-fairmq-pub $transport &
PUB_PID=$!
@CMAKE_BINARY_DIR@/bin/test-fairmq-sub $transport &
SUB1_PID=$!
@CMAKE_BINARY_DIR@/bin/test-fairmq-sub $transport &
SUB2_PID=$!
wait $PUB_PID
wait $SUB1_PID
wait $SUB2_PID

View File

@ -1,15 +0,0 @@
#!/bin/bash
transport="zeromq"
if [ "$1" = "nanomsg" ]; then
transport="nanomsg"
fi
trap 'kill -TERM $PUSH_PID; kill -TERM $PULL_PID; wait $PUSH_PID; wait $PULL_PID;' TERM
@CMAKE_BINARY_DIR@/bin/test-fairmq-push $transport &
PUSH_PID=$!
@CMAKE_BINARY_DIR@/bin/test-fairmq-pull $transport &
PULL_PID=$!
wait $PUSH_PID
wait $PULL_PID

View File

@ -1,18 +0,0 @@
#!/bin/bash
transport="zeromq"
if [ "$1" = "nanomsg" ]; then
transport="nanomsg"
fi
trap 'kill -TERM $REQ1_PID; kill -TERM $REQ2_PID; kill -TERM $REP_PID; wait $REQ1_PID; wait $REQ2_PID; wait $REP_PID;' TERM
@CMAKE_BINARY_DIR@/bin/test-fairmq-req $transport &
REQ1_PID=$!
@CMAKE_BINARY_DIR@/bin/test-fairmq-req $transport &
REQ2_PID=$!
@CMAKE_BINARY_DIR@/bin/test-fairmq-rep $transport &
REP_PID=$!
wait $REQ1_PID
wait $REQ2_PID
wait $REP_PID