mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
Configuration and DDS example/tools updates
- Update DDS example command UI and extract it from example. - Unify address handling via DDS properties for dynamic deployment. - Update DDS docs with the new approach. - Allow `--config-key` to be used to access common config in JSON. - Allow common channel properties to be specified for all sockets. - Update MQ examples and Tuto3 with new config options. - Add start scripts to MQ examples for easier use.
This commit is contained in:
parent
9a6e7f7aaf
commit
86ae4c2da1
|
@ -7,6 +7,7 @@
|
|||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/ex1-sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/startMQEx1.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx1.sh)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
|
|
|
@ -5,4 +5,6 @@ A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends d
|
|||
|
||||
`runExample1Sampler.cxx` and `runExample1Sink.cxx` configure and run the devices in their main function.
|
||||
|
||||
The executables take two required command line parameters: `--id` and `--config-json-file`. The value of `--id` should be a unique identifier and the value for `--config-json-file` a path to a config file. The config file for this example is `ex1-sampler-sink.json` and it contains configuration for the communication channels of the devices. The mapping between a specific device and the configuration (which can contain multiple devices) is done based on the **id**.
|
||||
The executables take two required command line parameters: `--id` and `--mq-config`. The value of `--id` should be a unique identifier and the value for `--mq-config` a path to a config file. The config file for this example is `ex1-sampler-sink.json` and it contains configuration for the communication channels of the devices. The mapping between a specific device and the configuration (which can contain multiple devices) is done based on the **id**.
|
||||
|
||||
For this and the following example, all the commands needed to start the device are contained in the startFairMQExN.sh script (that can also be used for starting the example).
|
||||
|
|
|
@ -45,18 +45,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sampler.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sampler.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sampler.SetProperty(FairMQExample1Sampler::Id, id);
|
||||
sampler.SetConfig(config);
|
||||
sampler.SetProperty(FairMQExample1Sampler::Text, text);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
|
|
|
@ -33,18 +33,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sink.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sink.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sink.SetProperty(FairMQExample1Sink::Id, id);
|
||||
sink.SetConfig(config);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
12
examples/MQ/1-sampler-sink/startMQEx1.sh.in
Executable file
12
examples/MQ/1-sampler-sink/startMQEx1.sh.in
Executable file
|
@ -0,0 +1,12 @@
|
|||
#!/bin/bash
|
||||
ex1config="@CMAKE_BINARY_DIR@/bin/config/ex1-sampler-sink.json"
|
||||
|
||||
SAMPLER="ex1-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --mq-config $ex1config"
|
||||
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
|
||||
|
||||
SINK="ex1-sink"
|
||||
SINK+=" --id sink1"
|
||||
SINK+=" --mq-config $ex1config"
|
||||
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &
|
|
@ -7,6 +7,7 @@
|
|||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/2-sampler-processor-sink/ex2-sampler-processor-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex2-sampler-processor-sink.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/2-sampler-processor-sink/startMQEx2.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx2.sh)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
|
|
|
@ -3,6 +3,8 @@ Example 2: Sampler -> Processor -> Sink
|
|||
|
||||
A simple topology of three devices - **Sampler**, **Processor** and **Sink**. **Sampler** sends data to one or more **Processor**s, who modify the data and send it to one **Sink**. Transport with the **PUSH-PULL** pattern.
|
||||
|
||||
For this example both processor devices share same configuration, and can therefore use same setting from the JSON file. But since their ID still has to be unique, additional command line argument must be used to allow them to share configuration. This parameter is `--config-key`.
|
||||
|
||||
In this example the Sampler is configured to **bind** its output and the Sink is configured to also **bind** its input. This allows us run any number of processors with the same configuration, because they all connect to same Sampler and Sink addresses. Furthermore, it allows adding of processors dynamically during run-time. The PUSH and PULL sockets will handle the data distribution to/from the new devices according to their distribution strategies ([Round-robin output for PUSH](http://api.zeromq.org/4-0:zmq-socket#toc14) and [Fair-queued input for PULL](http://api.zeromq.org/4-0:zmq-socket#toc15)).
|
||||
|
||||
The Sampler sends out a simple text string (its content configurable with `--text` command line parameter, defaul is "Hello"). Each Processor modifies the string by appending its ID to it and send it to the Sink.
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
{
|
||||
"fairMQOptions":
|
||||
{
|
||||
"device":
|
||||
{
|
||||
"devices":
|
||||
[{
|
||||
"id": "sampler1",
|
||||
"channel":
|
||||
{
|
||||
|
@ -18,12 +18,10 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "processor1",
|
||||
"channel":
|
||||
{
|
||||
"key": "processor",
|
||||
"channels":
|
||||
[{
|
||||
"name": "data1",
|
||||
"socket":
|
||||
{
|
||||
|
@ -35,7 +33,6 @@
|
|||
"rateLogging": "0"
|
||||
}
|
||||
},
|
||||
"channel":
|
||||
{
|
||||
"name": "data2",
|
||||
"socket":
|
||||
|
@ -47,41 +44,8 @@
|
|||
"rcvBufSize": "1000",
|
||||
"rateLogging": "0"
|
||||
}
|
||||
}
|
||||
}]
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "processor2",
|
||||
"channel":
|
||||
{
|
||||
"name": "data1",
|
||||
"socket":
|
||||
{
|
||||
"type": "pull",
|
||||
"method": "connect",
|
||||
"address": "tcp://localhost:5555",
|
||||
"sndBufSize": "1000",
|
||||
"rcvBufSize": "1000",
|
||||
"rateLogging": "0"
|
||||
}
|
||||
},
|
||||
"channel":
|
||||
{
|
||||
"name": "data2",
|
||||
"socket":
|
||||
{
|
||||
"type": "push",
|
||||
"method": "connect",
|
||||
"address": "tcp://localhost:5556",
|
||||
"sndBufSize": "1000",
|
||||
"rcvBufSize": "1000",
|
||||
"rateLogging": "0"
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "sink1",
|
||||
"channel":
|
||||
|
@ -97,6 +61,6 @@
|
|||
"rateLogging": "0"
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
}
|
||||
}
|
||||
|
|
|
@ -33,18 +33,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
processor.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
processor.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
processor.SetProperty(FairMQExample2Processor::Id, id);
|
||||
processor.SetConfig(config);
|
||||
|
||||
processor.ChangeState("INIT_DEVICE");
|
||||
processor.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
|
@ -45,18 +45,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sampler.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sampler.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sampler.SetProperty(FairMQExample2Sampler::Id, id);
|
||||
sampler.SetConfig(config);
|
||||
sampler.SetProperty(FairMQExample2Sampler::Text, text);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
|
|
|
@ -33,18 +33,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sink.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sink.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sink.SetProperty(FairMQExample2Sink::Id, id);
|
||||
sink.SetConfig(config);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
24
examples/MQ/2-sampler-processor-sink/startMQEx2.sh.in
Executable file
24
examples/MQ/2-sampler-processor-sink/startMQEx2.sh.in
Executable file
|
@ -0,0 +1,24 @@
|
|||
#!/bin/bash
|
||||
ex2config="@CMAKE_BINARY_DIR@/bin/config/ex2-sampler-processor-sink.json"
|
||||
|
||||
SAMPLER="ex2-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --mq-config $ex2config"
|
||||
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
|
||||
|
||||
PROCESSOR1="ex2-processor"
|
||||
PROCESSOR1+=" --id processor1"
|
||||
PROCESSOR1+=" --mq-config $ex2config"
|
||||
PROCESSOR1+=" --config-key processor"
|
||||
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$PROCESSOR1 &
|
||||
|
||||
PROCESSOR2="ex2-processor"
|
||||
PROCESSOR2+=" --id processor2"
|
||||
PROCESSOR2+=" --mq-config $ex2config"
|
||||
PROCESSOR2+=" --config-key processor"
|
||||
xterm -geometry 80x23+500+330 -hold -e @CMAKE_BINARY_DIR@/bin/$PROCESSOR2 &
|
||||
|
||||
SINK="ex2-sink"
|
||||
SINK+=" --id sink1"
|
||||
SINK+=" --mq-config $ex2config"
|
||||
xterm -geometry 80x23+1000+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &
|
|
@ -8,8 +8,7 @@
|
|||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY)
|
||||
|
||||
add_definitions(-DENABLE_DDS)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds.json ${CMAKE_BINARY_DIR}/bin/config/ex3-dds.json COPYONLY)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
|
@ -18,6 +17,7 @@ Set(INCLUDE_DIRECTORIES
|
|||
${CMAKE_SOURCE_DIR}/fairmq/devices
|
||||
${CMAKE_SOURCE_DIR}/fairmq/tools
|
||||
${CMAKE_SOURCE_DIR}/fairmq/options
|
||||
${CMAKE_SOURCE_DIR}/fairmq/deployment
|
||||
${CMAKE_SOURCE_DIR}/examples/MQ/3-dds
|
||||
${CMAKE_CURRENT_BINARY_DIR}
|
||||
)
|
||||
|
@ -69,7 +69,6 @@ Set(Exe_Names
|
|||
ex3-sampler
|
||||
ex3-processor
|
||||
ex3-sink
|
||||
ex3-command-ui
|
||||
)
|
||||
|
||||
Set(Exe_Source
|
||||
|
@ -77,7 +76,6 @@ Set(Exe_Source
|
|||
runExample3Sampler.cxx
|
||||
runExample3Processor.cxx
|
||||
runExample3Sink.cxx
|
||||
runDDSCommandUI.cxx
|
||||
)
|
||||
|
||||
list(LENGTH Exe_Names _length)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
Example 3: DDS
|
||||
===============
|
||||
|
||||
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices.
|
||||
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual socket reconfiguration of the devices.
|
||||
|
||||
This example is compiled only if the DDS is found by CMake. Custom DDS installation location can be given to CMake like this:
|
||||
|
||||
|
@ -11,52 +11,17 @@ cmake -DDDS_PATH="/path/to/dds/install/dir/" ..
|
|||
|
||||
The description below outlines the minimal steps needed to run the example with DDS. For more details please refer to DDS documentation on [DDS Website](http://dds.gsi.de/).
|
||||
|
||||
##### 1. The devices that bind their sockets need to advertise their bound addresses to DDS by writing a property.
|
||||
##### 1. After beginning the initialization, the device handles the socket addresses and ports distribution via DDS.
|
||||
|
||||
In our example Sampler and Sink bind their sockets. The bound addresses are available after the initial validation. The following code takes the address value and gives it to DDS:
|
||||
The binding channels give their bound addresses to devices interested in connecting to it and connecting sockets wait to receive these addresses. This match happens via the properties specified in the JSON file, which replace addresses in the DDS run. This is done behind the scenes after the initialization has been started and can be called with a single method call:
|
||||
|
||||
```C++
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForInitialValidation();
|
||||
|
||||
CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
|
||||
|
||||
HandleConfigViaDDS(sampler);
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
```
|
||||
|
||||
Same approach for the Sink.
|
||||
|
||||
##### 2. The devices that connect their sockets need to read the addresses from DDS.
|
||||
|
||||
The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above):
|
||||
|
||||
```C++
|
||||
CKeyValue ddsKeyValue;
|
||||
// Sampler properties
|
||||
CKeyValue::valuesMap_t samplerValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
||||
LOG(INFO) << "Subscribing and waiting for sampler output address.";
|
||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
|
||||
while (samplerValues.empty())
|
||||
{
|
||||
unique_lock<mutex> lock(keyMutex);
|
||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
|
||||
}
|
||||
}
|
||||
// Sink properties
|
||||
// ... same as above, but for sinkValues ...
|
||||
|
||||
processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
|
||||
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);
|
||||
```
|
||||
|
||||
After this step each device will have the necessary connection information.
|
||||
In most cases a device will land on a random node and all the addresses and ports are configured dynamicaly. The JSON file does not contain any address information for a DDS run. Instead, addresses are exchanged between the devices dynamically based on the provided property names. E.g. here a processor communicates with the sampler via the *data1* channel. Sampler (binding) communicates its address to the processor(s) (connecting) via the "samplerAddr" property (see `ex3-dds.json` file).
|
||||
|
||||
##### 3. Write DDS hosts file that contains a list of worker nodes to run the topology on (When deploying using the SSH plug-in).
|
||||
|
||||
|
@ -80,6 +45,8 @@ wn0, username@localhost, , /tmp/, 12
|
|||
|
||||
Take a look at `ex3-dds-topology.xml`. It consists of a definition part (properties, tasks, collections and more) and execution part (main). In our example Sampler, Processor and Sink tasks are defines, containing their executables and exchanged properties. The `<main>` of the topology uses the defined tasks. Besides one Sampler and one Sink task, a group containing Processor task is defined. The group has a multiplicity of 10, meaninig 10 Processors will be executed. Each of the Processors will receive the properties with Sampler and Sink addresses.
|
||||
|
||||
If `eth0` network interface (default for binding) is not available on your system, specify another one in the topology file for each task. For example: `--network-interface lo0`.
|
||||
|
||||
##### 5. Start DDS server.
|
||||
|
||||
The DDS server is started with:
|
||||
|
@ -115,7 +82,7 @@ After activation, agents will execute the defined tasks on the worker nodes. Out
|
|||
|
||||
##### 10. (optional) Use example command UI to check state of the devices
|
||||
|
||||
This example includes a simple utility to send command to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDS intercom library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices.
|
||||
This example includes a simple utility to send commands to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDS intercom library to send "check-state" string to all devices, to which they reply with their ID and state they are in. The utility also allows requesting state changes from devices. This can be used as an example of sending/receiving commands or other information to devices.
|
||||
|
||||
To see it in action, start the ex3-dds-command-ui while the topology is running.
|
||||
|
||||
|
|
|
@ -2,6 +2,6 @@
|
|||
# source setup.sh
|
||||
@bash_end@
|
||||
|
||||
sampler, username@localhost, , /tmp/, 1
|
||||
processor, username@localhost, , /tmp/, 10
|
||||
sink, username@localhost, , /tmp/, 1
|
||||
sampler, username@localhost, , /path/to/dds-work-dir/, 1
|
||||
processor, username@localhost, , /path/to/dds-work-dir/, 10
|
||||
sink, username@localhost, , /path/to/dds-work-dir/, 1
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
<topology id="ExampleDDS">
|
||||
|
||||
<property id="SamplerAddress" />
|
||||
<property id="SinkAddress" />
|
||||
<property id="samplerAddr" />
|
||||
<property id="sinkAddr" />
|
||||
|
||||
<declrequirement id="SamplerWorker">
|
||||
<hostPattern type="wnname" value="sampler"/>
|
||||
|
@ -16,27 +16,27 @@
|
|||
</declrequirement>
|
||||
|
||||
<decltask id="Sampler">
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler --id sampler0 --log-color false</exe>
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler --id sampler --mq-config @CMAKE_BINARY_DIR@/bin/config/ex3-dds.json</exe>
|
||||
<requirement>SamplerWorker</requirement>
|
||||
<properties>
|
||||
<id access="write">SamplerAddress</id>
|
||||
<id access="write">samplerAddr</id>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
<decltask id="Processor">
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor --id processor%taskIndex% --log-color false</exe>
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor --id processor_%taskIndex% --config-key processor --mq-config @CMAKE_BINARY_DIR@/bin/config/ex3-dds.json</exe>
|
||||
<requirement>ProcessorWorker</requirement>
|
||||
<properties>
|
||||
<id access="read">SamplerAddress</id>
|
||||
<id access="read">SinkAddress</id>
|
||||
<id access="read">samplerAddr</id>
|
||||
<id access="read">sinkAddr</id>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
<decltask id="Sink">
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink --id sink0 --log-color false</exe>
|
||||
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink --id sink --mq-config @CMAKE_BINARY_DIR@/bin/config/ex3-dds.json</exe>
|
||||
<requirement>SinkWorker</requirement>
|
||||
<properties>
|
||||
<id access="write">SinkAddress</id>
|
||||
<id access="write">sinkAddr</id>
|
||||
</properties>
|
||||
</decltask>
|
||||
|
||||
|
|
42
examples/MQ/3-dds/ex3-dds.json
Normal file
42
examples/MQ/3-dds/ex3-dds.json
Normal file
|
@ -0,0 +1,42 @@
|
|||
{
|
||||
"fairMQOptions":
|
||||
{
|
||||
"devices":
|
||||
[{
|
||||
"id": "sampler",
|
||||
"channel":
|
||||
{
|
||||
"name": "data1",
|
||||
"property": "samplerAddr",
|
||||
"type": "push",
|
||||
"method": "bind"
|
||||
}
|
||||
},
|
||||
{
|
||||
"key": "processor",
|
||||
"channels":
|
||||
[{
|
||||
"name": "data1",
|
||||
"property": "samplerAddr",
|
||||
"type": "pull",
|
||||
"method": "connect"
|
||||
},
|
||||
{
|
||||
"name": "data2",
|
||||
"property": "sinkAddr",
|
||||
"type": "push",
|
||||
"method": "connect"
|
||||
}]
|
||||
},
|
||||
{
|
||||
"id": "sink",
|
||||
"channel":
|
||||
{
|
||||
"name": "data2",
|
||||
"property": "sinkAddr",
|
||||
"type": "pull",
|
||||
"method": "bind"
|
||||
}
|
||||
}]
|
||||
}
|
||||
}
|
|
@ -1,43 +0,0 @@
|
|||
#include "dds_intercom.h" // DDS
|
||||
|
||||
// STD
|
||||
#include <iostream>
|
||||
#include <exception>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
#include <atomic>
|
||||
|
||||
using namespace std;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
try
|
||||
{
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
ddsCustomCmd.subscribe([](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
cout << "Received: \"" << command << "\"" << endl;
|
||||
});
|
||||
|
||||
while (true)
|
||||
{
|
||||
int result = ddsCustomCmd.send("check-state", "");
|
||||
|
||||
if (result == 1)
|
||||
{
|
||||
cerr << "Error sending custom command" << endl;
|
||||
}
|
||||
|
||||
this_thread::sleep_for(chrono::seconds(1));
|
||||
}
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
cerr << "Error: " << e.what() << endl;
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
}
|
|
@ -12,23 +12,13 @@
|
|||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
#include <boost/asio.hpp> // for DDS
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQDDSTools.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample3Processor.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
@ -44,96 +34,16 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
string id = config.GetValue<string>("id");
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
processor.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
processor.SetProperty(FairMQExample3Processor::Id, id);
|
||||
|
||||
// configure data output channel
|
||||
FairMQChannel dataInChannel("pull", "connect", "");
|
||||
dataInChannel.UpdateRateLogging(0);
|
||||
processor.fChannels["data1"].push_back(dataInChannel);
|
||||
|
||||
// configure data output channel
|
||||
FairMQChannel dataOutChannel("push", "connect", "");
|
||||
dataOutChannel.UpdateRateLogging(0);
|
||||
processor.fChannels["data2"].push_back(dataOutChannel);
|
||||
|
||||
// Waiting for DDS properties
|
||||
CKeyValue ddsKeyValue;
|
||||
// Sampler properties
|
||||
CKeyValue::valuesMap_t samplerValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
||||
LOG(INFO) << "Subscribing and waiting for sampler output address.";
|
||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||
ddsKeyValue.getValues("SamplerAddress", &samplerValues);
|
||||
while (samplerValues.empty())
|
||||
{
|
||||
unique_lock<mutex> lock(keyMutex);
|
||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||
ddsKeyValue.getValues("SamplerAddress", &samplerValues);
|
||||
}
|
||||
}
|
||||
// Sink properties
|
||||
CKeyValue::valuesMap_t sinkValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
||||
LOG(INFO) << "Subscribing and waiting for sink input address.";
|
||||
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
|
||||
ddsKeyValue.getValues("SinkAddress", &sinkValues);
|
||||
while (sinkValues.empty())
|
||||
{
|
||||
unique_lock<mutex> lock(keyMutex);
|
||||
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
|
||||
ddsKeyValue.getValues("SinkAddress", &sinkValues);
|
||||
}
|
||||
}
|
||||
|
||||
processor.fChannels.at("data1").at(0).UpdateAddress(samplerValues.begin()->second);
|
||||
processor.fChannels.at("data2").at(0).UpdateAddress(sinkValues.begin()->second);
|
||||
processor.SetConfig(config);
|
||||
|
||||
processor.ChangeState("INIT_DEVICE");
|
||||
HandleConfigViaDDS(processor);
|
||||
processor.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
processor.ChangeState("INIT_TASK");
|
||||
processor.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
// Subscribe on custom commands
|
||||
ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
LOG(INFO) << "Received custom command: " << command;
|
||||
if (command == "check-state")
|
||||
{
|
||||
ddsCustomCmd.send(id + ": " + processor.GetCurrentStateName(), to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(WARN) << "Received unknown command: " << command;
|
||||
LOG(WARN) << "Origin: " << senderId;
|
||||
LOG(WARN) << "Destination: " << condition;
|
||||
}
|
||||
});
|
||||
|
||||
processor.ChangeState("RUN");
|
||||
processor.WaitForEndOfState("RUN");
|
||||
|
||||
processor.ChangeState("RESET_TASK");
|
||||
processor.WaitForEndOfState("RESET_TASK");
|
||||
|
||||
processor.ChangeState("RESET_DEVICE");
|
||||
processor.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
processor.ChangeState("END");
|
||||
runDDSStateHandler(processor);
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
|
|
|
@ -12,24 +12,13 @@
|
|||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
#include <boost/asio.hpp> // for DDS
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQDDSTools.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample3Sampler.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
@ -40,94 +29,21 @@ int main(int argc, char** argv)
|
|||
|
||||
try
|
||||
{
|
||||
string interfaceName; // name of the network interface to use for communication.
|
||||
|
||||
options_description samplerOptions("Sampler options");
|
||||
samplerOptions.add_options()
|
||||
("network-interface", value<string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
|
||||
|
||||
config.AddToCmdLineOptions(samplerOptions);
|
||||
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
string id = config.GetValue<string>("id");
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sampler.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sampler.SetProperty(FairMQExample3Sampler::Id, id);
|
||||
|
||||
// configure data output channel
|
||||
FairMQChannel dataOutChannel("push", "bind", "");
|
||||
dataOutChannel.UpdateRateLogging(0);
|
||||
sampler.fChannels["data1"].push_back(dataOutChannel);
|
||||
|
||||
// Get the IP of the current host and store it for binding.
|
||||
map<string,string> IPs;
|
||||
FairMQ::tools::getHostIPs(IPs);
|
||||
stringstream ss;
|
||||
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
|
||||
if (IPs.count(interfaceName))
|
||||
{
|
||||
ss << "tcp://" << IPs[interfaceName] << ":1";
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(INFO) << ss.str();
|
||||
LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting.";
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
string initialOutputAddress = ss.str();
|
||||
|
||||
// Configure the found host IP for the channel.
|
||||
// TCP port will be chosen randomly during the initialization (binding).
|
||||
sampler.fChannels.at("data1").at(0).UpdateAddress(initialOutputAddress);
|
||||
sampler.SetConfig(config);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForInitialValidation();
|
||||
|
||||
// Advertise the bound addresses via DDS property
|
||||
LOG(INFO) << "Giving sampler output address to DDS.";
|
||||
CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data1").at(0).GetAddress());
|
||||
|
||||
HandleConfigViaDDS(sampler);
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
sampler.ChangeState("INIT_TASK");
|
||||
sampler.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
// Subscribe on custom commands
|
||||
ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
LOG(INFO) << "Received custom command: " << command;
|
||||
if (command == "check-state")
|
||||
{
|
||||
ddsCustomCmd.send(id + ": " + sampler.GetCurrentStateName(), to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(WARN) << "Received unknown command: " << command;
|
||||
LOG(WARN) << "Origin: " << senderId;
|
||||
LOG(WARN) << "Destination: " << condition;
|
||||
}
|
||||
});
|
||||
|
||||
sampler.ChangeState("RUN");
|
||||
sampler.WaitForEndOfState("RUN");
|
||||
|
||||
sampler.ChangeState("RESET_TASK");
|
||||
sampler.WaitForEndOfState("RESET_TASK");
|
||||
|
||||
sampler.ChangeState("RESET_DEVICE");
|
||||
sampler.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
sampler.ChangeState("END");
|
||||
runDDSStateHandler(sampler);
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
|
|
|
@ -12,24 +12,13 @@
|
|||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <iostream>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
#include <map>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
#include <boost/asio.hpp> // for DDS
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQDDSTools.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample3Sink.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
@ -40,94 +29,21 @@ int main(int argc, char** argv)
|
|||
|
||||
try
|
||||
{
|
||||
string interfaceName; // name of the network interface to use for communication.
|
||||
|
||||
options_description sinkOptions("Sink options");
|
||||
sinkOptions.add_options()
|
||||
("network-interface", value<string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
|
||||
|
||||
config.AddToCmdLineOptions(sinkOptions);
|
||||
|
||||
if (config.ParseAll(argc, argv))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
string id = config.GetValue<string>("id");
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sink.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sink.SetProperty(FairMQExample3Sink::Id, id);
|
||||
|
||||
// configure data output channel
|
||||
FairMQChannel dataInChannel("pull", "bind", "");
|
||||
dataInChannel.UpdateRateLogging(0);
|
||||
sink.fChannels["data2"].push_back(dataInChannel);
|
||||
|
||||
// Get the IP of the current host and store it for binding.
|
||||
map<string,string> IPs;
|
||||
FairMQ::tools::getHostIPs(IPs);
|
||||
stringstream ss;
|
||||
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
|
||||
if (IPs.count(interfaceName))
|
||||
{
|
||||
ss << "tcp://" << IPs[interfaceName] << ":1";
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(INFO) << ss.str();
|
||||
LOG(ERROR) << "Could not find provided network interface: \"" << interfaceName << "\"!, exiting.";
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
string initialInputAddress = ss.str();
|
||||
|
||||
// Configure the found host IP for the channel.
|
||||
// TCP port will be chosen randomly during the initialization (binding).
|
||||
sink.fChannels.at("data2").at(0).UpdateAddress(initialInputAddress);
|
||||
sink.SetConfig(config);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForInitialValidation();
|
||||
|
||||
// Advertise the bound address via DDS property
|
||||
LOG(INFO) << "Giving sink input address to DDS.";
|
||||
CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data2").at(0).GetAddress());
|
||||
|
||||
HandleConfigViaDDS(sink);
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
||||
sink.ChangeState("INIT_TASK");
|
||||
sink.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
// Subscribe on custom commands
|
||||
ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
LOG(INFO) << "Received custom command: " << command;
|
||||
if (command == "check-state")
|
||||
{
|
||||
ddsCustomCmd.send(id + ": " + sink.GetCurrentStateName(), to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(WARN) << "Received unknown command: " << command;
|
||||
LOG(WARN) << "Origin: " << senderId;
|
||||
LOG(WARN) << "Destination: " << condition;
|
||||
}
|
||||
});
|
||||
|
||||
sink.ChangeState("RUN");
|
||||
sink.WaitForEndOfState("RUN");
|
||||
|
||||
sink.ChangeState("RESET_TASK");
|
||||
sink.WaitForEndOfState("RESET_TASK");
|
||||
|
||||
sink.ChangeState("RESET_DEVICE");
|
||||
sink.WaitForEndOfState("RESET_DEVICE");
|
||||
|
||||
sink.ChangeState("END");
|
||||
runDDSStateHandler(sink);
|
||||
}
|
||||
catch (exception& e)
|
||||
{
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/4-copypush/ex4-copypush.json ${CMAKE_BINARY_DIR}/bin/config/ex4-copypush.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/4-copypush/startMQEx4.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx4.sh)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
|
|
|
@ -1,14 +1,14 @@
|
|||
{
|
||||
"fairMQOptions":
|
||||
{
|
||||
"device":
|
||||
{
|
||||
"devices":
|
||||
[{
|
||||
"id": "sampler1",
|
||||
"channel":
|
||||
{
|
||||
"name": "data",
|
||||
"socket":
|
||||
{
|
||||
"sockets":
|
||||
[{
|
||||
"type": "push",
|
||||
"method": "bind",
|
||||
"address": "tcp://*:5555",
|
||||
|
@ -16,7 +16,6 @@
|
|||
"rcvBufSize": "1000",
|
||||
"rateLogging": "0"
|
||||
},
|
||||
"socket":
|
||||
{
|
||||
"type": "push",
|
||||
"method": "bind",
|
||||
|
@ -24,11 +23,9 @@
|
|||
"sndBufSize": "1000",
|
||||
"rcvBufSize": "1000",
|
||||
"rateLogging": "0"
|
||||
}
|
||||
}]
|
||||
}
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "sink1",
|
||||
"channel":
|
||||
|
@ -45,8 +42,6 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "sink2",
|
||||
"channel":
|
||||
|
@ -62,7 +57,7 @@
|
|||
"rateLogging": "0"
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -33,18 +33,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sampler.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sampler.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sampler.SetProperty(FairMQExample4Sampler::Id, id);
|
||||
sampler.SetConfig(config);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
|
@ -33,18 +33,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sink.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sink.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sink.SetProperty(FairMQExample4Sink::Id, id);
|
||||
sink.SetConfig(config);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
17
examples/MQ/4-copypush/startMQEx4.sh.in
Executable file
17
examples/MQ/4-copypush/startMQEx4.sh.in
Executable file
|
@ -0,0 +1,17 @@
|
|||
#!/bin/bash
|
||||
ex4config="@CMAKE_BINARY_DIR@/bin/config/ex4-copypush.json"
|
||||
|
||||
SAMPLER="ex4-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --mq-config $ex4config"
|
||||
xterm -geometry 80x23+0+165 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
|
||||
|
||||
SINK1="ex4-sink"
|
||||
SINK1+=" --id sink1"
|
||||
SINK1+=" --mq-config $ex4config"
|
||||
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK1 &
|
||||
|
||||
SINK2="ex4-sink"
|
||||
SINK2+=" --id sink2"
|
||||
SINK2+=" --mq-config $ex4config"
|
||||
xterm -geometry 80x23+500+330 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK2 &
|
|
@ -7,6 +7,7 @@
|
|||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/ex5-req-rep.json ${CMAKE_BINARY_DIR}/bin/config/ex5-req-rep.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/startMQEx5.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx5.sh)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
{
|
||||
"fairMQOptions":
|
||||
{
|
||||
"device":
|
||||
{
|
||||
"devices":
|
||||
[{
|
||||
"id": "client",
|
||||
"channel":
|
||||
{
|
||||
|
@ -18,8 +18,6 @@
|
|||
}
|
||||
}
|
||||
},
|
||||
|
||||
"device":
|
||||
{
|
||||
"id": "server",
|
||||
"channel":
|
||||
|
@ -35,7 +33,7 @@
|
|||
"rateLogging": "0"
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -46,18 +46,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
string filename = config.GetValue<string>("config-json-file");
|
||||
string id = config.GetValue<string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
client.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
client.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
client.SetProperty(FairMQExample5Client::Id, id);
|
||||
client.SetConfig(config);
|
||||
client.SetProperty(FairMQExample5Client::Text, text);
|
||||
|
||||
client.ChangeState("INIT_DEVICE");
|
||||
|
|
|
@ -14,15 +14,12 @@
|
|||
|
||||
#include <iostream>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQParser.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample5Server.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
@ -38,18 +35,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
string filename = config.GetValue<string>("config-json-file");
|
||||
string id = config.GetValue<string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
server.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
server.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
server.SetProperty(FairMQExample5Server::Id, id);
|
||||
server.SetConfig(config);
|
||||
|
||||
server.ChangeState("INIT_DEVICE");
|
||||
server.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
12
examples/MQ/5-req-rep/startMQEx5.sh.in
Executable file
12
examples/MQ/5-req-rep/startMQEx5.sh.in
Executable file
|
@ -0,0 +1,12 @@
|
|||
#!/bin/bash
|
||||
ex5config="@CMAKE_BINARY_DIR@/bin/config/ex5-req-rep.json"
|
||||
|
||||
CLIENT="ex5-client"
|
||||
CLIENT+=" --id client"
|
||||
CLIENT+=" --mq-config $ex5config"
|
||||
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$CLIENT &
|
||||
|
||||
SERVER="ex5-server"
|
||||
SERVER+=" --id server"
|
||||
SERVER+=" --mq-config $ex5config"
|
||||
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SERVER &
|
|
@ -7,6 +7,7 @@
|
|||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/6-multiple-channels/ex6-multiple-channels.json ${CMAKE_BINARY_DIR}/bin/config/ex6-multiple-channels.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/6-multiple-channels/startMQEx6.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx6.sh)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
|
|
|
@ -33,18 +33,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
broadcaster.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
broadcaster.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
broadcaster.SetProperty(FairMQExample6Broadcaster::Id, id);
|
||||
broadcaster.SetConfig(config);
|
||||
|
||||
broadcaster.ChangeState("INIT_DEVICE");
|
||||
broadcaster.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
|
@ -45,18 +45,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sampler.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sampler.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sampler.SetProperty(FairMQExample6Sampler::Id, id);
|
||||
sampler.SetConfig(config);
|
||||
sampler.SetProperty(FairMQExample6Sampler::Text, text);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
|
|
|
@ -37,18 +37,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sink.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sink.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sink.SetProperty(FairMQExample6Sink::Id, id);
|
||||
sink.SetConfig(config);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
17
examples/MQ/6-multiple-channels/startMQEx6.sh.in
Executable file
17
examples/MQ/6-multiple-channels/startMQEx6.sh.in
Executable file
|
@ -0,0 +1,17 @@
|
|||
#!/bin/bash
|
||||
ex6config="@CMAKE_BINARY_DIR@/bin/config/ex6-multiple-channels.json"
|
||||
|
||||
SAMPLER="ex6-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --mq-config $ex6config"
|
||||
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
|
||||
|
||||
SINK="ex6-sink"
|
||||
SINK+=" --id sink1"
|
||||
SINK+=" --mq-config $ex6config"
|
||||
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &
|
||||
|
||||
BROADCASTER="ex6-broadcaster"
|
||||
BROADCASTER+=" --id broadcaster1"
|
||||
BROADCASTER+=" --mq-config $ex6config"
|
||||
xterm -geometry 80x23+250+330 -hold -e @CMAKE_BINARY_DIR@/bin/$BROADCASTER &
|
|
@ -7,6 +7,7 @@
|
|||
################################################################################
|
||||
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/ex8-multipart.json ${CMAKE_BINARY_DIR}/bin/config/ex8-multipart.json)
|
||||
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/8-multipart/startMQEx8.sh.in ${CMAKE_BINARY_DIR}/bin/startMQEx8.sh)
|
||||
|
||||
Set(INCLUDE_DIRECTORIES
|
||||
${CMAKE_SOURCE_DIR}/fairmq
|
||||
|
|
|
@ -14,15 +14,11 @@
|
|||
|
||||
#include <iostream>
|
||||
|
||||
#include "boost/program_options.hpp"
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQParser.h"
|
||||
#include "FairMQProgOptions.h"
|
||||
#include "FairMQExample8Sampler.h"
|
||||
|
||||
using namespace boost::program_options;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
FairMQExample8Sampler sampler;
|
||||
|
@ -37,18 +33,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sampler.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sampler.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sampler.SetProperty(FairMQExample8Sampler::Id, id);
|
||||
sampler.SetConfig(config);
|
||||
|
||||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
|
@ -33,18 +33,7 @@ int main(int argc, char** argv)
|
|||
return 0;
|
||||
}
|
||||
|
||||
std::string filename = config.GetValue<std::string>("config-json-file");
|
||||
std::string id = config.GetValue<std::string>("id");
|
||||
|
||||
config.UserParser<FairMQParser::JSON>(filename, id);
|
||||
|
||||
sink.fChannels = config.GetFairMQMap();
|
||||
|
||||
LOG(INFO) << "PID: " << getpid();
|
||||
|
||||
sink.SetTransport(config.GetValue<std::string>("transport"));
|
||||
|
||||
sink.SetProperty(FairMQExample8Sink::Id, id);
|
||||
sink.SetConfig(config);
|
||||
|
||||
sink.ChangeState("INIT_DEVICE");
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
|
12
examples/MQ/8-multipart/startMQEx8.sh.in
Executable file
12
examples/MQ/8-multipart/startMQEx8.sh.in
Executable file
|
@ -0,0 +1,12 @@
|
|||
#!/bin/bash
|
||||
ex8config="@CMAKE_BINARY_DIR@/bin/config/ex8-multipart.json"
|
||||
|
||||
SAMPLER="ex8-sampler"
|
||||
SAMPLER+=" --id sampler1"
|
||||
SAMPLER+=" --mq-config $ex8config"
|
||||
xterm -geometry 80x23+0+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SAMPLER &
|
||||
|
||||
SINK="ex8-sink"
|
||||
SINK+=" --id sink1"
|
||||
SINK+=" --mq-config $ex8config"
|
||||
xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK &
|
Loading…
Reference in New Issue
Block a user