FairMQ/examples/MQ/3-dds
Alexey Rybalchenko e3cf68331a FairMQ: Extend Multipart and messaging API
- Extend the multipart API to allow sending vectors of messages or helper
   thin wrapper FairMQParts. See example in examples/MQ/8-multipart.
 - NewMessage() can be used in devices instead of
   fTransportFactory->CreateMessage().
   Possible arguments remain unchanged (no args, size or data+size).
 - Send()/Receive() methods can be used in devices instead of
   fChannels.at("chan").at(i).Send()/Receive():
   Send(msg, "chan", i = 0), Receive(msg, "chan", i = 0).
 - Use the new methods in MQ examples and tests.
 - No breaking changes, but FAIRMQ_INTERFACE_VERSION is incremented to 3
   to allow to check for new methods.
2018-05-02 13:51:55 +02:00
..
CMakeLists.txt Add FlatBuffers & MessagePack examples 2018-05-02 13:51:55 +02:00
ex3-dds-hosts.cfg FairMQ: Extend Multipart and messaging API 2018-05-02 13:51:55 +02:00
ex3-dds-topology.xml Add a acknowledgement channel to Tutorial 3... 2018-05-02 13:51:55 +02:00
FairMQExample3Processor.cxx FairMQ: Extend Multipart and messaging API 2018-05-02 13:51:55 +02:00
FairMQExample3Processor.h Extend DDS Example to use command interface 2018-05-02 13:51:55 +02:00
FairMQExample3Sampler.cxx FairMQ: Extend Multipart and messaging API 2018-05-02 13:51:55 +02:00
FairMQExample3Sampler.h Extend DDS Example to use command interface 2018-05-02 13:51:55 +02:00
FairMQExample3Sink.cxx FairMQ: Extend Multipart and messaging API 2018-05-02 13:51:55 +02:00
FairMQExample3Sink.h Rename /example to /examples and move MQ examples in it 2018-05-02 13:51:55 +02:00
README.md Extend DDS Example to use command interface 2018-05-02 13:51:55 +02:00
runDDSCommandUI.cxx Extend DDS Example to use command interface 2018-05-02 13:51:55 +02:00
runExample3Processor.cxx FairMQ: Extend Multipart and messaging API 2018-05-02 13:51:55 +02:00
runExample3Sampler.cxx FairMQ: Extend Multipart and messaging API 2018-05-02 13:51:55 +02:00
runExample3Sink.cxx FairMQ: Extend Multipart and messaging API 2018-05-02 13:51:55 +02:00

Example 3: DDS

This example demonstrates usage of the Dynamic Deployment System (DDS) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices.

This example is compiled only if the DDS is found by CMake. Custom DDS installation location can be given to CMake like this:

cmake -DDDS_PATH="/path/to/dds/install/dir/" ..

The description below outlines the minimal steps needed to run the example with DDS. For more details please refer to DDS documentation on DDS Website.

1. The devices that bind their sockets need to advertise their bound addresses to DDS by writing a property.

In our example Sampler and Sink bind their sockets. The bound addresses are available after the initial validation. The following code takes the address value and gives it to DDS:

sampler.ChangeState("INIT_DEVICE");
sampler.WaitForInitialValidation();

dds::key_value::CKeyValue ddsKeyValue;
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());

sampler.WaitForEndOfState("INIT_DEVICE");

Same approach for the Sink.

2. The devices that connect their sockets need to read the addresses from DDS.

The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above):

dds::key_value::CKeyValue ddsKeyValue;
// Sampler properties
dds::key_value::CKeyValue::valuesMap_t samplerValues;
{
    mutex keyMutex;
    condition_variable keyCondition;

    LOG(INFO) << "Subscribing and waiting for sampler output address.";
    ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
    ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
    while (samplerValues.empty())
    {
        unique_lock<mutex> lock(keyMutex);
        keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
        ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
    }
}
// Sink properties
// ... same as above, but for sinkValues ...

processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);

After this step each device will have the necessary connection information.

3. Write DDS hosts file that contains a list of worker nodes to run the topology on (When deploying using the SSH plug-in).

We run this example on the local machine for simplicity. The file below defines one worker wn0 with 12 DDS Agents (thus able to accept 12 tasks). The parameters for each worker node are:

  • user-chosen worker ID (must be unique)
  • a host name with or without a login, in a form: login@host.fqdn (password-less SSH access to these hosts must be possible)
  • additional SSH params (can be empty)
  • a remote working directory (most exist on the worker nodes)
  • number of DDS Agents for this worker
@bash_begin@
echo "DBG: SSH ENV Script"
#source setup.sh
@bash_end@

wn0, username@localhost, , /tmp/, 12
4. Write DDS topology file that describes which tasks (processes) to run and their topology and configuration.

Take a look at ex3-dds-topology.xml. It consists of a definition part (properties, tasks, collections and more) and execution part (main). In our example Sampler, Processor and Sink tasks are defines, containing their executables and exchanged properties. The <main> of the topology uses the defined tasks. Besides one Sampler and one Sink task, a group containing Processor task is defined. The group has a multiplicity of 10, meaninig 10 Processors will be executed. Each of the Processors will receive the properties with Sampler and Sink addresses.

5. Start DDS server.

The DDS server is started with:

dds-server start -s
6. Submit DDS Agents (configured in the hosts file).

Agents are submitted with:

dds-submit --rms ssh --ssh-rms-cfg ex3-dds-hosts.cfg

The --rms option defines a destination resource management system. The --ssh-rms-cfg specifies an SSH plug-in resource definition file.

7. Set the topology file.

Point DDS to the topology file:

dds-topology --set ex3-dds-topology.xml
8. Activate the topology.
dds-topology --activate
9. Run

After activation, agents will execute the defined tasks on the worker nodes. Output of the tasks will be stored in the directory that was specified in the hosts file.

10. (optional) Use example command UI to check state of the devices

This example includes a simple utility to send command to devices and receive replies from them. The code in runDDSCommandUI.cxx (compiled as ex3-dds-command-ui) uses the DDSCustomCmd library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices.

To see it in action, start the ex3-dds-command-ui while the topology is running.

11. Stop DDS server/topology.

The execution of tasks can be stopped with:

dds-topology --stop

Or by stopping the DDS server:

dds-server stop

For a more complete DDS documentation please refer to DDS Website.