Update device configuration

- Move general config files out of example directory to fairmq/run.
 - Use FairMQProgOptions for MQ example 5.
 - Add SendPartAsync() for non-blocking send of a message part.
This commit is contained in:
Alexey Rybalchenko 2015-11-11 11:06:11 +01:00
parent 837490cc38
commit 383a220333
7 changed files with 75 additions and 4 deletions

View File

@ -6,7 +6,11 @@
# copied verbatim in the file "LICENSE" #
################################################################################
configure_file(${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/bsampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/bsampler-sink.json)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/startBenchmark.sh.in ${CMAKE_BINARY_DIR}/bin/startBenchmark.sh)
configure_file(${CMAKE_SOURCE_DIR}/fairmq/run/benchmark.json ${CMAKE_BINARY_DIR}/bin/config/benchmark.json)
# following scripts are only for protobuf tests and are not essential part of FairMQ
# configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Tutorial3/MQ/run/startBin.sh.in ${CMAKE_BINARY_DIR}/bin/startBin.sh)
# configure_file(${CMAKE_SOURCE_DIR}/examples/advanced/Tutorial3/MQ/run/startProto.sh.in ${CMAKE_BINARY_DIR}/bin/startProto.sh)
add_subdirectory(logger)
add_subdirectory(test)

View File

@ -420,6 +420,11 @@ int FairMQChannel::SendPart(const unique_ptr<FairMQMessage>& msg) const
return fSocket->Send(msg.get(), fSndMoreFlag);
}
int FairMQChannel::SendPartAsync(const unique_ptr<FairMQMessage>& msg) const
{
return fSocket->Send(msg.get(), fSndMoreFlag|fNoBlockFlag);
}
// int FairMQChannel::SendParts(initializer_list<unique_ptr<FairMQMessage>> partsList) const
// {
// int totalSize = 0;

View File

@ -116,12 +116,20 @@ class FairMQChannel
/// Queues the current message as a part of a multi-part message
/// @details SendPart method queues the provided message as a part of a multi-part message.
/// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync methods.
/// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
int SendPart(const std::unique_ptr<FairMQMessage>& msg) const;
/// Queues the current message as a part of a multi-part message without blocking
/// @details SendPart method queues the provided message as a part of a multi-part message without blocking.
/// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync() methods.
///
/// @param msg Constant reference of unique_ptr to a FairMQMessage
/// @return Returns the number of bytes that have been queued. -2 If queueing was not possible. In case of errors, returns -1.
int SendPartAsync(const std::unique_ptr<FairMQMessage>& msg) const;
// /// Sends the messages provided as arguments as a multi-part message.
// ///
// /// @param partsList Initializer list of FairMQMessages

18
fairmq/prototest/startBin.sh.in Executable file
View File

@ -0,0 +1,18 @@
#!/bin/bash
if(@NANOMSG_FOUND@); then
buffSize="50000000" # nanomsg buffer size is in bytes
else
buffSize="1000" # zeromq high-water mark is in messages
fi
SAMPLER="binsampler"
SAMPLER+=" --id 101"
SAMPLER+=" --event-size 10000"
SAMPLER+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5565"
xterm -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
SINK="binsink"
SINK+=" --id 201"
SINK+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5565"
xterm -e @CMAKE_BINARY_DIR@/bin/$SINK &

View File

@ -0,0 +1,18 @@
#!/bin/bash
if(@NANOMSG_FOUND@); then
buffSize="50000000" # nanomsg buffer size is in bytes
else
buffSize="1000" # zeromq high-water mark is in messages
fi
SAMPLER="protosampler"
SAMPLER+=" --id 101"
SAMPLER+=" --event-size 10000"
SAMPLER+=" --output-socket-type push --output-buff-size $buffSize --output-method bind --output-address tcp://*:5565"
xterm -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
SINK="protosink"
SINK+=" --id 201"
SINK+=" --input-socket-type pull --input-buff-size $buffSize --input-method connect --input-address tcp://localhost:5565"
xterm -e @CMAKE_BINARY_DIR@/bin/$SINK &

View File

@ -9,7 +9,7 @@
"name": "data-out",
"socket":
{
"type": "pub",
"type": "push",
"method": "bind",
"address": "tcp://*:5555",
"sndBufSize": "1000",
@ -26,7 +26,7 @@
"name": "data-in",
"socket":
{
"type": "sub",
"type": "pull",
"method": "connect",
"address": "tcp://localhost:5555",
"sndBufSize": "1000",

18
fairmq/run/startBenchmark.sh.in Executable file
View File

@ -0,0 +1,18 @@
#!/bin/bash
if(@NANOMSG_FOUND@); then
buffSize="500000000" # nanomsg buffer size is in bytes
else
buffSize="10000" # zeromq high-water mark is in messages
fi
SAMPLER="bsampler"
SAMPLER+=" --id bsampler1"
SAMPLER+=" --event-size 10000"
SAMPLER+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
SINK="sink"
SINK+=" --id sink1"
SINK+=" --config-json-file @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &