diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 3fe5723d..97ee5268 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -6,7 +6,11 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/bsampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/bsampler-sink.json) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/startBenchmark.sh.in ${CMAKE_BINARY_DIR}/bin/startBenchmark.sh) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/benchmark.json ${CMAKE_BINARY_DIR}/bin/config/benchmark.json) +# following scripts are only for protobuf tests and are not essential part of FairMQ +# configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Tutorial3/MQ/run/startBin.sh.in ${CMAKE_BINARY_DIR}/bin/startBin.sh) +# configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Tutorial3/MQ/run/startProto.sh.in ${CMAKE_BINARY_DIR}/bin/startProto.sh) add_subdirectory(logger) add_subdirectory(test) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 719971fe..6a6e9c8a 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -420,6 +420,11 @@ int FairMQChannel::SendPart(const unique_ptr& msg) const return fSocket->Send(msg.get(), fSndMoreFlag); } +int FairMQChannel::SendPartAsync(const unique_ptr& msg) const +{ + return fSocket->Send(msg.get(), fSndMoreFlag|fNoBlockFlag); +} + // int FairMQChannel::SendParts(initializer_list> partsList) const // { // int totalSize = 0; diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 46ba00b2..7620059e 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -116,12 +116,20 @@ class FairMQChannel /// Queues the current message as a part of a multi-part message /// @details SendPart method queues the provided message as a part of a multi-part message. - /// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync methods. + /// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods. /// /// @param msg Constant reference of unique_ptr to a FairMQMessage /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. int SendPart(const std::unique_ptr& msg) const; + /// Queues the current message as a part of a multi-part message without blocking + /// @details SendPart method queues the provided message as a part of a multi-part message without blocking. + /// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods. + /// + /// @param msg Constant reference of unique_ptr to a FairMQMessage + /// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1. + int SendPartAsync(const std::unique_ptr& msg) const; + // /// Sends the messages provided as arguments as a multi-part message. // /// // /// @param partsList Initializer list of FairMQMessages diff --git a/fairmq/prototest/startBin.sh.in b/fairmq/prototest/startBin.sh.in new file mode 100755 index 00000000..d8313f22 --- /dev/null +++ b/fairmq/prototest/startBin.sh.in @@ -0,0 +1,18 @@ +#!/bin/bash + +if(@NANOMSG_FOUND@); then + buffSize="50000000" # nanomsg buffer size is in bytes +else + buffSize="1000" # zeromq high-water mark is in messages +fi + +SAMPLER="binsampler" +SAMPLER+=" --id 101" +SAMPLER+=" --event-size 10000" +SAMPLER+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5565" +xterm -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & + +SINK="binsink" +SINK+=" --id 201" +SINK+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5565" +xterm -e @CMAKE_BINARY_DIR@/bin/$SINK & diff --git a/fairmq/prototest/startProto.sh.in b/fairmq/prototest/startProto.sh.in new file mode 100755 index 00000000..0e25ba18 --- /dev/null +++ b/fairmq/prototest/startProto.sh.in @@ -0,0 +1,18 @@ +#!/bin/bash + +if(@NANOMSG_FOUND@); then + buffSize="50000000" # nanomsg buffer size is in bytes +else + buffSize="1000" # zeromq high-water mark is in messages +fi + +SAMPLER="protosampler" +SAMPLER+=" --id 101" +SAMPLER+=" --event-size 10000" +SAMPLER+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5565" +xterm -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & + +SINK="protosink" +SINK+=" --id 201" +SINK+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5565" +xterm -e @CMAKE_BINARY_DIR@/bin/$SINK & diff --git a/fairmq/options/ProgOptionTest/macro/bsampler-sink.json b/fairmq/run/benchmark.json similarity index 92% rename from fairmq/options/ProgOptionTest/macro/bsampler-sink.json rename to fairmq/run/benchmark.json index 8b265bc1..971412c0 100644 --- a/fairmq/options/ProgOptionTest/macro/bsampler-sink.json +++ b/fairmq/run/benchmark.json @@ -9,7 +9,7 @@ "name": "data-out", "socket": { - "type": "pub", + "type": "push", "method": "bind", "address": "tcp://*:5555", "sndBufSize": "1000", @@ -26,7 +26,7 @@ "name": "data-in", "socket": { - "type": "sub", + "type": "pull", "method": "connect", "address": "tcp://localhost:5555", "sndBufSize": "1000", diff --git a/fairmq/run/startBenchmark.sh.in b/fairmq/run/startBenchmark.sh.in new file mode 100755 index 00000000..8adb8e30 --- /dev/null +++ b/fairmq/run/startBenchmark.sh.in @@ -0,0 +1,18 @@ +#!/bin/bash + +if(@NANOMSG_FOUND@); then + buffSize="500000000" # nanomsg buffer size is in bytes +else + buffSize="10000" # zeromq high-water mark is in messages +fi + +SAMPLER="bsampler" +SAMPLER+=" --id bsampler1" +SAMPLER+=" --event-size 10000" +SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" +xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & + +SINK="sink" +SINK+=" --id sink1" +SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json" +xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &