diff --git a/examples/MQ/1-sampler-sink/CMakeLists.txt b/examples/MQ/1-sampler-sink/CMakeLists.txt index feae4ecd..4f0d8b24 100644 --- a/examples/MQ/1-sampler-sink/CMakeLists.txt +++ b/examples/MQ/1-sampler-sink/CMakeLists.txt @@ -7,6 +7,7 @@ ################################################################################ configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/ex1-sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/startMQEx1.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx1.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq diff --git a/examples/MQ/1-sampler-sink/README.md b/examples/MQ/1-sampler-sink/README.md index 9108a03e..c5410e30 100644 --- a/examples/MQ/1-sampler-sink/README.md +++ b/examples/MQ/1-sampler-sink/README.md @@ -5,4 +5,6 @@ A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends d `runExample1Sampler.cxx` and `runExample1Sink.cxx` configure and run the devices in their main function. -The executables take two required command line parameters: `--id` and `--config-json-file`. The value of `--id` should be a unique identifier and the value for `--config-json-file` a path to a config file. The config file for this example is `ex1-sampler-sink.json` and it contains configuration for the communication channels of the devices. The mapping between a specific device and the configuration (which can contain multiple devices) is done based on the **id**. +The executables take two required command line parameters: `--id` and `--mq-config`. The value of `--id` should be a unique identifier and the value for `--mq-config` a path to a config file. The config file for this example is `ex1-sampler-sink.json` and it contains configuration for the communication channels of the devices. The mapping between a specific device and the configuration (which can contain multiple devices) is done based on the **id**. + +For this and the following example, all the commands needed to start the device are contained in the startFairMQExN.sh script (that can also be used for starting the example). diff --git a/examples/MQ/1-sampler-sink/runExample1Sampler.cxx b/examples/MQ/1-sampler-sink/runExample1Sampler.cxx index 7b83a9c6..726ed93d 100644 --- a/examples/MQ/1-sampler-sink/runExample1Sampler.cxx +++ b/examples/MQ/1-sampler-sink/runExample1Sampler.cxx @@ -45,18 +45,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sampler.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sampler.SetTransport(config.GetValue("transport")); - - sampler.SetProperty(FairMQExample1Sampler::Id, id); + sampler.SetConfig(config); sampler.SetProperty(FairMQExample1Sampler::Text, text); sampler.ChangeState("INIT_DEVICE"); diff --git a/examples/MQ/1-sampler-sink/runExample1Sink.cxx b/examples/MQ/1-sampler-sink/runExample1Sink.cxx index 02e7e6f6..32b22d6f 100644 --- a/examples/MQ/1-sampler-sink/runExample1Sink.cxx +++ b/examples/MQ/1-sampler-sink/runExample1Sink.cxx @@ -33,18 +33,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sink.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sink.SetTransport(config.GetValue("transport")); - - sink.SetProperty(FairMQExample1Sink::Id, id); + sink.SetConfig(config); sink.ChangeState("INIT_DEVICE"); sink.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/1-sampler-sink/startMQEx1.sh.in b/examples/MQ/1-sampler-sink/startMQEx1.sh.in new file mode 100755 index 00000000..3db4d525 --- /dev/null +++ b/examples/MQ/1-sampler-sink/startMQEx1.sh.in @@ -0,0 +1,12 @@ +#!/bin/bash +ex1config="@CMAKE_BINARY_DIR@/bin/config/ex1-sampler-sink.json" + +SAMPLER="ex1-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --mq-config $ex1config" +xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & + +SINK="ex1-sink" +SINK+=" --id sink1" +SINK+=" --mq-config $ex1config" +xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK & \ No newline at end of file diff --git a/examples/MQ/2-sampler-processor-sink/CMakeLists.txt b/examples/MQ/2-sampler-processor-sink/CMakeLists.txt index 8a015465..b3227646 100644 --- a/examples/MQ/2-sampler-processor-sink/CMakeLists.txt +++ b/examples/MQ/2-sampler-processor-sink/CMakeLists.txt @@ -7,6 +7,7 @@ ################################################################################ configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/2-sampler-processor-sink/startMQEx2.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx2.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq diff --git a/examples/MQ/2-sampler-processor-sink/README.md b/examples/MQ/2-sampler-processor-sink/README.md index 7835e919..06fc4640 100644 --- a/examples/MQ/2-sampler-processor-sink/README.md +++ b/examples/MQ/2-sampler-processor-sink/README.md @@ -3,6 +3,8 @@ Example 2: Sampler -> Processor -> Sink A simple topology of three devices - **Sampler**, **Processor** and **Sink**. **Sampler** sends data to one or more **Processor**s, who modify the data and send it to one **Sink**. Transport with the **PUSH-PULL** pattern. +For this example both processor devices share same configuration, and can therefore use same setting from the JSON file. But since their ID still has to be unique, additional command line argument must be used to allow them to share configuration. This parameter is `--config-key`. + In this example the Sampler is configured to **bind** its output and the Sink is configured to also **bind** its input. This allows us run any number of processors with the same configuration, because they all connect to same Sampler and Sink addresses. Furthermore, it allows adding of processors dynamically during run-time. The PUSH and PULL sockets will handle the data distribution to/from the new devices according to their distribution strategies ([Round-robin output for PUSH](http://api.zeromq.org/4-0:zmq-socket#toc14) and [Fair-queued input for PULL](http://api.zeromq.org/4-0:zmq-socket#toc15)). The Sampler sends out a simple text string (its content configurable with `--text` command line parameter, defaul is "Hello"). Each Processor modifies the string by appending its ID to it and send it to the Sink. diff --git a/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json b/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json index 1c096fcd..7ff94836 100644 --- a/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json +++ b/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json @@ -1,8 +1,8 @@ { "fairMQOptions": { - "device": - { + "devices": + [{ "id": "sampler1", "channel": { @@ -18,12 +18,10 @@ } } }, - - "device": { - "id": "processor1", - "channel": - { + "key": "processor", + "channels": + [{ "name": "data1", "socket": { @@ -35,7 +33,6 @@ "rateLogging": "0" } }, - "channel": { "name": "data2", "socket": @@ -47,41 +44,8 @@ "rcvBufSize": "1000", "rateLogging": "0" } - } + }] }, - - "device": - { - "id": "processor2", - "channel": - { - "name": "data1", - "socket": - { - "type": "pull", - "method": "connect", - "address": "tcp://localhost:5555", - "sndBufSize": "1000", - "rcvBufSize": "1000", - "rateLogging": "0" - } - }, - "channel": - { - "name": "data2", - "socket": - { - "type": "push", - "method": "connect", - "address": "tcp://localhost:5556", - "sndBufSize": "1000", - "rcvBufSize": "1000", - "rateLogging": "0" - } - } - }, - - "device": { "id": "sink1", "channel": @@ -97,6 +61,6 @@ "rateLogging": "0" } } - } + }] } } diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx index 3afd08e5..470d5184 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx @@ -33,18 +33,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - processor.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - processor.SetTransport(config.GetValue("transport")); - - processor.SetProperty(FairMQExample2Processor::Id, id); + processor.SetConfig(config); processor.ChangeState("INIT_DEVICE"); processor.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx index b699bab3..6ae3773a 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx @@ -45,18 +45,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sampler.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sampler.SetTransport(config.GetValue("transport")); - - sampler.SetProperty(FairMQExample2Sampler::Id, id); + sampler.SetConfig(config); sampler.SetProperty(FairMQExample2Sampler::Text, text); sampler.ChangeState("INIT_DEVICE"); diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx index 16b9566f..594b5500 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx @@ -33,18 +33,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sink.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sink.SetTransport(config.GetValue("transport")); - - sink.SetProperty(FairMQExample2Sink::Id, id); + sink.SetConfig(config); sink.ChangeState("INIT_DEVICE"); sink.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/2-sampler-processor-sink/startMQEx2.sh.in b/examples/MQ/2-sampler-processor-sink/startMQEx2.sh.in new file mode 100755 index 00000000..0defbd39 --- /dev/null +++ b/examples/MQ/2-sampler-processor-sink/startMQEx2.sh.in @@ -0,0 +1,24 @@ +#!/bin/bash +ex2config="@CMAKE_BINARY_DIR@/bin/config/ex2-sampler-processor-sink.json" + +SAMPLER="ex2-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --mq-config $ex2config" +xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & + +PROCESSOR1="ex2-processor" +PROCESSOR1+=" --id processor1" +PROCESSOR1+=" --mq-config $ex2config" +PROCESSOR1+=" --config-key processor" +xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$PROCESSOR1 & + +PROCESSOR2="ex2-processor" +PROCESSOR2+=" --id processor2" +PROCESSOR2+=" --mq-config $ex2config" +PROCESSOR2+=" --config-key processor" +xterm -geometry 80x23+500+330 -hold -e @CMAKE_BINARY_DIR@/bin/$PROCESSOR2 & + +SINK="ex2-sink" +SINK+=" --id sink1" +SINK+=" --mq-config $ex2config" +xterm -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK & diff --git a/examples/MQ/3-dds/CMakeLists.txt b/examples/MQ/3-dds/CMakeLists.txt index f1dc9e28..5af34b9d 100644 --- a/examples/MQ/3-dds/CMakeLists.txt +++ b/examples/MQ/3-dds/CMakeLists.txt @@ -8,8 +8,7 @@ configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml) configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY) - -add_definitions(-DENABLE_DDS) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds.json ${CMAKE_BINARY_DIR}/bin/config/ex3-dds.json COPYONLY) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq @@ -18,6 +17,7 @@ Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq/devices ${CMAKE_SOURCE_DIR}/fairmq/tools ${CMAKE_SOURCE_DIR}/fairmq/options + ${CMAKE_SOURCE_DIR}/fairmq/deployment ${CMAKE_SOURCE_DIR}/examples/MQ/3-dds ${CMAKE_CURRENT_BINARY_DIR} ) @@ -69,7 +69,6 @@ Set(Exe_Names ex3-sampler ex3-processor ex3-sink - ex3-command-ui ) Set(Exe_Source @@ -77,7 +76,6 @@ Set(Exe_Source runExample3Sampler.cxx runExample3Processor.cxx runExample3Sink.cxx - runDDSCommandUI.cxx ) list(LENGTH Exe_Names _length) diff --git a/examples/MQ/3-dds/README.md b/examples/MQ/3-dds/README.md index 802921b1..55e38625 100644 --- a/examples/MQ/3-dds/README.md +++ b/examples/MQ/3-dds/README.md @@ -1,7 +1,7 @@ Example 3: DDS =============== -This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices. +This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual socket reconfiguration of the devices. This example is compiled only if the DDS is found by CMake. Custom DDS installation location can be given to CMake like this: @@ -11,52 +11,17 @@ cmake -DDDS_PATH="/path/to/dds/install/dir/" .. The description below outlines the minimal steps needed to run the example with DDS. For more details please refer to DDS documentation on [DDS Website](http://dds.gsi.de/). -##### 1. The devices that bind their sockets need to advertise their bound addresses to DDS by writing a property. +##### 1. After beginning the initialization, the device handles the socket addresses and ports distribution via DDS. -In our example Sampler and Sink bind their sockets. The bound addresses are available after the initial validation. The following code takes the address value and gives it to DDS: +The binding channels give their bound addresses to devices interested in connecting to it and connecting sockets wait to receive these addresses. This match happens via the properties specified in the JSON file, which replace addresses in the DDS run. This is done behind the scenes after the initialization has been started and can be called with a single method call: ```C++ sampler.ChangeState("INIT_DEVICE"); -sampler.WaitForInitialValidation(); - -CKeyValue ddsKeyValue; -ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress()); - +HandleConfigViaDDS(sampler); sampler.WaitForEndOfState("INIT_DEVICE"); ``` -Same approach for the Sink. - -##### 2. The devices that connect their sockets need to read the addresses from DDS. - -The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above): - -```C++ -CKeyValue ddsKeyValue; -// Sampler properties -CKeyValue::valuesMap_t samplerValues; -{ - mutex keyMutex; - condition_variable keyCondition; - - LOG(INFO) << "Subscribing and waiting for sampler output address."; - ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); }); - ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues); - while (samplerValues.empty()) - { - unique_lock lock(keyMutex); - keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000)); - ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues); - } -} -// Sink properties -// ... same as above, but for sinkValues ... - -processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second); -processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second); -``` - -After this step each device will have the necessary connection information. +In most cases a device will land on a random node and all the addresses and ports are configured dynamicaly. The JSON file does not contain any address information for a DDS run. Instead, addresses are exchanged between the devices dynamically based on the provided property names. E.g. here a processor communicates with the sampler via the *data1* channel. Sampler (binding) communicates its address to the processor(s) (connecting) via the "samplerAddr" property (see `ex3-dds.json` file). ##### 3. Write DDS hosts file that contains a list of worker nodes to run the topology on (When deploying using the SSH plug-in). @@ -80,6 +45,8 @@ wn0, username@localhost, , /tmp/, 12 Take a look at `ex3-dds-topology.xml`. It consists of a definition part (properties, tasks, collections and more) and execution part (main). In our example Sampler, Processor and Sink tasks are defines, containing their executables and exchanged properties. The `
` of the topology uses the defined tasks. Besides one Sampler and one Sink task, a group containing Processor task is defined. The group has a multiplicity of 10, meaninig 10 Processors will be executed. Each of the Processors will receive the properties with Sampler and Sink addresses. +If `eth0` network interface (default for binding) is not available on your system, specify another one in the topology file for each task. For example: `--network-interface lo0`. + ##### 5. Start DDS server. The DDS server is started with: @@ -115,7 +82,7 @@ After activation, agents will execute the defined tasks on the worker nodes. Out ##### 10. (optional) Use example command UI to check state of the devices -This example includes a simple utility to send command to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDS intercom library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices. +This example includes a simple utility to send commands to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDS intercom library to send "check-state" string to all devices, to which they reply with their ID and state they are in. The utility also allows requesting state changes from devices. This can be used as an example of sending/receiving commands or other information to devices. To see it in action, start the ex3-dds-command-ui while the topology is running. diff --git a/examples/MQ/3-dds/ex3-dds-hosts.cfg b/examples/MQ/3-dds/ex3-dds-hosts.cfg index b6f56488..b9739086 100644 --- a/examples/MQ/3-dds/ex3-dds-hosts.cfg +++ b/examples/MQ/3-dds/ex3-dds-hosts.cfg @@ -2,6 +2,6 @@ # source setup.sh @bash_end@ -sampler, username@localhost, , /tmp/, 1 -processor, username@localhost, , /tmp/, 10 -sink, username@localhost, , /tmp/, 1 +sampler, username@localhost, , /path/to/dds-work-dir/, 1 +processor, username@localhost, , /path/to/dds-work-dir/, 10 +sink, username@localhost, , /path/to/dds-work-dir/, 1 diff --git a/examples/MQ/3-dds/ex3-dds-topology.xml b/examples/MQ/3-dds/ex3-dds-topology.xml index 4ec6ec7e..8d75d96a 100644 --- a/examples/MQ/3-dds/ex3-dds-topology.xml +++ b/examples/MQ/3-dds/ex3-dds-topology.xml @@ -1,7 +1,7 @@ - - + + @@ -16,27 +16,27 @@ - @CMAKE_BINARY_DIR@/bin/ex3-sampler --id sampler0 --log-color false + @CMAKE_BINARY_DIR@/bin/ex3-sampler --id sampler --mq-config @CMAKE_BINARY_DIR@/bin/config/ex3-dds.json SamplerWorker - SamplerAddress + samplerAddr - @CMAKE_BINARY_DIR@/bin/ex3-processor --id processor%taskIndex% --log-color false + @CMAKE_BINARY_DIR@/bin/ex3-processor --id processor_%taskIndex% --config-key processor --mq-config @CMAKE_BINARY_DIR@/bin/config/ex3-dds.json ProcessorWorker - SamplerAddress - SinkAddress + samplerAddr + sinkAddr - @CMAKE_BINARY_DIR@/bin/ex3-sink --id sink0 --log-color false + @CMAKE_BINARY_DIR@/bin/ex3-sink --id sink --mq-config @CMAKE_BINARY_DIR@/bin/config/ex3-dds.json SinkWorker - SinkAddress + sinkAddr diff --git a/examples/MQ/3-dds/ex3-dds.json b/examples/MQ/3-dds/ex3-dds.json new file mode 100644 index 00000000..1f0ccffc --- /dev/null +++ b/examples/MQ/3-dds/ex3-dds.json @@ -0,0 +1,42 @@ +{ + "fairMQOptions": + { + "devices": + [{ + "id": "sampler", + "channel": + { + "name": "data1", + "property": "samplerAddr", + "type": "push", + "method": "bind" + } + }, + { + "key": "processor", + "channels": + [{ + "name": "data1", + "property": "samplerAddr", + "type": "pull", + "method": "connect" + }, + { + "name": "data2", + "property": "sinkAddr", + "type": "push", + "method": "connect" + }] + }, + { + "id": "sink", + "channel": + { + "name": "data2", + "property": "sinkAddr", + "type": "pull", + "method": "bind" + } + }] + } +} diff --git a/examples/MQ/3-dds/runDDSCommandUI.cxx b/examples/MQ/3-dds/runDDSCommandUI.cxx deleted file mode 100644 index 2fd84d3d..00000000 --- a/examples/MQ/3-dds/runDDSCommandUI.cxx +++ /dev/null @@ -1,43 +0,0 @@ -#include "dds_intercom.h" // DDS - -// STD -#include -#include -#include -#include -#include - -using namespace std; -using namespace dds::intercom_api; - -int main(int argc, char* argv[]) -{ - try - { - CCustomCmd ddsCustomCmd; - - ddsCustomCmd.subscribe([](const string& command, const string& condition, uint64_t senderId) - { - cout << "Received: \"" << command << "\"" << endl; - }); - - while (true) - { - int result = ddsCustomCmd.send("check-state", ""); - - if (result == 1) - { - cerr << "Error sending custom command" << endl; - } - - this_thread::sleep_for(chrono::seconds(1)); - } - } - catch (exception& e) - { - cerr << "Error: " << e.what() << endl; - return EXIT_FAILURE; - } - - return EXIT_SUCCESS; -} \ No newline at end of file diff --git a/examples/MQ/3-dds/runExample3Processor.cxx b/examples/MQ/3-dds/runExample3Processor.cxx index b777abd3..46b5c364 100644 --- a/examples/MQ/3-dds/runExample3Processor.cxx +++ b/examples/MQ/3-dds/runExample3Processor.cxx @@ -12,23 +12,13 @@ * @author D. Klein, A. Rybalchenko */ -#include -#include -#include - -#include "boost/program_options.hpp" -#include // for DDS - #include "FairMQLogger.h" +#include "FairMQDDSTools.h" #include "FairMQProgOptions.h" #include "FairMQExample3Processor.h" -#include "FairMQTools.h" - -#include "dds_intercom.h" // DDS using namespace std; using namespace boost::program_options; -using namespace dds::intercom_api; int main(int argc, char** argv) { @@ -44,96 +34,16 @@ int main(int argc, char** argv) return 0; } - string id = config.GetValue("id"); - - LOG(INFO) << "PID: " << getpid(); - - processor.SetTransport(config.GetValue("transport")); - - processor.SetProperty(FairMQExample3Processor::Id, id); - - // configure data output channel - FairMQChannel dataInChannel("pull", "connect", ""); - dataInChannel.UpdateRateLogging(0); - processor.fChannels["data1"].push_back(dataInChannel); - - // configure data output channel - FairMQChannel dataOutChannel("push", "connect", ""); - dataOutChannel.UpdateRateLogging(0); - processor.fChannels["data2"].push_back(dataOutChannel); - - // Waiting for DDS properties - CKeyValue ddsKeyValue; - // Sampler properties - CKeyValue::valuesMap_t samplerValues; - { - mutex keyMutex; - condition_variable keyCondition; - - LOG(INFO) << "Subscribing and waiting for sampler output address."; - ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); }); - ddsKeyValue.getValues("SamplerAddress", &samplerValues); - while (samplerValues.empty()) - { - unique_lock lock(keyMutex); - keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000)); - ddsKeyValue.getValues("SamplerAddress", &samplerValues); - } - } - // Sink properties - CKeyValue::valuesMap_t sinkValues; - { - mutex keyMutex; - condition_variable keyCondition; - - LOG(INFO) << "Subscribing and waiting for sink input address."; - ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); }); - ddsKeyValue.getValues("SinkAddress", &sinkValues); - while (sinkValues.empty()) - { - unique_lock lock(keyMutex); - keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000)); - ddsKeyValue.getValues("SinkAddress", &sinkValues); - } - } - - processor.fChannels.at("data1").at(0).UpdateAddress(samplerValues.begin()->second); - processor.fChannels.at("data2").at(0).UpdateAddress(sinkValues.begin()->second); + processor.SetConfig(config); processor.ChangeState("INIT_DEVICE"); + HandleConfigViaDDS(processor); processor.WaitForEndOfState("INIT_DEVICE"); processor.ChangeState("INIT_TASK"); processor.WaitForEndOfState("INIT_TASK"); - CCustomCmd ddsCustomCmd; - - // Subscribe on custom commands - ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId) - { - LOG(INFO) << "Received custom command: " << command; - if (command == "check-state") - { - ddsCustomCmd.send(id + ": " + processor.GetCurrentStateName(), to_string(senderId)); - } - else - { - LOG(WARN) << "Received unknown command: " << command; - LOG(WARN) << "Origin: " << senderId; - LOG(WARN) << "Destination: " << condition; - } - }); - - processor.ChangeState("RUN"); - processor.WaitForEndOfState("RUN"); - - processor.ChangeState("RESET_TASK"); - processor.WaitForEndOfState("RESET_TASK"); - - processor.ChangeState("RESET_DEVICE"); - processor.WaitForEndOfState("RESET_DEVICE"); - - processor.ChangeState("END"); + runDDSStateHandler(processor); } catch (exception& e) { diff --git a/examples/MQ/3-dds/runExample3Sampler.cxx b/examples/MQ/3-dds/runExample3Sampler.cxx index f2b315e6..62527515 100644 --- a/examples/MQ/3-dds/runExample3Sampler.cxx +++ b/examples/MQ/3-dds/runExample3Sampler.cxx @@ -12,24 +12,13 @@ * @author D. Klein, A. Rybalchenko */ -#include -#include -#include -#include - -#include "boost/program_options.hpp" -#include // for DDS - #include "FairMQLogger.h" +#include "FairMQDDSTools.h" #include "FairMQProgOptions.h" #include "FairMQExample3Sampler.h" -#include "FairMQTools.h" - -#include "dds_intercom.h" // DDS using namespace std; using namespace boost::program_options; -using namespace dds::intercom_api; int main(int argc, char** argv) { @@ -40,94 +29,21 @@ int main(int argc, char** argv) try { - string interfaceName; // name of the network interface to use for communication. - - options_description samplerOptions("Sampler options"); - samplerOptions.add_options() - ("network-interface", value(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)"); - - config.AddToCmdLineOptions(samplerOptions); - if (config.ParseAll(argc, argv)) { return 0; } - string id = config.GetValue("id"); - - LOG(INFO) << "PID: " << getpid(); - - sampler.SetTransport(config.GetValue("transport")); - - sampler.SetProperty(FairMQExample3Sampler::Id, id); - - // configure data output channel - FairMQChannel dataOutChannel("push", "bind", ""); - dataOutChannel.UpdateRateLogging(0); - sampler.fChannels["data1"].push_back(dataOutChannel); - - // Get the IP of the current host and store it for binding. - map IPs; - FairMQ::tools::getHostIPs(IPs); - stringstream ss; - // Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0. - if (IPs.count(interfaceName)) - { - ss << "tcp://" << IPs[interfaceName] << ":1"; - } - else - { - LOG(INFO) << ss.str(); - LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting."; - exit(EXIT_FAILURE); - } - string initialOutputAddress = ss.str(); - - // Configure the found host IP for the channel. - // TCP port will be chosen randomly during the initialization (binding). - sampler.fChannels.at("data1").at(0).UpdateAddress(initialOutputAddress); + sampler.SetConfig(config); sampler.ChangeState("INIT_DEVICE"); - sampler.WaitForInitialValidation(); - - // Advertise the bound addresses via DDS property - LOG(INFO) << "Giving sampler output address to DDS."; - CKeyValue ddsKeyValue; - ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data1").at(0).GetAddress()); - + HandleConfigViaDDS(sampler); sampler.WaitForEndOfState("INIT_DEVICE"); sampler.ChangeState("INIT_TASK"); sampler.WaitForEndOfState("INIT_TASK"); - CCustomCmd ddsCustomCmd; - - // Subscribe on custom commands - ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId) - { - LOG(INFO) << "Received custom command: " << command; - if (command == "check-state") - { - ddsCustomCmd.send(id + ": " + sampler.GetCurrentStateName(), to_string(senderId)); - } - else - { - LOG(WARN) << "Received unknown command: " << command; - LOG(WARN) << "Origin: " << senderId; - LOG(WARN) << "Destination: " << condition; - } - }); - - sampler.ChangeState("RUN"); - sampler.WaitForEndOfState("RUN"); - - sampler.ChangeState("RESET_TASK"); - sampler.WaitForEndOfState("RESET_TASK"); - - sampler.ChangeState("RESET_DEVICE"); - sampler.WaitForEndOfState("RESET_DEVICE"); - - sampler.ChangeState("END"); + runDDSStateHandler(sampler); } catch (exception& e) { diff --git a/examples/MQ/3-dds/runExample3Sink.cxx b/examples/MQ/3-dds/runExample3Sink.cxx index 3f6459fd..aa374d87 100644 --- a/examples/MQ/3-dds/runExample3Sink.cxx +++ b/examples/MQ/3-dds/runExample3Sink.cxx @@ -12,24 +12,13 @@ * @author D. Klein, A. Rybalchenko */ -#include -#include -#include -#include - -#include "boost/program_options.hpp" -#include // for DDS - #include "FairMQLogger.h" +#include "FairMQDDSTools.h" #include "FairMQProgOptions.h" #include "FairMQExample3Sink.h" -#include "FairMQTools.h" - -#include "dds_intercom.h" // DDS using namespace std; using namespace boost::program_options; -using namespace dds::intercom_api; int main(int argc, char** argv) { @@ -40,94 +29,21 @@ int main(int argc, char** argv) try { - string interfaceName; // name of the network interface to use for communication. - - options_description sinkOptions("Sink options"); - sinkOptions.add_options() - ("network-interface", value(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)"); - - config.AddToCmdLineOptions(sinkOptions); - if (config.ParseAll(argc, argv)) { return 0; } - string id = config.GetValue("id"); - - LOG(INFO) << "PID: " << getpid(); - - sink.SetTransport(config.GetValue("transport")); - - sink.SetProperty(FairMQExample3Sink::Id, id); - - // configure data output channel - FairMQChannel dataInChannel("pull", "bind", ""); - dataInChannel.UpdateRateLogging(0); - sink.fChannels["data2"].push_back(dataInChannel); - - // Get the IP of the current host and store it for binding. - map IPs; - FairMQ::tools::getHostIPs(IPs); - stringstream ss; - // Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0. - if (IPs.count(interfaceName)) - { - ss << "tcp://" << IPs[interfaceName] << ":1"; - } - else - { - LOG(INFO) << ss.str(); - LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting."; - exit(EXIT_FAILURE); - } - string initialInputAddress = ss.str(); - - // Configure the found host IP for the channel. - // TCP port will be chosen randomly during the initialization (binding). - sink.fChannels.at("data2").at(0).UpdateAddress(initialInputAddress); + sink.SetConfig(config); sink.ChangeState("INIT_DEVICE"); - sink.WaitForInitialValidation(); - - // Advertise the bound address via DDS property - LOG(INFO) << "Giving sink input address to DDS."; - CKeyValue ddsKeyValue; - ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data2").at(0).GetAddress()); - + HandleConfigViaDDS(sink); sink.WaitForEndOfState("INIT_DEVICE"); sink.ChangeState("INIT_TASK"); sink.WaitForEndOfState("INIT_TASK"); - CCustomCmd ddsCustomCmd; - - // Subscribe on custom commands - ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId) - { - LOG(INFO) << "Received custom command: " << command; - if (command == "check-state") - { - ddsCustomCmd.send(id + ": " + sink.GetCurrentStateName(), to_string(senderId)); - } - else - { - LOG(WARN) << "Received unknown command: " << command; - LOG(WARN) << "Origin: " << senderId; - LOG(WARN) << "Destination: " << condition; - } - }); - - sink.ChangeState("RUN"); - sink.WaitForEndOfState("RUN"); - - sink.ChangeState("RESET_TASK"); - sink.WaitForEndOfState("RESET_TASK"); - - sink.ChangeState("RESET_DEVICE"); - sink.WaitForEndOfState("RESET_DEVICE"); - - sink.ChangeState("END"); + runDDSStateHandler(sink); } catch (exception& e) { diff --git a/examples/MQ/4-copypush/CMakeLists.txt b/examples/MQ/4-copypush/CMakeLists.txt index e15022ed..0a4a70ec 100644 --- a/examples/MQ/4-copypush/CMakeLists.txt +++ b/examples/MQ/4-copypush/CMakeLists.txt @@ -7,6 +7,7 @@ ################################################################################ configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/4-copypush/startMQEx4.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx4.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq diff --git a/examples/MQ/4-copypush/ex4-copypush.json b/examples/MQ/4-copypush/ex4-copypush.json index 8ebc2474..608d4a7c 100644 --- a/examples/MQ/4-copypush/ex4-copypush.json +++ b/examples/MQ/4-copypush/ex4-copypush.json @@ -1,14 +1,14 @@ { "fairMQOptions": { - "device": - { + "devices": + [{ "id": "sampler1", "channel": { "name": "data", - "socket": - { + "sockets": + [{ "type": "push", "method": "bind", "address": "tcp://*:5555", @@ -16,7 +16,6 @@ "rcvBufSize": "1000", "rateLogging": "0" }, - "socket": { "type": "push", "method": "bind", @@ -24,11 +23,9 @@ "sndBufSize": "1000", "rcvBufSize": "1000", "rateLogging": "0" - } + }] } }, - - "device": { "id": "sink1", "channel": @@ -45,8 +42,6 @@ } } }, - - "device": { "id": "sink2", "channel": @@ -62,7 +57,7 @@ "rateLogging": "0" } } - } + }] } } diff --git a/examples/MQ/4-copypush/runExample4Sampler.cxx b/examples/MQ/4-copypush/runExample4Sampler.cxx index 6fa941c6..0dcd55fb 100644 --- a/examples/MQ/4-copypush/runExample4Sampler.cxx +++ b/examples/MQ/4-copypush/runExample4Sampler.cxx @@ -33,18 +33,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sampler.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sampler.SetTransport(config.GetValue("transport")); - - sampler.SetProperty(FairMQExample4Sampler::Id, id); + sampler.SetConfig(config); sampler.ChangeState("INIT_DEVICE"); sampler.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/4-copypush/runExample4Sink.cxx b/examples/MQ/4-copypush/runExample4Sink.cxx index e3809451..c8a76b53 100644 --- a/examples/MQ/4-copypush/runExample4Sink.cxx +++ b/examples/MQ/4-copypush/runExample4Sink.cxx @@ -33,18 +33,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sink.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sink.SetTransport(config.GetValue("transport")); - - sink.SetProperty(FairMQExample4Sink::Id, id); + sink.SetConfig(config); sink.ChangeState("INIT_DEVICE"); sink.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/4-copypush/startMQEx4.sh.in b/examples/MQ/4-copypush/startMQEx4.sh.in new file mode 100755 index 00000000..17aa5ad2 --- /dev/null +++ b/examples/MQ/4-copypush/startMQEx4.sh.in @@ -0,0 +1,17 @@ +#!/bin/bash +ex4config="@CMAKE_BINARY_DIR@/bin/config/ex4-copypush.json" + +SAMPLER="ex4-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --mq-config $ex4config" +xterm -geometry 80x23+0+165 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & + +SINK1="ex4-sink" +SINK1+=" --id sink1" +SINK1+=" --mq-config $ex4config" +xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK1 & + +SINK2="ex4-sink" +SINK2+=" --id sink2" +SINK2+=" --mq-config $ex4config" +xterm -geometry 80x23+500+330 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK2 & diff --git a/examples/MQ/5-req-rep/CMakeLists.txt b/examples/MQ/5-req-rep/CMakeLists.txt index 83f98b56..2108a596 100644 --- a/examples/MQ/5-req-rep/CMakeLists.txt +++ b/examples/MQ/5-req-rep/CMakeLists.txt @@ -7,6 +7,7 @@ ################################################################################ configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/ex5-req-rep.json ${CMAKE_BINARY_DIR}/bin/config/ex5-req-rep.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/startMQEx5.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx5.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq diff --git a/examples/MQ/5-req-rep/ex5-req-rep.json b/examples/MQ/5-req-rep/ex5-req-rep.json index d01b4903..e2006a65 100644 --- a/examples/MQ/5-req-rep/ex5-req-rep.json +++ b/examples/MQ/5-req-rep/ex5-req-rep.json @@ -1,8 +1,8 @@ { "fairMQOptions": { - "device": - { + "devices": + [{ "id": "client", "channel": { @@ -18,8 +18,6 @@ } } }, - - "device": { "id": "server", "channel": @@ -35,7 +33,7 @@ "rateLogging": "0" } } - } + }] } } diff --git a/examples/MQ/5-req-rep/runExample5Client.cxx b/examples/MQ/5-req-rep/runExample5Client.cxx index e82dfcba..345c90d5 100644 --- a/examples/MQ/5-req-rep/runExample5Client.cxx +++ b/examples/MQ/5-req-rep/runExample5Client.cxx @@ -46,18 +46,7 @@ int main(int argc, char** argv) return 0; } - string filename = config.GetValue("config-json-file"); - string id = config.GetValue("id"); - - config.UserParser(filename, id); - - client.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - client.SetTransport(config.GetValue("transport")); - - client.SetProperty(FairMQExample5Client::Id, id); + client.SetConfig(config); client.SetProperty(FairMQExample5Client::Text, text); client.ChangeState("INIT_DEVICE"); diff --git a/examples/MQ/5-req-rep/runExample5Server.cxx b/examples/MQ/5-req-rep/runExample5Server.cxx index 2137e2b1..4079fb3e 100644 --- a/examples/MQ/5-req-rep/runExample5Server.cxx +++ b/examples/MQ/5-req-rep/runExample5Server.cxx @@ -14,15 +14,12 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" #include "FairMQParser.h" #include "FairMQProgOptions.h" #include "FairMQExample5Server.h" using namespace std; -using namespace boost::program_options; int main(int argc, char** argv) { @@ -38,18 +35,7 @@ int main(int argc, char** argv) return 0; } - string filename = config.GetValue("config-json-file"); - string id = config.GetValue("id"); - - config.UserParser(filename, id); - - server.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - server.SetTransport(config.GetValue("transport")); - - server.SetProperty(FairMQExample5Server::Id, id); + server.SetConfig(config); server.ChangeState("INIT_DEVICE"); server.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/5-req-rep/startMQEx5.sh.in b/examples/MQ/5-req-rep/startMQEx5.sh.in new file mode 100755 index 00000000..edbbde2e --- /dev/null +++ b/examples/MQ/5-req-rep/startMQEx5.sh.in @@ -0,0 +1,12 @@ +#!/bin/bash +ex5config="@CMAKE_BINARY_DIR@/bin/config/ex5-req-rep.json" + +CLIENT="ex5-client" +CLIENT+=" --id client" +CLIENT+=" --mq-config $ex5config" +xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$CLIENT & + +SERVER="ex5-server" +SERVER+=" --id server" +SERVER+=" --mq-config $ex5config" +xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SERVER & diff --git a/examples/MQ/6-multiple-channels/CMakeLists.txt b/examples/MQ/6-multiple-channels/CMakeLists.txt index b6f36adf..5d35ba86 100644 --- a/examples/MQ/6-multiple-channels/CMakeLists.txt +++ b/examples/MQ/6-multiple-channels/CMakeLists.txt @@ -7,6 +7,7 @@ ################################################################################ configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/6-multiple-channels/ex6-multiple-channels.json ${CMAKE_BINARY_DIR}/bin/config/ex6-multiple-channels.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/6-multiple-channels/startMQEx6.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx6.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq diff --git a/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx b/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx index 9d702bf8..5976bb57 100644 --- a/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx @@ -33,18 +33,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - broadcaster.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - broadcaster.SetTransport(config.GetValue("transport")); - - broadcaster.SetProperty(FairMQExample6Broadcaster::Id, id); + broadcaster.SetConfig(config); broadcaster.ChangeState("INIT_DEVICE"); broadcaster.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/6-multiple-channels/runExample6Sampler.cxx b/examples/MQ/6-multiple-channels/runExample6Sampler.cxx index 27f7db44..e23051b1 100644 --- a/examples/MQ/6-multiple-channels/runExample6Sampler.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Sampler.cxx @@ -45,18 +45,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sampler.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sampler.SetTransport(config.GetValue("transport")); - - sampler.SetProperty(FairMQExample6Sampler::Id, id); + sampler.SetConfig(config); sampler.SetProperty(FairMQExample6Sampler::Text, text); sampler.ChangeState("INIT_DEVICE"); diff --git a/examples/MQ/6-multiple-channels/runExample6Sink.cxx b/examples/MQ/6-multiple-channels/runExample6Sink.cxx index 66ab1d61..3e6b2d2e 100644 --- a/examples/MQ/6-multiple-channels/runExample6Sink.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Sink.cxx @@ -37,18 +37,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sink.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sink.SetTransport(config.GetValue("transport")); - - sink.SetProperty(FairMQExample6Sink::Id, id); + sink.SetConfig(config); sink.ChangeState("INIT_DEVICE"); sink.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/6-multiple-channels/startMQEx6.sh.in b/examples/MQ/6-multiple-channels/startMQEx6.sh.in new file mode 100755 index 00000000..caf4a54c --- /dev/null +++ b/examples/MQ/6-multiple-channels/startMQEx6.sh.in @@ -0,0 +1,17 @@ +#!/bin/bash +ex6config="@CMAKE_BINARY_DIR@/bin/config/ex6-multiple-channels.json" + +SAMPLER="ex6-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --mq-config $ex6config" +xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & + +SINK="ex6-sink" +SINK+=" --id sink1" +SINK+=" --mq-config $ex6config" +xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK & + +BROADCASTER="ex6-broadcaster" +BROADCASTER+=" --id broadcaster1" +BROADCASTER+=" --mq-config $ex6config" +xterm -geometry 80x23+250+330 -hold -e @CMAKE_BINARY_DIR@/bin/$BROADCASTER & diff --git a/examples/MQ/8-multipart/CMakeLists.txt b/examples/MQ/8-multipart/CMakeLists.txt index b3883d9b..4de95c9c 100644 --- a/examples/MQ/8-multipart/CMakeLists.txt +++ b/examples/MQ/8-multipart/CMakeLists.txt @@ -7,6 +7,7 @@ ################################################################################ configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/ex8-multipart.json ${CMAKE_BINARY_DIR}/bin/config/ex8-multipart.json) +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/startMQEx8.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx8.sh) Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq diff --git a/examples/MQ/8-multipart/runExample8Sampler.cxx b/examples/MQ/8-multipart/runExample8Sampler.cxx index 3c22e01d..ababbccb 100644 --- a/examples/MQ/8-multipart/runExample8Sampler.cxx +++ b/examples/MQ/8-multipart/runExample8Sampler.cxx @@ -14,15 +14,11 @@ #include -#include "boost/program_options.hpp" - #include "FairMQLogger.h" #include "FairMQParser.h" #include "FairMQProgOptions.h" #include "FairMQExample8Sampler.h" -using namespace boost::program_options; - int main(int argc, char** argv) { FairMQExample8Sampler sampler; @@ -37,18 +33,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sampler.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sampler.SetTransport(config.GetValue("transport")); - - sampler.SetProperty(FairMQExample8Sampler::Id, id); + sampler.SetConfig(config); sampler.ChangeState("INIT_DEVICE"); sampler.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/8-multipart/runExample8Sink.cxx b/examples/MQ/8-multipart/runExample8Sink.cxx index a28bd071..1ed32988 100644 --- a/examples/MQ/8-multipart/runExample8Sink.cxx +++ b/examples/MQ/8-multipart/runExample8Sink.cxx @@ -33,18 +33,7 @@ int main(int argc, char** argv) return 0; } - std::string filename = config.GetValue("config-json-file"); - std::string id = config.GetValue("id"); - - config.UserParser(filename, id); - - sink.fChannels = config.GetFairMQMap(); - - LOG(INFO) << "PID: " << getpid(); - - sink.SetTransport(config.GetValue("transport")); - - sink.SetProperty(FairMQExample8Sink::Id, id); + sink.SetConfig(config); sink.ChangeState("INIT_DEVICE"); sink.WaitForEndOfState("INIT_DEVICE"); diff --git a/examples/MQ/8-multipart/startMQEx8.sh.in b/examples/MQ/8-multipart/startMQEx8.sh.in new file mode 100755 index 00000000..c6620056 --- /dev/null +++ b/examples/MQ/8-multipart/startMQEx8.sh.in @@ -0,0 +1,12 @@ +#!/bin/bash +ex8config="@CMAKE_BINARY_DIR@/bin/config/ex8-multipart.json" + +SAMPLER="ex8-sampler" +SAMPLER+=" --id sampler1" +SAMPLER+=" --mq-config $ex8config" +xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER & + +SINK="ex8-sink" +SINK+=" --id sink1" +SINK+=" --mq-config $ex8config" +xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK & \ No newline at end of file