From 023d88d0efb8a0cdd5ec497e9b1e3b093d68f1a2 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 7 Oct 2015 15:50:10 +0200 Subject: [PATCH] Add FairMQ Example 6 - Working with multiple channels --- fairmq/CMakeLists.txt | 1 + fairmq/devices/FairMQMerger.cxx | 4 +- .../1-sampler-sink/runExample1Sink.cxx | 4 - .../runExample2Processor.cxx | 4 - .../runExample2Sink.cxx | 4 - .../4-copypush/runExample4Sampler.cxx | 4 - .../examples/4-copypush/runExample4Sink.cxx | 4 - .../6-multiple-channels/CMakeLists.txt | 82 +++++++++++++ .../FairMQExample6Broadcaster.cxx | 49 ++++++++ .../FairMQExample6Broadcaster.h | 32 +++++ .../FairMQExample6Sampler.cxx | 116 ++++++++++++++++++ .../FairMQExample6Sampler.h | 46 +++++++ .../FairMQExample6Sink.cxx | 61 +++++++++ .../6-multiple-channels/FairMQExample6Sink.h | 30 +++++ fairmq/examples/6-multiple-channels/README.md | 8 ++ .../ex6-multiple-channels.json | 85 +++++++++++++ .../runExample6Broadcaster.cxx | 79 ++++++++++++ .../runExample6Sampler.cxx | 92 ++++++++++++++ .../6-multiple-channels/runExample6Sink.cxx | 83 +++++++++++++ fairmq/examples/README.md | 9 ++ 20 files changed, 776 insertions(+), 21 deletions(-) create mode 100644 fairmq/examples/6-multiple-channels/CMakeLists.txt create mode 100644 fairmq/examples/6-multiple-channels/FairMQExample6Broadcaster.cxx create mode 100644 fairmq/examples/6-multiple-channels/FairMQExample6Broadcaster.h create mode 100644 fairmq/examples/6-multiple-channels/FairMQExample6Sampler.cxx create mode 100644 fairmq/examples/6-multiple-channels/FairMQExample6Sampler.h create mode 100644 fairmq/examples/6-multiple-channels/FairMQExample6Sink.cxx create mode 100644 fairmq/examples/6-multiple-channels/FairMQExample6Sink.h create mode 100644 fairmq/examples/6-multiple-channels/README.md create mode 100644 fairmq/examples/6-multiple-channels/ex6-multiple-channels.json create mode 100644 fairmq/examples/6-multiple-channels/runExample6Broadcaster.cxx create mode 100644 fairmq/examples/6-multiple-channels/runExample6Sampler.cxx create mode 100644 fairmq/examples/6-multiple-channels/runExample6Sink.cxx diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index a3803d70..079f71db 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -17,6 +17,7 @@ If(DDS_FOUND) EndIf(DDS_FOUND) add_subdirectory(examples/4-copypush) add_subdirectory(examples/5-req-rep) +add_subdirectory(examples/6-multiple-channels) add_subdirectory(test) diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index dc6ff908..8c79c03d 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -39,6 +39,8 @@ void FairMQMerger::Run() dataInChannels.at(i) = &(fChannels.at("data-in").at(i)); } + int numInputs = fChannels.at("data-in").size(); + while (CheckCurrentState(RUNNING)) { std::unique_ptr msg(fTransportFactory->CreateMessage()); @@ -46,7 +48,7 @@ void FairMQMerger::Run() poller->Poll(100); // Loop over the data input channels. - for (int i = 0; i < fChannels.at("data-in").size(); ++i) + for (int i = 0; i < numInputs; ++i) { // Check if the channel has data ready to be received. if (poller->CheckInput(i)) diff --git a/fairmq/examples/1-sampler-sink/runExample1Sink.cxx b/fairmq/examples/1-sampler-sink/runExample1Sink.cxx index 3cc5f85c..4080cf4b 100644 --- a/fairmq/examples/1-sampler-sink/runExample1Sink.cxx +++ b/fairmq/examples/1-sampler-sink/runExample1Sink.cxx @@ -14,8 +14,6 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" #include "FairMQParser.h" #include "FairMQProgOptions.h" @@ -27,8 +25,6 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using namespace boost::program_options; - int main(int argc, char** argv) { FairMQExample1Sink sink; diff --git a/fairmq/examples/2-sampler-processor-sink/runExample2Processor.cxx b/fairmq/examples/2-sampler-processor-sink/runExample2Processor.cxx index 0b390172..3e72f0de 100644 --- a/fairmq/examples/2-sampler-processor-sink/runExample2Processor.cxx +++ b/fairmq/examples/2-sampler-processor-sink/runExample2Processor.cxx @@ -14,8 +14,6 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" #include "FairMQParser.h" #include "FairMQProgOptions.h" @@ -27,8 +25,6 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using namespace boost::program_options; - int main(int argc, char** argv) { FairMQExample2Processor processor; diff --git a/fairmq/examples/2-sampler-processor-sink/runExample2Sink.cxx b/fairmq/examples/2-sampler-processor-sink/runExample2Sink.cxx index 312d703b..1f71b156 100644 --- a/fairmq/examples/2-sampler-processor-sink/runExample2Sink.cxx +++ b/fairmq/examples/2-sampler-processor-sink/runExample2Sink.cxx @@ -14,8 +14,6 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" #include "FairMQParser.h" #include "FairMQProgOptions.h" @@ -27,8 +25,6 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using namespace boost::program_options; - int main(int argc, char** argv) { FairMQExample2Sink sink; diff --git a/fairmq/examples/4-copypush/runExample4Sampler.cxx b/fairmq/examples/4-copypush/runExample4Sampler.cxx index 70ccacd4..ddfd7002 100644 --- a/fairmq/examples/4-copypush/runExample4Sampler.cxx +++ b/fairmq/examples/4-copypush/runExample4Sampler.cxx @@ -14,8 +14,6 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" #include "FairMQParser.h" #include "FairMQProgOptions.h" @@ -27,8 +25,6 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using namespace boost::program_options; - int main(int argc, char** argv) { FairMQExample4Sampler sampler; diff --git a/fairmq/examples/4-copypush/runExample4Sink.cxx b/fairmq/examples/4-copypush/runExample4Sink.cxx index 1714caf4..9c5cab08 100644 --- a/fairmq/examples/4-copypush/runExample4Sink.cxx +++ b/fairmq/examples/4-copypush/runExample4Sink.cxx @@ -14,8 +14,6 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" #include "FairMQParser.h" #include "FairMQProgOptions.h" @@ -27,8 +25,6 @@ #include "FairMQTransportFactoryZMQ.h" #endif -using namespace boost::program_options; - int main(int argc, char** argv) { FairMQExample4Sink sink; diff --git a/fairmq/examples/6-multiple-channels/CMakeLists.txt b/fairmq/examples/6-multiple-channels/CMakeLists.txt new file mode 100644 index 00000000..0d886452 --- /dev/null +++ b/fairmq/examples/6-multiple-channels/CMakeLists.txt @@ -0,0 +1,82 @@ + ################################################################################ + # Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH # + # # + # This software is distributed under the terms of the # + # GNU Lesser General Public Licence version 3 (LGPL) version 3, # + # copied verbatim in the file "LICENSE" # + ################################################################################ + +configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/6-multiple-channels/ex6-multiple-channels.json ${CMAKE_BINARY_DIR}/bin/config/ex6-multiple-channels.json) + +Set(INCLUDE_DIRECTORIES + ${CMAKE_SOURCE_DIR}/fairmq + ${CMAKE_SOURCE_DIR}/fairmq/devices + ${CMAKE_SOURCE_DIR}/fairmq/tools + ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/examples/6-multiple-channels + ${CMAKE_CURRENT_BINARY_DIR} +) + +Set(SYSTEM_INCLUDE_DIRECTORIES + ${Boost_INCLUDE_DIR} +) + +If(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/nanomsg + ) +Else(NANOMSG_FOUND) + Set(INCLUDE_DIRECTORIES + ${INCLUDE_DIRECTORIES} + ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ) +EndIf(NANOMSG_FOUND) + +Include_Directories(${INCLUDE_DIRECTORIES}) +Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES}) + +Set(LINK_DIRECTORIES + ${Boost_LIBRARY_DIRS} +) + +Link_Directories(${LINK_DIRECTORIES}) + +Set(SRCS + "FairMQExample6Sampler.cxx" + "FairMQExample6Sink.cxx" + "FairMQExample6Broadcaster.cxx" +) + +Set(DEPENDENCIES + ${DEPENDENCIES} + FairMQ +) + +Set(LIBRARY_NAME FairMQExample6) + +GENERATE_LIBRARY() + +Set(Exe_Names + ex6-sampler + ex6-sink + ex6-broadcaster +) + +Set(Exe_Source + runExample6Sampler.cxx + runExample6Sink.cxx + runExample6Broadcaster.cxx +) + +list(LENGTH Exe_Names _length) +math(EXPR _length ${_length}-1) + +ForEach(_file RANGE 0 ${_length}) + list(GET Exe_Names ${_file} _name) + list(GET Exe_Source ${_file} _src) + set(EXE_NAME ${_name}) + set(SRCS ${_src}) + set(DEPENDENCIES FairMQExample6) + GENERATE_EXECUTABLE() +EndForEach(_file RANGE 0 ${_length}) diff --git a/fairmq/examples/6-multiple-channels/FairMQExample6Broadcaster.cxx b/fairmq/examples/6-multiple-channels/FairMQExample6Broadcaster.cxx new file mode 100644 index 00000000..5130d8f1 --- /dev/null +++ b/fairmq/examples/6-multiple-channels/FairMQExample6Broadcaster.cxx @@ -0,0 +1,49 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample6Broadcaster.cpp + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#include // unique_ptr +#include + +#include + +#include "FairMQExample6Broadcaster.h" +#include "FairMQLogger.h" + +using namespace std; + +FairMQExample6Broadcaster::FairMQExample6Broadcaster() +{ +} + +void FairMQExample6Broadcaster::CustomCleanup(void *data, void *object) +{ + delete (string*)object; +} + +void FairMQExample6Broadcaster::Run() +{ + while (CheckCurrentState(RUNNING)) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + + string* text = new string("OK"); + unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + LOG(INFO) << "Sending \"" << "OK" << "\""; + fChannels.at("broadcast-out").at(0).Send(msg); + } +} + +FairMQExample6Broadcaster::~FairMQExample6Broadcaster() +{ +} diff --git a/fairmq/examples/6-multiple-channels/FairMQExample6Broadcaster.h b/fairmq/examples/6-multiple-channels/FairMQExample6Broadcaster.h new file mode 100644 index 00000000..cbd5198d --- /dev/null +++ b/fairmq/examples/6-multiple-channels/FairMQExample6Broadcaster.h @@ -0,0 +1,32 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample6Broadcaster.h + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQEXAMPLE6BROADCASTER_H_ +#define FAIRMQEXAMPLE6BROADCASTER_H_ + +#include "FairMQDevice.h" + +class FairMQExample6Broadcaster : public FairMQDevice +{ + public: + FairMQExample6Broadcaster(); + virtual ~FairMQExample6Broadcaster(); + + static void CustomCleanup(void* data, void* hint); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQEXAMPLE6BROADCASTER_H_ */ diff --git a/fairmq/examples/6-multiple-channels/FairMQExample6Sampler.cxx b/fairmq/examples/6-multiple-channels/FairMQExample6Sampler.cxx new file mode 100644 index 00000000..d54b074d --- /dev/null +++ b/fairmq/examples/6-multiple-channels/FairMQExample6Sampler.cxx @@ -0,0 +1,116 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample6Sampler.cpp + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#include // unique_ptr + +#include + +#include "FairMQExample6Sampler.h" +#include "FairMQPoller.h" +#include "FairMQLogger.h" + +using namespace std; + +FairMQExample6Sampler::FairMQExample6Sampler() + : fText() +{ +} + +void FairMQExample6Sampler::CustomCleanup(void *data, void *object) +{ + delete (string*)object; +} + +void FairMQExample6Sampler::Run() +{ + std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data-out", "broadcast-in" })); + + while (CheckCurrentState(RUNNING)) + { + poller->Poll(-1); + + if (poller->CheckInput("broadcast-in", 0)) + { + unique_ptr msg(fTransportFactory->CreateMessage()); + + if (fChannels.at("broadcast-in").at(0).Receive(msg) > 0) + { + LOG(INFO) << "Received broadcast: \"" + << string(static_cast(msg->GetData()), msg->GetSize()) + << "\""; + } + } // if (poller->CheckInput("broadcast-in", 0)) + + if (poller->CheckOutput("data-out", 0)) + { + boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + + string* text = new string(fText); + + unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + + LOG(INFO) << "Sending \"" << fText << "\""; + + fChannels.at("data-out").at(0).Send(msg); + } // if (poller->CheckOutput("data-out", 0)) + } // while (CheckCurrentState(RUNNING)) +} + +FairMQExample6Sampler::~FairMQExample6Sampler() +{ +} + +void FairMQExample6Sampler::SetProperty(const int key, const string& value) +{ + switch (key) + { + case Text: + fText = value; + break; + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +string FairMQExample6Sampler::GetProperty(const int key, const string& default_ /*= ""*/) +{ + switch (key) + { + case Text: + return fText; + break; + default: + return FairMQDevice::GetProperty(key, default_); + } +} + +void FairMQExample6Sampler::SetProperty(const int key, const int value) +{ + switch (key) + { + default: + FairMQDevice::SetProperty(key, value); + break; + } +} + +int FairMQExample6Sampler::GetProperty(const int key, const int default_ /*= 0*/) +{ + switch (key) + { + default: + return FairMQDevice::GetProperty(key, default_); + } +} diff --git a/fairmq/examples/6-multiple-channels/FairMQExample6Sampler.h b/fairmq/examples/6-multiple-channels/FairMQExample6Sampler.h new file mode 100644 index 00000000..bbdfd9ba --- /dev/null +++ b/fairmq/examples/6-multiple-channels/FairMQExample6Sampler.h @@ -0,0 +1,46 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample6Sampler.h + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQEXAMPLE6SAMPLER_H_ +#define FAIRMQEXAMPLE6SAMPLER_H_ + +#include + +#include "FairMQDevice.h" + +class FairMQExample6Sampler : public FairMQDevice +{ + public: + enum + { + Text = FairMQDevice::Last, + Last + }; + FairMQExample6Sampler(); + virtual ~FairMQExample6Sampler(); + + static void CustomCleanup(void* data, void* hint); + + virtual void SetProperty(const int key, const std::string& value); + virtual std::string GetProperty(const int key, const std::string& default_ = ""); + virtual void SetProperty(const int key, const int value); + virtual int GetProperty(const int key, const int default_ = 0); + + protected: + std::string fText; + + virtual void Run(); +}; + +#endif /* FAIRMQEXAMPLE6SAMPLER_H_ */ diff --git a/fairmq/examples/6-multiple-channels/FairMQExample6Sink.cxx b/fairmq/examples/6-multiple-channels/FairMQExample6Sink.cxx new file mode 100644 index 00000000..f28ee077 --- /dev/null +++ b/fairmq/examples/6-multiple-channels/FairMQExample6Sink.cxx @@ -0,0 +1,61 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample6Sink.cxx + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#include "FairMQExample6Sink.h" +#include "FairMQPoller.h" +#include "FairMQLogger.h" + +using namespace std; + +FairMQExample6Sink::FairMQExample6Sink() +{ +} + +void FairMQExample6Sink::Run() +{ + std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data-in", "broadcast-in" })); + + while (CheckCurrentState(RUNNING)) + { + poller->Poll(-1); + + if (poller->CheckInput("broadcast-in", 0)) + { + unique_ptr msg(fTransportFactory->CreateMessage()); + + if (fChannels.at("broadcast-in").at(0).Receive(msg) > 0) + { + LOG(INFO) << "Received broadcast: \"" + << string(static_cast(msg->GetData()), msg->GetSize()) + << "\""; + } + } + + if (poller->CheckInput("data-in", 0)) + { + unique_ptr msg(fTransportFactory->CreateMessage()); + + if (fChannels.at("data-in").at(0).Receive(msg) > 0) + { + LOG(INFO) << "Received message: \"" + << string(static_cast(msg->GetData()), msg->GetSize()) + << "\""; + } + } + } +} + +FairMQExample6Sink::~FairMQExample6Sink() +{ +} diff --git a/fairmq/examples/6-multiple-channels/FairMQExample6Sink.h b/fairmq/examples/6-multiple-channels/FairMQExample6Sink.h new file mode 100644 index 00000000..c0b5b5ed --- /dev/null +++ b/fairmq/examples/6-multiple-channels/FairMQExample6Sink.h @@ -0,0 +1,30 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQExample6Sink.h + * + * @since 2014-10-10 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQEXAMPLE6SINK_H_ +#define FAIRMQEXAMPLE6SINK_H_ + +#include "FairMQDevice.h" + +class FairMQExample6Sink : public FairMQDevice +{ + public: + FairMQExample6Sink(); + virtual ~FairMQExample6Sink(); + + protected: + virtual void Run(); +}; + +#endif /* FAIRMQEXAMPLE6SINK_H_ */ diff --git a/fairmq/examples/6-multiple-channels/README.md b/fairmq/examples/6-multiple-channels/README.md new file mode 100644 index 00000000..ab3a8173 --- /dev/null +++ b/fairmq/examples/6-multiple-channels/README.md @@ -0,0 +1,8 @@ +Example 6: Multiple Channels +=============== + +This example demonstrates how to work with multiple channels and multiplex between them. + +A topology of three devices - **Sampler**, **Sink** and **Broadcaster**. The Sampler sends data to the Sink via the PUSH-PULL pattern. The Broadcaster device sends a message to both Sampler and Sink containing a string "OK" every second. The Broadcaster sends the message via PUB pattern. Both Sampler and Sink, besides doing their PUSH-PULL job, listen via SUB to the Broadcaster. + +The multiplexing between their data channels and the broadcast channels happens with `FairMQPoller`. diff --git a/fairmq/examples/6-multiple-channels/ex6-multiple-channels.json b/fairmq/examples/6-multiple-channels/ex6-multiple-channels.json new file mode 100644 index 00000000..7479f2bd --- /dev/null +++ b/fairmq/examples/6-multiple-channels/ex6-multiple-channels.json @@ -0,0 +1,85 @@ +{ + "fairMQOptions": + { + "device": + { + "id": "sampler1", + "channel": + { + "name": "data-out", + "socket": + { + "type": "push", + "method": "bind", + "address": "tcp://*:5555", + "sndBufSize": "1000", + "rcvBufSize": "1000", + "rateLogging": "0" + } + }, + "channel": + { + "name": "broadcast-in", + "socket": + { + "type": "sub", + "method": "connect", + "address": "tcp://localhost:5005", + "sndBufSize": "1000", + "rcvBufSize": "1000", + "rateLogging": "0" + } + } + }, + + "device": + { + "id": "sink1", + "channel": + { + "name": "data-in", + "socket": + { + "type": "pull", + "method": "connect", + "address": "tcp://localhost:5555", + "sndBufSize": "1000", + "rcvBufSize": "1000", + "rateLogging": "0" + } + }, + "channel": + { + "name": "broadcast-in", + "socket": + { + "type": "sub", + "method": "connect", + "address": "tcp://localhost:5005", + "sndBufSize": "1000", + "rcvBufSize": "1000", + "rateLogging": "0" + } + } + }, + + "device": + { + "id": "broadcaster1", + "channel": + { + "name": "broadcast-out", + "socket": + { + "type": "pub", + "method": "bind", + "address": "tcp://*:5005", + "sndBufSize": "1000", + "rcvBufSize": "1000", + "rateLogging": "0" + } + } + } + } +} + diff --git a/fairmq/examples/6-multiple-channels/runExample6Broadcaster.cxx b/fairmq/examples/6-multiple-channels/runExample6Broadcaster.cxx new file mode 100644 index 00000000..0f719a28 --- /dev/null +++ b/fairmq/examples/6-multiple-channels/runExample6Broadcaster.cxx @@ -0,0 +1,79 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runExample6Broadcaster.cxx + * + * @since 2013-04-23 + * @author A. Rybalchenko + */ + +#include + +#include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" +#include "FairMQExample6Broadcaster.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +int main(int argc, char** argv) +{ + FairMQExample6Broadcaster broadcaster; + broadcaster.CatchSignals(); + + FairMQProgOptions config; + + try + { + if (config.ParseAll(argc, argv)) + { + return 0; + } + + std::string filename = config.GetValue("config-json-file"); + std::string id = config.GetValue("id"); + + config.UserParser(filename, id); + + broadcaster.fChannels = config.GetFairMQMap(); + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + broadcaster.SetTransport(transportFactory); + + broadcaster.SetProperty(FairMQExample6Broadcaster::Id, id); + + broadcaster.ChangeState("INIT_DEVICE"); + broadcaster.WaitForEndOfState("INIT_DEVICE"); + + broadcaster.ChangeState("INIT_TASK"); + broadcaster.WaitForEndOfState("INIT_TASK"); + + broadcaster.ChangeState("RUN"); + broadcaster.InteractiveStateLoop(); + } + catch (std::exception& e) + { + LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); + return 1; + } + + return 0; +} diff --git a/fairmq/examples/6-multiple-channels/runExample6Sampler.cxx b/fairmq/examples/6-multiple-channels/runExample6Sampler.cxx new file mode 100644 index 00000000..11759ebc --- /dev/null +++ b/fairmq/examples/6-multiple-channels/runExample6Sampler.cxx @@ -0,0 +1,92 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runExample6Sampler.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include + +#include "boost/program_options.hpp" + +#include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" +#include "FairMQExample6Sampler.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +using namespace boost::program_options; + +int main(int argc, char** argv) +{ + FairMQExample6Sampler sampler; + sampler.CatchSignals(); + + FairMQProgOptions config; + + try + { + std::string text; + + options_description samplerOptions("Sampler options"); + samplerOptions.add_options() + ("text", value(&text)->default_value("Hello"), "Text to send out"); + + config.AddToCmdLineOptions(samplerOptions); + + if (config.ParseAll(argc, argv)) + { + return 0; + } + + std::string filename = config.GetValue("config-json-file"); + std::string id = config.GetValue("id"); + + config.UserParser(filename, id); + + sampler.fChannels = config.GetFairMQMap(); + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + sampler.SetTransport(transportFactory); + + sampler.SetProperty(FairMQExample6Sampler::Id, id); + sampler.SetProperty(FairMQExample6Sampler::Text, text); + + sampler.ChangeState("INIT_DEVICE"); + sampler.WaitForEndOfState("INIT_DEVICE"); + + sampler.ChangeState("INIT_TASK"); + sampler.WaitForEndOfState("INIT_TASK"); + + sampler.ChangeState("RUN"); + sampler.InteractiveStateLoop(); + } + catch (std::exception& e) + { + LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); + return 1; + } + + return 0; +} diff --git a/fairmq/examples/6-multiple-channels/runExample6Sink.cxx b/fairmq/examples/6-multiple-channels/runExample6Sink.cxx new file mode 100644 index 00000000..4c8b1447 --- /dev/null +++ b/fairmq/examples/6-multiple-channels/runExample6Sink.cxx @@ -0,0 +1,83 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * runExample6Sink.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include + +#include "boost/program_options.hpp" + +#include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" +#include "FairMQExample6Sink.h" + +#ifdef NANOMSG +#include "FairMQTransportFactoryNN.h" +#else +#include "FairMQTransportFactoryZMQ.h" +#endif + +using namespace boost::program_options; + +int main(int argc, char** argv) +{ + FairMQExample6Sink sink; + sink.CatchSignals(); + + FairMQProgOptions config; + + try + { + if (config.ParseAll(argc, argv)) + { + return 0; + } + + std::string filename = config.GetValue("config-json-file"); + std::string id = config.GetValue("id"); + + config.UserParser(filename, id); + + sink.fChannels = config.GetFairMQMap(); + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + sink.SetTransport(transportFactory); + + sink.SetProperty(FairMQExample6Sink::Id, id); + + sink.ChangeState("INIT_DEVICE"); + sink.WaitForEndOfState("INIT_DEVICE"); + + sink.ChangeState("INIT_TASK"); + sink.WaitForEndOfState("INIT_TASK"); + + sink.ChangeState("RUN"); + sink.InteractiveStateLoop(); + } + catch (std::exception& e) + { + LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); + return 1; + } + + return 0; +} diff --git a/fairmq/examples/README.md b/fairmq/examples/README.md index 6e201e71..c69b4fc2 100644 --- a/fairmq/examples/README.md +++ b/fairmq/examples/README.md @@ -7,18 +7,27 @@ Example 1: Sampler -> Sink -------------------------- A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends data to **Sink** with the **PUSH-PULL** pattern. + Example 2: Sampler -> Processor -> Sink --------------------------------------- A simple topology of three devices - **Sampler**, **Processor** and **Sink**. **Sampler** sends data to one or more **Processor**s, who modify the data and send it to one **Sink**. Transport with the **PUSH-PULL** pattern. + Example 3: DDS -------------- This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices. + Example 4: Copy & Push ---------------------- A topology consisting of one **Sampler** and two **Sink**s. The **Sampler** uses the `Copy` method to send the same data to both sinks with the **PUSH_PULL** pattern. In countrary to the **PUB-PATTERN** pattern, this insures that all receivers are connected and no data is lost, but requires additional sockets. + Example 5: Request & Reply -------------------------- This topology contains two devices that communicate with each other via the **REQUEST-REPLY** pettern. Bidirectional communication via a single socket. + + +Example 6: Multiple Channels +---------------------------- +This example demonstrates how to work with multiple channels and multiplex between them.