mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Include empty parts in multipart example/test
This commit is contained in:
parent
88dbcbe4fd
commit
dd191551ca
|
@ -32,19 +32,19 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-multipart.sh.in ${CMA
|
||||||
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh)
|
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/test-ex-multipart.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh)
|
||||||
|
|
||||||
add_test(NAME Example.Multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq)
|
add_test(NAME Example.Multipart.zeromq COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh zeromq)
|
||||||
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
|
set_tests_properties(Example.Multipart.zeromq PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts")
|
||||||
|
|
||||||
if(BUILD_NANOMSG_TRANSPORT)
|
if(BUILD_NANOMSG_TRANSPORT)
|
||||||
add_test(NAME Example.Multipart.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg)
|
add_test(NAME Example.Multipart.nanomsg COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh nanomsg)
|
||||||
set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
|
set_tests_properties(Example.Multipart.nanomsg PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts")
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem)
|
add_test(NAME Example.Multipart.shmem COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh shmem)
|
||||||
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
|
set_tests_properties(Example.Multipart.shmem PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts")
|
||||||
|
|
||||||
if(BUILD_OFI_TRANSPORT)
|
if(BUILD_OFI_TRANSPORT)
|
||||||
add_test(NAME Example.Multipart.ofi COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh ofi)
|
add_test(NAME Example.Multipart.ofi COMMAND ${CMAKE_CURRENT_BINARY_DIR}/test-ex-multipart.sh ofi)
|
||||||
set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 5 parts")
|
set_tests_properties(Example.Multipart.ofi PROPERTIES TIMEOUT "30" RUN_SERIAL true PASS_REGULAR_EXPRESSION "Received message with 7 parts")
|
||||||
endif()
|
endif()
|
||||||
|
|
||||||
# install
|
# install
|
||||||
|
|
|
@ -63,6 +63,10 @@ bool Sampler::ConditionalRun()
|
||||||
|
|
||||||
assert(parts.Size() == 6);
|
assert(parts.Size() == 6);
|
||||||
|
|
||||||
|
parts.AddPart(NewMessage(100));
|
||||||
|
|
||||||
|
assert(parts.Size() == 7);
|
||||||
|
|
||||||
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
|
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
|
||||||
|
|
||||||
Send(parts, "data");
|
Send(parts, "data");
|
||||||
|
|
|
@ -22,13 +22,24 @@ namespace example_multipart
|
||||||
|
|
||||||
bool Sink::HandleData(FairMQParts& parts, int /*index*/)
|
bool Sink::HandleData(FairMQParts& parts, int /*index*/)
|
||||||
{
|
{
|
||||||
|
LOG(info) << "Received message with " << parts.Size() << " parts";
|
||||||
|
|
||||||
Header header;
|
Header header;
|
||||||
header.stopFlag = (static_cast<Header*>(parts.At(0)->GetData()))->stopFlag;
|
header.stopFlag = (static_cast<Header*>(parts.At(0)->GetData()))->stopFlag;
|
||||||
|
|
||||||
LOG(info) << "Received message with " << parts.Size() << " parts";
|
LOG(info) << "Received part 1 (header) with stopFlag: " << header.stopFlag;
|
||||||
|
LOG(info) << "Received part 2 of size: " << parts.At(1)->GetSize();
|
||||||
LOG(info) << "Received header with stopFlag: " << header.stopFlag;
|
assert(parts.At(1)->GetSize() == 1000);
|
||||||
LOG(info) << "Received body of size: " << parts.At(1)->GetSize();
|
LOG(info) << "Received part 3 of size: " << parts.At(2)->GetSize();
|
||||||
|
assert(parts.At(2)->GetSize() == 500);
|
||||||
|
LOG(info) << "Received part 4 of size: " << parts.At(3)->GetSize();
|
||||||
|
assert(parts.At(3)->GetSize() == 600);
|
||||||
|
LOG(info) << "Received part 5 of size: " << parts.At(4)->GetSize();
|
||||||
|
assert(parts.At(4)->GetSize() == 700);
|
||||||
|
LOG(info) << "Received part 6 of size: " << parts.At(5)->GetSize();
|
||||||
|
assert(parts.At(5)->GetSize() == 0);
|
||||||
|
LOG(info) << "Received part 7 of size: " << parts.At(6)->GetSize();
|
||||||
|
assert(parts.At(6)->GetSize() == 100);
|
||||||
|
|
||||||
if (header.stopFlag == 1) {
|
if (header.stopFlag == 1) {
|
||||||
LOG(info) << "stopFlag is 1, going IDLE";
|
LOG(info) << "stopFlag is 1, going IDLE";
|
||||||
|
|
|
@ -2,12 +2,24 @@
|
||||||
|
|
||||||
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
export FAIRMQ_PATH=@FAIRMQ_BIN_DIR@
|
||||||
|
|
||||||
|
transport="zeromq"
|
||||||
|
|
||||||
|
if [[ $1 =~ ^[a-z]+$ ]]; then
|
||||||
|
transport=$1
|
||||||
|
fi
|
||||||
|
|
||||||
|
SESSION="$(@CMAKE_BINARY_DIR@/fairmq/fairmq-uuid-gen -h)"
|
||||||
|
|
||||||
SAMPLER="fairmq-ex-multipart-sampler"
|
SAMPLER="fairmq-ex-multipart-sampler"
|
||||||
SAMPLER+=" --id sampler1"
|
SAMPLER+=" --id sampler1"
|
||||||
|
SAMPLER+=" --transport $transport"
|
||||||
|
SAMPLER+=" --session $SESSION"
|
||||||
SAMPLER+=" --channel-config name=data,type=push,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555"
|
SAMPLER+=" --channel-config name=data,type=push,method=connect,rateLogging=0,address=tcp://127.0.0.1:5555"
|
||||||
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
|
||||||
|
|
||||||
SINK="fairmq-ex-multipart-sink"
|
SINK="fairmq-ex-multipart-sink"
|
||||||
SINK+=" --id sink1"
|
SINK+=" --id sink1"
|
||||||
|
SINK+=" --transport $transport"
|
||||||
|
SINK+=" --session $SESSION"
|
||||||
SINK+=" --channel-config name=data,type=pull,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
|
SINK+=" --channel-config name=data,type=pull,method=bind,rateLogging=0,address=tcp://127.0.0.1:5555"
|
||||||
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &
|
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &
|
||||||
|
|
Loading…
Reference in New Issue
Block a user