Add orthogonal OK/ERROR states.

Replace state check mutex with atomic.

Update DDS example documentation.
This commit is contained in:
Alexey Rybalchenko
2015-08-24 17:35:30 +02:00
committed by Mohammad Al-Turany
parent a7ab33a10e
commit fbf7dbf2ba
38 changed files with 838 additions and 615 deletions

View File

@@ -18,6 +18,8 @@
#include "FairMQExample3Processor.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample3Processor::FairMQExample3Processor()
: fTaskIndex(0)
{
@@ -25,29 +27,37 @@ FairMQExample3Processor::FairMQExample3Processor()
void FairMQExample3Processor::CustomCleanup(void *data, void *object)
{
delete (std::string*)object;
delete (string*)object;
}
void FairMQExample3Processor::Run()
{
// Check if we are still in the RUNNING state
while (CheckCurrentState(RUNNING))
{
FairMQMessage* input = fTransportFactory->CreateMessage();
fChannels.at("data-in").at(0).Receive(input);
// Create empty message to hold the input
unique_ptr<FairMQMessage> input(fTransportFactory->CreateMessage());
LOG(INFO) << "Received data, processing...";
// Receive the message (blocks until received or interrupted (e.g. by state change)).
// Returns size of the received message or -1 if interrupted.
if (fChannels.at("data-in").at(0).Receive(input) > 0)
{
LOG(INFO) << "Received data, processing...";
std::string* text = new std::string(static_cast<char*>(input->GetData()), input->GetSize());
*text += " (modified by " + fId + std::to_string(fTaskIndex) + ")";
// Modify the received string
string* text = new string(static_cast<char*>(input->GetData()), input->GetSize());
*text += " (modified by " + fId + to_string(fTaskIndex) + ")";
delete input;
// Create output message
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text);
fChannels.at("data-out").at(0).Send(msg);
// Send out the output message
fChannels.at("data-out").at(0).Send(msg);
}
}
}
void FairMQExample3Processor::SetProperty(const int key, const std::string& value)
void FairMQExample3Processor::SetProperty(const int key, const string& value)
{
switch (key)
{
@@ -57,7 +67,7 @@ void FairMQExample3Processor::SetProperty(const int key, const std::string& valu
}
}
std::string FairMQExample3Processor::GetProperty(const int key, const std::string& default_ /*= ""*/)
string FairMQExample3Processor::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{

View File

@@ -18,6 +18,8 @@
#include "FairMQExample3Sampler.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample3Sampler::FairMQExample3Sampler()
: fText()
{
@@ -25,18 +27,19 @@ FairMQExample3Sampler::FairMQExample3Sampler()
void FairMQExample3Sampler::CustomCleanup(void *data, void *object)
{
delete (std::string*)object;
delete (string*)object;
}
void FairMQExample3Sampler::Run()
{
// Check if we are still in the RUNNING state
while (CheckCurrentState(RUNNING))
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
std::string* text = new std::string(fText);
string* text = new string(fText);
FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text);
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
LOG(INFO) << "Sending \"" << fText << "\"";
@@ -48,7 +51,7 @@ FairMQExample3Sampler::~FairMQExample3Sampler()
{
}
void FairMQExample3Sampler::SetProperty(const int key, const std::string& value)
void FairMQExample3Sampler::SetProperty(const int key, const string& value)
{
switch (key)
{
@@ -61,7 +64,7 @@ void FairMQExample3Sampler::SetProperty(const int key, const std::string& value)
}
}
std::string FairMQExample3Sampler::GetProperty(const int key, const std::string& default_ /*= ""*/)
string FairMQExample3Sampler::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{

View File

@@ -28,15 +28,14 @@ void FairMQExample3Sink::Run()
{
while (CheckCurrentState(RUNNING))
{
FairMQMessage* msg = fTransportFactory->CreateMessage();
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
fChannels.at("data-in").at(0).Receive(msg);
LOG(INFO) << "Received message: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
<< "\"";
delete msg;
if (fChannels.at("data-in").at(0).Receive(msg) > 0)
{
LOG(INFO) << "Received message: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
<< "\"";
}
}
}

View File

@@ -3,6 +3,119 @@ Example 3: DDS
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices.
The description below outlines the minimal steps needed to run the example. For more detailed DDS documentation please visit [DDS Website](http://dds.gsi.de/).
The description below outlines the minimal steps needed to run the example with DDS. For more details please refer to DDS documentation on [DDS Website](http://dds.gsi.de/).
The topology run by DDS is defined in `ex3-dds-topology.xml` and the hosts to run it on are configured in `ex3-dds-hosts.cfg`. The topology starts one Sampler, one Sink and a group of 10 Processors.
##### 1. The devices that bind their sockets need to advertise their bound addresses to DDS by writing a property.
In our example Sampler and Sink bind their sockets. The bound addresses are available after the initial validation. The following code takes the address value and gives it to DDS:
```C++
sampler.ChangeState("INIT_DEVICE");
sampler.WaitForInitialValidation();
dds::key_value::CKeyValue ddsKeyValue;
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
sampler.WaitForEndOfState("INIT_DEVICE");
```
Same approach for the Sink.
##### 2. The devices that connect their sockets need to read the addresses from DDS.
The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above):
```C++
dds::key_value::CKeyValue ddsKeyValue;
// Sampler properties
dds::key_value::CKeyValue::valuesMap_t samplerValues;
{
mutex keyMutex;
condition_variable keyCondition;
LOG(INFO) << "Subscribing and waiting for sampler output address.";
ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); });
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
while (samplerValues.empty())
{
unique_lock<mutex> lock(keyMutex);
keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000));
ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues);
}
}
// Sink properties
// ... same as above, but for sinkValues ...
processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);
```
After this step each device will have the necessary connection information.
##### 3. Write DDS hosts file that contains a list of worker nodes to run the topology on (When deploying using the SSH plug-in).
We run this example on the local machine for simplicity. The file below defines one worker `wn0` with 12 DDS Agents (thus able to accept 12 tasks). The parameters for each worker node are:
- user-chosen worker ID (must be unique)
- a host name with or without a login, in a form: login@host.fqdn
- additional SSH params (can be empty)
- a remote working directory
- number of DDS Agents for this worker
```bash
@bash_begin@
echo "DBG: SSH ENV Script"
#source setup.sh
@bash_end@
wn0, username@localhost, , /tmp/, 12
```
##### 4. Write DDS topology file that describes which tasks (processes) to run and their topology and configuration.
Take a look at `ex3-dds-topology.xml`. It consists of a definition part (properties, tasks, collections and more) and execution part (main). In our example Sampler, Processor and Sink tasks are defines, containing their executables and exchanged properties. The `<main>` of the topology uses the defined tasks. Besides one Sampler and one Sink task, a group containing Processor task is defined. The group has a multiplicity of 10, meaninig 10 Processors will be executed. Each of the Processors will receive the properties with Sampler and Sink addresses.
##### 5. Start DDS server.
The DDS server is started with:
```bash
dds-server start -s
```
##### 6. Submit DDS Agents (configured in the hosts file).
Agents are submitted with:
```bash
dds-submit --rms ssh --ssh-rms-cfg ex3-dds-hosts.cfg
```
The `--rms` option defines a destination resource management system. The `--ssh-rms-cfg` specifies an SSH plug-in resource definition file.
##### 7. Set the topology file.
Point DDS to the topology file:
```bash
dds-topology --set ex3-dds-topology.xml
```
##### 8. Activate the topology.
```bash
dds-topology --activate
```
##### 9. Run
After activation, agents will execute the defined tasks on the worker nodes. Output of the tasks will be stored in the directory that was specified in the hosts file.
##### 10. Stop DDS server/topology.
The execution of tasks can be stopped with:
```bash
dds-topology --stop
```
Or by stopping the DDS server:
```bash
dds-server stop
```
For a more complete DDS documentation please refer to [DDS Website](http://dds.gsi.de/).

View File

@@ -3,4 +3,4 @@ echo "DBG: SSH ENV Script"
#source setup.sh
@bash_end@
worker, username@localhost, , /tmp/, 12
wn0, username@localhost, , /tmp/, 12

View File

@@ -83,13 +83,20 @@ int main(int argc, char** argv)
FairMQ::tools::getHostIPs(IPs);
stringstream ss;
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
if (IPs.count("ib0")) {
ss << "tcp://" << IPs["ib0"] << ":1";
} else if (IPs.count("eth0")) {
ss << "tcp://" << IPs["eth0"] << ":1";
} else if (IPs.count("wlan0")) {
ss << "tcp://" << IPs["wlan0"] << ":1";
} else {
if (IPs.count("ib0"))
{
ss << "tcp://" << IPs["ib0"] << ":1";
}
else if (IPs.count("eth0"))
{
ss << "tcp://" << IPs["eth0"] << ":1";
}
else if (IPs.count("wlan0"))
{
ss << "tcp://" << IPs["wlan0"] << ":1";
}
else
{
LOG(INFO) << ss.str();
LOG(ERROR) << "Could not find ib0, eth0 or wlan0";
exit(EXIT_FAILURE);

View File

@@ -74,13 +74,20 @@ int main(int argc, char** argv)
FairMQ::tools::getHostIPs(IPs);
stringstream ss;
// Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0.
if (IPs.count("ib0")) {
ss << "tcp://" << IPs["ib0"] << ":1";
} else if (IPs.count("eth0")) {
ss << "tcp://" << IPs["eth0"] << ":1";
} else if (IPs.count("wlan0")) {
ss << "tcp://" << IPs["wlan0"] << ":1";
} else {
if (IPs.count("ib0"))
{
ss << "tcp://" << IPs["ib0"] << ":1";
}
else if (IPs.count("eth0"))
{
ss << "tcp://" << IPs["eth0"] << ":1";
}
else if (IPs.count("wlan0"))
{
ss << "tcp://" << IPs["wlan0"] << ":1";
}
else
{
LOG(INFO) << ss.str();
LOG(ERROR) << "Could not find ib0, eth0 or wlan0";
exit(EXIT_FAILURE);