Extend DDS Example to use command interface

This commit is contained in:
Alexey Rybalchenko 2015-11-23 11:28:15 +01:00
parent 7744d8bc91
commit 90f0e3cb78
13 changed files with 181 additions and 238 deletions

View File

@ -6,7 +6,6 @@
# copied verbatim in the file "LICENSE" #
################################################################################
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-devices.json ${CMAKE_BINARY_DIR}/bin/config/ex3-devices.json)
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-topology.xml ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-topology.xml)
configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/3-dds/ex3-dds-hosts.cfg ${CMAKE_BINARY_DIR}/bin/config/ex3-dds-hosts.cfg COPYONLY)
@ -68,6 +67,7 @@ Set(DEPENDENCIES
${DEPENDENCIES}
FairMQ
dds-key-value-lib
dds-custom-cmd-lib
)
set(LIBRARY_NAME FairMQExample3)
@ -79,6 +79,7 @@ Set(Exe_Names
ex3-sampler-dds
ex3-processor-dds
ex3-sink-dds
ex3-dds-command-ui
)
Set(Exe_Source
@ -86,6 +87,7 @@ Set(Exe_Source
runExample3Sampler.cxx
runExample3Processor.cxx
runExample3Sink.cxx
runDDSCommandUI.cxx
)
list(LENGTH Exe_Names _length)

View File

@ -21,7 +21,6 @@
using namespace std;
FairMQExample3Processor::FairMQExample3Processor()
: fTaskIndex(0)
{
}
@ -32,7 +31,6 @@ void FairMQExample3Processor::CustomCleanup(void *data, void *object)
void FairMQExample3Processor::Run()
{
// Check if we are still in the RUNNING state
while (CheckCurrentState(RUNNING))
{
// Create empty message to hold the input
@ -46,7 +44,7 @@ void FairMQExample3Processor::Run()
// Modify the received string
string* text = new string(static_cast<char*>(input->GetData()), input->GetSize());
*text += " (modified by " + fId + to_string(fTaskIndex) + ")";
*text += " (modified by " + fId + ")";
// Create output message
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
@ -57,50 +55,6 @@ void FairMQExample3Processor::Run()
}
}
void FairMQExample3Processor::SetProperty(const int key, const string& value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQExample3Processor::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQExample3Processor::SetProperty(const int key, const int value)
{
switch (key)
{
case TaskIndex:
fTaskIndex = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQExample3Processor::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
case TaskIndex:
return fTaskIndex;
break;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
FairMQExample3Processor::~FairMQExample3Processor()
{
}

View File

@ -22,25 +22,12 @@
class FairMQExample3Processor : public FairMQDevice
{
public:
enum
{
Text = FairMQDevice::Last,
TaskIndex,
Last
};
FairMQExample3Processor();
virtual ~FairMQExample3Processor();
static void CustomCleanup(void* data, void* hint);
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected:
int fTaskIndex;
virtual void Run();
};

View File

@ -21,7 +21,6 @@
using namespace std;
FairMQExample3Sampler::FairMQExample3Sampler()
: fText()
{
}
@ -32,16 +31,15 @@ void FairMQExample3Sampler::CustomCleanup(void *data, void *object)
void FairMQExample3Sampler::Run()
{
// Check if we are still in the RUNNING state
while (CheckCurrentState(RUNNING))
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
string* text = new string(fText);
string* text = new string("Data");
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
LOG(INFO) << "Sending \"" << fText << "\"";
LOG(INFO) << "Sending \"Data\"";
fChannels.at("data-out").at(0).Send(msg);
}
@ -50,47 +48,3 @@ void FairMQExample3Sampler::Run()
FairMQExample3Sampler::~FairMQExample3Sampler()
{
}
void FairMQExample3Sampler::SetProperty(const int key, const string& value)
{
switch (key)
{
case Text:
fText = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQExample3Sampler::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
case Text:
return fText;
break;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQExample3Sampler::SetProperty(const int key, const int value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQExample3Sampler::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}

View File

@ -22,24 +22,12 @@
class FairMQExample3Sampler : public FairMQDevice
{
public:
enum
{
Text = FairMQDevice::Last,
Last
};
FairMQExample3Sampler();
virtual ~FairMQExample3Sampler();
static void CustomCleanup(void* data, void* hint);
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected:
std::string fText;
virtual void Run();
};

View File

@ -113,7 +113,13 @@ dds-topology --activate
After activation, agents will execute the defined tasks on the worker nodes. Output of the tasks will be stored in the directory that was specified in the hosts file.
##### 10. Stop DDS server/topology.
##### 10. (optional) Use example command UI to check state of the devices
This example includes a simple utility to send command to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDSCustomCmd library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices.
To see it in action, start the ex3-dds-command-ui while the topology is running.
##### 11. Stop DDS server/topology.
The execution of tasks can be stopped with:
```bash

View File

@ -3,4 +3,6 @@ echo "DBG: SSH ENV Script"
#source setup.sh
@bash_end@
wn0, username@localhost, , /tmp/, 12
sampler, username@localhost, , /tmp/, 1
processor, username@localhost, , /tmp/, 10
sink, username@localhost, , /tmp/, 1

View File

@ -3,15 +3,29 @@
<property id="SamplerOutputAddress" />
<property id="SinkInputAddress" />
<declrequirement id="SamplerWorker">
<hostPattern type="wnname" value="sampler"/>
</declrequirement>
<declrequirement id="ProcessorWorker">
<hostPattern type="wnname" value="processor"/>
</declrequirement>
<declrequirement id="SinkWorker">
<hostPattern type="wnname" value="sink"/>
</declrequirement>
<decltask id="Sampler">
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler-dds --id sampler --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe>
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sampler-dds --id sampler0 --log-color-format false</exe>
<requirement>SamplerWorker</requirement>
<properties>
<id access="write">SamplerOutputAddress</id>
</properties>
</decltask>
<decltask id="Processor">
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor-dds --id processor --index %taskIndex% --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe>
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-processor-dds --id processor%taskIndex% --log-color-format false</exe>
<requirement>ProcessorWorker</requirement>
<properties>
<id access="read">SamplerOutputAddress</id>
<id access="read">SinkInputAddress</id>
@ -19,7 +33,8 @@
</decltask>
<decltask id="Sink">
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink-dds --id sink --config-json-file @CMAKE_BINARY_DIR@/bin/config/ex3-devices.json</exe>
<exe reachable="true">@CMAKE_BINARY_DIR@/bin/ex3-sink-dds --id sink0 --log-color-format false</exe>
<requirement>SinkWorker</requirement>
<properties>
<id access="write">SinkInputAddress</id>
</properties>

View File

@ -1,59 +0,0 @@
{
"fairMQOptions":
{
"device":
{
"id": "sampler",
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "bind",
"address": ""
}
}
},
"device":
{
"id": "processor",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "connect",
"address": ""
}
},
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "connect",
"address": ""
}
}
},
"device":
{
"id": "sink",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "bind",
"address": ""
}
}
}
}
}

View File

@ -0,0 +1,44 @@
// DDS
#include "CustomCmd.h"
// STD
#include <iostream>
#include <exception>
#include <sstream>
#include <thread>
#include <atomic>
using namespace std;
using namespace dds;
using namespace custom_cmd;
int main(int argc, char* argv[])
{
try
{
CCustomCmd ddsCustomCmd;
ddsCustomCmd.subscribeCmd([](const string& command, const string& condition, uint64_t senderId)
{
cout << "Received: \"" << command << "\"" << endl;
});
while (true)
{
int result = ddsCustomCmd.sendCmd("check-state", "");
if (result == 1)
{
cerr << "Error sending custom command" << endl;
}
this_thread::sleep_for(chrono::seconds(1));
}
}
catch (exception& _e)
{
cerr << "Error: " << _e.what() << endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}

View File

@ -20,7 +20,6 @@
#include <boost/asio.hpp> // for DDS
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample3Processor.h"
#include "FairMQTools.h"
@ -31,8 +30,10 @@
#include "FairMQTransportFactoryZMQ.h"
#endif
#include "KeyValue.h" // DDS
#include "KeyValue.h" // DDS Key Value
#include "CustomCmd.h" // DDS Custom Commands
using namespace std;
using namespace boost::program_options;
int main(int argc, char** argv)
@ -44,28 +45,35 @@ int main(int argc, char** argv)
try
{
int ddsTaskIndex = 0;
options_description processorOptions("Processor options");
processorOptions.add_options()
("index", value<int>(&ddsTaskIndex)->default_value(0), "DDS task index");
config.AddToCmdLineOptions(processorOptions);
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
processor.fChannels = config.GetFairMQMap();
string id = config.GetValue<string>("id");
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
processor.SetTransport(transportFactory);
processor.SetProperty(FairMQExample3Processor::Id, id);
// configure data output channel
FairMQChannel dataInChannel("pull", "connect", "");
dataInChannel.UpdateRateLogging(0);
processor.fChannels["data-in"].push_back(dataInChannel);
// configure data output channel
FairMQChannel dataOutChannel("push", "connect", "");
dataOutChannel.UpdateRateLogging(0);
processor.fChannels["data-out"].push_back(dataOutChannel);
// Waiting for DDS properties
dds::key_value::CKeyValue ddsKeyValue;
// Sampler properties
@ -104,23 +112,30 @@ int main(int argc, char** argv)
processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second);
processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second);
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
processor.SetTransport(transportFactory);
processor.SetProperty(FairMQExample3Processor::Id, id);
processor.SetProperty(FairMQExample3Processor::TaskIndex, ddsTaskIndex);
processor.ChangeState("INIT_DEVICE");
processor.WaitForEndOfState("INIT_DEVICE");
processor.ChangeState("INIT_TASK");
processor.WaitForEndOfState("INIT_TASK");
dds::custom_cmd::CCustomCmd ddsCustomCmd;
// Subscribe on custom commands
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
{
LOG(INFO) << "Received custom command: " << command << " condition: " << condition << " senderId: " << senderId;
if (command == "check-state")
{
ddsCustomCmd.sendCmd(id + ": " + processor.GetCurrentStateName(), to_string(senderId));
}
else
{
LOG(WARN) << "Received unknown command: " << command;
LOG(WARN) << "Origin: " << senderId;
LOG(WARN) << "Destination: " << condition;
}
});
processor.ChangeState("RUN");
processor.WaitForEndOfState("RUN");
@ -132,7 +147,7 @@ int main(int argc, char** argv)
processor.ChangeState("END");
}
catch (std::exception& e)
catch (exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";

View File

@ -21,7 +21,6 @@
#include <boost/asio.hpp> // for DDS
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample3Sampler.h"
#include "FairMQTools.h"
@ -32,8 +31,10 @@
#include "FairMQTransportFactoryZMQ.h"
#endif
#include "KeyValue.h" // DDS
#include "KeyValue.h" // DDS Key Value
#include "CustomCmd.h" // DDS Custom Commands
using namespace std;
using namespace boost::program_options;
int main(int argc, char** argv)
@ -45,13 +46,11 @@ int main(int argc, char** argv)
try
{
std::string text; // text to be sent for processing.
std::string interfaceName; // name of the network interface to use for communication.
string interfaceName; // name of the network interface to use for communication.
options_description samplerOptions("Sampler options");
samplerOptions.add_options()
("text", value<std::string>(&text)->default_value("Hello"), "Text to send out")
("network-interface", value<std::string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
("network-interface", value<string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
config.AddToCmdLineOptions(samplerOptions);
@ -60,12 +59,7 @@ int main(int argc, char** argv)
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sampler.fChannels = config.GetFairMQMap();
string id = config.GetValue<string>("id");
LOG(INFO) << "PID: " << getpid();
@ -78,7 +72,11 @@ int main(int argc, char** argv)
sampler.SetTransport(transportFactory);
sampler.SetProperty(FairMQExample3Sampler::Id, id);
sampler.SetProperty(FairMQExample3Sampler::Text, text);
// configure data output channel
FairMQChannel dataOutChannel("push", "bind", "");
dataOutChannel.UpdateRateLogging(0);
sampler.fChannels["data-out"].push_back(dataOutChannel);
// Get the IP of the current host and store it for binding.
map<string,string> IPs;
@ -114,6 +112,24 @@ int main(int argc, char** argv)
sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK");
dds::custom_cmd::CCustomCmd ddsCustomCmd;
// Subscribe on custom commands
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
{
LOG(INFO) << "Received custom command: " << command << " condition: " << condition << " senderId: " << senderId;
if (command == "check-state")
{
ddsCustomCmd.sendCmd(id + ": " + sampler.GetCurrentStateName(), to_string(senderId));
}
else
{
LOG(WARN) << "Received unknown command: " << command;
LOG(WARN) << "Origin: " << senderId;
LOG(WARN) << "Destination: " << condition;
}
});
sampler.ChangeState("RUN");
sampler.WaitForEndOfState("RUN");
@ -125,7 +141,7 @@ int main(int argc, char** argv)
sampler.ChangeState("END");
}
catch (std::exception& e)
catch (exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";

View File

@ -21,7 +21,6 @@
#include <boost/asio.hpp> // for DDS
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample3Sink.h"
#include "FairMQTools.h"
@ -32,8 +31,10 @@
#include "FairMQTransportFactoryZMQ.h"
#endif
#include "KeyValue.h" // DDS
#include "KeyValue.h" // DDS Key Value
#include "CustomCmd.h" // DDS Custom Commands
using namespace std;
using namespace boost::program_options;
int main(int argc, char** argv)
@ -45,11 +46,11 @@ int main(int argc, char** argv)
try
{
std::string interfaceName; // name of the network interface to use for communication.
string interfaceName; // name of the network interface to use for communication.
options_description sinkOptions("Sink options");
sinkOptions.add_options()
("network-interface", value<std::string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
("network-interface", value<string>(&interfaceName)->default_value("eth0"), "Name of the network interface to use (e.g. eth0, ib0, wlan0, en0...)");
config.AddToCmdLineOptions(sinkOptions);
@ -58,12 +59,7 @@ int main(int argc, char** argv)
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sink.fChannels = config.GetFairMQMap();
string id = config.GetValue<string>("id");
LOG(INFO) << "PID: " << getpid();
@ -77,6 +73,11 @@ int main(int argc, char** argv)
sink.SetProperty(FairMQExample3Sink::Id, id);
// configure data output channel
FairMQChannel dataInChannel("pull", "bind", "");
dataInChannel.UpdateRateLogging(0);
sink.fChannels["data-in"].push_back(dataInChannel);
// Get the IP of the current host and store it for binding.
map<string,string> IPs;
FairMQ::tools::getHostIPs(IPs);
@ -111,6 +112,24 @@ int main(int argc, char** argv)
sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK");
dds::custom_cmd::CCustomCmd ddsCustomCmd;
// Subscribe on custom commands
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
{
LOG(INFO) << "Received custom command: " << command << " condition: " << condition << " senderId: " << senderId;
if (command == "check-state")
{
ddsCustomCmd.sendCmd(id + ": " + sink.GetCurrentStateName(), to_string(senderId));
}
else
{
LOG(WARN) << "Received unknown command: " << command;
LOG(WARN) << "Origin: " << senderId;
LOG(WARN) << "Destination: " << condition;
}
});
sink.ChangeState("RUN");
sink.WaitForEndOfState("RUN");
@ -122,7 +141,7 @@ int main(int argc, char** argv)
sink.ChangeState("END");
}
catch (std::exception& e)
catch (exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";