mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-12 16:21:13 +00:00
FairMQ Examples cleanup
- Rename Tutorial3 MQ files for uniform naming. - Add search for dylib in FindDDS.cmake (OSX). - Add more detail to the DDS example readme. - MQ Example 3 (DDS): choose network interface via command line option. - Give FairMQ examples their own CMakeLists.txt for clarity. - Remove C++11 checks in Tutorial3 from the code (they are now in CMake). - Add Serializer for device properties (FairMQDevice::ListProperties()).
This commit is contained in:
parent
a75486f3ec
commit
19afacb504
|
@ -7,54 +7,34 @@
|
|||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/bsampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/bsampler-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink/ex1-sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink/ex2-sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-devices.json ${CMAKE_BINARY_DIR}/bin/config/ex3-devices.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json)
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-push-pull.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-pub-sub.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-req-rep.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh)
|
||||
|
||||
add_subdirectory(logger)
|
||||
|
||||
add_subdirectory(examples/1-sampler-sink)
|
||||
add_subdirectory(examples/2-sampler-processor-sink)
|
||||
If(DDS_FOUND)
|
||||
add_subdirectory(examples/3-dds)
|
||||
EndIf(DDS_FOUND)
|
||||
add_subdirectory(examples/4-copypush)
|
||||
add_subdirectory(examples/5-req-rep)
|
||||
|
||||
add_subdirectory(test)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||
${CMAKE_SOURCE_DIR}/fairmq/logger
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/5-req-rep
|
||||
${CMAKE_SOURCE_DIR}/fairmq/test/push-pull
|
||||
${CMAKE_SOURCE_DIR}/fairmq/test/pub-sub
|
||||
${CMAKE_SOURCE_DIR}/fairmq/test/req-rep
|
||||
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
|
||||
if(DDS_FOUND)
|
||||
add_definitions(-DENABLE_DDS)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds
|
||||
)
|
||||
endif(DDS_FOUND)
|
||||
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${Boost_INCLUDE_DIR}
|
||||
${ZMQ_INCLUDE_DIR}
|
||||
)
|
||||
|
||||
If(DDS_FOUND)
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${SYSTEM_INCLUDE_DIRECTORIES}
|
||||
${DDS_INCLUDE_DIR}
|
||||
)
|
||||
EndIf(DDS_FOUND)
|
||||
|
||||
If(PROTOBUF_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
|
@ -74,16 +54,7 @@ If(NANOMSG_FOUND)
|
|||
)
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${SYSTEM_INCLUDE_DIRECTORIES}
|
||||
${NANOMSG_LIBRARY_SHARED}
|
||||
)
|
||||
Else(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
||||
)
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${SYSTEM_INCLUDE_DIRECTORIES}
|
||||
${ZMQ_INCLUDE_DIR}
|
||||
${NANOMSG_INCLUDE_DIR}
|
||||
)
|
||||
EndIf(NANOMSG_FOUND)
|
||||
|
||||
|
@ -94,16 +65,15 @@ Set(LINK_DIRECTORIES
|
|||
${Boost_LIBRARY_DIRS}
|
||||
)
|
||||
|
||||
if(DDS_FOUND)
|
||||
set(LINK_DIRECTORIES
|
||||
${LINK_DIRECTORIES}
|
||||
${DDS_LIBRARY_DIR}
|
||||
)
|
||||
endif(DDS_FOUND)
|
||||
|
||||
Link_Directories(${LINK_DIRECTORIES})
|
||||
|
||||
set(SRCS
|
||||
Set(SRCS
|
||||
"zeromq/FairMQTransportFactoryZMQ.cxx"
|
||||
"zeromq/FairMQMessageZMQ.cxx"
|
||||
"zeromq/FairMQSocketZMQ.cxx"
|
||||
"zeromq/FairMQPollerZMQ.cxx"
|
||||
"zeromq/FairMQContextZMQ.cxx"
|
||||
|
||||
"FairMQLogger.cxx"
|
||||
"FairMQConfigurable.cxx"
|
||||
"FairMQStateMachine.cxx"
|
||||
|
@ -124,39 +94,9 @@ set(SRCS
|
|||
"options/FairProgOptions.cxx"
|
||||
"options/FairMQProgOptions.cxx"
|
||||
"options/FairMQParser.cxx"
|
||||
|
||||
"examples/1-sampler-sink/FairMQExample1Sampler.cxx"
|
||||
"examples/1-sampler-sink/FairMQExample1Sink.cxx"
|
||||
"examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx"
|
||||
"examples/2-sampler-processor-sink/FairMQExample2Processor.cxx"
|
||||
"examples/2-sampler-processor-sink/FairMQExample2Sink.cxx"
|
||||
"examples/4-copypush/FairMQExample4Sampler.cxx"
|
||||
"examples/4-copypush/FairMQExample4Sink.cxx"
|
||||
"examples/5-req-rep/FairMQExample5Client.cxx"
|
||||
"examples/5-req-rep/FairMQExample5Server.cxx"
|
||||
|
||||
"test/push-pull/FairMQTestPush.cxx"
|
||||
"test/push-pull/FairMQTestPull.cxx"
|
||||
"test/pub-sub/FairMQTestPub.cxx"
|
||||
"test/pub-sub/FairMQTestSub.cxx"
|
||||
"test/req-rep/FairMQTestReq.cxx"
|
||||
"test/req-rep/FairMQTestRep.cxx"
|
||||
)
|
||||
|
||||
if(DDS_FOUND)
|
||||
set(SRCS
|
||||
${SRCS}
|
||||
"examples/3-dds/FairMQExample3Sampler.cxx"
|
||||
"examples/3-dds/FairMQExample3Processor.cxx"
|
||||
"examples/3-dds/FairMQExample3Sink.cxx"
|
||||
)
|
||||
set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
dds-key-value-lib
|
||||
)
|
||||
endif(DDS_FOUND)
|
||||
|
||||
if(PROTOBUF_FOUND)
|
||||
If(PROTOBUF_FOUND)
|
||||
# following source files are only for protobuf tests and are not essential part of FairMQ
|
||||
# add_custom_command(
|
||||
# OUTPUT
|
||||
|
@ -175,38 +115,25 @@ if(PROTOBUF_FOUND)
|
|||
# "prototest/FairMQBinSink.cxx"
|
||||
# "prototest/FairMQProtoSink.cxx"
|
||||
# )
|
||||
set(DEPENDENCIES
|
||||
Set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
${PROTOBUF_LIBRARY}
|
||||
)
|
||||
endif(PROTOBUF_FOUND)
|
||||
Endif(PROTOBUF_FOUND)
|
||||
|
||||
if(NANOMSG_FOUND)
|
||||
set(SRCS
|
||||
If(NANOMSG_FOUND)
|
||||
Set(SRCS
|
||||
${SRCS}
|
||||
"nanomsg/FairMQTransportFactoryNN.cxx"
|
||||
"nanomsg/FairMQMessageNN.cxx"
|
||||
"nanomsg/FairMQSocketNN.cxx"
|
||||
"nanomsg/FairMQPollerNN.cxx"
|
||||
)
|
||||
set(DEPENDENCIES
|
||||
Set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
${NANOMSG_LIBRARY_SHARED}
|
||||
)
|
||||
else(NANOMSG_FOUND)
|
||||
set(SRCS
|
||||
${SRCS}
|
||||
"zeromq/FairMQTransportFactoryZMQ.cxx"
|
||||
"zeromq/FairMQMessageZMQ.cxx"
|
||||
"zeromq/FairMQSocketZMQ.cxx"
|
||||
"zeromq/FairMQPollerZMQ.cxx"
|
||||
"zeromq/FairMQContextZMQ.cxx"
|
||||
)
|
||||
set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
${ZMQ_LIBRARY_SHARED}
|
||||
)
|
||||
endif(NANOMSG_FOUND)
|
||||
EndIf(NANOMSG_FOUND)
|
||||
|
||||
# to copy src that are header-only files (e.g. c++ template) for FairRoot external installation
|
||||
# manual install (globbing add not recommended)
|
||||
|
@ -217,10 +144,11 @@ Set(FAIRMQHEADERS
|
|||
devices/GenericFileSink.h
|
||||
tools/FairMQTools.h
|
||||
)
|
||||
install(FILES ${FAIRMQHEADERS} DESTINATION include)
|
||||
Install(FILES ${FAIRMQHEADERS} DESTINATION include)
|
||||
|
||||
set(DEPENDENCIES
|
||||
Set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
${ZMQ_LIBRARY_SHARED}
|
||||
boost_thread
|
||||
fairmq_logger
|
||||
boost_timer
|
||||
|
@ -232,43 +160,19 @@ set(DEPENDENCIES
|
|||
boost_exception
|
||||
)
|
||||
|
||||
set(LIBRARY_NAME FairMQ)
|
||||
Set(LIBRARY_NAME FairMQ)
|
||||
|
||||
GENERATE_LIBRARY()
|
||||
|
||||
set(Exe_Names
|
||||
Set(Exe_Names
|
||||
bsampler
|
||||
sink
|
||||
buffer
|
||||
splitter
|
||||
merger
|
||||
proxy
|
||||
ex1-sampler
|
||||
ex1-sink
|
||||
ex2-sampler
|
||||
ex2-processor
|
||||
ex2-sink
|
||||
ex4-sampler
|
||||
ex4-sink
|
||||
ex5-client
|
||||
ex5-server
|
||||
test-fairmq-push
|
||||
test-fairmq-pull
|
||||
test-fairmq-pub
|
||||
test-fairmq-sub
|
||||
test-fairmq-req
|
||||
test-fairmq-rep
|
||||
)
|
||||
|
||||
if(DDS_FOUND)
|
||||
set(Exe_Names
|
||||
${Exe_Names}
|
||||
ex3-sampler-dds
|
||||
ex3-processor-dds
|
||||
ex3-sink-dds
|
||||
)
|
||||
endif(DDS_FOUND)
|
||||
|
||||
# following executables are only for protobuf tests and are not essential part of FairMQ
|
||||
# if(PROTOBUF_FOUND)
|
||||
# set(Exe_Names
|
||||
|
@ -280,39 +184,15 @@ endif(DDS_FOUND)
|
|||
# )
|
||||
# endif(PROTOBUF_FOUND)
|
||||
|
||||
set(Exe_Source
|
||||
Set(Exe_Source
|
||||
run/runBenchmarkSampler.cxx
|
||||
run/runSink.cxx
|
||||
run/runBuffer.cxx
|
||||
run/runSplitter.cxx
|
||||
run/runMerger.cxx
|
||||
run/runProxy.cxx
|
||||
examples/1-sampler-sink/runExample1Sampler.cxx
|
||||
examples/1-sampler-sink/runExample1Sink.cxx
|
||||
examples/2-sampler-processor-sink/runExample2Sampler.cxx
|
||||
examples/2-sampler-processor-sink/runExample2Processor.cxx
|
||||
examples/2-sampler-processor-sink/runExample2Sink.cxx
|
||||
examples/4-copypush/runExample4Sampler.cxx
|
||||
examples/4-copypush/runExample4Sink.cxx
|
||||
examples/5-req-rep/runExample5Client.cxx
|
||||
examples/5-req-rep/runExample5Server.cxx
|
||||
test/push-pull/runTestPush.cxx
|
||||
test/push-pull/runTestPull.cxx
|
||||
test/pub-sub/runTestPub.cxx
|
||||
test/pub-sub/runTestSub.cxx
|
||||
test/req-rep/runTestReq.cxx
|
||||
test/req-rep/runTestRep.cxx
|
||||
)
|
||||
|
||||
if(DDS_FOUND)
|
||||
set(Exe_Source
|
||||
${Exe_Source}
|
||||
examples/3-dds/runExample3Sampler.cxx
|
||||
examples/3-dds/runExample3Processor.cxx
|
||||
examples/3-dds/runExample3Sink.cxx
|
||||
)
|
||||
endif(DDS_FOUND)
|
||||
|
||||
# following source files are only for protobuf tests and are not essential part of FairMQ
|
||||
# if(PROTOBUF_FOUND)
|
||||
# set(Exe_Source
|
||||
|
@ -330,20 +210,8 @@ math(EXPR _length ${_length}-1)
|
|||
ForEach(_file RANGE 0 ${_length})
|
||||
list(GET Exe_Names ${_file} _name)
|
||||
list(GET Exe_Source ${_file} _src)
|
||||
set(EXE_NAME ${_name})
|
||||
set(SRCS ${_src})
|
||||
set(DEPENDENCIES FairMQ)
|
||||
Set(EXE_NAME ${_name})
|
||||
Set(SRCS ${_src})
|
||||
Set(DEPENDENCIES FairMQ)
|
||||
GENERATE_EXECUTABLE()
|
||||
EndForEach(_file RANGE 0 ${_length})
|
||||
|
||||
add_test(NAME run_fairmq_push_pull COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh)
|
||||
set_tests_properties(run_fairmq_push_pull PROPERTIES TIMEOUT "30")
|
||||
set_tests_properties(run_fairmq_push_pull PROPERTIES PASS_REGULAR_EXPRESSION "PUSH-PULL test successfull")
|
||||
|
||||
add_test(NAME run_fairmq_pub_sub COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh)
|
||||
set_tests_properties(run_fairmq_pub_sub PROPERTIES TIMEOUT "30")
|
||||
set_tests_properties(run_fairmq_pub_sub PROPERTIES PASS_REGULAR_EXPRESSION "PUB-SUB test successfull")
|
||||
|
||||
add_test(NAME run_fairmq_req_rep COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh)
|
||||
set_tests_properties(run_fairmq_req_rep PROPERTIES TIMEOUT "30")
|
||||
set_tests_properties(run_fairmq_req_rep PROPERTIES PASS_REGULAR_EXPRESSION "REQ-REP test successfull")
|
||||
|
|
|
@ -32,48 +32,111 @@ class FairMQChannel
|
|||
friend class FairMQDevice;
|
||||
|
||||
public:
|
||||
/// Default constructor
|
||||
FairMQChannel();
|
||||
|
||||
/// Constructor
|
||||
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
||||
/// @param method Socket method (bind/connect)
|
||||
/// @param address Network address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||
FairMQChannel(const std::string& type, const std::string& method, const std::string& address);
|
||||
|
||||
/// Default destructor
|
||||
virtual ~FairMQChannel();
|
||||
|
||||
/// Get socket type
|
||||
/// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
||||
std::string GetType() const;
|
||||
/// Get socket method
|
||||
/// @return Returns socket method (bind/connect)
|
||||
std::string GetMethod() const;
|
||||
/// Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||
/// @return Returns socket type (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||
std::string GetAddress() const;
|
||||
/// Get socket send buffer size (in number of messages)
|
||||
/// @return Returns socket send buffer size (in number of messages)
|
||||
int GetSndBufSize() const;
|
||||
/// Get socket receive buffer size (in number of messages)
|
||||
/// @return Returns socket receive buffer size (in number of messages)
|
||||
int GetRcvBufSize() const;
|
||||
/// Get socket rate logging setting (1/0)
|
||||
/// @return Returns socket rate logging setting (1/0)
|
||||
int GetRateLogging() const;
|
||||
|
||||
/// Set socket type
|
||||
/// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/)
|
||||
void UpdateType(const std::string& type);
|
||||
/// Set socket method
|
||||
/// @param method Socket method (bind/connect)
|
||||
void UpdateMethod(const std::string& method);
|
||||
/// Set socket address
|
||||
/// @param address Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc")
|
||||
void UpdateAddress(const std::string& address);
|
||||
/// Set socket send buffer size
|
||||
/// @param sndBufSize Socket send buffer size (in number of messages)
|
||||
void UpdateSndBufSize(const int sndBufSize);
|
||||
/// Set socket receive buffer size
|
||||
/// @param rcvBufSize Socket receive buffer size (in number of messages)
|
||||
void UpdateRcvBufSize(const int rcvBufSize);
|
||||
/// Set socket rate logging setting
|
||||
/// @param rateLogging Socket rate logging setting (1/0)
|
||||
void UpdateRateLogging(const int rateLogging);
|
||||
|
||||
/// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel()))
|
||||
/// @return true if channel settings are valid, false otherwise.
|
||||
bool IsValid() const;
|
||||
|
||||
/// Validates channel configuration
|
||||
/// @return true if channel settings are valid, false otherwise.
|
||||
bool ValidateChannel();
|
||||
bool InitCommandInterface(FairMQTransportFactory* factory);
|
||||
|
||||
/// Resets the channel (requires validation to be used again).
|
||||
void ResetChannel();
|
||||
|
||||
FairMQSocket* fSocket;
|
||||
|
||||
// Wrappers for the socket methods to simplify the usage of channels
|
||||
/// Sends a message to the socket queue.
|
||||
/// @details Send method attempts to send a message by
|
||||
/// putting it in the output queue. If the queue is full or queueing is not possible
|
||||
/// for some other reason (e.g. no peers connected for a binding socket), the method blocks.
|
||||
///
|
||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||
/// @return Returns the number of bytes that have been queued. In case of errors, returns -1.
|
||||
int Send(const std::unique_ptr<FairMQMessage>& msg) const;
|
||||
|
||||
/// \brief Sends a message in non-blocking mode.
|
||||
/// \details SendAsync method attempts to send the message without blocking by
|
||||
/// Sends a message in non-blocking mode.
|
||||
/// @details SendAsync method attempts to send a message without blocking by
|
||||
/// putting it in the queue. If the queue is full or queueing is not possible
|
||||
/// for some other reason (e.g. no peers connected for a binding socket), the method returns 0.
|
||||
///
|
||||
/// \param msg Constant reference of unique_ptr to a FairMQMessage
|
||||
/// \return Returns the number of bytes that have been queued. If queueing failed due to
|
||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||
/// @return Returns the number of bytes that have been queued. If queueing failed due to
|
||||
/// full queue or no connected peers (when binding), returns 0. In case of errors, returns -1.
|
||||
int SendAsync(const std::unique_ptr<FairMQMessage>& msg) const;
|
||||
|
||||
/// Queues the current message as a part of a multi-part message
|
||||
/// @details SendPart method queues the provided message as a part of a multi-part message.
|
||||
/// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync methods.
|
||||
///
|
||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||
/// @return Returns the number of bytes that have been queued. In case of errors, returns -1.
|
||||
int SendPart(const std::unique_ptr<FairMQMessage>& msg) const;
|
||||
|
||||
/// Receives a message from the socket queue.
|
||||
/// @details Receive method attempts to receive a message from the input queue.
|
||||
/// If the queue is empty the method blocks.
|
||||
///
|
||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||
/// @return Returns the number of bytes that have been received. In case of errors, returns -1.
|
||||
int Receive(const std::unique_ptr<FairMQMessage>& msg) const;
|
||||
|
||||
/// Receives a message in non-blocking mode.
|
||||
/// @details ReceiveAsync method attempts to receive a message without blocking from the input queue.
|
||||
/// If the queue is empty the method returns 0.
|
||||
///
|
||||
/// @param msg Constant reference of unique_ptr to a FairMQMessage
|
||||
/// @return Returns the number of bytes that have been received. If queue is empty, returns 0.
|
||||
/// In case of errors, returns -1.
|
||||
int ReceiveAsync(const std::unique_ptr<FairMQMessage>& msg) const;
|
||||
|
||||
// DEPRECATED socket method wrappers with raw pointers and flag checks
|
||||
|
@ -82,8 +145,8 @@ class FairMQChannel
|
|||
int Receive(FairMQMessage* msg, const std::string& flag = "") const;
|
||||
int Receive(FairMQMessage* msg, const int flags) const;
|
||||
|
||||
/// \brief Checks if the socket is expecting to receive another part of a multipart message.
|
||||
/// \return Return true if the socket expects another part of a multipart message and false otherwise.
|
||||
/// Checks if the socket is expecting to receive another part of a multipart message.
|
||||
/// @return Return true if the socket expects another part of a multipart message and false otherwise.
|
||||
bool ExpectsAnotherPart() const;
|
||||
|
||||
private:
|
||||
|
@ -105,6 +168,8 @@ class FairMQChannel
|
|||
int fNoBlockFlag;
|
||||
int fSndMoreFlag;
|
||||
|
||||
bool InitCommandInterface(FairMQTransportFactory* factory);
|
||||
|
||||
bool HandleUnblock() const;
|
||||
|
||||
// use static mutex to make the class easily copyable
|
||||
|
|
|
@ -45,6 +45,12 @@ int FairMQConfigurable::GetProperty(const int key, const int default_ /*= 0*/)
|
|||
return default_;
|
||||
}
|
||||
|
||||
string FairMQConfigurable::GetPropertyDescription(const int key)
|
||||
{
|
||||
LOG(ERROR) << "Reached end of the property list. The description of the requested property " << key << " was not found.";
|
||||
return "0";
|
||||
}
|
||||
|
||||
FairMQConfigurable::~FairMQConfigurable()
|
||||
{
|
||||
}
|
||||
|
|
|
@ -31,6 +31,8 @@ class FairMQConfigurable
|
|||
virtual std::string GetProperty(const int key, const std::string& default_ = "");
|
||||
virtual void SetProperty(const int key, const int value);
|
||||
virtual int GetProperty(const int key, const int default_ = 0);
|
||||
|
||||
virtual std::string GetPropertyDescription(const int key);
|
||||
};
|
||||
|
||||
#endif /* FAIRMQCONFIGURABLE_H_ */
|
||||
|
|
|
@ -397,6 +397,37 @@ string FairMQDevice::GetProperty(const int key, const string& default_ /*= ""*/)
|
|||
}
|
||||
}
|
||||
|
||||
string FairMQDevice::GetPropertyDescription(const int key)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
case Id:
|
||||
return "Id: Device ID";
|
||||
case NumIoThreads:
|
||||
return "NumIoThreads: Number of I/O Threads (size of the 0MQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one.)";
|
||||
case MaxInitializationTime:
|
||||
return "MaxInitializationTime: Timeout for retrying validation and initialization of the channels.";
|
||||
case PortRangeMin:
|
||||
return "PortRangeMin: Minumum value for the port range (when binding to dynamic port).";
|
||||
case PortRangeMax:
|
||||
return "PortRangeMax: Maximum value for the port range (when binding to dynamic port).";
|
||||
case LogIntervalInMs:
|
||||
return "LogIntervalInMs: Time between socket rates logging outputs.";
|
||||
default:
|
||||
return FairMQConfigurable::GetPropertyDescription(key);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQDevice::ListProperties()
|
||||
{
|
||||
LOG(INFO) << "Properties of FairMQDevice:";
|
||||
for (int p = FairMQConfigurable::Last; p < FairMQDevice::Last; ++p)
|
||||
{
|
||||
LOG(INFO) << " " << GetPropertyDescription(p);
|
||||
}
|
||||
LOG(INFO) << "---------------------------";
|
||||
}
|
||||
|
||||
// Method for getting properties represented as an integer.
|
||||
int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/)
|
||||
{
|
||||
|
|
|
@ -29,12 +29,14 @@
|
|||
|
||||
class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
||||
{
|
||||
friend class FairMQChannel;
|
||||
|
||||
public:
|
||||
enum
|
||||
{
|
||||
Id = FairMQConfigurable::Last,
|
||||
MaxInitializationTime,
|
||||
NumIoThreads,
|
||||
MaxInitializationTime,
|
||||
PortRangeMin,
|
||||
PortRangeMax,
|
||||
LogIntervalInMs,
|
||||
|
@ -60,6 +62,13 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
|||
virtual void SetProperty(const int key, const int value);
|
||||
virtual int GetProperty(const int key, const int default_ = 0);
|
||||
|
||||
/// Get property description for a given property name
|
||||
/// @param key Property name/key
|
||||
/// @return String with the property description
|
||||
virtual std::string GetPropertyDescription(const int key);
|
||||
/// Print all properties of this and the parent class to LOG(INFO)
|
||||
virtual void ListProperties();
|
||||
|
||||
virtual void SetTransport(FairMQTransportFactory* factory);
|
||||
|
||||
static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs);
|
||||
|
|
|
@ -6,8 +6,6 @@ The standard FairRoot is running all the different analysis tasks within one pro
|
|||
|
||||
The components encapsulating the tasks are called **devices** and derive from the common base class `FairMQDevice`. FairMQ provides ready to use devices to organize the dataflow between the components (without touching the contents of a message), providing functionality like merging and splitting of the data stream (see subdirectory `devices`).
|
||||
|
||||
A number of devices to handle the data from the Tutorial3 detector of FairRoot are provided as an example and can be found in `FairRoot/base/MQ` directory. The implementation of the tasks run by these devices can be found `FairRoot/example/Tutorial3`. The implementation includes sending raw binary data as well as serializing the data with either [Boost Serialization](http://www.boost.org/doc/libs/release/libs/serialization/), [Google Protocol Buffers](https://developers.google.com/protocol-buffers/) or [Root TMessage](http://root.cern.ch/root/html/TMessage.html). Following the examples you can implement your own devices to transport arbitrary data.
|
||||
|
||||
## Topology
|
||||
|
||||
Devices are arranged into **topologies** where each device has a defined number of data inputs and outputs.
|
||||
|
@ -18,6 +16,10 @@ Example of a simple FairMQ topology:
|
|||
|
||||
Topology configuration is currently happening via setup scripts. This is very rudimentary and a much more flexible system is now in development. For now, example setup scripts can be found in directory `FairRoot/example/Tutorial3/` along with some additional documentation.
|
||||
|
||||
## Communication Patterns
|
||||
|
||||
FairMQ devices communicate via the communication patterns offered by ZeroMQ (or nanomsg): PUSH-PULL, PUB-SUB, REQ-REP, PAIR, [more info here](http://api.zeromq.org/4-0:zmq-socket).
|
||||
|
||||
## Messages
|
||||
|
||||
Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content and transport either raw data or serialized data as described above. Message can be initialized in three different ways:
|
||||
|
@ -33,3 +35,8 @@ The communication layer is available through an interface. Two interface impleme
|
|||
|
||||

|
||||
|
||||
## Examples
|
||||
|
||||
A collection of simple examples in `examples` directory demonstrates some common usage patterns of FairMQ.
|
||||
|
||||
A number of devices to handle the data from the Tutorial3 FairTestDetector of FairRoot are provided as an example and can be found in `FairRoot/base/MQ` directory. The implementation of the tasks run by these devices can be found `FairRoot/example/Tutorial3`. The implementation includes sending raw binary data as well as serializing the data with either [Boost Serialization](http://www.boost.org/doc/libs/release/libs/serialization/), [Google Protocol Buffers](https://developers.google.com/protocol-buffers/) or [Root TMessage](http://root.cern.ch/root/html/TMessage.html). Following the examples you can implement your own devices to transport arbitrary data.
|
||||
|
|
|
@ -130,3 +130,26 @@ int FairMQBenchmarkSampler::GetProperty(const int key, const int default_ /*= 0*
|
|||
return FairMQDevice::GetProperty(key, default_);
|
||||
}
|
||||
}
|
||||
|
||||
string FairMQBenchmarkSampler::GetPropertyDescription(const int key)
|
||||
{
|
||||
switch (key)
|
||||
{
|
||||
case EventSize:
|
||||
return "EventSize: Size of the transfered message buffer.";
|
||||
case EventRate:
|
||||
return "EventRate: Upper limit for the message rate.";
|
||||
default:
|
||||
return FairMQDevice::GetPropertyDescription(key);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQBenchmarkSampler::ListProperties()
|
||||
{
|
||||
LOG(INFO) << "Properties of FairMQBenchmarkSampler:";
|
||||
for (int p = FairMQConfigurable::Last; p < FairMQBenchmarkSampler::Last; ++p)
|
||||
{
|
||||
LOG(INFO) << " " << GetPropertyDescription(p);
|
||||
}
|
||||
LOG(INFO) << "---------------------------";
|
||||
}
|
||||
|
|
|
@ -44,6 +44,9 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
|||
virtual void SetProperty(const int key, const int value);
|
||||
virtual int GetProperty(const int key, const int default_ = 0);
|
||||
|
||||
virtual std::string GetPropertyDescription(const int key);
|
||||
virtual void ListProperties();
|
||||
|
||||
protected:
|
||||
int fEventSize;
|
||||
int fEventRate;
|
||||
|
|
79
fairmq/examples/1-sampler-sink/CMakeLists.txt
Normal file
79
fairmq/examples/1-sampler-sink/CMakeLists.txt
Normal file
|
@ -0,0 +1,79 @@
|
|||
################################################################################
|
||||
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink/ex1-sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${Boost_INCLUDE_DIR}
|
||||
)
|
||||
|
||||
If(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
|
||||
)
|
||||
Else(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
||||
)
|
||||
EndIf(NANOMSG_FOUND)
|
||||
|
||||
Include_Directories(${INCLUDE_DIRECTORIES})
|
||||
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
||||
|
||||
Set(LINK_DIRECTORIES
|
||||
${Boost_LIBRARY_DIRS}
|
||||
)
|
||||
|
||||
Link_Directories(${LINK_DIRECTORIES})
|
||||
|
||||
Set(SRCS
|
||||
"FairMQExample1Sampler.cxx"
|
||||
"FairMQExample1Sink.cxx"
|
||||
)
|
||||
|
||||
Set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
FairMQ
|
||||
)
|
||||
|
||||
Set(LIBRARY_NAME FairMQExample1)
|
||||
|
||||
GENERATE_LIBRARY()
|
||||
|
||||
Set(Exe_Names
|
||||
ex1-sampler
|
||||
ex1-sink
|
||||
)
|
||||
|
||||
Set(Exe_Source
|
||||
runExample1Sampler.cxx
|
||||
runExample1Sink.cxx
|
||||
)
|
||||
|
||||
list(LENGTH Exe_Names _length)
|
||||
math(EXPR _length ${_length}-1)
|
||||
|
||||
ForEach(_file RANGE 0 ${_length})
|
||||
list(GET Exe_Names ${_file} _name)
|
||||
list(GET Exe_Source ${_file} _src)
|
||||
set(EXE_NAME ${_name})
|
||||
set(SRCS ${_src})
|
||||
set(DEPENDENCIES FairMQExample1)
|
||||
GENERATE_EXECUTABLE()
|
||||
EndForEach(_file RANGE 0 ${_length})
|
|
@ -15,7 +15,6 @@
|
|||
#include <memory> // unique_ptr
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include "FairMQExample1Sampler.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
|
|
@ -12,9 +12,6 @@
|
|||
* @author A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/bind.hpp>
|
||||
|
||||
#include "FairMQExample1Sink.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
|
|
|
@ -5,4 +5,4 @@ A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends d
|
|||
|
||||
`runExample1Sampler.cxx` and `runExample1Sink.cxx` configure and run the devices in their main function.
|
||||
|
||||
The executables take two required command line parameters: `--id` and `--config-json-file`. The value of `--id` should be a unique identifier and the value for `-config-json-file` a path to a config file. The config file for this example is `ex1-sampler-sink.json` and it contains configuration for the communication channels of the devices. The mapping between a specific device and the configuration (which can contain multiple devices) is done based on the **id**.
|
||||
The executables take two required command line parameters: `--id` and `--config-json-file`. The value of `--id` should be a unique identifier and the value for `--config-json-file` a path to a config file. The config file for this example is `ex1-sampler-sink.json` and it contains configuration for the communication channels of the devices. The mapping between a specific device and the configuration (which can contain multiple devices) is done based on the **id**.
|
||||
|
|
82
fairmq/examples/2-sampler-processor-sink/CMakeLists.txt
Normal file
82
fairmq/examples/2-sampler-processor-sink/CMakeLists.txt
Normal file
|
@ -0,0 +1,82 @@
|
|||
################################################################################
|
||||
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink/ex2-sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${Boost_INCLUDE_DIR}
|
||||
)
|
||||
|
||||
If(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
|
||||
)
|
||||
Else(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
||||
)
|
||||
EndIf(NANOMSG_FOUND)
|
||||
|
||||
Include_Directories(${INCLUDE_DIRECTORIES})
|
||||
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
||||
|
||||
Set(LINK_DIRECTORIES
|
||||
${Boost_LIBRARY_DIRS}
|
||||
)
|
||||
|
||||
Link_Directories(${LINK_DIRECTORIES})
|
||||
|
||||
Set(SRCS
|
||||
"FairMQExample2Sampler.cxx"
|
||||
"FairMQExample2Processor.cxx"
|
||||
"FairMQExample2Sink.cxx"
|
||||
)
|
||||
|
||||
Set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
FairMQ
|
||||
)
|
||||
|
||||
Set(LIBRARY_NAME FairMQExample2)
|
||||
|
||||
GENERATE_LIBRARY()
|
||||
|
||||
Set(Exe_Names
|
||||
ex2-sampler
|
||||
ex2-processor
|
||||
ex2-sink
|
||||
)
|
||||
|
||||
Set(Exe_Source
|
||||
runExample2Sampler.cxx
|
||||
runExample2Processor.cxx
|
||||
runExample2Sink.cxx
|
||||
)
|
||||
|
||||
list(LENGTH Exe_Names _length)
|
||||
math(EXPR _length ${_length}-1)
|
||||
|
||||
ForEach(_file RANGE 0 ${_length})
|
||||
list(GET Exe_Names ${_file} _name)
|
||||
list(GET Exe_Source ${_file} _src)
|
||||
Set(EXE_NAME ${_name})
|
||||
Set(SRCS ${_src})
|
||||
Set(DEPENDENCIES FairMQExample2)
|
||||
GENERATE_EXECUTABLE()
|
||||
EndForEach(_file RANGE 0 ${_length})
|
93
fairmq/examples/3-dds/CMakeLists.txt
Normal file
93
fairmq/examples/3-dds/CMakeLists.txt
Normal file
|
@ -0,0 +1,93 @@
|
|||
################################################################################
|
||||
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-devices.json ${CMAKE_BINARY_DIR}/bin/config/ex3-devices.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY)
|
||||
|
||||
add_definitions(-DENABLE_DDS)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${SYSTEM_INCLUDE_DIRECTORIES}
|
||||
${DDS_INCLUDE_DIR}
|
||||
)
|
||||
|
||||
If(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
|
||||
)
|
||||
Else(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
||||
)
|
||||
EndIf(NANOMSG_FOUND)
|
||||
|
||||
Include_Directories(${INCLUDE_DIRECTORIES})
|
||||
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
||||
|
||||
Set(LINK_DIRECTORIES
|
||||
${LINK_DIRECTORIES}
|
||||
${Boost_LIBRARY_DIRS}
|
||||
${DDS_LIBRARY_DIR}
|
||||
)
|
||||
|
||||
Link_Directories(${LINK_DIRECTORIES})
|
||||
|
||||
Set(SRCS
|
||||
${SRCS}
|
||||
"FairMQExample3Sampler.cxx"
|
||||
"FairMQExample3Processor.cxx"
|
||||
"FairMQExample3Sink.cxx"
|
||||
)
|
||||
|
||||
Set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
FairMQ
|
||||
dds-key-value-lib
|
||||
)
|
||||
|
||||
set(LIBRARY_NAME FairMQExample3)
|
||||
|
||||
GENERATE_LIBRARY()
|
||||
|
||||
Set(Exe_Names
|
||||
${Exe_Names}
|
||||
ex3-sampler-dds
|
||||
ex3-processor-dds
|
||||
ex3-sink-dds
|
||||
)
|
||||
|
||||
Set(Exe_Source
|
||||
${Exe_Source}
|
||||
runExample3Sampler.cxx
|
||||
runExample3Processor.cxx
|
||||
runExample3Sink.cxx
|
||||
)
|
||||
|
||||
list(LENGTH Exe_Names _length)
|
||||
math(EXPR _length ${_length}-1)
|
||||
|
||||
ForEach(_file RANGE 0 ${_length})
|
||||
list(GET Exe_Names ${_file} _name)
|
||||
list(GET Exe_Source ${_file} _src)
|
||||
set(EXE_NAME ${_name})
|
||||
set(SRCS ${_src})
|
||||
set(DEPENDENCIES FairMQExample3)
|
||||
GENERATE_EXECUTABLE()
|
||||
EndForEach(_file RANGE 0 ${_length})
|
|
@ -3,6 +3,12 @@ Example 3: DDS
|
|||
|
||||
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices.
|
||||
|
||||
This example is compiled only if the DDS is found by CMake. Custom DDS installation location can be given to CMake like this:
|
||||
|
||||
```bash
|
||||
cmake -DDDS_PATH="/path/to/dds/install/dir/" ..
|
||||
```
|
||||
|
||||
The description below outlines the minimal steps needed to run the example with DDS. For more details please refer to DDS documentation on [DDS Website](http://dds.gsi.de/).
|
||||
|
||||
##### 1. The devices that bind their sockets need to advertise their bound addresses to DDS by writing a property.
|
||||
|
@ -56,9 +62,9 @@ After this step each device will have the necessary connection information.
|
|||
|
||||
We run this example on the local machine for simplicity. The file below defines one worker `wn0` with 12 DDS Agents (thus able to accept 12 tasks). The parameters for each worker node are:
|
||||
- user-chosen worker ID (must be unique)
|
||||
- a host name with or without a login, in a form: login@host.fqdn
|
||||
- a host name with or without a login, in a form: login@host.fqdn (password-less SSH access to these hosts must be possible)
|
||||
- additional SSH params (can be empty)
|
||||
- a remote working directory
|
||||
- a remote working directory (most exist on the worker nodes)
|
||||
- number of DDS Agents for this worker
|
||||
|
||||
```bash
|
||||
|
|
|
@ -46,11 +46,11 @@ int main(int argc, char** argv)
|
|||
{
|
||||
int ddsTaskIndex = 0;
|
||||
|
||||
options_description samplerOptions("Processor options");
|
||||
samplerOptions.add_options()
|
||||
("index", value<int>(&ddsTaskIndex)->default_value(0), "Store DDS task index");
|
||||
options_description processorOptions("Processor options");
|
||||
processorOptions.add_options()
|
||||
("index", value<int>(&ddsTaskIndex)->default_value(0), "DDS task index");
|
||||
|
||||
config.AddToCmdLineOptions(samplerOptions);
|
||||
config.AddToCmdLineOptions(processorOptions);
|
||||
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
|
|
|
@ -45,11 +45,13 @@ int main(int argc, char** argv)
|
|||
|
||||
try
|
||||
{
|
||||
std::string text;
|
||||
std::string text; // text to be sent for processing.
|
||||
std::string interfaceName; // name of the network interface to use for communication.
|
||||
|
||||
options_description samplerOptions("Sampler options");
|
||||
samplerOptions.add_options()
|
||||
("text", value<std::string>(&text)->default_value("Hello"), "Text to send out");
|
||||
("text", value<std::string>(&text)->default_value("Hello"), "Text to send out")
|
||||
("network-interface", value<std::string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
|
||||
|
||||
config.AddToCmdLineOptions(samplerOptions);
|
||||
|
||||
|
@ -83,22 +85,14 @@ int main(int argc, char** argv)
|
|||
FairMQ::tools::getHostIPs(IPs);
|
||||
stringstream ss;
|
||||
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
|
||||
if (IPs.count("ib0"))
|
||||
if (IPs.count(interfaceName))
|
||||
{
|
||||
ss << "tcp://" << IPs["ib0"] << ":1";
|
||||
}
|
||||
else if (IPs.count("eth0"))
|
||||
{
|
||||
ss << "tcp://" << IPs["eth0"] << ":1";
|
||||
}
|
||||
else if (IPs.count("wlan0"))
|
||||
{
|
||||
ss << "tcp://" << IPs["wlan0"] << ":1";
|
||||
ss << "tcp://" << IPs[interfaceName] << ":1";
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(INFO) << ss.str();
|
||||
LOG(ERROR) << "Could not find ib0, eth0 or wlan0";
|
||||
LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting.";
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
string initialOutputAddress = ss.str();
|
||||
|
|
|
@ -45,6 +45,14 @@ int main(int argc, char** argv)
|
|||
|
||||
try
|
||||
{
|
||||
std::string interfaceName; // name of the network interface to use for communication.
|
||||
|
||||
options_description sinkOptions("Sink options");
|
||||
sinkOptions.add_options()
|
||||
("network-interface", value<std::string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
|
||||
|
||||
config.AddToCmdLineOptions(sinkOptions);
|
||||
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
|
@ -74,22 +82,14 @@ int main(int argc, char** argv)
|
|||
FairMQ::tools::getHostIPs(IPs);
|
||||
stringstream ss;
|
||||
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
|
||||
if (IPs.count("ib0"))
|
||||
if (IPs.count(interfaceName))
|
||||
{
|
||||
ss << "tcp://" << IPs["ib0"] << ":1";
|
||||
}
|
||||
else if (IPs.count("eth0"))
|
||||
{
|
||||
ss << "tcp://" << IPs["eth0"] << ":1";
|
||||
}
|
||||
else if (IPs.count("wlan0"))
|
||||
{
|
||||
ss << "tcp://" << IPs["wlan0"] << ":1";
|
||||
ss << "tcp://" << IPs[interfaceName] << ":1";
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(INFO) << ss.str();
|
||||
LOG(ERROR) << "Could not find ib0, eth0 or wlan0";
|
||||
LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting.";
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
string initialInputAddress = ss.str();
|
||||
|
|
79
fairmq/examples/4-copypush/CMakeLists.txt
Normal file
79
fairmq/examples/4-copypush/CMakeLists.txt
Normal file
|
@ -0,0 +1,79 @@
|
|||
################################################################################
|
||||
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${Boost_INCLUDE_DIR}
|
||||
)
|
||||
|
||||
If(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
|
||||
)
|
||||
Else(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
||||
)
|
||||
EndIf(NANOMSG_FOUND)
|
||||
|
||||
Include_Directories(${INCLUDE_DIRECTORIES})
|
||||
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
||||
|
||||
Set(LINK_DIRECTORIES
|
||||
${Boost_LIBRARY_DIRS}
|
||||
)
|
||||
|
||||
Link_Directories(${LINK_DIRECTORIES})
|
||||
|
||||
Set(SRCS
|
||||
"FairMQExample4Sampler.cxx"
|
||||
"FairMQExample4Sink.cxx"
|
||||
)
|
||||
|
||||
Set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
FairMQ
|
||||
)
|
||||
|
||||
Set(LIBRARY_NAME FairMQExample4)
|
||||
|
||||
GENERATE_LIBRARY()
|
||||
|
||||
Set(Exe_Names
|
||||
ex4-sampler
|
||||
ex4-sink
|
||||
)
|
||||
|
||||
Set(Exe_Source
|
||||
runExample4Sampler.cxx
|
||||
runExample4Sink.cxx
|
||||
)
|
||||
|
||||
list(LENGTH Exe_Names _length)
|
||||
math(EXPR _length ${_length}-1)
|
||||
|
||||
ForEach(_file RANGE 0 ${_length})
|
||||
list(GET Exe_Names ${_file} _name)
|
||||
list(GET Exe_Source ${_file} _src)
|
||||
Set(EXE_NAME ${_name})
|
||||
Set(SRCS ${_src})
|
||||
Set(DEPENDENCIES FairMQExample4)
|
||||
GENERATE_EXECUTABLE()
|
||||
EndForEach(_file RANGE 0 ${_length})
|
77
fairmq/examples/5-req-rep/CMakeLists.txt
Normal file
77
fairmq/examples/5-req-rep/CMakeLists.txt
Normal file
|
@ -0,0 +1,77 @@
|
|||
################################################################################
|
||||
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||
${CMAKE_SOURCE_DIR}/fairmq/examples/5-req-rep
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${Boost_INCLUDE_DIR}
|
||||
)
|
||||
|
||||
If(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
|
||||
)
|
||||
Else(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
||||
)
|
||||
EndIf(NANOMSG_FOUND)
|
||||
|
||||
Include_Directories(${INCLUDE_DIRECTORIES})
|
||||
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
||||
|
||||
Set(LINK_DIRECTORIES
|
||||
${Boost_LIBRARY_DIRS}
|
||||
)
|
||||
|
||||
Link_Directories(${LINK_DIRECTORIES})
|
||||
|
||||
set(SRCS
|
||||
"FairMQExample5Client.cxx"
|
||||
"FairMQExample5Server.cxx"
|
||||
)
|
||||
|
||||
set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
FairMQ
|
||||
)
|
||||
|
||||
set(LIBRARY_NAME FairMQExample5)
|
||||
|
||||
GENERATE_LIBRARY()
|
||||
|
||||
set(Exe_Names
|
||||
ex5-client
|
||||
ex5-server
|
||||
)
|
||||
|
||||
set(Exe_Source
|
||||
runExample5Client.cxx
|
||||
runExample5Server.cxx
|
||||
)
|
||||
|
||||
list(LENGTH Exe_Names _length)
|
||||
math(EXPR _length ${_length}-1)
|
||||
|
||||
ForEach(_file RANGE 0 ${_length})
|
||||
list(GET Exe_Names ${_file} _name)
|
||||
list(GET Exe_Source ${_file} _src)
|
||||
set(EXE_NAME ${_name})
|
||||
set(SRCS ${_src})
|
||||
set(DEPENDENCIES FairMQExample5)
|
||||
GENERATE_EXECUTABLE()
|
||||
EndForEach(_file RANGE 0 ${_length})
|
107
fairmq/test/CMakeLists.txt
Normal file
107
fairmq/test/CMakeLists.txt
Normal file
|
@ -0,0 +1,107 @@
|
|||
################################################################################
|
||||
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
|
||||
# #
|
||||
# This software is distributed under the terms of the #
|
||||
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
|
||||
# copied verbatim in the file "LICENSE" #
|
||||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-push-pull.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-pub-sub.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-req-rep.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||
${CMAKE_SOURCE_DIR}/fairmq/test/push-pull
|
||||
${CMAKE_SOURCE_DIR}/fairmq/test/pub-sub
|
||||
${CMAKE_SOURCE_DIR}/fairmq/test/req-rep
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
|
||||
Set(SYSTEM_INCLUDE_DIRECTORIES
|
||||
${Boost_INCLUDE_DIR}
|
||||
)
|
||||
|
||||
If(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
|
||||
)
|
||||
Else(NANOMSG_FOUND)
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${INCLUDE_DIRECTORIES}
|
||||
${CMAKE_SOURCE_DIR}/fairmq/zeromq
|
||||
)
|
||||
EndIf(NANOMSG_FOUND)
|
||||
|
||||
Include_Directories(${INCLUDE_DIRECTORIES})
|
||||
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
|
||||
|
||||
Set(LINK_DIRECTORIES
|
||||
${Boost_LIBRARY_DIRS}
|
||||
)
|
||||
|
||||
Link_Directories(${LINK_DIRECTORIES})
|
||||
|
||||
set(SRCS
|
||||
"push-pull/FairMQTestPush.cxx"
|
||||
"push-pull/FairMQTestPull.cxx"
|
||||
"pub-sub/FairMQTestPub.cxx"
|
||||
"pub-sub/FairMQTestSub.cxx"
|
||||
"req-rep/FairMQTestReq.cxx"
|
||||
"req-rep/FairMQTestRep.cxx"
|
||||
)
|
||||
|
||||
set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
FairMQ
|
||||
)
|
||||
|
||||
set(LIBRARY_NAME FairMQTest)
|
||||
|
||||
GENERATE_LIBRARY()
|
||||
|
||||
set(Exe_Names
|
||||
test-fairmq-push
|
||||
test-fairmq-pull
|
||||
test-fairmq-pub
|
||||
test-fairmq-sub
|
||||
test-fairmq-req
|
||||
test-fairmq-rep
|
||||
)
|
||||
|
||||
set(Exe_Source
|
||||
push-pull/runTestPush.cxx
|
||||
push-pull/runTestPull.cxx
|
||||
pub-sub/runTestPub.cxx
|
||||
pub-sub/runTestSub.cxx
|
||||
req-rep/runTestReq.cxx
|
||||
req-rep/runTestRep.cxx
|
||||
)
|
||||
|
||||
list(LENGTH Exe_Names _length)
|
||||
math(EXPR _length ${_length}-1)
|
||||
|
||||
ForEach(_file RANGE 0 ${_length})
|
||||
list(GET Exe_Names ${_file} _name)
|
||||
list(GET Exe_Source ${_file} _src)
|
||||
set(EXE_NAME ${_name})
|
||||
set(SRCS ${_src})
|
||||
set(DEPENDENCIES FairMQTest)
|
||||
GENERATE_EXECUTABLE()
|
||||
EndForEach(_file RANGE 0 ${_length})
|
||||
|
||||
add_test(NAME run_fairmq_push_pull COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh)
|
||||
set_tests_properties(run_fairmq_push_pull PROPERTIES TIMEOUT "30")
|
||||
set_tests_properties(run_fairmq_push_pull PROPERTIES PASS_REGULAR_EXPRESSION "PUSH-PULL test successfull")
|
||||
|
||||
add_test(NAME run_fairmq_pub_sub COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh)
|
||||
set_tests_properties(run_fairmq_pub_sub PROPERTIES TIMEOUT "30")
|
||||
set_tests_properties(run_fairmq_pub_sub PROPERTIES PASS_REGULAR_EXPRESSION "PUB-SUB test successfull")
|
||||
|
||||
add_test(NAME run_fairmq_req_rep COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh)
|
||||
set_tests_properties(run_fairmq_req_rep PROPERTIES TIMEOUT "30")
|
||||
set_tests_properties(run_fairmq_req_rep PROPERTIES PASS_REGULAR_EXPRESSION "REQ-REP test successfull")
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
#include <sstream>
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQContextZMQ.h"
|
||||
|
||||
|
|
|
@ -15,12 +15,9 @@
|
|||
#ifndef FAIRMQCONTEXTZMQ_H_
|
||||
#define FAIRMQCONTEXTZMQ_H_
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
class FairMQContextZMQ
|
||||
{
|
||||
public:
|
||||
|
||||
/// Constructor
|
||||
FairMQContextZMQ(int numIoThreads);
|
||||
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
#include <sstream>
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include "FairMQSocketZMQ.h"
|
||||
#include "FairMQLogger.h"
|
||||
|
||||
|
|
|
@ -17,8 +17,6 @@
|
|||
|
||||
#include <boost/shared_ptr.hpp>
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include "FairMQSocket.h"
|
||||
#include "FairMQContextZMQ.h"
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user