mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 08:41:16 +00:00
Update DDS example to use new library and names
This commit is contained in:
parent
f02936165c
commit
c9388d4837
|
@ -57,8 +57,7 @@ Set(SRCS
|
|||
Set(DEPENDENCIES
|
||||
${DEPENDENCIES}
|
||||
FairMQ
|
||||
dds-key-value-lib
|
||||
dds-custom-cmd-lib
|
||||
dds_intercom_lib
|
||||
)
|
||||
|
||||
set(LIBRARY_NAME FairMQExample3)
|
||||
|
|
|
@ -19,7 +19,7 @@ In our example Sampler and Sink bind their sockets. The bound addresses are avai
|
|||
sampler.ChangeState("INIT_DEVICE");
|
||||
sampler.WaitForInitialValidation();
|
||||
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress());
|
||||
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
|
@ -32,9 +32,9 @@ Same approach for the Sink.
|
|||
The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above):
|
||||
|
||||
```C++
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
CKeyValue ddsKeyValue;
|
||||
// Sampler properties
|
||||
dds::key_value::CKeyValue::valuesMap_t samplerValues;
|
||||
CKeyValue::valuesMap_t samplerValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
@ -92,9 +92,9 @@ dds-server start -s
|
|||
|
||||
Agents are submitted with:
|
||||
```bash
|
||||
dds-submit --rms ssh --ssh-rms-cfg ex3-dds-hosts.cfg
|
||||
dds-submit --rms ssh --config ex3-dds-hosts.cfg
|
||||
```
|
||||
The `--rms` option defines a destination resource management system. The `--ssh-rms-cfg` specifies an SSH plug-in resource definition file.
|
||||
The `--rms` option defines a destination resource management system. The `--config` specifies an SSH plug-in resource definition file.
|
||||
|
||||
##### 7. Set the topology file.
|
||||
|
||||
|
@ -115,7 +115,7 @@ After activation, agents will execute the defined tasks on the worker nodes. Out
|
|||
|
||||
##### 10. (optional) Use example command UI to check state of the devices
|
||||
|
||||
This example includes a simple utility to send command to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDSCustomCmd library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices.
|
||||
This example includes a simple utility to send command to devices and receive replies from them. The code in `runDDSCommandUI.cxx` (compiled as ex3-dds-command-ui) uses the DDS intercom library to send "check-state" string to all devices, to which they reply with their ID and state they are in. This can be used as an example of sending/receiving commands or other information to devices.
|
||||
|
||||
To see it in action, start the ex3-dds-command-ui while the topology is running.
|
||||
|
||||
|
|
|
@ -1,8 +1,7 @@
|
|||
@bash_begin@
|
||||
echo "DBG: SSH ENV Script"
|
||||
#source setup.sh
|
||||
# source setup.sh
|
||||
@bash_end@
|
||||
|
||||
sampler, orybalch@localhost, , /tmp/, 1
|
||||
processor, orybalch@localhost, , /tmp/, 10
|
||||
sink, orybalch@localhost, , /tmp/, 1
|
||||
sampler, username@localhost, , /tmp/, 1
|
||||
processor, username@localhost, , /tmp/, 10
|
||||
sink, username@localhost, , /tmp/, 1
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
// DDS
|
||||
#include "CustomCmd.h"
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
// STD
|
||||
#include <iostream>
|
||||
#include <exception>
|
||||
|
@ -8,8 +8,7 @@
|
|||
#include <atomic>
|
||||
|
||||
using namespace std;
|
||||
using namespace dds;
|
||||
using namespace custom_cmd;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
|
@ -17,14 +16,14 @@ int main(int argc, char* argv[])
|
|||
{
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
ddsCustomCmd.subscribeCmd([](const string& command, const string& condition, uint64_t senderId)
|
||||
ddsCustomCmd.subscribe([](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
cout << "Received: \"" << command << "\"" << endl;
|
||||
});
|
||||
|
||||
while (true)
|
||||
{
|
||||
int result = ddsCustomCmd.sendCmd("check-state", "");
|
||||
int result = ddsCustomCmd.send("check-state", "");
|
||||
|
||||
if (result == 1)
|
||||
{
|
||||
|
@ -34,11 +33,11 @@ int main(int argc, char* argv[])
|
|||
this_thread::sleep_for(chrono::seconds(1));
|
||||
}
|
||||
}
|
||||
catch (exception& _e)
|
||||
catch (exception& e)
|
||||
{
|
||||
cerr << "Error: " << _e.what() << endl;
|
||||
cerr << "Error: " << e.what() << endl;
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
}
|
|
@ -24,11 +24,11 @@
|
|||
#include "FairMQExample3Processor.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#include "KeyValue.h" // DDS Key Value
|
||||
#include "CustomCmd.h" // DDS Custom Commands
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
@ -63,9 +63,9 @@ int main(int argc, char** argv)
|
|||
processor.fChannels["data2"].push_back(dataOutChannel);
|
||||
|
||||
// Waiting for DDS properties
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
CKeyValue ddsKeyValue;
|
||||
// Sampler properties
|
||||
dds::key_value::CKeyValue::valuesMap_t samplerValues;
|
||||
CKeyValue::valuesMap_t samplerValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
@ -81,7 +81,7 @@ int main(int argc, char** argv)
|
|||
}
|
||||
}
|
||||
// Sink properties
|
||||
dds::key_value::CKeyValue::valuesMap_t sinkValues;
|
||||
CKeyValue::valuesMap_t sinkValues;
|
||||
{
|
||||
mutex keyMutex;
|
||||
condition_variable keyCondition;
|
||||
|
@ -106,15 +106,15 @@ int main(int argc, char** argv)
|
|||
processor.ChangeState("INIT_TASK");
|
||||
processor.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
dds::custom_cmd::CCustomCmd ddsCustomCmd;
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
// Subscribe on custom commands
|
||||
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
|
||||
ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
LOG(INFO) << "Received custom command: " << command;
|
||||
if (command == "check-state")
|
||||
{
|
||||
ddsCustomCmd.sendCmd(id + ": " + processor.GetCurrentStateName(), to_string(senderId));
|
||||
ddsCustomCmd.send(id + ": " + processor.GetCurrentStateName(), to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -25,11 +25,11 @@
|
|||
#include "FairMQExample3Sampler.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#include "KeyValue.h" // DDS Key Value
|
||||
#include "CustomCmd.h" // DDS Custom Commands
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
@ -92,7 +92,7 @@ int main(int argc, char** argv)
|
|||
|
||||
// Advertise the bound addresses via DDS property
|
||||
LOG(INFO) << "Giving sampler output address to DDS.";
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SamplerAddress", sampler.fChannels.at("data1").at(0).GetAddress());
|
||||
|
||||
sampler.WaitForEndOfState("INIT_DEVICE");
|
||||
|
@ -100,15 +100,15 @@ int main(int argc, char** argv)
|
|||
sampler.ChangeState("INIT_TASK");
|
||||
sampler.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
dds::custom_cmd::CCustomCmd ddsCustomCmd;
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
// Subscribe on custom commands
|
||||
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
|
||||
ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
LOG(INFO) << "Received custom command: " << command;
|
||||
if (command == "check-state")
|
||||
{
|
||||
ddsCustomCmd.sendCmd(id + ": " + sampler.GetCurrentStateName(), to_string(senderId));
|
||||
ddsCustomCmd.send(id + ": " + sampler.GetCurrentStateName(), to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -25,11 +25,11 @@
|
|||
#include "FairMQExample3Sink.h"
|
||||
#include "FairMQTools.h"
|
||||
|
||||
#include "KeyValue.h" // DDS Key Value
|
||||
#include "CustomCmd.h" // DDS Custom Commands
|
||||
#include "dds_intercom.h" // DDS
|
||||
|
||||
using namespace std;
|
||||
using namespace boost::program_options;
|
||||
using namespace dds::intercom_api;
|
||||
|
||||
int main(int argc, char** argv)
|
||||
{
|
||||
|
@ -92,7 +92,7 @@ int main(int argc, char** argv)
|
|||
|
||||
// Advertise the bound address via DDS property
|
||||
LOG(INFO) << "Giving sink input address to DDS.";
|
||||
dds::key_value::CKeyValue ddsKeyValue;
|
||||
CKeyValue ddsKeyValue;
|
||||
ddsKeyValue.putValue("SinkAddress", sink.fChannels.at("data2").at(0).GetAddress());
|
||||
|
||||
sink.WaitForEndOfState("INIT_DEVICE");
|
||||
|
@ -100,15 +100,15 @@ int main(int argc, char** argv)
|
|||
sink.ChangeState("INIT_TASK");
|
||||
sink.WaitForEndOfState("INIT_TASK");
|
||||
|
||||
dds::custom_cmd::CCustomCmd ddsCustomCmd;
|
||||
CCustomCmd ddsCustomCmd;
|
||||
|
||||
// Subscribe on custom commands
|
||||
ddsCustomCmd.subscribeCmd([&](const string& command, const string& condition, uint64_t senderId)
|
||||
ddsCustomCmd.subscribe([&](const string& command, const string& condition, uint64_t senderId)
|
||||
{
|
||||
LOG(INFO) << "Received custom command: " << command;
|
||||
if (command == "check-state")
|
||||
{
|
||||
ddsCustomCmd.sendCmd(id + ": " + sink.GetCurrentStateName(), to_string(senderId));
|
||||
ddsCustomCmd.send(id + ": " + sink.GetCurrentStateName(), to_string(senderId));
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue
Block a user