diff --git a/examples/MQ/1-sampler-sink/CMakeLists.txt b/examples/MQ/1-sampler-sink/CMakeLists.txt index 3fa2919b..a392cddf 100644 --- a/examples/MQ/1-sampler-sink/CMakeLists.txt +++ b/examples/MQ/1-sampler-sink/CMakeLists.txt @@ -6,9 +6,9 @@ # copied verbatim in the file "LICENSE" # ################################################################################ -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/ex1-sampler-sink.json +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/ex1-sampler-sink.json ${CMAKE_BINARY_DIR}/bin/config/ex1-sampler-sink.json) -configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/startMQEx1.sh.in +configure_file(${CMAKE_SOURCE_DIR}/examples/MQ/1-sampler-sink/startMQEx1.sh.in ${CMAKE_BINARY_DIR}/bin/examples/MQ/1-sampler-sink/startMQEx1.sh) Set(INCLUDE_DIRECTORIES diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx index 669b6d88..963f96cd 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.cxx @@ -12,12 +12,12 @@ * @author A. Rybalchenko */ -#include // unique_ptr - -#include +#include // this_thread::sleep_for +#include #include "FairMQExample1Sampler.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig using namespace std; @@ -26,71 +26,40 @@ FairMQExample1Sampler::FairMQExample1Sampler() { } -void FairMQExample1Sampler::CustomCleanup(void* /*data*/, void *object) +void FairMQExample1Sampler::InitTask() { - delete static_cast(object); + // Get the fText value from the command line option (via fConfig) + fText = fConfig->GetValue("text"); } -void FairMQExample1Sampler::Run() +bool FairMQExample1Sampler::ConditionalRun() { - while (CheckCurrentState(RUNNING)) + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // create a copy of the data with new(), that will be deleted after the transfer is complete + string* text = new string(fText); + + // create message object with a pointer to the data buffer, + // its size, + // custom deletion function (called when transfer is done), + // and pointer to the object managing the data buffer + FairMQMessagePtr msg(NewMessage(const_cast(text->c_str()), + text->length(), + [](void* /*data*/, void* object) { delete static_cast(object); }, + text)); + + LOG(INFO) << "Sending \"" << fText << "\""; + + // in case of error or transfer interruption, return false to go to IDLE state + // successfull transfer will return number of bytes transfered (can be 0 if sending an empty message). + if (Send(msg, "data") < 0) { - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - - string* text = new string(fText); - - unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - - LOG(INFO) << "Sending \"" << fText << "\""; - - Send(msg, "data"); + return false; } + + return true; } FairMQExample1Sampler::~FairMQExample1Sampler() { } - -void FairMQExample1Sampler::SetProperty(const int key, const string& value) -{ - switch (key) - { - case Text: - fText = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -string FairMQExample1Sampler::GetProperty(const int key, const string& default_ /*= ""*/) -{ - switch (key) - { - case Text: - return fText; - break; - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -void FairMQExample1Sampler::SetProperty(const int key, const int value) -{ - switch (key) - { - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -int FairMQExample1Sampler::GetProperty(const int key, const int default_ /*= 0*/) -{ - switch (key) - { - default: - return FairMQDevice::GetProperty(key, default_); - } -} diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.h b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.h index fd6665c3..7aebce39 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sampler.h +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sampler.h @@ -22,25 +22,14 @@ class FairMQExample1Sampler : public FairMQDevice { public: - enum - { - Text = FairMQDevice::Last, - Last - }; FairMQExample1Sampler(); virtual ~FairMQExample1Sampler(); - static void CustomCleanup(void* data, void* hint); - - virtual void SetProperty(const int key, const std::string& value); - virtual std::string GetProperty(const int key, const std::string& default_ = ""); - virtual void SetProperty(const int key, const int value); - virtual int GetProperty(const int key, const int default_ = 0); - protected: std::string fText; - virtual void Run(); + virtual void InitTask(); + virtual bool ConditionalRun(); }; #endif /* FAIRMQEXAMPLE1SAMPLER_H_ */ diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx b/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx index 24783c46..5aaa47e9 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sink.cxx @@ -19,21 +19,17 @@ using namespace std; FairMQExample1Sink::FairMQExample1Sink() { + // register a handler for data arriving on "data" channel + OnData("data", &FairMQExample1Sink::HandleData); } -void FairMQExample1Sink::Run() +// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0) +bool FairMQExample1Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { - while (CheckCurrentState(RUNNING)) - { - unique_ptr msg(NewMessage()); + LOG(INFO) << "Received: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - if (Receive(msg, "data") >= 0) - { - LOG(INFO) << "Received message: \"" - << string(static_cast(msg->GetData()), msg->GetSize()) - << "\""; - } - } + // return true if want to be called again (otherwise go to IDLE state) + return true; } FairMQExample1Sink::~FairMQExample1Sink() diff --git a/examples/MQ/1-sampler-sink/FairMQExample1Sink.h b/examples/MQ/1-sampler-sink/FairMQExample1Sink.h index 34847903..bd6f35ea 100644 --- a/examples/MQ/1-sampler-sink/FairMQExample1Sink.h +++ b/examples/MQ/1-sampler-sink/FairMQExample1Sink.h @@ -24,7 +24,7 @@ class FairMQExample1Sink : public FairMQDevice virtual ~FairMQExample1Sink(); protected: - virtual void Run(); + bool HandleData(FairMQMessagePtr&, int); }; #endif /* FAIRMQEXAMPLE1SINK_H_ */ diff --git a/examples/MQ/1-sampler-sink/runExample1Sampler.cxx b/examples/MQ/1-sampler-sink/runExample1Sampler.cxx index 7f83e569..a9c36e0f 100644 --- a/examples/MQ/1-sampler-sink/runExample1Sampler.cxx +++ b/examples/MQ/1-sampler-sink/runExample1Sampler.cxx @@ -5,55 +5,19 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample1Sampler.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample1Sampler.h" -using namespace boost::program_options; +namespace bpo = boost::program_options; -int main(int argc, char** argv) +void addCustomOptions(bpo::options_description& options) { - try - { - std::string text; - - options_description samplerOptions("Sampler options"); - samplerOptions.add_options() - ("text", value(&text)->default_value("Hello"), "Text to send out"); - - FairMQProgOptions config; - config.AddToCmdLineOptions(samplerOptions); - config.ParseAll(argc, argv); - - FairMQExample1Sampler sampler; - sampler.CatchSignals(); - sampler.SetConfig(config); - sampler.SetProperty(FairMQExample1Sampler::Text, text); - - sampler.ChangeState("INIT_DEVICE"); - sampler.WaitForEndOfState("INIT_DEVICE"); - - sampler.ChangeState("INIT_TASK"); - sampler.WaitForEndOfState("INIT_TASK"); - - sampler.ChangeState("RUN"); - sampler.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; + options.add_options() + ("text", bpo::value()->default_value("Hello"), "Text to send out"); +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample1Sampler(); } diff --git a/examples/MQ/1-sampler-sink/runExample1Sink.cxx b/examples/MQ/1-sampler-sink/runExample1Sink.cxx index a9403d0a..58c67da4 100644 --- a/examples/MQ/1-sampler-sink/runExample1Sink.cxx +++ b/examples/MQ/1-sampler-sink/runExample1Sink.cxx @@ -5,45 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample1Sink.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include - -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample1Sink.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample1Sink sink; - sink.CatchSignals(); - sink.SetConfig(config); - - sink.ChangeState("INIT_DEVICE"); - sink.WaitForEndOfState("INIT_DEVICE"); - - sink.ChangeState("INIT_TASK"); - sink.WaitForEndOfState("INIT_TASK"); - - sink.ChangeState("RUN"); - sink.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample1Sink(); } diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.cxx b/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.cxx index 95612c50..952f9a0d 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.cxx +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.cxx @@ -5,56 +5,43 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQExample2Processor.cpp - * - * @since 2014-10-10 - * @author A. Rybalchenko - */ - -#include -#include #include "FairMQExample2Processor.h" #include "FairMQLogger.h" +#include + using namespace std; FairMQExample2Processor::FairMQExample2Processor() - : fText() { + OnData("data1", &FairMQExample2Processor::HandleData); } -void FairMQExample2Processor::CustomCleanup(void* /*data*/, void *object) +bool FairMQExample2Processor::HandleData(FairMQMessagePtr& msg, int /*index*/) { - delete static_cast(object); -} + LOG(INFO) << "Received data, processing..."; -void FairMQExample2Processor::Run() -{ - // Check if we are still in the RUNNING state - while (CheckCurrentState(RUNNING)) + // Modify the received string + string* text = new std::string(static_cast(msg->GetData()), msg->GetSize()); + *text += " (modified by " + fId + ")"; + + // create message object with a pointer to the data buffer, + // its size, + // custom deletion function (called when transfer is done), + // and pointer to the object managing the data buffer + FairMQMessagePtr msg2(NewMessage(const_cast(text->c_str()), + text->length(), + [](void* /*data*/, void* object) { delete static_cast(object); }, + text)); + + // Send out the output message + if (Send(msg2, "data2") < 0) { - // Create empty message to hold the input - unique_ptr input(NewMessage()); - - // Receive the message (blocks until received or interrupted (e.g. by state change)). - // Returns size of the received message or -1 if interrupted. - if (Receive(input, "data1") >= 0) - { - LOG(INFO) << "Received data, processing..."; - - // Modify the received string - string* text = new string(static_cast(input->GetData()), input->GetSize()); - *text += " (modified by " + fId + ")"; - - // Create output message - unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - - // Send out the output message - Send(msg, "data2"); - } + return false; } + + return true; } FairMQExample2Processor::~FairMQExample2Processor() diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.h b/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.h index 9f0f1157..62d5a377 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.h +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Processor.h @@ -5,37 +5,20 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQExample2Processor.h - * - * @since 2014-10-10 - * @author A. Rybalchenko - */ #ifndef FAIRMQEXAMPLE2PROCESSOR_H_ #define FAIRMQEXAMPLE2PROCESSOR_H_ -#include - #include "FairMQDevice.h" class FairMQExample2Processor : public FairMQDevice { public: - enum - { - Text = FairMQDevice::Last, - Last - }; FairMQExample2Processor(); virtual ~FairMQExample2Processor(); - static void CustomCleanup(void* data, void* hint); - protected: - std::string fText; - - virtual void Run(); + bool HandleData(FairMQMessagePtr&, int); }; #endif /* FAIRMQEXAMPLE2PROCESSOR_H_ */ diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx index 0ebc3822..b53b16dd 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.cxx @@ -12,11 +12,12 @@ * @author A. Rybalchenko */ -#include -#include +#include // this_thread::sleep_for +#include #include "FairMQExample2Sampler.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig using namespace std; @@ -25,72 +26,40 @@ FairMQExample2Sampler::FairMQExample2Sampler() { } -void FairMQExample2Sampler::CustomCleanup(void* /*data*/, void *object) +void FairMQExample2Sampler::InitTask() { - delete static_cast(object); + // Get the fText value from the command line option (via fConfig) + fText = fConfig->GetValue("text"); } -void FairMQExample2Sampler::Run() +bool FairMQExample2Sampler::ConditionalRun() { - // Check if we are still in the RUNNING state - while (CheckCurrentState(RUNNING)) + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // create a copy of the data with new(), that will be deleted after the transfer is complete + string* text = new string(fText); + + // create message object with a pointer to the data buffer, + // its size, + // custom deletion function (called when transfer is done), + // and pointer to the object managing the data buffer + FairMQMessagePtr msg(NewMessage(const_cast(text->c_str()), + text->length(), + [](void* /*data*/, void* object) { delete static_cast(object); }, + text)); + + LOG(INFO) << "Sending \"" << fText << "\""; + + // in case of error or transfer interruption, return false to go to IDLE state + // successfull transfer will return number of bytes transfered (can be 0 if sending an empty message). + if (Send(msg, "data1") < 0) { - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - - string* text = new string(fText); - - unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - - LOG(INFO) << "Sending \"" << fText << "\""; - - Send(msg, "data1"); + return false; } + + return true; } FairMQExample2Sampler::~FairMQExample2Sampler() { } - -void FairMQExample2Sampler::SetProperty(const int key, const string& value) -{ - switch (key) - { - case Text: - fText = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -string FairMQExample2Sampler::GetProperty(const int key, const string& default_ /*= ""*/) -{ - switch (key) - { - case Text: - return fText; - break; - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -void FairMQExample2Sampler::SetProperty(const int key, const int value) -{ - switch (key) - { - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -int FairMQExample2Sampler::GetProperty(const int key, const int default_ /*= 0*/) -{ - switch (key) - { - default: - return FairMQDevice::GetProperty(key, default_); - } -} diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.h b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.h index 71dc3fca..82ba081c 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.h +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sampler.h @@ -22,25 +22,14 @@ class FairMQExample2Sampler : public FairMQDevice { public: - enum - { - Text = FairMQDevice::Last, - Last - }; FairMQExample2Sampler(); virtual ~FairMQExample2Sampler(); - static void CustomCleanup(void* data, void* hint); - - virtual void SetProperty(const int key, const std::string& value); - virtual std::string GetProperty(const int key, const std::string& default_ = ""); - virtual void SetProperty(const int key, const int value); - virtual int GetProperty(const int key, const int default_ = 0); - protected: std::string fText; - virtual void Run(); + virtual void InitTask(); + virtual bool ConditionalRun(); }; #endif /* FAIRMQEXAMPLE2SAMPLER_H_ */ diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx index bfc3a8d2..74ef614c 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.cxx @@ -12,9 +12,6 @@ * @author A. Rybalchenko */ -#include -#include - #include "FairMQExample2Sink.h" #include "FairMQLogger.h" @@ -22,21 +19,17 @@ using namespace std; FairMQExample2Sink::FairMQExample2Sink() { + // register a handler for data arriving on "data2" channel + OnData("data2", &FairMQExample2Sink::HandleData); } -void FairMQExample2Sink::Run() +// handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0) +bool FairMQExample2Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { - while (CheckCurrentState(RUNNING)) - { - unique_ptr msg(NewMessage()); + LOG(INFO) << "Received: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - if (Receive(msg, "data2") >= 0) - { - LOG(INFO) << "Received message: \"" - << string(static_cast(msg->GetData()), msg->GetSize()) - << "\""; - } - } + // return true if want to be called again (otherwise go to IDLE state) + return true; } FairMQExample2Sink::~FairMQExample2Sink() diff --git a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.h b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.h index 228501a0..437d7a3c 100644 --- a/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.h +++ b/examples/MQ/2-sampler-processor-sink/FairMQExample2Sink.h @@ -24,7 +24,7 @@ class FairMQExample2Sink : public FairMQDevice virtual ~FairMQExample2Sink(); protected: - virtual void Run(); + bool HandleData(FairMQMessagePtr&, int); }; #endif /* FAIRMQEXAMPLE2SINK_H_ */ diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx index 37937bed..8ea57503 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Processor.cxx @@ -5,44 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample2Processor.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "FairMQLogger.h" - -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample2Processor.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample2Processor processor; - processor.CatchSignals(); - processor.SetConfig(config); - - processor.ChangeState("INIT_DEVICE"); - processor.WaitForEndOfState("INIT_DEVICE"); - - processor.ChangeState("INIT_TASK"); - processor.WaitForEndOfState("INIT_TASK"); - - processor.ChangeState("RUN"); - processor.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample2Processor(); } diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx index 58676066..3f8bf7f9 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Sampler.cxx @@ -5,56 +5,19 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample2Sampler.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" - -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample2Sampler.h" -using namespace boost::program_options; +namespace bpo = boost::program_options; -int main(int argc, char** argv) +void addCustomOptions(bpo::options_description& options) { - try - { - std::string text; - - options_description samplerOptions("Sampler options"); - samplerOptions.add_options() - ("text", value(&text)->default_value("Hello"), "Text to send out"); - - FairMQProgOptions config; - config.AddToCmdLineOptions(samplerOptions); - config.ParseAll(argc, argv); - - FairMQExample2Sampler sampler; - sampler.CatchSignals(); - sampler.SetConfig(config); - sampler.SetProperty(FairMQExample2Sampler::Text, text); - - sampler.ChangeState("INIT_DEVICE"); - sampler.WaitForEndOfState("INIT_DEVICE"); - - sampler.ChangeState("INIT_TASK"); - sampler.WaitForEndOfState("INIT_TASK"); - - sampler.ChangeState("RUN"); - sampler.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; + options.add_options() + ("text", bpo::value()->default_value("Hello"), "Text to send out"); +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample2Sampler(); } diff --git a/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx b/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx index 69d9eaaa..8a684d25 100644 --- a/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx +++ b/examples/MQ/2-sampler-processor-sink/runExample2Sink.cxx @@ -5,44 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample2Sink.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "FairMQLogger.h" - -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample2Sink.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample2Sink sink; - sink.CatchSignals(); - sink.SetConfig(config); - - sink.ChangeState("INIT_DEVICE"); - sink.WaitForEndOfState("INIT_DEVICE"); - - sink.ChangeState("INIT_TASK"); - sink.WaitForEndOfState("INIT_TASK"); - - sink.ChangeState("RUN"); - sink.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample2Sink(); } diff --git a/examples/MQ/3-dds/CMakeLists.txt b/examples/MQ/3-dds/CMakeLists.txt index 9e022950..0114d380 100644 --- a/examples/MQ/3-dds/CMakeLists.txt +++ b/examples/MQ/3-dds/CMakeLists.txt @@ -56,7 +56,7 @@ Set(DEPENDENCIES ${DEPENDENCIES} FairMQ ${DDS_INTERCOM_LIBRARY_SHARED} - ${DDS_PROTOCOL_LIBRARY_SHARED} # also link the two DDS dependency libraries to avoid linking issues on some osx systems + ${DDS_PROTOCOL_LIBRARY_SHARED} ${DDS_USER_DEFAULTS_LIBRARY_SHARED} ) diff --git a/examples/MQ/3-dds/FairMQExample3Processor.cxx b/examples/MQ/3-dds/FairMQExample3Processor.cxx index 8255fcc1..a2ae9718 100644 --- a/examples/MQ/3-dds/FairMQExample3Processor.cxx +++ b/examples/MQ/3-dds/FairMQExample3Processor.cxx @@ -5,15 +5,6 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQExample3Processor.cpp - * - * @since 2014-10-10 - * @author A. Rybalchenko - */ - -#include -#include #include "FairMQExample3Processor.h" #include "FairMQLogger.h" @@ -22,37 +13,33 @@ using namespace std; FairMQExample3Processor::FairMQExample3Processor() { + OnData("data1", &FairMQExample3Processor::HandleData); } -void FairMQExample3Processor::CustomCleanup(void *data, void *object) +bool FairMQExample3Processor::HandleData(FairMQMessagePtr& msg, int /*index*/) { - delete static_cast(object); -} + LOG(INFO) << "Received data, processing..."; -void FairMQExample3Processor::Run() -{ - while (CheckCurrentState(RUNNING)) + // Modify the received string + string* text = new std::string(static_cast(msg->GetData()), msg->GetSize()); + *text += " (modified by " + fId + ")"; + + // create message object with a pointer to the data buffer, + // its size, + // custom deletion function (called when transfer is done), + // and pointer to the object managing the data buffer + FairMQMessagePtr msg2(NewMessage(const_cast(text->c_str()), + text->length(), + [](void* /*data*/, void* object) { delete static_cast(object); }, + text)); + + // Send out the output message + if (Send(msg2, "data2") < 0) { - // Create empty message to hold the input - unique_ptr input(NewMessage()); - - // Receive the message (blocks until received or interrupted (e.g. by state change)). - // Returns size of the received message or -1 if interrupted. - if (Receive(input, "data1") >= 0) - { - LOG(INFO) << "Received data, processing..."; - - // Modify the received string - string* text = new string(static_cast(input->GetData()), input->GetSize()); - *text += " (modified by " + fId + ")"; - - // Create output message - unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - - // Send out the output message - Send(msg, "data2"); - } + return false; } + + return true; } FairMQExample3Processor::~FairMQExample3Processor() diff --git a/examples/MQ/3-dds/FairMQExample3Processor.h b/examples/MQ/3-dds/FairMQExample3Processor.h index 7d9fdd35..89a7ee13 100644 --- a/examples/MQ/3-dds/FairMQExample3Processor.h +++ b/examples/MQ/3-dds/FairMQExample3Processor.h @@ -5,17 +5,9 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQExample3Processor.h - * - * @since 2014-10-10 - * @author A. Rybalchenko - */ -#ifndef FAIRMQEXAMPLE3PROCESSOR_H_ -#define FAIRMQEXAMPLE3PROCESSOR_H_ - -#include +#ifndef FAIRMQEXAMPLE2PROCESSOR_H_ +#define FAIRMQEXAMPLE2PROCESSOR_H_ #include "FairMQDevice.h" @@ -25,10 +17,8 @@ class FairMQExample3Processor : public FairMQDevice FairMQExample3Processor(); virtual ~FairMQExample3Processor(); - static void CustomCleanup(void* data, void* hint); - protected: - virtual void Run(); + bool HandleData(FairMQMessagePtr&, int); }; #endif /* FAIRMQEXAMPLE3PROCESSOR_H_ */ diff --git a/examples/MQ/3-dds/FairMQExample3Sampler.cxx b/examples/MQ/3-dds/FairMQExample3Sampler.cxx index 9fef81a0..3060054d 100644 --- a/examples/MQ/3-dds/FairMQExample3Sampler.cxx +++ b/examples/MQ/3-dds/FairMQExample3Sampler.cxx @@ -12,11 +12,12 @@ * @author A. Rybalchenko */ -#include -#include +#include // this_thread::sleep_for +#include #include "FairMQExample3Sampler.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" // device->fConfig using namespace std; @@ -24,25 +25,24 @@ FairMQExample3Sampler::FairMQExample3Sampler() { } -void FairMQExample3Sampler::CustomCleanup(void *data, void *object) +bool FairMQExample3Sampler::ConditionalRun() { - delete static_cast(object); -} + std::this_thread::sleep_for(std::chrono::seconds(1)); -void FairMQExample3Sampler::Run() -{ - while (CheckCurrentState(RUNNING)) + // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). + // Should only be used for small data because of the cost of an additional copy + FairMQMessagePtr msg(NewSimpleMessage("Data")); + + LOG(INFO) << "Sending \"Data\""; + + // in case of error or transfer interruption, return false to go to IDLE state + // successfull transfer will return number of bytes transfered (can be 0 if sending an empty message). + if (Send(msg, "data1") < 0) { - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - - string* text = new string("Data"); - - unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - - LOG(INFO) << "Sending \"Data\""; - - Send(msg, "data1"); + return false; } + + return true; } FairMQExample3Sampler::~FairMQExample3Sampler() diff --git a/examples/MQ/3-dds/FairMQExample3Sampler.h b/examples/MQ/3-dds/FairMQExample3Sampler.h index e49c82b7..c6908e4e 100644 --- a/examples/MQ/3-dds/FairMQExample3Sampler.h +++ b/examples/MQ/3-dds/FairMQExample3Sampler.h @@ -15,8 +15,6 @@ #ifndef FAIRMQEXAMPLE3SAMPLER_H_ #define FAIRMQEXAMPLE3SAMPLER_H_ -#include - #include "FairMQDevice.h" class FairMQExample3Sampler : public FairMQDevice @@ -25,10 +23,8 @@ class FairMQExample3Sampler : public FairMQDevice FairMQExample3Sampler(); virtual ~FairMQExample3Sampler(); - static void CustomCleanup(void* data, void* hint); - protected: - virtual void Run(); + virtual bool ConditionalRun(); }; #endif /* FAIRMQEXAMPLE3SAMPLER_H_ */ diff --git a/examples/MQ/3-dds/FairMQExample3Sink.cxx b/examples/MQ/3-dds/FairMQExample3Sink.cxx index 322fe58a..5d09bec3 100644 --- a/examples/MQ/3-dds/FairMQExample3Sink.cxx +++ b/examples/MQ/3-dds/FairMQExample3Sink.cxx @@ -12,9 +12,6 @@ * @author A. Rybalchenko */ -#include -#include - #include "FairMQExample3Sink.h" #include "FairMQLogger.h" @@ -22,21 +19,17 @@ using namespace std; FairMQExample3Sink::FairMQExample3Sink() { + // register a handler for data arriving on "data2" channel + OnData("data2", &FairMQExample3Sink::HandleData); } -void FairMQExample3Sink::Run() +// handler is called whenever a message arrives on "data2", with a reference to the message and a sub-channel index (here 0) +bool FairMQExample3Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { - while (CheckCurrentState(RUNNING)) - { - unique_ptr msg(NewMessage()); + LOG(INFO) << "Received: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - if (Receive(msg, "data2") >= 0) - { - LOG(INFO) << "Received message: \"" - << string(static_cast(msg->GetData()), msg->GetSize()) - << "\""; - } - } + // return true if want to be called again (otherwise go to IDLE state) + return true; } FairMQExample3Sink::~FairMQExample3Sink() diff --git a/examples/MQ/3-dds/FairMQExample3Sink.h b/examples/MQ/3-dds/FairMQExample3Sink.h index b4006dc2..85c16480 100644 --- a/examples/MQ/3-dds/FairMQExample3Sink.h +++ b/examples/MQ/3-dds/FairMQExample3Sink.h @@ -24,7 +24,7 @@ class FairMQExample3Sink : public FairMQDevice virtual ~FairMQExample3Sink(); protected: - virtual void Run(); + bool HandleData(FairMQMessagePtr&, int); }; #endif /* FAIRMQEXAMPLE3SINK_H_ */ diff --git a/examples/MQ/3-dds/runExample3Processor.cxx b/examples/MQ/3-dds/runExample3Processor.cxx index 98b2b891..7787a7ae 100644 --- a/examples/MQ/3-dds/runExample3Processor.cxx +++ b/examples/MQ/3-dds/runExample3Processor.cxx @@ -5,34 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample2Processor.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample3Processor.h" -#include "runSimpleMQStateMachine.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample3Processor processor; - runStateMachine(processor, config); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample3Processor(); } diff --git a/examples/MQ/3-dds/runExample3Sampler.cxx b/examples/MQ/3-dds/runExample3Sampler.cxx index c301de7a..ad8d8f5e 100644 --- a/examples/MQ/3-dds/runExample3Sampler.cxx +++ b/examples/MQ/3-dds/runExample3Sampler.cxx @@ -5,34 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample2Sampler.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample3Sampler.h" -#include "runSimpleMQStateMachine.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample3Sampler sampler; - runStateMachine(sampler, config); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample3Sampler(); } diff --git a/examples/MQ/3-dds/runExample3Sink.cxx b/examples/MQ/3-dds/runExample3Sink.cxx index 041605e6..3e8e7677 100644 --- a/examples/MQ/3-dds/runExample3Sink.cxx +++ b/examples/MQ/3-dds/runExample3Sink.cxx @@ -5,34 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample2Sink.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample3Sink.h" -#include "runSimpleMQStateMachine.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample3Sink sink; - runStateMachine(sink, config); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample3Sink(); } diff --git a/examples/MQ/4-copypush/FairMQExample4Sampler.cxx b/examples/MQ/4-copypush/FairMQExample4Sampler.cxx index 71c50be1..c833bc05 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sampler.cxx +++ b/examples/MQ/4-copypush/FairMQExample4Sampler.cxx @@ -12,52 +12,39 @@ * @author A. Rybalchenko */ -#include // unique_ptr - -#include -#include +#include // this_thread::sleep_for +#include #include "FairMQExample4Sampler.h" #include "FairMQLogger.h" FairMQExample4Sampler::FairMQExample4Sampler() + : fNumDataChannels(0) + , fCounter(0) { } -void FairMQExample4Sampler::Run() +void FairMQExample4Sampler::InitTask() { - uint64_t counter = 0; + fNumDataChannels = fChannels.at("data").size(); +} - while (CheckCurrentState(RUNNING)) +bool FairMQExample4Sampler::ConditionalRun() +{ + std::this_thread::sleep_for(std::chrono::seconds(1)); + + // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). + // Should only be used for small data because of the cost of an additional copy + FairMQMessagePtr msg(NewSimpleMessage(fCounter++)); + + for (int i = 0; i < fNumDataChannels - 1; ++i) { - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - - uint64_t* number = new uint64_t(counter); - - std::unique_ptr msg(NewMessage(number, // data pointer - sizeof(uint64_t), // data size - [](void* data, void* /*hint*/){ delete static_cast(data); } // callback to deallocate after the transfer - )); - - LOG(INFO) << "Sending \"" << counter << "\""; - - if (fChannels.at("data").size() > 1) - { - for (unsigned int i = 1; i < fChannels.at("data").size(); ++i) - { - std::unique_ptr msgCopy(NewMessage()); - msgCopy->Copy(msg); - Send(msgCopy, "data", i); - } - Send(msg, "data"); - } - else - { - Send(msg, "data"); - } - - ++counter; + FairMQMessagePtr msgCopy(NewMessage()); + msgCopy->Copy(msg); + Send(msgCopy, "data", i); } + Send(msg, "data", fNumDataChannels - 1); + return true; } FairMQExample4Sampler::~FairMQExample4Sampler() diff --git a/examples/MQ/4-copypush/FairMQExample4Sampler.h b/examples/MQ/4-copypush/FairMQExample4Sampler.h index 2f7ff53e..fd6b72cc 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sampler.h +++ b/examples/MQ/4-copypush/FairMQExample4Sampler.h @@ -15,10 +15,10 @@ #ifndef FAIRMQEXAMPLE4SAMPLER_H_ #define FAIRMQEXAMPLE4SAMPLER_H_ -#include - #include "FairMQDevice.h" +#include // uint64_t + class FairMQExample4Sampler : public FairMQDevice { public: @@ -26,7 +26,11 @@ class FairMQExample4Sampler : public FairMQDevice virtual ~FairMQExample4Sampler(); protected: - virtual void Run(); + virtual void InitTask(); + virtual bool ConditionalRun(); + + int fNumDataChannels; + uint64_t fCounter; }; #endif /* FAIRMQEXAMPLE4SAMPLER_H_ */ diff --git a/examples/MQ/4-copypush/FairMQExample4Sink.cxx b/examples/MQ/4-copypush/FairMQExample4Sink.cxx index f0704951..bf5e44e6 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sink.cxx +++ b/examples/MQ/4-copypush/FairMQExample4Sink.cxx @@ -12,29 +12,22 @@ * @author A. Rybalchenko */ -#include - -#include -#include - #include "FairMQExample4Sink.h" #include "FairMQLogger.h" +#include // uint64_t + FairMQExample4Sink::FairMQExample4Sink() { + OnData("data", &FairMQExample4Sink::HandleData); } -void FairMQExample4Sink::Run() +bool FairMQExample4Sink::HandleData(FairMQMessagePtr& msg, int /*index*/) { - while (CheckCurrentState(RUNNING)) - { - std::unique_ptr msg(NewMessage()); + LOG(INFO) << "Received message: \"" << *(static_cast(msg->GetData())) << "\""; - if (Receive(msg, "data") >= 0) - { - LOG(INFO) << "Received message: \"" << *(static_cast(msg->GetData())) << "\""; - } - } + // return true if want to be called again (otherwise go to IDLE state) + return true; } FairMQExample4Sink::~FairMQExample4Sink() diff --git a/examples/MQ/4-copypush/FairMQExample4Sink.h b/examples/MQ/4-copypush/FairMQExample4Sink.h index 799ca84f..e7058b7b 100644 --- a/examples/MQ/4-copypush/FairMQExample4Sink.h +++ b/examples/MQ/4-copypush/FairMQExample4Sink.h @@ -24,7 +24,7 @@ class FairMQExample4Sink : public FairMQDevice virtual ~FairMQExample4Sink(); protected: - virtual void Run(); + bool HandleData(FairMQMessagePtr&, int); }; #endif /* FAIRMQEXAMPLE4SINK_H_ */ diff --git a/examples/MQ/4-copypush/runExample4Sampler.cxx b/examples/MQ/4-copypush/runExample4Sampler.cxx index 339b499b..39d7e156 100644 --- a/examples/MQ/4-copypush/runExample4Sampler.cxx +++ b/examples/MQ/4-copypush/runExample4Sampler.cxx @@ -5,45 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample4Sampler.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include - -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample4Sampler.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample4Sampler sampler; - sampler.CatchSignals(); - sampler.SetConfig(config); - - sampler.ChangeState("INIT_DEVICE"); - sampler.WaitForEndOfState("INIT_DEVICE"); - - sampler.ChangeState("INIT_TASK"); - sampler.WaitForEndOfState("INIT_TASK"); - - sampler.ChangeState("RUN"); - sampler.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample4Sampler(); } diff --git a/examples/MQ/4-copypush/runExample4Sink.cxx b/examples/MQ/4-copypush/runExample4Sink.cxx index 0b048d7f..28e3aee6 100644 --- a/examples/MQ/4-copypush/runExample4Sink.cxx +++ b/examples/MQ/4-copypush/runExample4Sink.cxx @@ -5,45 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample4Sink.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include - -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample4Sink.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample4Sink sink; - sink.CatchSignals(); - sink.SetConfig(config); - - sink.ChangeState("INIT_DEVICE"); - sink.WaitForEndOfState("INIT_DEVICE"); - - sink.ChangeState("INIT_TASK"); - sink.WaitForEndOfState("INIT_TASK"); - - sink.ChangeState("RUN"); - sink.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample4Sink(); } diff --git a/examples/MQ/5-req-rep/FairMQExample5Client.cxx b/examples/MQ/5-req-rep/FairMQExample5Client.cxx index f51bfa6b..edc0585c 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Client.cxx +++ b/examples/MQ/5-req-rep/FairMQExample5Client.cxx @@ -12,11 +12,13 @@ * @author A. Rybalchenko */ -#include -#include +#include // unique_ptr +#include // this_thread::sleep_for +#include #include "FairMQExample5Client.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" using namespace std; @@ -25,79 +27,41 @@ FairMQExample5Client::FairMQExample5Client() { } +void FairMQExample5Client::InitTask() +{ + fText = fConfig->GetValue("text"); +} + +bool FairMQExample5Client::ConditionalRun() +{ + this_thread::sleep_for(chrono::seconds(1)); + + string* text = new string(fText); + + // create message object with a pointer to the data buffer, + // its size, + // custom deletion function (called when transfer is done), + // and pointer to the object managing the data buffer + FairMQMessagePtr request(NewMessage(const_cast(text->c_str()), // data + text->length(), // size + [](void* /*data*/, void* object) { delete static_cast(object); }, // deletion callback + text)); // object that manages the data + FairMQMessagePtr reply(NewMessage()); + + LOG(INFO) << "Sending \"" << fText << "\" to server."; + + if (Send(request, "data") > 0) + { + if (Receive(reply, "data") >= 0) + { + LOG(INFO) << "Received reply from server: \"" << string(static_cast(reply->GetData()), reply->GetSize()) << "\""; + return true; + } + } + + return false; +} + FairMQExample5Client::~FairMQExample5Client() { } - -void FairMQExample5Client::CustomCleanup(void* /*data*/, void *hint) -{ - delete static_cast(hint); -} - -void FairMQExample5Client::Run() -{ - while (CheckCurrentState(RUNNING)) - { - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - - string* text = new string(fText); - - unique_ptr request(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - unique_ptr reply(NewMessage()); - - LOG(INFO) << "Sending \"" << fText << "\" to server."; - - if (Send(request, "data") > 0) - { - if (Receive(reply, "data") >= 0) - { - LOG(INFO) << "Received reply from server: \"" << string(static_cast(reply->GetData()), reply->GetSize()) << "\""; - } - } - } -} - - -void FairMQExample5Client::SetProperty(const int key, const string& value) -{ - switch (key) - { - case Text: - fText = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -string FairMQExample5Client::GetProperty(const int key, const string& default_ /*= ""*/) -{ - switch (key) - { - case Text: - return fText; - break; - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -void FairMQExample5Client::SetProperty(const int key, const int value) -{ - switch (key) - { - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -int FairMQExample5Client::GetProperty(const int key, const int default_ /*= 0*/) -{ - switch (key) - { - default: - return FairMQDevice::GetProperty(key, default_); - } -} diff --git a/examples/MQ/5-req-rep/FairMQExample5Client.h b/examples/MQ/5-req-rep/FairMQExample5Client.h index 53964b52..8d28785e 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Client.h +++ b/examples/MQ/5-req-rep/FairMQExample5Client.h @@ -22,25 +22,14 @@ class FairMQExample5Client : public FairMQDevice { public: - enum - { - Text = FairMQDevice::Last, - Last - }; FairMQExample5Client(); virtual ~FairMQExample5Client(); - static void CustomCleanup(void* data, void* hint); - - virtual void SetProperty(const int key, const std::string& value); - virtual std::string GetProperty(const int key, const std::string& default_ = ""); - virtual void SetProperty(const int key, const int value); - virtual int GetProperty(const int key, const int default_ = 0); - protected: std::string fText; - virtual void Run(); + virtual bool ConditionalRun(); + virtual void InitTask(); }; #endif /* FAIRMQEXAMPLECLIENT_H_ */ diff --git a/examples/MQ/5-req-rep/FairMQExample5Server.cxx b/examples/MQ/5-req-rep/FairMQExample5Server.cxx index 9a24abc5..09fb4059 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Server.cxx +++ b/examples/MQ/5-req-rep/FairMQExample5Server.cxx @@ -12,9 +12,6 @@ * @author A. Rybalchenko */ -#include -#include - #include "FairMQExample5Server.h" #include "FairMQLogger.h" @@ -22,32 +19,28 @@ using namespace std; FairMQExample5Server::FairMQExample5Server() { + OnData("data", &FairMQExample5Server::HandleData); } -void FairMQExample5Server::CustomCleanup(void* /*data*/, void *hint) +bool FairMQExample5Server::HandleData(FairMQMessagePtr& request, int /*index*/) { - delete static_cast(hint); -} + LOG(INFO) << "Received request from client: \"" << string(static_cast(request->GetData()), request->GetSize()) << "\""; -void FairMQExample5Server::Run() -{ - while (CheckCurrentState(RUNNING)) + string* text = new string("Thank you for the \"" + string(static_cast(request->GetData()), request->GetSize()) + "\"!"); + + LOG(INFO) << "Sending reply to client."; + + FairMQMessagePtr reply(NewMessage(const_cast(text->c_str()), // data + text->length(), // size + [](void* /*data*/, void* object) { delete static_cast(object); }, // deletion callback + text)); // object that manages the data + + if (Send(reply, "data") > 0) { - unique_ptr request(NewMessage()); - - if (Receive(request, "data") >= 0) - { - LOG(INFO) << "Received request from client: \"" << string(static_cast(request->GetData()), request->GetSize()) << "\""; - - string* text = new string("Thank you for the \"" + string(static_cast(request->GetData()), request->GetSize()) + "\"!"); - - LOG(INFO) << "Sending reply to client."; - - unique_ptr reply(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - - Send(reply, "data"); - } + return true; } + + return false; } FairMQExample5Server::~FairMQExample5Server() diff --git a/examples/MQ/5-req-rep/FairMQExample5Server.h b/examples/MQ/5-req-rep/FairMQExample5Server.h index 8f9a607b..2291f6ea 100644 --- a/examples/MQ/5-req-rep/FairMQExample5Server.h +++ b/examples/MQ/5-req-rep/FairMQExample5Server.h @@ -23,10 +23,8 @@ class FairMQExample5Server : public FairMQDevice FairMQExample5Server(); virtual ~FairMQExample5Server(); - static void CustomCleanup(void *data, void* hint); - protected: - virtual void Run(); + bool HandleData(FairMQMessagePtr&, int); }; #endif /* FAIRMQEXAMPLE5SERVER_H_ */ diff --git a/examples/MQ/5-req-rep/runExample5Client.cxx b/examples/MQ/5-req-rep/runExample5Client.cxx index c9a9a129..f2c8f7a9 100644 --- a/examples/MQ/5-req-rep/runExample5Client.cxx +++ b/examples/MQ/5-req-rep/runExample5Client.cxx @@ -5,55 +5,19 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExampleClient.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample5Client.h" -using namespace boost::program_options; +namespace bpo = boost::program_options; -int main(int argc, char** argv) +void addCustomOptions(bpo::options_description& options) { - try - { - std::string text; - - options_description clientOptions("Client options"); - clientOptions.add_options() - ("text", value(&text)->default_value("Hello"), "Text to send out"); - - FairMQProgOptions config; - config.AddToCmdLineOptions(clientOptions); - config.ParseAll(argc, argv); - - FairMQExample5Client client; - client.CatchSignals(); - client.SetConfig(config); - client.SetProperty(FairMQExample5Client::Text, text); - - client.ChangeState("INIT_DEVICE"); - client.WaitForEndOfState("INIT_DEVICE"); - - client.ChangeState("INIT_TASK"); - client.WaitForEndOfState("INIT_TASK"); - - client.ChangeState("RUN"); - client.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; + options.add_options() + ("text", bpo::value()->default_value("Hello"), "Text to send out"); +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample5Client(); } diff --git a/examples/MQ/5-req-rep/runExample5Server.cxx b/examples/MQ/5-req-rep/runExample5Server.cxx index 2b1fbf43..db348f70 100644 --- a/examples/MQ/5-req-rep/runExample5Server.cxx +++ b/examples/MQ/5-req-rep/runExample5Server.cxx @@ -5,45 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExampleServer.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include - -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample5Server.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample5Server server; - server.CatchSignals(); - server.SetConfig(config); - - server.ChangeState("INIT_DEVICE"); - server.WaitForEndOfState("INIT_DEVICE"); - - server.ChangeState("INIT_TASK"); - server.WaitForEndOfState("INIT_TASK"); - - server.ChangeState("RUN"); - server.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample5Server(); } diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.cxx b/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.cxx index e3697b25..4177188b 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.cxx +++ b/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.cxx @@ -12,10 +12,8 @@ * @author A. Rybalchenko */ -#include // unique_ptr -#include - -#include +#include // this_thread::sleep_for +#include #include "FairMQExample6Broadcaster.h" #include "FairMQLogger.h" @@ -26,22 +24,19 @@ FairMQExample6Broadcaster::FairMQExample6Broadcaster() { } -void FairMQExample6Broadcaster::CustomCleanup(void* /*data*/, void *object) +bool FairMQExample6Broadcaster::ConditionalRun() { - delete static_cast(object); -} + this_thread::sleep_for(chrono::seconds(1)); -void FairMQExample6Broadcaster::Run() -{ - while (CheckCurrentState(RUNNING)) - { - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). + // Should only be used for small data because of the cost of an additional copy + FairMQMessagePtr msg(NewSimpleMessage("OK")); - string* text = new string("OK"); - unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - LOG(INFO) << "Sending OK"; - Send(msg, "broadcast"); - } + LOG(INFO) << "Sending OK"; + + Send(msg, "broadcast"); + + return true; } FairMQExample6Broadcaster::~FairMQExample6Broadcaster() diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.h b/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.h index cbd5198d..87550b56 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.h +++ b/examples/MQ/6-multiple-channels/FairMQExample6Broadcaster.h @@ -23,10 +23,8 @@ class FairMQExample6Broadcaster : public FairMQDevice FairMQExample6Broadcaster(); virtual ~FairMQExample6Broadcaster(); - static void CustomCleanup(void* data, void* hint); - protected: - virtual void Run(); + virtual bool ConditionalRun(); }; #endif /* FAIRMQEXAMPLE6BROADCASTER_H_ */ diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx index 3e36bb7a..9f98aa5b 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.cxx @@ -13,12 +13,13 @@ */ #include // unique_ptr - -#include +#include // this_thread::sleep_for +#include #include "FairMQExample6Sampler.h" #include "FairMQPoller.h" #include "FairMQLogger.h" +#include "FairMQProgOptions.h" using namespace std; @@ -27,42 +28,39 @@ FairMQExample6Sampler::FairMQExample6Sampler() { } -void FairMQExample6Sampler::CustomCleanup(void* /*data*/, void *object) +void FairMQExample6Sampler::InitTask() { - delete static_cast(object); + fText = fConfig->GetValue("text"); } void FairMQExample6Sampler::Run() { - std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data", "broadcast" })); + unique_ptr poller(fTransportFactory->CreatePoller(fChannels, { "data", "broadcast" })); while (CheckCurrentState(RUNNING)) { - poller->Poll(-1); + poller->Poll(100); if (poller->CheckInput("broadcast", 0)) { - unique_ptr msg(NewMessage()); + FairMQMessagePtr msg(NewMessage()); if (Receive(msg, "broadcast") > 0) { - LOG(INFO) << "Received broadcast: \"" - << string(static_cast(msg->GetData()), msg->GetSize()) - << "\""; + LOG(INFO) << "Received broadcast: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; } } if (poller->CheckOutput("data", 0)) { - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + this_thread::sleep_for(chrono::seconds(1)); - string* text = new string(fText); + FairMQMessagePtr msg(NewSimpleMessage(fText)); - unique_ptr msg(NewMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - - LOG(INFO) << "Sending \"" << fText << "\""; - - Send(msg, "data"); + if (Send(msg, "data") > 0) + { + LOG(INFO) << "Sent \"" << fText << "\""; + } } } } @@ -70,47 +68,3 @@ void FairMQExample6Sampler::Run() FairMQExample6Sampler::~FairMQExample6Sampler() { } - -void FairMQExample6Sampler::SetProperty(const int key, const string& value) -{ - switch (key) - { - case Text: - fText = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -string FairMQExample6Sampler::GetProperty(const int key, const string& default_ /*= ""*/) -{ - switch (key) - { - case Text: - return fText; - break; - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -void FairMQExample6Sampler::SetProperty(const int key, const int value) -{ - switch (key) - { - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -int FairMQExample6Sampler::GetProperty(const int key, const int default_ /*= 0*/) -{ - switch (key) - { - default: - return FairMQDevice::GetProperty(key, default_); - } -} diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.h b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.h index bbdfd9ba..102d272e 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sampler.h +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sampler.h @@ -22,25 +22,14 @@ class FairMQExample6Sampler : public FairMQDevice { public: - enum - { - Text = FairMQDevice::Last, - Last - }; FairMQExample6Sampler(); virtual ~FairMQExample6Sampler(); - static void CustomCleanup(void* data, void* hint); - - virtual void SetProperty(const int key, const std::string& value); - virtual std::string GetProperty(const int key, const std::string& default_ = ""); - virtual void SetProperty(const int key, const int value); - virtual int GetProperty(const int key, const int default_ = 0); - protected: std::string fText; virtual void Run(); + virtual void InitTask(); }; #endif /* FAIRMQEXAMPLE6SAMPLER_H_ */ diff --git a/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx b/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx index de757a06..a7454e86 100644 --- a/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx +++ b/examples/MQ/6-multiple-channels/FairMQExample6Sink.cxx @@ -12,6 +12,8 @@ * @author A. Rybalchenko */ +#include // unique_ptr + #include "FairMQExample6Sink.h" #include "FairMQPoller.h" #include "FairMQLogger.h" @@ -28,11 +30,11 @@ void FairMQExample6Sink::Run() while (CheckCurrentState(RUNNING)) { - poller->Poll(-1); + poller->Poll(100); if (poller->CheckInput("broadcast", 0)) { - unique_ptr msg(NewMessage()); + FairMQMessagePtr msg(NewMessage()); if (Receive(msg, "broadcast") > 0) { @@ -42,7 +44,7 @@ void FairMQExample6Sink::Run() if (poller->CheckInput("data", 0)) { - unique_ptr msg(NewMessage()); + FairMQMessagePtr msg(NewMessage()); if (Receive(msg, "data") > 0) { diff --git a/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx b/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx index 08029b79..4d72eb2a 100644 --- a/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Broadcaster.cxx @@ -5,43 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample6Broadcaster.cxx - * - * @since 2013-04-23 - * @author A. Rybalchenko - */ -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample6Broadcaster.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample6Broadcaster broadcaster; - broadcaster.CatchSignals(); - broadcaster.SetConfig(config); - - broadcaster.ChangeState("INIT_DEVICE"); - broadcaster.WaitForEndOfState("INIT_DEVICE"); - - broadcaster.ChangeState("INIT_TASK"); - broadcaster.WaitForEndOfState("INIT_TASK"); - - broadcaster.ChangeState("RUN"); - broadcaster.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample6Broadcaster(); } diff --git a/examples/MQ/6-multiple-channels/runExample6Sampler.cxx b/examples/MQ/6-multiple-channels/runExample6Sampler.cxx index 41472fee..f96ddf11 100644 --- a/examples/MQ/6-multiple-channels/runExample6Sampler.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Sampler.cxx @@ -5,55 +5,19 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample6Sampler.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "boost/program_options.hpp" - -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample6Sampler.h" -using namespace boost::program_options; +namespace bpo = boost::program_options; -int main(int argc, char** argv) +void addCustomOptions(bpo::options_description& options) { - try - { - std::string text; - - options_description samplerOptions("Sampler options"); - samplerOptions.add_options() - ("text", value(&text)->default_value("Hello"), "Text to send out"); - - FairMQProgOptions config; - config.AddToCmdLineOptions(samplerOptions); - config.ParseAll(argc, argv); - - FairMQExample6Sampler sampler; - sampler.CatchSignals(); - sampler.SetConfig(config); - sampler.SetProperty(FairMQExample6Sampler::Text, text); - - sampler.ChangeState("INIT_DEVICE"); - sampler.WaitForEndOfState("INIT_DEVICE"); - - sampler.ChangeState("INIT_TASK"); - sampler.WaitForEndOfState("INIT_TASK"); - - sampler.ChangeState("RUN"); - sampler.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; + options.add_options() + ("text", bpo::value()->default_value("Hello"), "Text to send out"); +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample6Sampler(); } diff --git a/examples/MQ/6-multiple-channels/runExample6Sink.cxx b/examples/MQ/6-multiple-channels/runExample6Sink.cxx index 3da67091..6afe3e1d 100644 --- a/examples/MQ/6-multiple-channels/runExample6Sink.cxx +++ b/examples/MQ/6-multiple-channels/runExample6Sink.cxx @@ -5,43 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample6Sink.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample6Sink.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample6Sink sink; - sink.CatchSignals(); - sink.SetConfig(config); - - sink.ChangeState("INIT_DEVICE"); - sink.WaitForEndOfState("INIT_DEVICE"); - - sink.ChangeState("INIT_TASK"); - sink.WaitForEndOfState("INIT_TASK"); - - sink.ChangeState("RUN"); - sink.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample6Sink(); } diff --git a/examples/MQ/8-multipart/FairMQEx8Header.h b/examples/MQ/8-multipart/FairMQEx8Header.h new file mode 100644 index 00000000..a4ca74d0 --- /dev/null +++ b/examples/MQ/8-multipart/FairMQEx8Header.h @@ -0,0 +1,17 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIRMQEX8HEADER_H_ +#define FAIRMQEX8HEADER_H_ + +struct Ex8Header +{ + int32_t stopFlag; +}; + +#endif /* FAIRMQEX8HEADER_H_ */ \ No newline at end of file diff --git a/examples/MQ/8-multipart/FairMQExample8Sampler.cxx b/examples/MQ/8-multipart/FairMQExample8Sampler.cxx index 4eb305e1..2677fd2d 100644 --- a/examples/MQ/8-multipart/FairMQExample8Sampler.cxx +++ b/examples/MQ/8-multipart/FairMQExample8Sampler.cxx @@ -12,58 +12,48 @@ * @author A. Rybalchenko */ -#include -#include +#include // this_thread::sleep_for +#include #include "FairMQExample8Sampler.h" +#include "FairMQEx8Header.h" #include "FairMQLogger.h" using namespace std; -struct Ex8Header { - int32_t stopFlag; -}; - FairMQExample8Sampler::FairMQExample8Sampler() + : fCounter(0) { } -void FairMQExample8Sampler::Run() +bool FairMQExample8Sampler::ConditionalRun() { - int counter = 0; + // Wait a second to keep the output readable. + this_thread::sleep_for(chrono::seconds(1)); - // Check if we are still in the RUNNING state. - while (CheckCurrentState(RUNNING)) + Ex8Header header; + // Set stopFlag to 1 for the first 4 messages, and to 0 for the 5th. + fCounter < 5 ? header.stopFlag = 0 : header.stopFlag = 1; + LOG(INFO) << "Sending header with stopFlag: " << header.stopFlag; + + FairMQParts parts; + + // NewSimpleMessage creates a copy of the data and takes care of its destruction (after the transfer takes place). + // Should only be used for small data because of the cost of an additional copy + parts.AddPart(NewSimpleMessage(header)); + parts.AddPart(NewMessage(1000)); + + LOG(INFO) << "Sending body of size: " << parts.At(1)->GetSize(); + + Send(parts, "data-out"); + + // Go out of the sending loop if the stopFlag was sent. + if (fCounter++ == 5) { - Ex8Header* header = new Ex8Header; - // Set stopFlag to 1 for the first 4 messages, and to 0 for the 5th. - counter < 5 ? header->stopFlag = 0 : header->stopFlag = 1; - LOG(INFO) << "Sending header with stopFlag: " << header->stopFlag; - - FairMQParts parts; - - parts.AddPart(NewMessage(header, // data pointer - sizeof(Ex8Header), // data size - [](void* data, void* /*hint*/) { delete static_cast(data); } // callback to deallocate after the transfer - )); - parts.AddPart(NewMessage(1000)); - - LOG(INFO) << "Sending body of size: " << parts.At(1)->GetSize(); - - Send(parts, "data-out"); - - // Go out of the sending loop if the stopFlag was sent. - if (counter == 5) - { - break; - } - - counter++; - // Wait a second to keep the output readable. - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + return false; } - LOG(INFO) << "Going out of RUNNING state."; + return true; } FairMQExample8Sampler::~FairMQExample8Sampler() diff --git a/examples/MQ/8-multipart/FairMQExample8Sampler.h b/examples/MQ/8-multipart/FairMQExample8Sampler.h index 479a057e..9c55c0cc 100644 --- a/examples/MQ/8-multipart/FairMQExample8Sampler.h +++ b/examples/MQ/8-multipart/FairMQExample8Sampler.h @@ -15,8 +15,6 @@ #ifndef FAIRMQEXAMPLE8SAMPLER_H_ #define FAIRMQEXAMPLE8SAMPLER_H_ -#include - #include "FairMQDevice.h" class FairMQExample8Sampler : public FairMQDevice @@ -26,7 +24,10 @@ class FairMQExample8Sampler : public FairMQDevice virtual ~FairMQExample8Sampler(); protected: - virtual void Run(); + virtual bool ConditionalRun(); + + private: + int fCounter; }; #endif /* FAIRMQEXAMPLE8SAMPLER_H_ */ diff --git a/examples/MQ/8-multipart/FairMQExample8Sink.cxx b/examples/MQ/8-multipart/FairMQExample8Sink.cxx index 05216d27..44dfa6e5 100644 --- a/examples/MQ/8-multipart/FairMQExample8Sink.cxx +++ b/examples/MQ/8-multipart/FairMQExample8Sink.cxx @@ -12,41 +12,30 @@ * @author A. Rybalchenko */ -#include -#include - #include "FairMQExample8Sink.h" +#include "FairMQEx8Header.h" #include "FairMQLogger.h" using namespace std; -struct Ex8Header { - int32_t stopFlag; -}; - FairMQExample8Sink::FairMQExample8Sink() { + OnData("data-in", &FairMQExample8Sink::HandleData); } -void FairMQExample8Sink::Run() +bool FairMQExample8Sink::HandleData(FairMQParts& parts, int /*index*/) { - while (CheckCurrentState(RUNNING)) + Ex8Header header; + header.stopFlag = (static_cast(parts.At(0)->GetData()))->stopFlag; + LOG(INFO) << "Received header with stopFlag: " << header.stopFlag; + LOG(INFO) << "Received body of size: " << parts.At(1)->GetSize(); + if (header.stopFlag == 1) { - FairMQParts parts; - - if (Receive(parts, "data-in") >= 0) - { - Ex8Header header; - header.stopFlag = (static_cast(parts.At(0)->GetData()))->stopFlag; - LOG(INFO) << "Received header with stopFlag: " << header.stopFlag; - LOG(INFO) << "Received body of size: " << parts.At(1)->GetSize(); - if (header.stopFlag == 1) - { - LOG(INFO) << "Flag is 0, exiting Run()"; - break; - } - } + LOG(INFO) << "stopFlag is 0, going IDLE"; + return false; } + + return true; } FairMQExample8Sink::~FairMQExample8Sink() diff --git a/examples/MQ/8-multipart/FairMQExample8Sink.h b/examples/MQ/8-multipart/FairMQExample8Sink.h index e71851b2..050e042b 100644 --- a/examples/MQ/8-multipart/FairMQExample8Sink.h +++ b/examples/MQ/8-multipart/FairMQExample8Sink.h @@ -24,7 +24,7 @@ class FairMQExample8Sink : public FairMQDevice virtual ~FairMQExample8Sink(); protected: - virtual void Run(); + bool HandleData(FairMQParts&, int); }; #endif /* FAIRMQEXAMPLE8SINK_H_ */ diff --git a/examples/MQ/8-multipart/runExample8Sampler.cxx b/examples/MQ/8-multipart/runExample8Sampler.cxx index 0b75a46e..e24bcb5e 100644 --- a/examples/MQ/8-multipart/runExample8Sampler.cxx +++ b/examples/MQ/8-multipart/runExample8Sampler.cxx @@ -5,43 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample8Sampler.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample8Sampler.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample8Sampler sampler; - sampler.CatchSignals(); - sampler.SetConfig(config); - - sampler.ChangeState("INIT_DEVICE"); - sampler.WaitForEndOfState("INIT_DEVICE"); - - sampler.ChangeState("INIT_TASK"); - sampler.WaitForEndOfState("INIT_TASK"); - - sampler.ChangeState("RUN"); - sampler.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample8Sampler(); } diff --git a/examples/MQ/8-multipart/runExample8Sink.cxx b/examples/MQ/8-multipart/runExample8Sink.cxx index 77831f59..bf81cfed 100644 --- a/examples/MQ/8-multipart/runExample8Sink.cxx +++ b/examples/MQ/8-multipart/runExample8Sink.cxx @@ -5,43 +5,17 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * runExample8Sink.cxx - * - * @since 2013-04-23 - * @author D. Klein, A. Rybalchenko - */ -#include "FairMQLogger.h" -#include "FairMQProgOptions.h" +#include "runFairMQDevice.h" #include "FairMQExample8Sink.h" -int main(int argc, char** argv) +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description& /*options*/) { - try - { - FairMQProgOptions config; - config.ParseAll(argc, argv); - - FairMQExample8Sink sink; - sink.CatchSignals(); - sink.SetConfig(config); - - sink.ChangeState("INIT_DEVICE"); - sink.WaitForEndOfState("INIT_DEVICE"); - - sink.ChangeState("INIT_TASK"); - sink.WaitForEndOfState("INIT_TASK"); - - sink.ChangeState("RUN"); - sink.InteractiveStateLoop(); - } - catch (std::exception& e) - { - LOG(ERROR) << "Unhandled Exception reached the top of main: " - << e.what() << ", application will now exit"; - return 1; - } - - return 0; +} + +FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) +{ + return new FairMQExample8Sink(); }