diff --git a/examples/MQ/5-req-rep/CMakeLists.txt b/examples/MQ/5-req-rep/CMakeLists.txt index 6c414d6d..a02fdc3e 100644 --- a/examples/MQ/5-req-rep/CMakeLists.txt +++ b/examples/MQ/5-req-rep/CMakeLists.txt @@ -6,6 +6,8 @@ # copied verbatim in the file "LICENSE" # ################################################################################ +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/5-req-rep/ex5-req-rep.json ${CMAKE_BINARY_DIR}/bin/config/ex5-req-rep.json) + Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq ${CMAKE_SOURCE_DIR}/fairmq/devices diff --git a/examples/MQ/5-req-rep/ex5-req-rep.json b/examples/MQ/5-req-rep/ex5-req-rep.json new file mode 100644 index 00000000..d01b4903 --- /dev/null +++ b/examples/MQ/5-req-rep/ex5-req-rep.json @@ -0,0 +1,41 @@ +{ + "fairMQOptions": + { + "device": + { + "id": "client", + "channel": + { + "name": "data", + "socket": + { + "type": "req", + "method": "connect", + "address": "tcp://localhost:5005", + "sndBufSize": "1000", + "rcvBufSize": "1000", + "rateLogging": "0" + } + } + }, + + "device": + { + "id": "server", + "channel": + { + "name": "data", + "socket": + { + "type": "rep", + "method": "bind", + "address": "tcp://*:5005", + "sndBufSize": "1000", + "rcvBufSize": "1000", + "rateLogging": "0" + } + } + } + } +} + diff --git a/examples/MQ/5-req-rep/runExample5Client.cxx b/examples/MQ/5-req-rep/runExample5Client.cxx index 087861ac..0010c0bc 100644 --- a/examples/MQ/5-req-rep/runExample5Client.cxx +++ b/examples/MQ/5-req-rep/runExample5Client.cxx @@ -17,6 +17,8 @@ #include "boost/program_options.hpp" #include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" #include "FairMQExample5Client.h" #ifdef NANOMSG @@ -26,89 +28,66 @@ #endif using namespace std; - -typedef struct DeviceOptions -{ - DeviceOptions() : - text() {} - - string text; -} DeviceOptions_t; - -inline bool parse_cmd_line(int _argc, char* _argv[], DeviceOptions* _options) -{ - if (_options == NULL) - throw runtime_error("Internal error: options' container is empty."); - - namespace bpo = boost::program_options; - bpo::options_description desc("Options"); - desc.add_options() - ("text,t", bpo::value()->default_value("something"), "Text to send to server") - ("help", "Print help messages"); - - bpo::variables_map vm; - bpo::store(bpo::parse_command_line(_argc, _argv, desc), vm); - - if (vm.count("help")) - { - LOG(INFO) << "EPN" << endl << desc; - return false; - } - - bpo::notify(vm); - - if ( vm.count("text") ) - _options->text = vm["text"].as(); - - return true; -} +using namespace boost::program_options; int main(int argc, char** argv) { FairMQExample5Client client; client.CatchSignals(); - DeviceOptions_t options; + FairMQProgOptions config; + try { - if (!parse_cmd_line(argc, argv, &options)) + string text; + + options_description clientOptions("Client options"); + clientOptions.add_options() + ("text", value(&text)->default_value("Hello"), "Text to send out"); + + config.AddToCmdLineOptions(clientOptions); + + if (config.ParseAll(argc, argv)) + { return 0; + } + + string filename = config.GetValue("config-json-file"); + string id = config.GetValue("id"); + + config.UserParser(filename, id); + + client.fChannels = config.GetFairMQMap(); + + LOG(INFO) << "PID: " << getpid(); + +#ifdef NANOMSG + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); +#else + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); +#endif + + client.SetTransport(transportFactory); + + client.SetProperty(FairMQExample5Client::Id, id); + client.SetProperty(FairMQExample5Client::Text, text); + + client.ChangeState("INIT_DEVICE"); + client.WaitForEndOfState("INIT_DEVICE"); + + client.ChangeState("INIT_TASK"); + client.WaitForEndOfState("INIT_TASK"); + + client.ChangeState("RUN"); + client.InteractiveStateLoop(); } catch (exception& e) { LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); return 1; } - LOG(INFO) << "PID: " << getpid(); - -#ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); -#else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); -#endif - - client.SetTransport(transportFactory); - - client.SetProperty(FairMQExample5Client::Id, "client"); - client.SetProperty(FairMQExample5Client::Text, options.text); - client.SetProperty(FairMQExample5Client::NumIoThreads, 1); - - FairMQChannel requestChannel("req", "connect", "tcp://localhost:5005"); - requestChannel.UpdateSndBufSize(10000); - requestChannel.UpdateRcvBufSize(10000); - requestChannel.UpdateRateLogging(1); - - client.fChannels["data"].push_back(requestChannel); - - client.ChangeState("INIT_DEVICE"); - client.WaitForEndOfState("INIT_DEVICE"); - - client.ChangeState("INIT_TASK"); - client.WaitForEndOfState("INIT_TASK"); - - client.ChangeState("RUN"); - client.InteractiveStateLoop(); - return 0; } diff --git a/examples/MQ/5-req-rep/runExample5Server.cxx b/examples/MQ/5-req-rep/runExample5Server.cxx index 541d7216..0ec33cdf 100644 --- a/examples/MQ/5-req-rep/runExample5Server.cxx +++ b/examples/MQ/5-req-rep/runExample5Server.cxx @@ -14,7 +14,11 @@ #include +#include "boost/program_options.hpp" + #include "FairMQLogger.h" +#include "FairMQParser.h" +#include "FairMQProgOptions.h" #include "FairMQExample5Server.h" #ifdef NANOMSG @@ -24,40 +28,57 @@ #endif using namespace std; +using namespace boost::program_options; int main(int argc, char** argv) { FairMQExample5Server server; server.CatchSignals(); - LOG(INFO) << "PID: " << getpid(); + FairMQProgOptions config; + + try + { + if (config.ParseAll(argc, argv)) + { + return 0; + } + + string filename = config.GetValue("config-json-file"); + string id = config.GetValue("id"); + + config.UserParser(filename, id); + + server.fChannels = config.GetFairMQMap(); + + LOG(INFO) << "PID: " << getpid(); #ifdef NANOMSG - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); #else - FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); #endif - server.SetTransport(transportFactory); + server.SetTransport(transportFactory); - server.SetProperty(FairMQExample5Server::Id, "server"); - server.SetProperty(FairMQExample5Server::NumIoThreads, 1); + server.SetProperty(FairMQExample5Server::Id, id); - FairMQChannel replyChannel("rep", "bind", "tcp://*:5005"); - replyChannel.UpdateSndBufSize(10000); - replyChannel.UpdateRcvBufSize(10000); - replyChannel.UpdateRateLogging(1); + server.ChangeState("INIT_DEVICE"); + server.WaitForEndOfState("INIT_DEVICE"); - server.fChannels["data"].push_back(replyChannel); + server.ChangeState("INIT_TASK"); + server.WaitForEndOfState("INIT_TASK"); - server.ChangeState("INIT_DEVICE"); - server.WaitForEndOfState("INIT_DEVICE"); - - server.ChangeState("INIT_TASK"); - server.WaitForEndOfState("INIT_TASK"); - - server.ChangeState("RUN"); - server.InteractiveStateLoop(); + server.ChangeState("RUN"); + server.InteractiveStateLoop(); + } + catch (exception& e) + { + LOG(ERROR) << e.what(); + LOG(INFO) << "Command line options are the following: "; + config.PrintHelp(); + return 1; + } return 0; }