diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 6b55ac6b..a3803d70 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -7,54 +7,34 @@ ################################################################################ configure_file(${CMAKE_SOURCE_DIR}/fairmq/options/ProgOptionTest/macro/bsampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/bsampler-sink.json) -configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink/ex1-sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json) -configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink/ex2-sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json) -configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-devices.json ${CMAKE_BINARY_DIR}/bin/config/ex3-devices.json) -configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml) -configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY) -configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json) - -configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-push-pull.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh) -configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-pub-sub.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh) -configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-req-rep.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh) add_subdirectory(logger) +add_subdirectory(examples/1-sampler-sink) +add_subdirectory(examples/2-sampler-processor-sink) +If(DDS_FOUND) + add_subdirectory(examples/3-dds) +EndIf(DDS_FOUND) +add_subdirectory(examples/4-copypush) +add_subdirectory(examples/5-req-rep) + +add_subdirectory(test) + Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq/devices ${CMAKE_SOURCE_DIR}/fairmq/tools ${CMAKE_SOURCE_DIR}/fairmq/options ${CMAKE_SOURCE_DIR}/fairmq/logger - ${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink - ${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink - ${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush - ${CMAKE_SOURCE_DIR}/fairmq/examples/5-req-rep - ${CMAKE_SOURCE_DIR}/fairmq/test/push-pull - ${CMAKE_SOURCE_DIR}/fairmq/test/pub-sub - ${CMAKE_SOURCE_DIR}/fairmq/test/req-rep + ${CMAKE_SOURCE_DIR}/fairmq/zeromq ${CMAKE_CURRENT_BINARY_DIR} ) -if(DDS_FOUND) - add_definitions(-DENABLE_DDS) - Set(INCLUDE_DIRECTORIES - ${INCLUDE_DIRECTORIES} - ${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds - ) -endif(DDS_FOUND) - Set(SYSTEM_INCLUDE_DIRECTORIES ${Boost_INCLUDE_DIR} + ${ZMQ_INCLUDE_DIR} ) -If(DDS_FOUND) - Set(SYSTEM_INCLUDE_DIRECTORIES - ${SYSTEM_INCLUDE_DIRECTORIES} - ${DDS_INCLUDE_DIR} - ) -EndIf(DDS_FOUND) - If(PROTOBUF_FOUND) Set(INCLUDE_DIRECTORIES ${INCLUDE_DIRECTORIES} @@ -74,16 +54,7 @@ If(NANOMSG_FOUND) ) Set(SYSTEM_INCLUDE_DIRECTORIES ${SYSTEM_INCLUDE_DIRECTORIES} - ${NANOMSG_LIBRARY_SHARED} - ) -Else(NANOMSG_FOUND) - Set(INCLUDE_DIRECTORIES - ${INCLUDE_DIRECTORIES} - ${CMAKE_SOURCE_DIR}/fairmq/zeromq - ) - Set(SYSTEM_INCLUDE_DIRECTORIES - ${SYSTEM_INCLUDE_DIRECTORIES} - ${ZMQ_INCLUDE_DIR} + ${NANOMSG_INCLUDE_DIR} ) EndIf(NANOMSG_FOUND) @@ -94,16 +65,15 @@ Set(LINK_DIRECTORIES ${Boost_LIBRARY_DIRS} ) -if(DDS_FOUND) - set(LINK_DIRECTORIES - ${LINK_DIRECTORIES} - ${DDS_LIBRARY_DIR} - ) -endif(DDS_FOUND) - Link_Directories(${LINK_DIRECTORIES}) -set(SRCS +Set(SRCS + "zeromq/FairMQTransportFactoryZMQ.cxx" + "zeromq/FairMQMessageZMQ.cxx" + "zeromq/FairMQSocketZMQ.cxx" + "zeromq/FairMQPollerZMQ.cxx" + "zeromq/FairMQContextZMQ.cxx" + "FairMQLogger.cxx" "FairMQConfigurable.cxx" "FairMQStateMachine.cxx" @@ -124,39 +94,9 @@ set(SRCS "options/FairProgOptions.cxx" "options/FairMQProgOptions.cxx" "options/FairMQParser.cxx" - - "examples/1-sampler-sink/FairMQExample1Sampler.cxx" - "examples/1-sampler-sink/FairMQExample1Sink.cxx" - "examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx" - "examples/2-sampler-processor-sink/FairMQExample2Processor.cxx" - "examples/2-sampler-processor-sink/FairMQExample2Sink.cxx" - "examples/4-copypush/FairMQExample4Sampler.cxx" - "examples/4-copypush/FairMQExample4Sink.cxx" - "examples/5-req-rep/FairMQExample5Client.cxx" - "examples/5-req-rep/FairMQExample5Server.cxx" - - "test/push-pull/FairMQTestPush.cxx" - "test/push-pull/FairMQTestPull.cxx" - "test/pub-sub/FairMQTestPub.cxx" - "test/pub-sub/FairMQTestSub.cxx" - "test/req-rep/FairMQTestReq.cxx" - "test/req-rep/FairMQTestRep.cxx" ) -if(DDS_FOUND) - set(SRCS - ${SRCS} - "examples/3-dds/FairMQExample3Sampler.cxx" - "examples/3-dds/FairMQExample3Processor.cxx" - "examples/3-dds/FairMQExample3Sink.cxx" - ) - set(DEPENDENCIES - ${DEPENDENCIES} - dds-key-value-lib - ) -endif(DDS_FOUND) - -if(PROTOBUF_FOUND) +If(PROTOBUF_FOUND) # following source files are only for protobuf tests and are not essential part of FairMQ # add_custom_command( # OUTPUT @@ -175,38 +115,25 @@ if(PROTOBUF_FOUND) # "prototest/FairMQBinSink.cxx" # "prototest/FairMQProtoSink.cxx" # ) - set(DEPENDENCIES + Set(DEPENDENCIES ${DEPENDENCIES} ${PROTOBUF_LIBRARY} ) -endif(PROTOBUF_FOUND) +Endif(PROTOBUF_FOUND) -if(NANOMSG_FOUND) - set(SRCS +If(NANOMSG_FOUND) + Set(SRCS ${SRCS} "nanomsg/FairMQTransportFactoryNN.cxx" "nanomsg/FairMQMessageNN.cxx" "nanomsg/FairMQSocketNN.cxx" "nanomsg/FairMQPollerNN.cxx" ) - set(DEPENDENCIES + Set(DEPENDENCIES ${DEPENDENCIES} ${NANOMSG_LIBRARY_SHARED} ) -else(NANOMSG_FOUND) - set(SRCS - ${SRCS} - "zeromq/FairMQTransportFactoryZMQ.cxx" - "zeromq/FairMQMessageZMQ.cxx" - "zeromq/FairMQSocketZMQ.cxx" - "zeromq/FairMQPollerZMQ.cxx" - "zeromq/FairMQContextZMQ.cxx" - ) - set(DEPENDENCIES - ${DEPENDENCIES} - ${ZMQ_LIBRARY_SHARED} - ) -endif(NANOMSG_FOUND) +EndIf(NANOMSG_FOUND) # to copy src that are header-only files (e.g. c++ template) for FairRoot external installation # manual install (globbing add not recommended) @@ -217,10 +144,11 @@ Set(FAIRMQHEADERS devices/GenericFileSink.h tools/FairMQTools.h ) -install(FILES ${FAIRMQHEADERS} DESTINATION include) +Install(FILES ${FAIRMQHEADERS} DESTINATION include) -set(DEPENDENCIES +Set(DEPENDENCIES ${DEPENDENCIES} + ${ZMQ_LIBRARY_SHARED} boost_thread fairmq_logger boost_timer @@ -232,43 +160,19 @@ set(DEPENDENCIES boost_exception ) -set(LIBRARY_NAME FairMQ) +Set(LIBRARY_NAME FairMQ) GENERATE_LIBRARY() -set(Exe_Names +Set(Exe_Names bsampler sink buffer splitter merger proxy - ex1-sampler - ex1-sink - ex2-sampler - ex2-processor - ex2-sink - ex4-sampler - ex4-sink - ex5-client - ex5-server - test-fairmq-push - test-fairmq-pull - test-fairmq-pub - test-fairmq-sub - test-fairmq-req - test-fairmq-rep ) -if(DDS_FOUND) - set(Exe_Names - ${Exe_Names} - ex3-sampler-dds - ex3-processor-dds - ex3-sink-dds - ) -endif(DDS_FOUND) - # following executables are only for protobuf tests and are not essential part of FairMQ # if(PROTOBUF_FOUND) # set(Exe_Names @@ -280,39 +184,15 @@ endif(DDS_FOUND) # ) # endif(PROTOBUF_FOUND) -set(Exe_Source +Set(Exe_Source run/runBenchmarkSampler.cxx run/runSink.cxx run/runBuffer.cxx run/runSplitter.cxx run/runMerger.cxx run/runProxy.cxx - examples/1-sampler-sink/runExample1Sampler.cxx - examples/1-sampler-sink/runExample1Sink.cxx - examples/2-sampler-processor-sink/runExample2Sampler.cxx - examples/2-sampler-processor-sink/runExample2Processor.cxx - examples/2-sampler-processor-sink/runExample2Sink.cxx - examples/4-copypush/runExample4Sampler.cxx - examples/4-copypush/runExample4Sink.cxx - examples/5-req-rep/runExample5Client.cxx - examples/5-req-rep/runExample5Server.cxx - test/push-pull/runTestPush.cxx - test/push-pull/runTestPull.cxx - test/pub-sub/runTestPub.cxx - test/pub-sub/runTestSub.cxx - test/req-rep/runTestReq.cxx - test/req-rep/runTestRep.cxx ) -if(DDS_FOUND) - set(Exe_Source - ${Exe_Source} - examples/3-dds/runExample3Sampler.cxx - examples/3-dds/runExample3Processor.cxx - examples/3-dds/runExample3Sink.cxx - ) -endif(DDS_FOUND) - # following source files are only for protobuf tests and are not essential part of FairMQ # if(PROTOBUF_FOUND) # set(Exe_Source @@ -330,20 +210,8 @@ math(EXPR _length ${_length}-1) ForEach(_file RANGE 0 ${_length}) list(GET Exe_Names ${_file} _name) list(GET Exe_Source ${_file} _src) - set(EXE_NAME ${_name}) - set(SRCS ${_src}) - set(DEPENDENCIES FairMQ) + Set(EXE_NAME ${_name}) + Set(SRCS ${_src}) + Set(DEPENDENCIES FairMQ) GENERATE_EXECUTABLE() EndForEach(_file RANGE 0 ${_length}) - -add_test(NAME run_fairmq_push_pull COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh) -set_tests_properties(run_fairmq_push_pull PROPERTIES TIMEOUT "30") -set_tests_properties(run_fairmq_push_pull PROPERTIES PASS_REGULAR_EXPRESSION "PUSH-PULL test successfull") - -add_test(NAME run_fairmq_pub_sub COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh) -set_tests_properties(run_fairmq_pub_sub PROPERTIES TIMEOUT "30") -set_tests_properties(run_fairmq_pub_sub PROPERTIES PASS_REGULAR_EXPRESSION "PUB-SUB test successfull") - -add_test(NAME run_fairmq_req_rep COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh) -set_tests_properties(run_fairmq_req_rep PROPERTIES TIMEOUT "30") -set_tests_properties(run_fairmq_req_rep PROPERTIES PASS_REGULAR_EXPRESSION "REQ-REP test successfull") diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 71543ef4..280d87d4 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -32,48 +32,111 @@ class FairMQChannel friend class FairMQDevice; public: + /// Default constructor FairMQChannel(); + + /// Constructor + /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) + /// @param method Socket method (bind/connect) + /// @param address Network address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") FairMQChannel(const std::string& type, const std::string& method, const std::string& address); + + /// Default destructor virtual ~FairMQChannel(); + /// Get socket type + /// @return Returns socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) std::string GetType() const; + /// Get socket method + /// @return Returns socket method (bind/connect) std::string GetMethod() const; + /// Get socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") + /// @return Returns socket type (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") std::string GetAddress() const; + /// Get socket send buffer size (in number of messages) + /// @return Returns socket send buffer size (in number of messages) int GetSndBufSize() const; + /// Get socket receive buffer size (in number of messages) + /// @return Returns socket receive buffer size (in number of messages) int GetRcvBufSize() const; + /// Get socket rate logging setting (1/0) + /// @return Returns socket rate logging setting (1/0) int GetRateLogging() const; + /// Set socket type + /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) void UpdateType(const std::string& type); + /// Set socket method + /// @param method Socket method (bind/connect) void UpdateMethod(const std::string& method); + /// Set socket address + /// @param address Socket address (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") void UpdateAddress(const std::string& address); + /// Set socket send buffer size + /// @param sndBufSize Socket send buffer size (in number of messages) void UpdateSndBufSize(const int sndBufSize); + /// Set socket receive buffer size + /// @param rcvBufSize Socket receive buffer size (in number of messages) void UpdateRcvBufSize(const int rcvBufSize); + /// Set socket rate logging setting + /// @param rateLogging Socket rate logging setting (1/0) void UpdateRateLogging(const int rateLogging); + /// Checks if the configured channel settings are valid (checks the validity parameter, without running full validation (as oposed to ValidateChannel())) + /// @return true if channel settings are valid, false otherwise. bool IsValid() const; + /// Validates channel configuration + /// @return true if channel settings are valid, false otherwise. bool ValidateChannel(); - bool InitCommandInterface(FairMQTransportFactory* factory); + /// Resets the channel (requires validation to be used again). void ResetChannel(); FairMQSocket* fSocket; - // Wrappers for the socket methods to simplify the usage of channels + /// Sends a message to the socket queue. + /// @details Send method attempts to send a message by + /// putting it in the output queue. If the queue is full or queueing is not possible + /// for some other reason (e.g. no peers connected for a binding socket), the method blocks. + /// + /// @param msg Constant reference of unique_ptr to a FairMQMessage + /// @return Returns the number of bytes that have been queued. In case of errors, returns -1. int Send(const std::unique_ptr& msg) const; - /// \brief Sends a message in non-blocking mode. - /// \details SendAsync method attempts to send the message without blocking by + /// Sends a message in non-blocking mode. + /// @details SendAsync method attempts to send a message without blocking by /// putting it in the queue. If the queue is full or queueing is not possible /// for some other reason (e.g. no peers connected for a binding socket), the method returns 0. /// - /// \param msg Constant reference of unique_ptr to a FairMQMessage - /// \return Returns the number of bytes that have been queued. If queueing failed due to + /// @param msg Constant reference of unique_ptr to a FairMQMessage + /// @return Returns the number of bytes that have been queued. If queueing failed due to /// full queue or no connected peers (when binding), returns 0. In case of errors, returns -1. int SendAsync(const std::unique_ptr& msg) const; + + /// Queues the current message as a part of a multi-part message + /// @details SendPart method queues the provided message as a part of a multi-part message. + /// The actual transfer over the network is initiated once final part has been queued with the Send() or SendAsync methods. + /// + /// @param msg Constant reference of unique_ptr to a FairMQMessage + /// @return Returns the number of bytes that have been queued. In case of errors, returns -1. int SendPart(const std::unique_ptr& msg) const; + /// Receives a message from the socket queue. + /// @details Receive method attempts to receive a message from the input queue. + /// If the queue is empty the method blocks. + /// + /// @param msg Constant reference of unique_ptr to a FairMQMessage + /// @return Returns the number of bytes that have been received. In case of errors, returns -1. int Receive(const std::unique_ptr& msg) const; + + /// Receives a message in non-blocking mode. + /// @details ReceiveAsync method attempts to receive a message without blocking from the input queue. + /// If the queue is empty the method returns 0. + /// + /// @param msg Constant reference of unique_ptr to a FairMQMessage + /// @return Returns the number of bytes that have been received. If queue is empty, returns 0. + /// In case of errors, returns -1. int ReceiveAsync(const std::unique_ptr& msg) const; // DEPRECATED socket method wrappers with raw pointers and flag checks @@ -82,8 +145,8 @@ class FairMQChannel int Receive(FairMQMessage* msg, const std::string& flag = "") const; int Receive(FairMQMessage* msg, const int flags) const; - /// \brief Checks if the socket is expecting to receive another part of a multipart message. - /// \return Return true if the socket expects another part of a multipart message and false otherwise. + /// Checks if the socket is expecting to receive another part of a multipart message. + /// @return Return true if the socket expects another part of a multipart message and false otherwise. bool ExpectsAnotherPart() const; private: @@ -105,6 +168,8 @@ class FairMQChannel int fNoBlockFlag; int fSndMoreFlag; + bool InitCommandInterface(FairMQTransportFactory* factory); + bool HandleUnblock() const; // use static mutex to make the class easily copyable diff --git a/fairmq/FairMQConfigurable.cxx b/fairmq/FairMQConfigurable.cxx index 9178142e..29f61e9d 100644 --- a/fairmq/FairMQConfigurable.cxx +++ b/fairmq/FairMQConfigurable.cxx @@ -45,6 +45,12 @@ int FairMQConfigurable::GetProperty(const int key, const int default_ /*= 0*/) return default_; } +string FairMQConfigurable::GetPropertyDescription(const int key) +{ + LOG(ERROR) << "Reached end of the property list. The description of the requested property " << key << " was not found."; + return "0"; +} + FairMQConfigurable::~FairMQConfigurable() { } diff --git a/fairmq/FairMQConfigurable.h b/fairmq/FairMQConfigurable.h index d35bbe99..3bc9e12f 100644 --- a/fairmq/FairMQConfigurable.h +++ b/fairmq/FairMQConfigurable.h @@ -31,6 +31,8 @@ class FairMQConfigurable virtual std::string GetProperty(const int key, const std::string& default_ = ""); virtual void SetProperty(const int key, const int value); virtual int GetProperty(const int key, const int default_ = 0); + + virtual std::string GetPropertyDescription(const int key); }; #endif /* FAIRMQCONFIGURABLE_H_ */ diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index ede6f276..5bb39f30 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -397,6 +397,37 @@ string FairMQDevice::GetProperty(const int key, const string& default_ /*= ""*/) } } +string FairMQDevice::GetPropertyDescription(const int key) +{ + switch (key) + { + case Id: + return "Id: Device ID"; + case NumIoThreads: + return "NumIoThreads: Number of I/O Threads (size of the 0MQ thread pool to handle I/O operations. If your application is using only the inproc transport for messaging you may set this to zero, otherwise set it to at least one.)"; + case MaxInitializationTime: + return "MaxInitializationTime: Timeout for retrying validation and initialization of the channels."; + case PortRangeMin: + return "PortRangeMin: Minumum value for the port range (when binding to dynamic port)."; + case PortRangeMax: + return "PortRangeMax: Maximum value for the port range (when binding to dynamic port)."; + case LogIntervalInMs: + return "LogIntervalInMs: Time between socket rates logging outputs."; + default: + return FairMQConfigurable::GetPropertyDescription(key); + } +} + +void FairMQDevice::ListProperties() +{ + LOG(INFO) << "Properties of FairMQDevice:"; + for (int p = FairMQConfigurable::Last; p < FairMQDevice::Last; ++p) + { + LOG(INFO) << " " << GetPropertyDescription(p); + } + LOG(INFO) << "---------------------------"; +} + // Method for getting properties represented as an integer. int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/) { diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 9384ef63..e89befc6 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -29,12 +29,14 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable { + friend class FairMQChannel; + public: enum { Id = FairMQConfigurable::Last, - MaxInitializationTime, NumIoThreads, + MaxInitializationTime, PortRangeMin, PortRangeMax, LogIntervalInMs, @@ -60,6 +62,13 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable virtual void SetProperty(const int key, const int value); virtual int GetProperty(const int key, const int default_ = 0); + /// Get property description for a given property name + /// @param key Property name/key + /// @return String with the property description + virtual std::string GetPropertyDescription(const int key); + /// Print all properties of this and the parent class to LOG(INFO) + virtual void ListProperties(); + virtual void SetTransport(FairMQTransportFactory* factory); static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); diff --git a/fairmq/README.md b/fairmq/README.md index c3b1acdc..30a0c541 100644 --- a/fairmq/README.md +++ b/fairmq/README.md @@ -4,9 +4,7 @@ The standard FairRoot is running all the different analysis tasks within one pro ## Devices -The components encapsulating the tasks are called **devices** and derive from the common base class `FairMQDevice`. FairMQ provides ready to use devices to organize the dataflow between the components (without touching the contents of a message), providing functionality like merging and splitting of the data stream (see subdirectory `devices`). - -A number of devices to handle the data from the Tutorial3 detector of FairRoot are provided as an example and can be found in `FairRoot/base/MQ` directory. The implementation of the tasks run by these devices can be found `FairRoot/example/Tutorial3`. The implementation includes sending raw binary data as well as serializing the data with either [Boost Serialization](http://www.boost.org/doc/libs/release/libs/serialization/), [Google Protocol Buffers](https://developers.google.com/protocol-buffers/) or [Root TMessage](http://root.cern.ch/root/html/TMessage.html). Following the examples you can implement your own devices to transport arbitrary data. +The components encapsulating the tasks are called **devices** and derive from the common base class `FairMQDevice`. FairMQ provides ready to use devices to organize the dataflow between the components (without touching the contents of a message), providing functionality like merging and splitting of the data stream (see subdirectory `devices`). ## Topology @@ -18,6 +16,10 @@ Example of a simple FairMQ topology: Topology configuration is currently happening via setup scripts. This is very rudimentary and a much more flexible system is now in development. For now, example setup scripts can be found in directory `FairRoot/example/Tutorial3/` along with some additional documentation. +## Communication Patterns + +FairMQ devices communicate via the communication patterns offered by ZeroMQ (or nanomsg): PUSH-PULL, PUB-SUB, REQ-REP, PAIR, [more info here](http://api.zeromq.org/4-0:zmq-socket). + ## Messages Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content and transport either raw data or serialized data as described above. Message can be initialized in three different ways: @@ -33,3 +35,8 @@ The communication layer is available through an interface. Two interface impleme ![FairMQ transport interface](../docs/images/fairmq-transport-interface.png?raw=true "FairMQ transport interface") +## Examples + +A collection of simple examples in `examples` directory demonstrates some common usage patterns of FairMQ. + +A number of devices to handle the data from the Tutorial3 FairTestDetector of FairRoot are provided as an example and can be found in `FairRoot/base/MQ` directory. The implementation of the tasks run by these devices can be found `FairRoot/example/Tutorial3`. The implementation includes sending raw binary data as well as serializing the data with either [Boost Serialization](http://www.boost.org/doc/libs/release/libs/serialization/), [Google Protocol Buffers](https://developers.google.com/protocol-buffers/) or [Root TMessage](http://root.cern.ch/root/html/TMessage.html). Following the examples you can implement your own devices to transport arbitrary data. diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index ca127183..3adb9fed 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -130,3 +130,26 @@ int FairMQBenchmarkSampler::GetProperty(const int key, const int default_ /*= 0* return FairMQDevice::GetProperty(key, default_); } } + +string FairMQBenchmarkSampler::GetPropertyDescription(const int key) +{ + switch (key) + { + case EventSize: + return "EventSize: Size of the transfered message buffer."; + case EventRate: + return "EventRate: Upper limit for the message rate."; + default: + return FairMQDevice::GetPropertyDescription(key); + } +} + +void FairMQBenchmarkSampler::ListProperties() +{ + LOG(INFO) << "Properties of FairMQBenchmarkSampler:"; + for (int p = FairMQConfigurable::Last; p < FairMQBenchmarkSampler::Last; ++p) + { + LOG(INFO) << " " << GetPropertyDescription(p); + } + LOG(INFO) << "---------------------------"; +} diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index b1953c85..72f50bfb 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -44,6 +44,9 @@ class FairMQBenchmarkSampler : public FairMQDevice virtual void SetProperty(const int key, const int value); virtual int GetProperty(const int key, const int default_ = 0); + virtual std::string GetPropertyDescription(const int key); + virtual void ListProperties(); + protected: int fEventSize; int fEventRate; diff --git a/fairmq/examples/1-sampler-sink/CMakeLists.txt b/fairmq/examples/1-sampler-sink/CMakeLists.txt new file mode 100644 index 00000000..81b1338b --- /dev/null +++ b/fairmq/examples/1-sampler-sink/CMakeLists.txt @@ -0,0 +1,79 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink/ex1-sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json) + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/examples/1-sampler-sink + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${Boost_INCLUDE_DIR} +) + +If(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ) +Else(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ) +EndIf(NANOMSG_FOUND) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${Boost_LIBRARY_DIRS} +) + +Link_Directories(${LINK_DIRECTORIES}) + +Set(SRCS + "FairMQExample1Sampler.cxx" + "FairMQExample1Sink.cxx" +) + +Set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ +) + +Set(LIBRARY_NAME FairMQExample1) + +GENERATE_LIBRARY() + +Set(Exe_Names + ex1-sampler + ex1-sink +) + +Set(Exe_Source + runExample1Sampler.cxx + runExample1Sink.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQExample1) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) diff --git a/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.cxx b/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.cxx index 72cab457..e03e5475 100644 --- a/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.cxx +++ b/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.cxx @@ -15,7 +15,6 @@ #include // unique_ptr #include -#include #include "FairMQExample1Sampler.h" #include "FairMQLogger.h" diff --git a/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx b/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx index e4547950..dd923eeb 100644 --- a/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx +++ b/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx @@ -12,9 +12,6 @@ * @author A. Rybalchenko */ -#include -#include - #include "FairMQExample1Sink.h" #include "FairMQLogger.h" diff --git a/fairmq/examples/1-sampler-sink/README.md b/fairmq/examples/1-sampler-sink/README.md index f7f2975e..9108a03e 100644 --- a/fairmq/examples/1-sampler-sink/README.md +++ b/fairmq/examples/1-sampler-sink/README.md @@ -5,4 +5,4 @@ A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends d `runExample1Sampler.cxx` and `runExample1Sink.cxx` configure and run the devices in their main function. -The executables take two required command line parameters: `--id` and `--config-json-file`. The value of `--id` should be a unique identifier and the value for `-config-json-file` a path to a config file. The config file for this example is `ex1-sampler-sink.json` and it contains configuration for the communication channels of the devices. The mapping between a specific device and the configuration (which can contain multiple devices) is done based on the **id**. +The executables take two required command line parameters: `--id` and `--config-json-file`. The value of `--id` should be a unique identifier and the value for `--config-json-file` a path to a config file. The config file for this example is `ex1-sampler-sink.json` and it contains configuration for the communication channels of the devices. The mapping between a specific device and the configuration (which can contain multiple devices) is done based on the **id**. diff --git a/fairmq/examples/2-sampler-processor-sink/CMakeLists.txt b/fairmq/examples/2-sampler-processor-sink/CMakeLists.txt new file mode 100644 index 00000000..ff5ec3e5 --- /dev/null +++ b/fairmq/examples/2-sampler-processor-sink/CMakeLists.txt @@ -0,0 +1,82 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink/ex2-sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json) + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/examples/2-sampler-processor-sink + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${Boost_INCLUDE_DIR} +) + +If(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ) +Else(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ) +EndIf(NANOMSG_FOUND) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${Boost_LIBRARY_DIRS} +) + +Link_Directories(${LINK_DIRECTORIES}) + +Set(SRCS + "FairMQExample2Sampler.cxx" + "FairMQExample2Processor.cxx" + "FairMQExample2Sink.cxx" +) + +Set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ +) + +Set(LIBRARY_NAME FairMQExample2) + +GENERATE_LIBRARY() + +Set(Exe_Names + ex2-sampler + ex2-processor + ex2-sink +) + +Set(Exe_Source + runExample2Sampler.cxx + runExample2Processor.cxx + runExample2Sink.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + Set(EXE_NAME ${_name}) + Set(SRCS ${_src}) + Set(DEPENDENCIES FairMQExample2) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) diff --git a/fairmq/examples/3-dds/CMakeLists.txt b/fairmq/examples/3-dds/CMakeLists.txt new file mode 100644 index 00000000..569d63b1 --- /dev/null +++ b/fairmq/examples/3-dds/CMakeLists.txt @@ -0,0 +1,93 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-devices.json ${CMAKE_BINARY_DIR}/bin/config/ex3-devices.json) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY) + +add_definitions(-DENABLE_DDS) + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${SYSTEM_INCLUDE_DIRECTORIES} + ${DDS_INCLUDE_DIR} +) + +If(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ) +Else(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ) +EndIf(NANOMSG_FOUND) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${LINK_DIRECTORIES} + ${Boost_LIBRARY_DIRS} + ${DDS_LIBRARY_DIR} +) + +Link_Directories(${LINK_DIRECTORIES}) + +Set(SRCS + ${SRCS} + "FairMQExample3Sampler.cxx" + "FairMQExample3Processor.cxx" + "FairMQExample3Sink.cxx" +) + +Set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ + dds-key-value-lib +) + +set(LIBRARY_NAME FairMQExample3) + +GENERATE_LIBRARY() + +Set(Exe_Names + ${Exe_Names} + ex3-sampler-dds + ex3-processor-dds + ex3-sink-dds +) + +Set(Exe_Source + ${Exe_Source} + runExample3Sampler.cxx + runExample3Processor.cxx + runExample3Sink.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQExample3) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) diff --git a/fairmq/examples/3-dds/README.md b/fairmq/examples/3-dds/README.md index 1a71027a..0f02b67f 100644 --- a/fairmq/examples/3-dds/README.md +++ b/fairmq/examples/3-dds/README.md @@ -3,6 +3,12 @@ Example 3: DDS This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices. +This example is compiled only if the DDS is found by CMake. Custom DDS installation location can be given to CMake like this: + +```bash +cmake -DDDS_PATH="/path/to/dds/install/dir/" .. +``` + The description below outlines the minimal steps needed to run the example with DDS. For more details please refer to DDS documentation on [DDS Website](http://dds.gsi.de/). ##### 1. The devices that bind their sockets need to advertise their bound addresses to DDS by writing a property. @@ -56,9 +62,9 @@ After this step each device will have the necessary connection information. We run this example on the local machine for simplicity. The file below defines one worker `wn0` with 12 DDS Agents (thus able to accept 12 tasks). The parameters for each worker node are: - user-chosen worker ID (must be unique) - - a host name with or without a login, in a form: login@host.fqdn + - a host name with or without a login, in a form: login@host.fqdn (password-less SSH access to these hosts must be possible) - additional SSH params (can be empty) - - a remote working directory + - a remote working directory (most exist on the worker nodes) - number of DDS Agents for this worker ```bash diff --git a/fairmq/examples/3-dds/runExample3Processor.cxx b/fairmq/examples/3-dds/runExample3Processor.cxx index 2f2097cb..c3ab7a73 100644 --- a/fairmq/examples/3-dds/runExample3Processor.cxx +++ b/fairmq/examples/3-dds/runExample3Processor.cxx @@ -46,11 +46,11 @@ int main(int argc, char** argv) { int ddsTaskIndex = 0; - options_description samplerOptions("Processor options"); - samplerOptions.add_options() - ("index", value(&ddsTaskIndex)->default_value(0), "Store DDS task index"); + options_description processorOptions("Processor options"); + processorOptions.add_options() + ("index", value(&ddsTaskIndex)->default_value(0), "DDS task index"); - config.AddToCmdLineOptions(samplerOptions); + config.AddToCmdLineOptions(processorOptions); if (config.ParseAll(argc, argv)) { diff --git a/fairmq/examples/3-dds/runExample3Sampler.cxx b/fairmq/examples/3-dds/runExample3Sampler.cxx index 373ca2f2..e13731a7 100644 --- a/fairmq/examples/3-dds/runExample3Sampler.cxx +++ b/fairmq/examples/3-dds/runExample3Sampler.cxx @@ -45,11 +45,13 @@ int main(int argc, char** argv) try { - std::string text; + std::string text; // text to be sent for processing. + std::string interfaceName; // name of the network interface to use for communication. options_description samplerOptions("Sampler options"); samplerOptions.add_options() - ("text", value(&text)->default_value("Hello"), "Text to send out"); + ("text", value(&text)->default_value("Hello"), "Text to send out") + ("network-interface", value(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)"); config.AddToCmdLineOptions(samplerOptions); @@ -83,22 +85,14 @@ int main(int argc, char** argv) FairMQ::tools::getHostIPs(IPs); stringstream ss; // Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0. - if (IPs.count("ib0")) + if (IPs.count(interfaceName)) { - ss << "tcp://" << IPs["ib0"] << ":1"; - } - else if (IPs.count("eth0")) - { - ss << "tcp://" << IPs["eth0"] << ":1"; - } - else if (IPs.count("wlan0")) - { - ss << "tcp://" << IPs["wlan0"] << ":1"; + ss << "tcp://" << IPs[interfaceName] << ":1"; } else { LOG(INFO) << ss.str(); - LOG(ERROR) << "Could not find ib0, eth0 or wlan0"; + LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting."; exit(EXIT_FAILURE); } string initialOutputAddress = ss.str(); diff --git a/fairmq/examples/3-dds/runExample3Sink.cxx b/fairmq/examples/3-dds/runExample3Sink.cxx index c2f26d69..e4ede527 100644 --- a/fairmq/examples/3-dds/runExample3Sink.cxx +++ b/fairmq/examples/3-dds/runExample3Sink.cxx @@ -45,6 +45,14 @@ int main(int argc, char** argv) try { + std::string interfaceName; // name of the network interface to use for communication. + + options_description sinkOptions("Sink options"); + sinkOptions.add_options() + ("network-interface", value(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)"); + + config.AddToCmdLineOptions(sinkOptions); + if (config.ParseAll(argc, argv)) { return 0; @@ -74,22 +82,14 @@ int main(int argc, char** argv) FairMQ::tools::getHostIPs(IPs); stringstream ss; // Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0. - if (IPs.count("ib0")) + if (IPs.count(interfaceName)) { - ss << "tcp://" << IPs["ib0"] << ":1"; - } - else if (IPs.count("eth0")) - { - ss << "tcp://" << IPs["eth0"] << ":1"; - } - else if (IPs.count("wlan0")) - { - ss << "tcp://" << IPs["wlan0"] << ":1"; + ss << "tcp://" << IPs[interfaceName] << ":1"; } else { LOG(INFO) << ss.str(); - LOG(ERROR) << "Could not find ib0, eth0 or wlan0"; + LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting."; exit(EXIT_FAILURE); } string initialInputAddress = ss.str(); diff --git a/fairmq/examples/4-copypush/CMakeLists.txt b/fairmq/examples/4-copypush/CMakeLists.txt new file mode 100644 index 00000000..3918e133 --- /dev/null +++ b/fairmq/examples/4-copypush/CMakeLists.txt @@ -0,0 +1,79 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json) + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/examples/4-copypush + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${Boost_INCLUDE_DIR} +) + +If(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ) +Else(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ) +EndIf(NANOMSG_FOUND) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${Boost_LIBRARY_DIRS} +) + +Link_Directories(${LINK_DIRECTORIES}) + +Set(SRCS + "FairMQExample4Sampler.cxx" + "FairMQExample4Sink.cxx" +) + +Set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ +) + +Set(LIBRARY_NAME FairMQExample4) + +GENERATE_LIBRARY() + +Set(Exe_Names + ex4-sampler + ex4-sink +) + +Set(Exe_Source + runExample4Sampler.cxx + runExample4Sink.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + Set(EXE_NAME ${_name}) + Set(SRCS ${_src}) + Set(DEPENDENCIES FairMQExample4) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) diff --git a/fairmq/examples/5-req-rep/CMakeLists.txt b/fairmq/examples/5-req-rep/CMakeLists.txt new file mode 100644 index 00000000..95593e83 --- /dev/null +++ b/fairmq/examples/5-req-rep/CMakeLists.txt @@ -0,0 +1,77 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/examples/5-req-rep + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${Boost_INCLUDE_DIR} +) + +If(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ) +Else(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ) +EndIf(NANOMSG_FOUND) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${Boost_LIBRARY_DIRS} +) + +Link_Directories(${LINK_DIRECTORIES}) + +set(SRCS + "FairMQExample5Client.cxx" + "FairMQExample5Server.cxx" +) + +set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ +) + +set(LIBRARY_NAME FairMQExample5) + +GENERATE_LIBRARY() + +set(Exe_Names + ex5-client + ex5-server +) + +set(Exe_Source + runExample5Client.cxx + runExample5Server.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQExample5) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) diff --git a/fairmq/test/CMakeLists.txt b/fairmq/test/CMakeLists.txt new file mode 100644 index 00000000..2cebdadd --- /dev/null +++ b/fairmq/test/CMakeLists.txt @@ -0,0 +1,107 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-push-pull.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-pub-sub.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh) +configure_file(${CMAKE_SOURCE_DIR}/fairmq/test/test-fairmq-req-rep.sh.in ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh) + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/test/push-pull + ${CMAKE_SOURCE_DIR}/fairmq/test/pub-sub + ${CMAKE_SOURCE_DIR}/fairmq/test/req-rep + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${Boost_INCLUDE_DIR} +) + +If(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ) +Else(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ) +EndIf(NANOMSG_FOUND) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${Boost_LIBRARY_DIRS} +) + +Link_Directories(${LINK_DIRECTORIES}) + +set(SRCS + "push-pull/FairMQTestPush.cxx" + "push-pull/FairMQTestPull.cxx" + "pub-sub/FairMQTestPub.cxx" + "pub-sub/FairMQTestSub.cxx" + "req-rep/FairMQTestReq.cxx" + "req-rep/FairMQTestRep.cxx" +) + +set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ +) + +set(LIBRARY_NAME FairMQTest) + +GENERATE_LIBRARY() + +set(Exe_Names + test-fairmq-push + test-fairmq-pull + test-fairmq-pub + test-fairmq-sub + test-fairmq-req + test-fairmq-rep +) + +set(Exe_Source + push-pull/runTestPush.cxx + push-pull/runTestPull.cxx + pub-sub/runTestPub.cxx + pub-sub/runTestSub.cxx + req-rep/runTestReq.cxx + req-rep/runTestRep.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQTest) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) + +add_test(NAME run_fairmq_push_pull COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-push-pull.sh) +set_tests_properties(run_fairmq_push_pull PROPERTIES TIMEOUT "30") +set_tests_properties(run_fairmq_push_pull PROPERTIES PASS_REGULAR_EXPRESSION "PUSH-PULL test successfull") + +add_test(NAME run_fairmq_pub_sub COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-pub-sub.sh) +set_tests_properties(run_fairmq_pub_sub PROPERTIES TIMEOUT "30") +set_tests_properties(run_fairmq_pub_sub PROPERTIES PASS_REGULAR_EXPRESSION "PUB-SUB test successfull") + +add_test(NAME run_fairmq_req_rep COMMAND ${CMAKE_BINARY_DIR}/fairmq/test/test-fairmq-req-rep.sh) +set_tests_properties(run_fairmq_req_rep PROPERTIES TIMEOUT "30") +set_tests_properties(run_fairmq_req_rep PROPERTIES PASS_REGULAR_EXPRESSION "REQ-REP test successfull") diff --git a/fairmq/zeromq/FairMQContextZMQ.cxx b/fairmq/zeromq/FairMQContextZMQ.cxx index ed8fe795..6ece7d63 100644 --- a/fairmq/zeromq/FairMQContextZMQ.cxx +++ b/fairmq/zeromq/FairMQContextZMQ.cxx @@ -14,6 +14,8 @@ #include +#include + #include "FairMQLogger.h" #include "FairMQContextZMQ.h" diff --git a/fairmq/zeromq/FairMQContextZMQ.h b/fairmq/zeromq/FairMQContextZMQ.h index 6dc3b8e5..9532e0ff 100644 --- a/fairmq/zeromq/FairMQContextZMQ.h +++ b/fairmq/zeromq/FairMQContextZMQ.h @@ -15,12 +15,9 @@ #ifndef FAIRMQCONTEXTZMQ_H_ #define FAIRMQCONTEXTZMQ_H_ -#include - class FairMQContextZMQ { public: - /// Constructor FairMQContextZMQ(int numIoThreads); diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 1ca8c4aa..ecf17092 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -14,6 +14,8 @@ #include +#include + #include "FairMQSocketZMQ.h" #include "FairMQLogger.h" diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 45c187ef..88ed4f97 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -17,8 +17,6 @@ #include -#include - #include "FairMQSocket.h" #include "FairMQContextZMQ.h"