diff --git a/examples/MQ/1-sampler-sink/runExample1Sampler.cxx b/examples/MQ/1-sampler-sink/runExample1Sampler.cxx index a9c36e0f..27473638 100644 --- a/examples/MQ/1-sampler-sink/runExample1Sampler.cxx +++ b/examples/MQ/1-sampler-sink/runExample1Sampler.cxx @@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options) ("text", bpo::value()->default_value("Hello"), "Text to send out"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample1Sampler(); } diff --git a/examples/MQ/1-sampler-sink/runExample1Sink.cxx b/examples/MQ/1-sampler-sink/runExample1Sink.cxx index 58c67da4..46656b02 100644 --- a/examples/MQ/1-sampler-sink/runExample1Sink.cxx +++ b/examples/MQ/1-sampler-sink/runExample1Sink.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample1Sink(); } diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx index 8ea57503..5f17ffe5 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample2Processor(); } diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx index 3f8bf7f9..c82d076b 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx @@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options) ("text", bpo::value()->default_value("Hello"), "Text to send out"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample2Sampler(); } diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx index 8a684d25..b688fbca 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample2Sink(); } diff --git a/examples/MQ/3-dds/runExample3Processor.cxx b/examples/MQ/3-dds/runExample3Processor.cxx index 7787a7ae..44bf0808 100644 --- a/examples/MQ/3-dds/runExample3Processor.cxx +++ b/examples/MQ/3-dds/runExample3Processor.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& config) { return new FairMQExample3Processor(); } diff --git a/examples/MQ/3-dds/runExample3Sampler.cxx b/examples/MQ/3-dds/runExample3Sampler.cxx index ad8d8f5e..4067eeea 100644 --- a/examples/MQ/3-dds/runExample3Sampler.cxx +++ b/examples/MQ/3-dds/runExample3Sampler.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample3Sampler(); } diff --git a/examples/MQ/3-dds/runExample3Sink.cxx b/examples/MQ/3-dds/runExample3Sink.cxx index 3e8e7677..651ccb75 100644 --- a/examples/MQ/3-dds/runExample3Sink.cxx +++ b/examples/MQ/3-dds/runExample3Sink.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample3Sink(); } diff --git a/examples/MQ/4-copypush/runExample4Sampler.cxx b/examples/MQ/4-copypush/runExample4Sampler.cxx index 39d7e156..58856d88 100644 --- a/examples/MQ/4-copypush/runExample4Sampler.cxx +++ b/examples/MQ/4-copypush/runExample4Sampler.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample4Sampler(); } diff --git a/examples/MQ/4-copypush/runExample4Sink.cxx b/examples/MQ/4-copypush/runExample4Sink.cxx index 28e3aee6..1179aa0a 100644 --- a/examples/MQ/4-copypush/runExample4Sink.cxx +++ b/examples/MQ/4-copypush/runExample4Sink.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample4Sink(); } diff --git a/examples/MQ/5-req-rep/runExample5Client.cxx b/examples/MQ/5-req-rep/runExample5Client.cxx index f2c8f7a9..8c4b4fbe 100644 --- a/examples/MQ/5-req-rep/runExample5Client.cxx +++ b/examples/MQ/5-req-rep/runExample5Client.cxx @@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options) ("text", bpo::value()->default_value("Hello"), "Text to send out"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample5Client(); } diff --git a/examples/MQ/5-req-rep/runExample5Server.cxx b/examples/MQ/5-req-rep/runExample5Server.cxx index db348f70..c990b001 100644 --- a/examples/MQ/5-req-rep/runExample5Server.cxx +++ b/examples/MQ/5-req-rep/runExample5Server.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample5Server(); } diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx b/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx index a7454e86..87803ddf 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx @@ -22,36 +22,22 @@ using namespace std; FairMQExample6Sink::FairMQExample6Sink() { + OnData("broadcast", &FairMQExample6Sink::HandleBroadcast); + OnData("data", &FairMQExample6Sink::HandleData); } -void FairMQExample6Sink::Run() +bool FairMQExample6Sink::HandleBroadcast(FairMQMessagePtr& msg, int /*index*/) { - std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data", "broadcast" })); + LOG(INFO) << "Received broadcast: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - while (CheckCurrentState(RUNNING)) - { - poller->Poll(100); + return true; +} - if (poller->CheckInput("broadcast", 0)) - { - FairMQMessagePtr msg(NewMessage()); +bool FairMQExample6Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) +{ + LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - if (Receive(msg, "broadcast") > 0) - { - LOG(INFO) << "Received broadcast: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - } - } - - if (poller->CheckInput("data", 0)) - { - FairMQMessagePtr msg(NewMessage()); - - if (Receive(msg, "data") > 0) - { - LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - } - } - } + return true; } FairMQExample6Sink::~FairMQExample6Sink() diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sink.h b/examples/MQ/6-multiple-channels/FairMQExample6Sink.h index c0b5b5ed..e6a77f72 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sink.h +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sink.h @@ -24,7 +24,8 @@ class FairMQExample6Sink : public FairMQDevice virtual ~FairMQExample6Sink(); protected: - virtual void Run(); + bool HandleBroadcast(FairMQMessagePtr&, int); + bool HandleData(FairMQMessagePtr&, int); }; #endif /* FAIRMQEXAMPLE6SINK_H_ */ diff --git a/examples/MQ/6-multiple-channels/ex6-multiple-channels.json b/examples/MQ/6-multiple-channels/ex6-multiple-channels.json index d59e34da..e93c84cb 100644 --- a/examples/MQ/6-multiple-channels/ex6-multiple-channels.json +++ b/examples/MQ/6-multiple-channels/ex6-multiple-channels.json @@ -1,11 +1,11 @@ { "fairMQOptions": { - "device": - { + "devices": + [{ "id": "sampler1", - "channel": - { + "channels": + [{ "name": "data", "socket": { @@ -17,7 +17,6 @@ "rateLogging": "0" } }, - "channel": { "name": "broadcast", "socket": @@ -29,14 +28,12 @@ "rcvBufSize": "1000", "rateLogging": "0" } - } + }] }, - - "device": { "id": "sink1", - "channel": - { + "channels": + [{ "name": "data", "socket": { @@ -48,7 +45,6 @@ "rateLogging": "0" } }, - "channel": { "name": "broadcast", "socket": @@ -60,10 +56,8 @@ "rcvBufSize": "1000", "rateLogging": "0" } - } + }] }, - - "device": { "id": "broadcaster1", "channel": @@ -79,7 +73,7 @@ "rateLogging": "0" } } - } + }] } } diff --git a/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx b/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx index 4d72eb2a..3292df6c 100644 --- a/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample6Broadcaster(); } diff --git a/examples/MQ/6-multiple-channels/runExample6Sampler.cxx b/examples/MQ/6-multiple-channels/runExample6Sampler.cxx index f96ddf11..2298ac36 100644 --- a/examples/MQ/6-multiple-channels/runExample6Sampler.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Sampler.cxx @@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options) ("text", bpo::value()->default_value("Hello"), "Text to send out"); } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample6Sampler(); } diff --git a/examples/MQ/6-multiple-channels/runExample6Sink.cxx b/examples/MQ/6-multiple-channels/runExample6Sink.cxx index 6afe3e1d..2b67c6fc 100644 --- a/examples/MQ/6-multiple-channels/runExample6Sink.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Sink.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample6Sink(); } diff --git a/examples/MQ/8-multipart/ex8-multipart.json b/examples/MQ/8-multipart/ex8-multipart.json index 075aae96..3638e614 100644 --- a/examples/MQ/8-multipart/ex8-multipart.json +++ b/examples/MQ/8-multipart/ex8-multipart.json @@ -1,8 +1,8 @@ { "fairMQOptions": { - "device": - { + "devices": + [{ "id": "sampler1", "channel": { @@ -18,8 +18,6 @@ } } }, - - "device": { "id": "sink1", "channel": @@ -35,6 +33,6 @@ "rateLogging": "0" } } - } + }] } } diff --git a/examples/MQ/8-multipart/runExample8Sampler.cxx b/examples/MQ/8-multipart/runExample8Sampler.cxx index e24bcb5e..1966286f 100644 --- a/examples/MQ/8-multipart/runExample8Sampler.cxx +++ b/examples/MQ/8-multipart/runExample8Sampler.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample8Sampler(); } diff --git a/examples/MQ/8-multipart/runExample8Sink.cxx b/examples/MQ/8-multipart/runExample8Sink.cxx index bf81cfed..dc317120 100644 --- a/examples/MQ/8-multipart/runExample8Sink.cxx +++ b/examples/MQ/8-multipart/runExample8Sink.cxx @@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/) { } -FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) { return new FairMQExample8Sink(); }