From b166cedb630588d56f86372563bbf356f5f701fe Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 16 Nov 2016 16:27:21 +0100 Subject: [PATCH] Convert factory methods to return smart ptrs - Convert factory methods to return smart ptrs. - Refactor state machine to use same thread for user states. - Remove unused includes and dependencies, use std. --- fairmq/CMakeLists.txt | 4 - fairmq/FairMQChannel.cxx | 166 ++++++------- fairmq/FairMQChannel.h | 19 +- fairmq/FairMQDevice.cxx | 187 +++++--------- fairmq/FairMQDevice.h | 37 ++- fairmq/FairMQPoller.h | 5 +- fairmq/FairMQSocket.h | 3 + fairmq/FairMQStateMachine.cxx | 49 ++-- fairmq/FairMQStateMachine.h | 229 ++++++++++++------ fairmq/FairMQTransportFactory.h | 15 +- fairmq/devices/FairMQBenchmarkSampler.cxx | 43 +--- fairmq/devices/FairMQSink.cxx | 8 +- fairmq/nanomsg/FairMQTransportFactoryNN.cxx | 28 +-- fairmq/nanomsg/FairMQTransportFactoryNN.h | 14 +- fairmq/options/runConfigEx.cxx | 33 --- .../plugins/config/FairMQDDSConfigPlugin.cxx | 2 +- fairmq/run/runDDSCommandUI.cxx | 2 +- fairmq/zeromq/FairMQTransportFactoryZMQ.cxx | 28 +-- fairmq/zeromq/FairMQTransportFactoryZMQ.h | 14 +- 19 files changed, 406 insertions(+), 480 deletions(-) diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index d081fb3b..e10315cc 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -120,14 +120,10 @@ Set(DEPENDENCIES ${Boost_THREAD_LIBRARY} dl fairmq_logger - ${Boost_TIMER_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} - ${Boost_RANDOM_LIBRARY} - ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} - ${Boost_ATOMIC_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index f46e779f..7b67baff 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -14,16 +14,14 @@ #include -#include - #include "FairMQChannel.h" #include "FairMQLogger.h" using namespace std; -boost::mutex FairMQChannel::fChannelMutex; +mutex FairMQChannel::fChannelMutex; -std::atomic FairMQChannel::fInterrupted(false); +atomic FairMQChannel::fInterrupted(false); FairMQChannel::FairMQChannel() : fSocket(nullptr) @@ -121,12 +119,12 @@ string FairMQChannel::GetType() const { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); return fType; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::GetType: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::GetType: " << e.what(); exit(EXIT_FAILURE); } } @@ -135,12 +133,12 @@ string FairMQChannel::GetMethod() const { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); return fMethod; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::GetMethod: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::GetMethod: " << e.what(); exit(EXIT_FAILURE); } } @@ -149,12 +147,12 @@ string FairMQChannel::GetAddress() const { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); return fAddress; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::GetAddress: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::GetAddress: " << e.what(); exit(EXIT_FAILURE); } } @@ -163,12 +161,12 @@ int FairMQChannel::GetSndBufSize() const { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); return fSndBufSize; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::GetSndBufSize: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::GetSndBufSize: " << e.what(); exit(EXIT_FAILURE); } } @@ -177,12 +175,12 @@ int FairMQChannel::GetRcvBufSize() const { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); return fRcvBufSize; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::GetRcvBufSize: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::GetRcvBufSize: " << e.what(); exit(EXIT_FAILURE); } } @@ -191,12 +189,12 @@ int FairMQChannel::GetSndKernelSize() const { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); return fSndKernelSize; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::GetSndKernelSize: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::GetSndKernelSize: " << e.what(); exit(EXIT_FAILURE); } } @@ -205,12 +203,12 @@ int FairMQChannel::GetRcvKernelSize() const { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); return fRcvKernelSize; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::GetRcvKernelSize: " << e.what(); exit(EXIT_FAILURE); } } @@ -219,12 +217,12 @@ int FairMQChannel::GetRateLogging() const { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); return fRateLogging; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::GetRateLogging: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::GetRateLogging: " << e.what(); exit(EXIT_FAILURE); } } @@ -233,13 +231,13 @@ void FairMQChannel::UpdateType(const string& type) { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); fIsValid = false; fType = type; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::UpdateType: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateType: " << e.what(); exit(EXIT_FAILURE); } } @@ -248,13 +246,13 @@ void FairMQChannel::UpdateMethod(const string& method) { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); fIsValid = false; fMethod = method; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::UpdateMethod: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateMethod: " << e.what(); exit(EXIT_FAILURE); } } @@ -263,13 +261,13 @@ void FairMQChannel::UpdateAddress(const string& address) { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); fIsValid = false; fAddress = address; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::UpdateAddress: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateAddress: " << e.what(); exit(EXIT_FAILURE); } } @@ -278,13 +276,13 @@ void FairMQChannel::UpdateSndBufSize(const int sndBufSize) { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); fIsValid = false; fSndBufSize = sndBufSize; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::UpdateSndBufSize: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateSndBufSize: " << e.what(); exit(EXIT_FAILURE); } } @@ -293,13 +291,13 @@ void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); fIsValid = false; fRcvBufSize = rcvBufSize; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::UpdateRcvBufSize: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateRcvBufSize: " << e.what(); exit(EXIT_FAILURE); } } @@ -308,13 +306,13 @@ void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize) { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); fIsValid = false; fSndKernelSize = sndKernelSize; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateSndKernelSize: " << e.what(); exit(EXIT_FAILURE); } } @@ -323,13 +321,13 @@ void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize) { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); fIsValid = false; fRcvKernelSize = rcvKernelSize; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateRcvKernelSize: " << e.what(); exit(EXIT_FAILURE); } } @@ -338,13 +336,13 @@ void FairMQChannel::UpdateRateLogging(const int rateLogging) { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); fIsValid = false; fRateLogging = rateLogging; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::UpdateRateLogging: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::UpdateRateLogging: " << e.what(); exit(EXIT_FAILURE); } } @@ -353,12 +351,12 @@ bool FairMQChannel::IsValid() const { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); return fIsValid; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::IsValid: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::IsValid: " << e.what(); exit(EXIT_FAILURE); } } @@ -367,7 +365,7 @@ bool FairMQChannel::ValidateChannel() { try { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); stringstream ss; ss << "Validating channel \"" << fChannelName << "\"... "; @@ -402,14 +400,17 @@ bool FairMQChannel::ValidateChannel() { //TODO: maybe cache fEndpoints as a class member? not really needed as tokenizing is //fast, and only happens during (re-)configure - std::vector fEndpoints; + vector fEndpoints; Tokenize(fEndpoints, fAddress); for (const auto endpoint : fEndpoints) { - std::string address; - if (endpoint[0]=='@'||endpoint[0]=='+'||endpoint[0]=='>') { - address = endpoint.substr(1); - } else { + string address; + if (endpoint[0] == '@' || endpoint[0] == '+' || endpoint[0] == '>') + { + address = endpoint.substr(1); + } + else + { // we don't have a method modifier, check if the default method is set const string socketMethodNames[] = { "bind", "connect" }; const set socketMethods(socketMethodNames, socketMethodNames + sizeof(socketMethodNames) / sizeof(string)); @@ -508,14 +509,14 @@ bool FairMQChannel::ValidateChannel() LOG(DEBUG) << ss.str(); return true; } - catch (boost::exception& e) + catch (exception& e) { - LOG(ERROR) << "Exception caught in FairMQChannel::ValidateChannel: " << boost::diagnostic_information(e); + LOG(ERROR) << "Exception caught in FairMQChannel::ValidateChannel: " << e.what(); exit(EXIT_FAILURE); } } -bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory, int numIoThreads) +bool FairMQChannel::InitCommandInterface(shared_ptr factory, int numIoThreads) { fTransportFactory = factory; @@ -539,7 +540,7 @@ bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory, int nu void FairMQChannel::ResetChannel() { - boost::unique_lock scoped_lock(fChannelMutex); + unique_lock lock(fChannelMutex); fIsValid = false; // TODO: implement channel resetting } @@ -586,7 +587,7 @@ int FairMQChannel::Receive(const unique_ptr& msg, int rcvTimeoutI return -2; } -int64_t FairMQChannel::Send(const std::vector>& msgVec, int sndTimeoutInMs) const +int64_t FairMQChannel::Send(const vector>& msgVec, int sndTimeoutInMs) const { fPoller->Poll(sndTimeoutInMs); @@ -607,7 +608,7 @@ int64_t FairMQChannel::Send(const std::vector>& m return -2; } -int64_t FairMQChannel::Receive(std::vector>& msgVec, int rcvTimeoutInMs) const +int64_t FairMQChannel::Receive(vector>& msgVec, int rcvTimeoutInMs) const { fPoller->Poll(rcvTimeoutInMs); @@ -769,34 +770,27 @@ bool FairMQChannel::ExpectsAnotherPart() const inline bool FairMQChannel::HandleUnblock() const { - FairMQMessage* cmd = fTransportFactory->CreateMessage(); - if (fCmdSocket->Receive(cmd, 0) >= 0) + FairMQMessagePtr cmd(fTransportFactory->CreateMessage()); + if (fCmdSocket->Receive(cmd.get(), 0) >= 0) { // LOG(DEBUG) << "unblocked"; } - delete cmd; return true; } FairMQChannel::~FairMQChannel() { - delete fCmdSocket; - delete fPoller; } -void FairMQChannel::Tokenize(std::vector& output, - const std::string& input, - const std::string delimiters) +void FairMQChannel::Tokenize(vector& output, const string& input, const string delimiters) { - using namespace std; - size_t start = 0; - size_t end = input.find_first_of(delimiters); - while (end != string::npos) - { - output.push_back(input.substr(start, end-start)); - start = ++end; - end = input.find_first_of(delimiters, start); - } - output.push_back(input.substr(start)); + size_t start = 0; + size_t end = input.find_first_of(delimiters); + while (end != string::npos) + { + output.push_back(input.substr(start, end-start)); + start = ++end; + end = input.find_first_of(delimiters, start); + } + output.push_back(input.substr(start)); } - diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 433a17ff..78fd7f45 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -18,8 +18,7 @@ #include #include // unique_ptr #include - -#include +#include #include "FairMQTransportFactory.h" #include "FairMQSocket.h" @@ -134,7 +133,7 @@ class FairMQChannel /// Resets the channel (requires validation to be used again). void ResetChannel(); - FairMQSocket* fSocket; + std::unique_ptr fSocket; /// Sends a message to the socket queue. /// @details Send method attempts to send a message by @@ -249,9 +248,7 @@ class FairMQChannel int Receive(FairMQMessage* msg, const int flags, int rcvTimeoutInMs = -1) const; // TODO: this might go to some base utility library - static void Tokenize(std::vector& output, - const std::string& input, - const std::string delimiters = ","); + static void Tokenize(std::vector& output, const std::string& input, const std::string delimiters = ","); private: std::string fType; @@ -266,15 +263,15 @@ class FairMQChannel std::string fChannelName; std::atomic fIsValid; - FairMQPoller* fPoller; - FairMQSocket* fCmdSocket; + FairMQPollerPtr fPoller; + FairMQSocketPtr fCmdSocket; - FairMQTransportFactory* fTransportFactory; + std::shared_ptr fTransportFactory; int fNoBlockFlag; int fSndMoreFlag; - bool InitCommandInterface(FairMQTransportFactory* factory, int numIoThreads); + bool InitCommandInterface(std::shared_ptr factory, int numIoThreads); bool HandleUnblock() const; @@ -282,7 +279,7 @@ class FairMQChannel // implication: same mutex is used for all instances of the class // this does not hurt much, because mutex is used only during initialization with very low contention // possible TODO: improve this - static boost::mutex fChannelMutex; + static std::mutex fChannelMutex; static std::atomic fInterrupted; }; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 27dc7bc0..3db77626 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -19,12 +19,13 @@ #include #include #include +#include +#include +#include #include // for the InteractiveStateLoop #include -#include - #include "FairMQSocket.h" #include "FairMQDevice.h" #include "FairMQLogger.h" @@ -38,8 +39,8 @@ using namespace std; -// boost::function and a wrapper to catch the signals -boost::function sigHandler; +// std::function and a wrapper to catch the signals +std::function sigHandler; static void CallSignalHandler(int signal) { sigHandler(signal); @@ -249,18 +250,13 @@ void FairMQDevice::InitWrapper() if (numAttempts != 0) { - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } Init(); ChangeState(internal_DEVICE_READY); - - // notify parent thread about end of processing. - boost::lock_guard lock(fStateMutex); - fStateFinished = true; - fStateCondition.notify_one(); } void FairMQDevice::WaitForInitialValidation() @@ -399,11 +395,6 @@ void FairMQDevice::InitTaskWrapper() InitTask(); ChangeState(internal_READY); - - // notify parent thread about end of processing. - boost::lock_guard lock(fStateMutex); - fStateFinished = true; - fStateCondition.notify_one(); } void FairMQDevice::InitTask() @@ -475,7 +466,7 @@ void FairMQDevice::RunWrapper() { LOG(INFO) << "DEVICE: Running..."; - boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); + std::thread rateLogger(&FairMQDevice::LogSocketRates, this); FairMQChannel::fInterrupted = false; @@ -497,7 +488,7 @@ void FairMQDevice::RunWrapper() inputChannelKeys.push_back(i.first); } - unique_ptr poller(fTransportFactory->CreatePoller(fChannels, inputChannelKeys)); + FairMQPollerPtr poller(fTransportFactory->CreatePoller(fChannels, inputChannelKeys)); while (CheckCurrentState(RUNNING) && !exitingRunningCallback) { @@ -580,26 +571,20 @@ void FairMQDevice::RunWrapper() ChangeState(ERROR); } - try - { - rateLogger.interrupt(); - rateLogger.join(); - } - catch (const boost::thread_resource_error& e) - { - LOG(ERROR) << e.what(); - exit(EXIT_FAILURE); - } - if (CheckCurrentState(RUNNING)) { ChangeState(internal_READY); } - // notify parent thread about end of processing. - boost::lock_guard lock(fStateMutex); - fStateFinished = true; - fStateCondition.notify_one(); + try + { + rateLogger.join(); + } + catch (exception& e) + { + LOG(ERROR) << "Exception cought during Run(): " << e.what(); + exit(EXIT_FAILURE); + } } void FairMQDevice::Run() @@ -621,19 +606,12 @@ void FairMQDevice::PostRun() void FairMQDevice::Pause() { - while (true) + while (CheckCurrentState(PAUSED)) { - try - { - boost::this_thread::sleep(boost::posix_time::milliseconds(500)); - LOG(DEBUG) << "paused..."; - } - catch (const boost::thread_interrupted&) - { - LOG(INFO) << "FairMQDevice::Pause() interrupted"; - break; - } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + LOG(DEBUG) << "paused..."; } + LOG(DEBUG) << "Unpausing"; } // Method for setting properties represented as a string. @@ -748,19 +726,19 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/) void FairMQDevice::SetTransport(FairMQTransportFactory* factory) { - fTransportFactory = factory; + fTransportFactory = unique_ptr(factory); } void FairMQDevice::SetTransport(const string& transport) { if (transport == "zeromq") { - fTransportFactory = new FairMQTransportFactoryZMQ(); + fTransportFactory = unique_ptr(new FairMQTransportFactoryZMQ()); } #ifdef NANOMSG_FOUND else if (transport == "nanomsg") { - fTransportFactory = new FairMQTransportFactoryNN(); + fTransportFactory = unique_ptr(new FairMQTransportFactoryNN()); } #endif else @@ -809,7 +787,7 @@ void FairMQDevice::LogSocketRates() { if (vi->fRateLogging > 0) { - filteredSockets.push_back(vi->fSocket); + filteredSockets.push_back(vi->fSocket.get()); logIntervals.push_back(vi->fRateLogging); intervalCounters.push_back(0); stringstream ss; @@ -847,56 +825,48 @@ void FairMQDevice::LogSocketRates() t0 = get_timestamp(); - while (true) + while (CheckCurrentState(RUNNING)) { - try + t1 = get_timestamp(); + + msSinceLastLog = (t1 - t0) / 1000.0L; + + i = 0; + + for (const auto& vi : filteredSockets) { - t1 = get_timestamp(); + intervalCounters.at(i)++; - msSinceLastLog = (t1 - t0) / 1000.0L; - - i = 0; - - for (const auto& vi : filteredSockets) + if (intervalCounters.at(i) == logIntervals.at(i)) { - intervalCounters.at(i)++; + intervalCounters.at(i) = 0; - if (intervalCounters.at(i) == logIntervals.at(i)) - { - intervalCounters.at(i) = 0; + bytesInNew.at(i) = vi->GetBytesRx(); + mbPerSecIn.at(i) = (static_cast(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / static_cast(msSinceLastLog) * 1000.; + bytesIn.at(i) = bytesInNew.at(i); - bytesInNew.at(i) = vi->GetBytesRx(); - mbPerSecIn.at(i) = (static_cast(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / static_cast(msSinceLastLog) * 1000.; - bytesIn.at(i) = bytesInNew.at(i); + msgInNew.at(i) = vi->GetMessagesRx(); + msgPerSecIn.at(i) = static_cast(msgInNew.at(i) - msgIn.at(i)) / static_cast(msSinceLastLog) * 1000.; + msgIn.at(i) = msgInNew.at(i); - msgInNew.at(i) = vi->GetMessagesRx(); - msgPerSecIn.at(i) = static_cast(msgInNew.at(i) - msgIn.at(i)) / static_cast(msSinceLastLog) * 1000.; - msgIn.at(i) = msgInNew.at(i); + bytesOutNew.at(i) = vi->GetBytesTx(); + mbPerSecOut.at(i) = (static_cast(bytesOutNew.at(i) - bytesOut.at(i)) / (1024. * 1024.)) / static_cast(msSinceLastLog) * 1000.; + bytesOut.at(i) = bytesOutNew.at(i); - bytesOutNew.at(i) = vi->GetBytesTx(); - mbPerSecOut.at(i) = (static_cast(bytesOutNew.at(i) - bytesOut.at(i)) / (1024. * 1024.)) / static_cast(msSinceLastLog) * 1000.; - bytesOut.at(i) = bytesOutNew.at(i); + msgOutNew.at(i) = vi->GetMessagesTx(); + msgPerSecOut.at(i) = static_cast(msgOutNew.at(i) - msgOut.at(i)) / static_cast(msSinceLastLog) * 1000.; + msgOut.at(i) = msgOutNew.at(i); - msgOutNew.at(i) = vi->GetMessagesTx(); - msgPerSecOut.at(i) = static_cast(msgOutNew.at(i) - msgOut.at(i)) / static_cast(msSinceLastLog) * 1000.; - msgOut.at(i) = msgOutNew.at(i); - - LOG(DEBUG) << filteredChannelNames.at(i) << ": " - << "in: " << msgPerSecIn.at(i) << " msg (" << mbPerSecIn.at(i) << " MB), " - << "out: " << msgPerSecOut.at(i) << " msg (" << mbPerSecOut.at(i) << " MB)"; - } - - ++i; + LOG(DEBUG) << filteredChannelNames.at(i) << ": " + << "in: " << msgPerSecIn.at(i) << " msg (" << mbPerSecIn.at(i) << " MB), " + << "out: " << msgPerSecOut.at(i) << " msg (" << mbPerSecOut.at(i) << " MB)"; } - t0 = t1; - boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - } - catch (boost::thread_interrupted&) - { - // LOG(DEBUG) << "FairMQDevice::LogSocketRates() interrupted"; - break; + ++i; } + + t0 = t1; + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } // LOG(DEBUG) << "FairMQDevice::LogSocketRates() stopping"; @@ -979,7 +949,7 @@ void FairMQDevice::InteractiveStateLoop() ChangeState(END); - if (CheckCurrentState("EXITING")) + if (CheckCurrentState(EXITING)) { fInteractiveRunning = false; } @@ -1002,9 +972,8 @@ void FairMQDevice::InteractiveStateLoop() void FairMQDevice::Unblock() { FairMQChannel::fInterrupted = true; - FairMQMessage* cmd = fTransportFactory->CreateMessage(); - fCmdSocket->Send(cmd, 0); - delete cmd; + FairMQMessagePtr cmd(fTransportFactory->CreateMessage()); + fCmdSocket->Send(cmd.get(), 0); } void FairMQDevice::ResetTaskWrapper() @@ -1012,11 +981,6 @@ void FairMQDevice::ResetTaskWrapper() ResetTask(); ChangeState(internal_DEVICE_READY); - - // notify parent thread about end of processing. - boost::lock_guard lock(fStateMutex); - fStateFinished = true; - fStateCondition.notify_one(); } void FairMQDevice::ResetTask() @@ -1028,11 +992,6 @@ void FairMQDevice::ResetWrapper() Reset(); ChangeState(internal_IDLE); - - // notify parent thread about end of processing. - boost::lock_guard lock(fStateMutex); - fStateFinished = true; - fStateCondition.notify_one(); } void FairMQDevice::Reset() @@ -1044,14 +1003,11 @@ void FairMQDevice::Reset() for (auto& vi : mi.second) { vi.fSocket->Close(); - delete vi.fSocket; vi.fSocket = nullptr; - delete vi.fPoller; vi.fPoller = nullptr; vi.fCmdSocket->Close(); - delete vi.fCmdSocket; vi.fCmdSocket = nullptr; } } @@ -1102,32 +1058,5 @@ void FairMQDevice::Shutdown() FairMQDevice::~FairMQDevice() { - // iterate over the channels map - for (auto& mi : fChannels) - { - // iterate over the channels vector - for (auto& vi : mi.second) - { - if (vi.fSocket) - { - delete vi.fSocket; - vi.fSocket = nullptr; - } - if (vi.fPoller) - { - delete vi.fPoller; - vi.fPoller = nullptr; - } - } - } - - if (fCmdSocket) - { - delete fCmdSocket; - fCmdSocket = nullptr; - } - - delete fTransportFactory; - LOG(DEBUG) << "Device destroyed"; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 00c31cde..a242ab10 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -36,7 +36,7 @@ typedef std::unordered_map> FairMQChannelMap; -typedef std::function&, int)> InputMsgCallback; +typedef std::function InputMsgCallback; typedef std::function InputMultipartCallback; class FairMQProgOptions; @@ -106,7 +106,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// In case of errors, returns -1. - inline int Send(const std::unique_ptr& msg, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) const + inline int Send(const FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int sndTimeoutInMs = -1) const { return fChannels.at(chan).at(i).Send(msg, sndTimeoutInMs); } @@ -117,7 +117,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been queued. -2 If queueing was not possible or timed out. /// In case of errors, returns -1. - inline int SendAsync(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const + inline int SendAsync(const FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).SendAsync(msg); } @@ -150,7 +150,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// In case of errors, returns -1. - inline int Receive(const std::unique_ptr& msg, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) const + inline int Receive(const FairMQMessagePtr& msg, const std::string& chan, const int i = 0, int rcvTimeoutInMs = -1) const { return fChannels.at(chan).at(i).Receive(msg, rcvTimeoutInMs); } @@ -161,7 +161,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param i channel index /// @return Number of bytes that have been received. -2 If reading from the queue was not possible or timed out. /// In case of errors, returns -1. - inline int ReceiveAsync(const std::unique_ptr& msg, const std::string& chan, const int i = 0) const + inline int ReceiveAsync(const FairMQMessagePtr& msg, const std::string& chan, const int i = 0) const { return fChannels.at(chan).at(i).ReceiveAsync(msg); } @@ -188,16 +188,9 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable return fChannels.at(chan).at(i).ReceiveAsync(parts.fParts); } - /// @brief Create FairMQPoller - /// @return pointer to FairMQPoller - inline FairMQPoller* NewPoller(const std::initializer_list channelList) const - { - return fTransportFactory->CreatePoller(fChannels, channelList); - } - /// @brief Create empty FairMQMessage /// @return pointer to FairMQMessage - inline FairMQMessage* NewMessage() const + inline FairMQMessagePtr NewMessage() const { return fTransportFactory->CreateMessage(); } @@ -205,7 +198,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @brief Create new FairMQMessage of specified size /// @param size message size /// @return pointer to FairMQMessage - inline FairMQMessage* NewMessage(int size) const + inline FairMQMessagePtr NewMessage(int size) const { return fTransportFactory->CreateMessage(size); } @@ -216,26 +209,26 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param ffn optional callback, called when the message is transfered (and can be deleted) /// @param hint optional helper pointer that can be used in the callback /// @return pointer to FairMQMessage - inline FairMQMessage* NewMessage(void* data, int size, fairmq_free_fn* ffn, void* hint = NULL) const + inline FairMQMessagePtr NewMessage(void* data, int size, fairmq_free_fn* ffn, void* hint = NULL) const { return fTransportFactory->CreateMessage(data, size, ffn, hint); } template - inline FairMQMessage* NewSimpleMessage(const T& data) const + inline FairMQMessagePtr NewSimpleMessage(const T& data) const { T* dataCopy = new T(data); return fTransportFactory->CreateMessage(dataCopy, sizeof(T), FairMQSimpleMsgCleanup, dataCopy); } template - inline FairMQMessage* NewSimpleMessage(const char(&data)[N]) const + inline FairMQMessagePtr NewSimpleMessage(const char(&data)[N]) const { std::string* msgStr = new std::string(data); return fTransportFactory->CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); } - inline FairMQMessage* NewSimpleMessage(const std::string& str) const + inline FairMQMessagePtr NewSimpleMessage(const std::string& str) const { std::string* msgStr = new std::string(str); return fTransportFactory->CreateMessage(const_cast(msgStr->c_str()), msgStr->length(), FairMQSimpleMsgCleanup, msgStr); @@ -298,10 +291,10 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable FairMQProgOptions* fConfig; ///< Program options configuration template - void OnData(const std::string& channelName, bool (T::* memberFunction)(std::unique_ptr& msg, int index)) + void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index)) { fDataCallbacks = true; - fMsgInputs.insert(std::make_pair(channelName, [this, memberFunction](std::unique_ptr& msg, int index) + fMsgInputs.insert(std::make_pair(channelName, [this, memberFunction](FairMQMessagePtr& msg, int index) { return (static_cast(this)->*memberFunction)(msg, index); })); @@ -336,9 +329,9 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable int fLogIntervalInMs; ///< Interval for logging the socket transfer rates - FairMQSocket* fCmdSocket; ///< Socket used for the internal unblocking mechanism + FairMQSocketPtr fCmdSocket; ///< Socket used for the internal unblocking mechanism - FairMQTransportFactory* fTransportFactory; ///< Transport factory + std::shared_ptr fTransportFactory; ///< Transport factory /// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask(). virtual void Init(); diff --git a/fairmq/FairMQPoller.h b/fairmq/FairMQPoller.h index b3cb21fe..43d1378e 100644 --- a/fairmq/FairMQPoller.h +++ b/fairmq/FairMQPoller.h @@ -16,6 +16,7 @@ #define FAIRMQPOLLER_H_ #include +#include class FairMQPoller { @@ -29,4 +30,6 @@ class FairMQPoller virtual ~FairMQPoller() {}; }; -#endif /* FAIRMQPOLLER_H_ */ \ No newline at end of file +using FairMQPollerPtr = std::unique_ptr; + +#endif /* FAIRMQPOLLER_H_ */ diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index d0eb9301..0d2a54dc 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -17,6 +17,7 @@ #include #include +#include #include "FairMQMessage.h" @@ -68,4 +69,6 @@ class FairMQSocket virtual ~FairMQSocket() {}; }; +using FairMQSocketPtr = std::unique_ptr; + #endif /* FAIRMQSOCKET_H_ */ diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index 4664b920..fb9dc0d8 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -12,8 +12,7 @@ * @author D. Klein, A. Rybalchenko */ -#include -#include // for WaitForEndOfStateForMs() +#include // WaitForEndOfStateForMs() #include "FairMQStateMachine.h" #include "FairMQLogger.h" @@ -97,13 +96,9 @@ bool FairMQStateMachine::ChangeState(int event) return false; } } - catch (boost::thread_interrupted& e) + catch (std::exception& e) { - LOG(ERROR) << boost::diagnostic_information(e); - } - catch (boost::exception& e) - { - LOG(ERROR) << boost::diagnostic_information(e); + LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what(); exit(EXIT_FAILURE); } return false; @@ -121,35 +116,28 @@ void FairMQStateMachine::WaitForEndOfState(int event) switch (event) { case INIT_DEVICE: - case INIT_TASK: case RUN: case RESET_TASK: case RESET_DEVICE: { - try + std::unique_lock lock(fWorkMutex); + while (fWorkActive || fWorkAvailable) { - boost::unique_lock lock(fStateMutex); - while (!fStateFinished) - { - fStateCondition.wait(lock); - } - fStateThread.join(); - } - catch (boost::exception& e) - { - LOG(ERROR) << boost::diagnostic_information(e); - exit(EXIT_FAILURE); + fWorkDoneCondition.wait(lock); } + break; } + case INIT_TASK: + break; // InitTask is synchronous, until ROOT workaround is no longer needed. default: LOG(ERROR) << "Requested state is either synchronous or does not exist."; break; } } - catch (boost::thread_interrupted& e) + catch (std::exception& e) { - LOG(ERROR) << boost::diagnostic_information(e); + LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what(); } } @@ -165,16 +153,15 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs) switch (event) { case INIT_DEVICE: - case INIT_TASK: case RUN: case RESET_TASK: case RESET_DEVICE: { - boost::unique_lock lock(fStateMutex); - while (!fStateFinished) + std::unique_lock lock(fWorkMutex); + while (fWorkActive || fWorkAvailable) { - fStateCondition.wait_until(lock, boost::chrono::system_clock::now() + boost::chrono::milliseconds(durationInMs)); - if (!fStateFinished) + fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs)); + if (fWorkActive) { return false; } @@ -182,14 +169,16 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs) return true; break; } + case INIT_TASK: + break; // InitTask is synchronous, until ROOT workaround is no longer needed. default: LOG(ERROR) << "Requested state is either synchronous or does not exist."; return false; } } - catch (boost::thread_interrupted& e) + catch (std::exception& e) { - LOG(ERROR) << boost::diagnostic_information(e); + LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what(); } return false; } diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 2c640fd2..85e4ed99 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -19,13 +19,10 @@ #include #include - -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include // Increase maximum number of boost::msm states (default is 10) // This #define has to be before any msm header includes @@ -71,15 +68,19 @@ _Pragma("GCC diagnostic ignored \"-Weffc++\"") #endif // defining the boost MSM state machine -struct FairMQFSM_ : public msm::front::state_machine_def +struct FairMQFSM_ : public msmf::state_machine_def { public: FairMQFSM_() - : fStateThread() + : fWorkerThread() , fTerminateStateThread() - , fStateFinished(false) - , fStateCondition() - , fStateMutex() + , fWork() + , fWorkAvailableCondition() + , fWorkDoneCondition() + , fWorkMutex() + , fWorkerTerminated(false) + , fWorkActive(false) + , fWorkAvailable(false) , fState() {} @@ -87,32 +88,38 @@ struct FairMQFSM_ : public msm::front::state_machine_def virtual ~FairMQFSM_() {}; template - void on_entry(Event const&, FSM&) + void on_entry(Event const&, FSM& fsm) { LOG(STATE) << "Starting FairMQ state machine"; fState = IDLE; + + // start a worker thread to execute user states in. + fsm.fWorkerThread = std::thread(&FairMQFSM_::Worker, &fsm); } template - void on_exit(Event const&, FSM&) + void on_exit(Event const&, FSM& fsm) { + // join the worker thread (executing user states) + fsm.fWorkerThread.join(); + LOG(STATE) << "Exiting FairMQ state machine"; } // The list of FSM states - struct OK_FSM : public msm::front::state<> {}; - struct ERROR_FSM : public msm::front::terminate_state<> {}; + struct OK_FSM : public msmf::state<> {}; + struct ERROR_FSM : public msmf::terminate_state<> {}; - struct IDLE_FSM : public msm::front::state<> {}; - struct INITIALIZING_DEVICE_FSM : public msm::front::state<> {}; - struct DEVICE_READY_FSM : public msm::front::state<> {}; - struct INITIALIZING_TASK_FSM : public msm::front::state<> {}; - struct READY_FSM : public msm::front::state<> {}; - struct RUNNING_FSM : public msm::front::state<> {}; - struct PAUSED_FSM : public msm::front::state<> {}; - struct RESETTING_TASK_FSM : public msm::front::state<> {}; - struct RESETTING_DEVICE_FSM : public msm::front::state<> {}; - struct EXITING_FSM : public msm::front::state<> {}; + struct IDLE_FSM : public msmf::state<> {}; + struct INITIALIZING_DEVICE_FSM : public msmf::state<> {}; + struct DEVICE_READY_FSM : public msmf::state<> {}; + struct INITIALIZING_TASK_FSM : public msmf::state<> {}; + struct READY_FSM : public msmf::state<> {}; + struct RUNNING_FSM : public msmf::state<> {}; + struct PAUSED_FSM : public msmf::state<> {}; + struct RESETTING_TASK_FSM : public msmf::state<> {}; + struct RESETTING_DEVICE_FSM : public msmf::state<> {}; + struct EXITING_FSM : public msmf::state<> {}; // Define initial states typedef mpl::vector initial_state; @@ -124,7 +131,6 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering IDLE state"; - fsm.fState = IDLE; } }; @@ -135,12 +141,16 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering INITIALIZING DEVICE state"; - - fsm.fStateFinished = false; - fsm.fState = INITIALIZING_DEVICE; - fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::InitWrapper, &fsm)); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + fsm.fWork = std::bind(&FairMQFSM_::InitWrapper, &fsm); + fsm.fWorkAvailableCondition.notify_one(); } }; @@ -150,7 +160,6 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering DEVICE READY state"; - fsm.fState = DEVICE_READY; } }; @@ -161,13 +170,10 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering INITIALIZING TASK state"; - - fsm.fStateFinished = false; - fsm.fState = INITIALIZING_TASK; fsm.InitTaskWrapper(); - // fsm.fInitializingTaskThread = boost::thread(boost::bind(&FairMQFSM_::InitTaskWrapper, &fsm)); + // fsm.fInitializingTaskThread = std::thread(&FairMQFSM_::InitTaskWrapper, &fsm); } }; @@ -177,7 +183,6 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering READY state"; - fsm.fState = READY; } }; @@ -188,12 +193,16 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering RUNNING state"; - - fsm.fStateFinished = false; - fsm.fState = RUNNING; - fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::RunWrapper, &fsm)); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + fsm.fWork = std::bind(&FairMQFSM_::RunWrapper, &fsm); + fsm.fWorkAvailableCondition.notify_one(); } }; @@ -202,15 +211,19 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + LOG(STATE) << "Entering PAUSED state"; fsm.fState = PAUSED; - fsm.fStateFinished = false; fsm.Unblock(); - fsm.fStateThread.join(); - LOG(STATE) << "Entering PAUSED state"; - - fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::Pause, &fsm)); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + fsm.fWork = std::bind(&FairMQFSM_::Pause, &fsm); + fsm.fWorkAvailableCondition.notify_one(); } }; @@ -219,15 +232,17 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { + LOG(STATE) << "Entering RUNNING state"; fsm.fState = RUNNING; - fsm.fStateThread.interrupt(); - fsm.fStateThread.join(); - fsm.fStateFinished = false; - - LOG(STATE) << "Entering RUNNING state"; - - fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::RunWrapper, &fsm)); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + fsm.fWork = std::bind(&FairMQFSM_::RunWrapper, &fsm); + fsm.fWorkAvailableCondition.notify_one(); } }; @@ -237,11 +252,14 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering READY state"; - fsm.fState = READY; fsm.Unblock(); - fsm.fStateThread.join(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } } }; @@ -250,8 +268,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { - LOG(STATE) << "RUNNING state finished without an external event"; - LOG(STATE) << "Entering READY state"; + LOG(STATE) << "RUNNING state finished without an external event, entering READY state"; fsm.fState = READY; } }; @@ -262,12 +279,16 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering RESETTING TASK state"; - - fsm.fStateFinished = false; - fsm.fState = RESETTING_TASK; - fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::ResetTaskWrapper, &fsm)); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + fsm.fWork = std::bind(&FairMQFSM_::ResetTaskWrapper, &fsm); + fsm.fWorkAvailableCondition.notify_one(); } }; @@ -277,12 +298,16 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering RESETTING DEVICE state"; - - fsm.fStateFinished = false; - fsm.fState = RESETTING_DEVICE; - fsm.ResetWrapper(); + std::unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + fsm.fWork = std::bind(&FairMQFSM_::ResetWrapper, &fsm); + fsm.fWorkAvailableCondition.notify_one(); } }; @@ -292,10 +317,14 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering EXITING state"; - fsm.fState = EXITING; - fsm.fTerminateStateThread = boost::thread(boost::bind(&FairMQFSM_::Terminate, &fsm)); + // terminate worker thread + std::lock_guard lock(fsm.fWorkMutex); + fsm.fWorkerTerminated = true; + fsm.fWorkAvailableCondition.notify_one(); + + fsm.fTerminateStateThread = std::thread(&FairMQFSM_::Terminate, &fsm); fsm.Shutdown(); fsm.fTerminateStateThread.join(); } @@ -310,10 +339,14 @@ struct FairMQFSM_ : public msm::front::state_machine_def fsm.fState = EXITING; - fsm.Unblock(); - fsm.fStateThread.join(); + fsm.Unblock(); // Unblock potential blocking transfer calls - fsm.fTerminateStateThread = boost::thread(boost::bind(&FairMQFSM_::Terminate, &fsm)); + // terminate worker thread + std::lock_guard lock(fsm.fWorkMutex); + fsm.fWorkerTerminated = true; + fsm.fWorkAvailableCondition.notify_one(); + + fsm.fTerminateStateThread = std::thread(&FairMQFSM_::Terminate, &fsm); fsm.Shutdown(); fsm.fTerminateStateThread.join(); } @@ -346,6 +379,35 @@ struct FairMQFSM_ : public msm::front::state_machine_def virtual void Terminate() {} // Termination method called during StopFct action. virtual void Unblock() {} // Method to send commands. + void Worker() + { + while (true) + { + { + std::unique_lock lock(fWorkMutex); + // Wait for work to be done. + while (!fWorkAvailable && !fWorkerTerminated) + { + fWorkAvailableCondition.wait(lock); + } + + if (fWorkerTerminated) + { + break; + } + + fWorkActive = true; + } + + fWork(); + + std::lock_guard lock(fWorkMutex); + fWorkActive = false; + fWorkAvailable = false; + fWorkDoneCondition.notify_one(); + } + } + // Transition table for FairMQFSM struct transition_table : mpl::vector< // Start Event Next Action Guard @@ -372,15 +434,18 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void no_transition(Event const& e, FSM&, int state) { - typedef typename boost::msm::back::recursive_get_transition_table::type recursive_stt; - typedef typename boost::msm::back::generate_state_set::type all_states; + typedef typename msm::back::recursive_get_transition_table::type recursive_stt; + typedef typename msm::back::generate_state_set::type all_states; + std::string stateName; - boost::mpl::for_each >(boost::msm::back::get_state_name(stateName, state)); + + mpl::for_each>(msm::back::get_state_name(stateName, state)); + stateName = stateName.substr(24); std::size_t pos = stateName.find("_FSME"); stateName.erase(pos); - if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK") + if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE") { stateName = stateName.substr(1); } @@ -478,13 +543,17 @@ struct FairMQFSM_ : public msm::front::state_machine_def } // this is to run certain functions in a separate thread - boost::thread fStateThread; - boost::thread fTerminateStateThread; + std::thread fWorkerThread; + std::thread fTerminateStateThread; - // condition variable to notify parent thread about end of state. - std::atomic fStateFinished; - boost::condition_variable fStateCondition; - boost::mutex fStateMutex; + // function to execute user states in a worker thread + std::function fWork; + std::condition_variable fWorkAvailableCondition; + std::condition_variable fWorkDoneCondition; + std::mutex fWorkMutex; + bool fWorkerTerminated; + bool fWorkActive; + bool fWorkAvailable; protected: std::atomic fState; diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 82e94053..48f57ae0 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -16,6 +16,7 @@ #define FAIRMQTRANSPORTFACTORY_H_ #include +#include #include #include @@ -30,15 +31,15 @@ class FairMQChannel; class FairMQTransportFactory { public: - virtual FairMQMessage* CreateMessage() const = 0; - virtual FairMQMessage* CreateMessage(const size_t size) const = 0; - virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const = 0; + virtual FairMQMessagePtr CreateMessage() const = 0; + virtual FairMQMessagePtr CreateMessage(const size_t size) const = 0; + virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const = 0; - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const = 0; + virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const = 0; - virtual FairMQPoller* CreatePoller(const std::vector& channels) const = 0; - virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const = 0; - virtual FairMQPoller* CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const = 0; + virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const = 0; + virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const = 0; + virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const = 0; virtual ~FairMQTransportFactory() {}; }; diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index c29dbc1b..8bb11cc3 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -13,10 +13,8 @@ */ #include - -#include -#include -#include +#include +#include #include "FairMQBenchmarkSampler.h" #include "FairMQLogger.h" @@ -47,7 +45,7 @@ void FairMQBenchmarkSampler::InitTask() void FairMQBenchmarkSampler::Run() { - // boost::thread resetMsgCounter(boost::bind(&FairMQBenchmarkSampler::ResetMsgCounter, this)); + // std::thread resetMsgCounter(&FairMQBenchmarkSampler::ResetMsgCounter, this); int numSentMsgs = 0; @@ -57,7 +55,7 @@ void FairMQBenchmarkSampler::Run() const FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0); LOG(INFO) << "Starting the benchmark with message size of " << fMsgSize << " and number of messages " << fNumMsgs << "."; - boost::timer::auto_cpu_timer timer; + auto tStart = chrono::high_resolution_clock::now(); while (CheckCurrentState(RUNNING)) { @@ -78,38 +76,23 @@ void FairMQBenchmarkSampler::Run() // --fMsgCounter; // while (fMsgCounter == 0) { - // boost::this_thread::sleep(boost::posix_time::milliseconds(1)); + // this_thread::sleep_for(chrono::milliseconds(1)); // } } - LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state."; - LOG(INFO) << "Sending time: "; + auto tEnd = chrono::high_resolution_clock::now(); - // try - // { - // resetMsgCounter.interrupt(); - // resetMsgCounter.join(); - // } - // catch(boost::thread_resource_error& e) - // { - // LOG(ERROR) << e.what(); - // exit(EXIT_FAILURE); - // } + LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state."; + LOG(INFO) << "Sending time: " << chrono::duration(tEnd - tStart).count() << " ms"; + + // resetMsgCounter.join(); } void FairMQBenchmarkSampler::ResetMsgCounter() { - while (true) + while (CheckCurrentState(RUNNING)) { - try - { - fMsgCounter = fMsgRate / 100; - boost::this_thread::sleep(boost::posix_time::milliseconds(10)); - } - catch (boost::thread_interrupted&) - { - LOG(DEBUG) << "Event rate limiter thread interrupted"; - break; - } + fMsgCounter = fMsgRate / 100; + this_thread::sleep_for(chrono::milliseconds(10)); } } diff --git a/fairmq/devices/FairMQSink.cxx b/fairmq/devices/FairMQSink.cxx index 5a8ed3c0..b85dd015 100644 --- a/fairmq/devices/FairMQSink.cxx +++ b/fairmq/devices/FairMQSink.cxx @@ -12,7 +12,7 @@ * @author D. Klein, A. Rybalchenko */ -#include +#include #include "FairMQSink.h" #include "FairMQLogger.h" @@ -39,7 +39,7 @@ void FairMQSink::Run() const FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0); LOG(INFO) << "Starting the benchmark and expecting to receive " << fNumMsgs << " messages."; - boost::timer::auto_cpu_timer timer; + auto tStart = chrono::high_resolution_clock::now(); while (CheckCurrentState(RUNNING)) { @@ -58,8 +58,10 @@ void FairMQSink::Run() } } + auto tEnd = chrono::high_resolution_clock::now(); + LOG(INFO) << "Received " << numReceivedMsgs << " messages, leaving RUNNING state."; - LOG(INFO) << "Receiving time: "; + LOG(INFO) << "Receiving time: " << chrono::duration(tEnd - tStart).count() << " ms"; } FairMQSink::~FairMQSink() diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index bbe86a08..ce3ca56f 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -21,37 +21,37 @@ FairMQTransportFactoryNN::FairMQTransportFactoryNN() LOG(INFO) << "Using nanomsg library"; } -FairMQMessage* FairMQTransportFactoryNN::CreateMessage() const +FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage() const { - return new FairMQMessageNN(); + return unique_ptr(new FairMQMessageNN()); } -FairMQMessage* FairMQTransportFactoryNN::CreateMessage(const size_t size) const +FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(const size_t size) const { - return new FairMQMessageNN(size); + return unique_ptr(new FairMQMessageNN(size)); } -FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const +FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const { - return new FairMQMessageNN(data, size, ffn, hint); + return unique_ptr(new FairMQMessageNN(data, size, ffn, hint)); } -FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/) const +FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) const { - return new FairMQSocketNN(type, name, numIoThreads, id); + return unique_ptr(new FairMQSocketNN(type, name, numIoThreads, id)); } -FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector& channels) const +FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector& channels) const { - return new FairMQPollerNN(channels); + return unique_ptr(new FairMQPollerNN(channels)); } -FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const +FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const { - return new FairMQPollerNN(channelsMap, channelList); + return unique_ptr(new FairMQPollerNN(channelsMap, channelList)); } -FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const +FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const { - return new FairMQPollerNN(cmdSocket, dataSocket); + return unique_ptr(new FairMQPollerNN(cmdSocket, dataSocket)); } diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 3e7235d9..f1dcd5ca 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -27,15 +27,15 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory public: FairMQTransportFactoryNN(); - virtual FairMQMessage* CreateMessage() const; - virtual FairMQMessage* CreateMessage(const size_t size) const; - virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const; + virtual FairMQMessagePtr CreateMessage() const; + virtual FairMQMessagePtr CreateMessage(const size_t size) const; + virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const; - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const; + virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const; - virtual FairMQPoller* CreatePoller(const std::vector& channels) const; - virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const; - virtual FairMQPoller* CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const; + virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const; + virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const; + virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const; virtual ~FairMQTransportFactoryNN() {}; }; diff --git a/fairmq/options/runConfigEx.cxx b/fairmq/options/runConfigEx.cxx index 4bbbeb7a..37b30a99 100644 --- a/fairmq/options/runConfigEx.cxx +++ b/fairmq/options/runConfigEx.cxx @@ -59,33 +59,6 @@ void MyCallBack(MyDevice& d, double val) d.Print(); } -void PrintMQParam(const FairMQMap& channels, const FairMQProgOptions& config) -{ - for (const auto& p : channels) - { - int index = 0; - for (const auto& channel : p.second) - { - string typeKey = p.first + "." + to_string(index) + ".type"; - string methodKey = p.first + "." + to_string(index) + ".method"; - string addressKey = p.first + "." + to_string(index) + ".address"; - string propertyKey = p.first + "." + to_string(index) + ".property"; - string sndBufSizeKey = p.first + "." + to_string(index) + ".sndBufSize"; - string rcvBufSizeKey = p.first + "." + to_string(index) + ".rcvBufSize"; - string rateLoggingKey = p.first + "." + to_string(index) + ".rateLogging"; - - LOG(INFO) << "Channel name = " << p.first; - LOG(INFO) << "key = " << typeKey <<"\t value = " << config.GetValue(typeKey); - LOG(INFO) << "key = " << methodKey <<"\t value = " << config.GetValue(methodKey); - LOG(INFO) << "key = " << addressKey <<"\t value = " << config.GetValue(addressKey); - LOG(INFO) << "key = " << propertyKey <<"\t value = " << config.GetValue(propertyKey); - LOG(INFO) << "key = " << sndBufSizeKey << "\t value = " << config.GetValue(sndBufSizeKey); - LOG(INFO) << "key = " << rcvBufSizeKey <<"\t value = " << config.GetValue(rcvBufSizeKey); - LOG(INFO) << "key = " << rateLoggingKey <<"\t value = " << config.GetValue(rateLoggingKey); - } - } -} - int main(int argc, char** argv) { try @@ -103,9 +76,6 @@ int main(int argc, char** argv) // // get FairMQMap // auto map1 = config.GetFairMQMap(); - // // form keys from map1 and print the value stored in variable map - // PrintMQParam(map1, config); - // // update value in variable map, and propagate the update to the FairMQMap // config.UpdateValue("data.0.address","tcp://localhost:1234"); @@ -118,9 +88,6 @@ int main(int argc, char** argv) // // update the FairMQMap and propagate the change in variable map // config.UpdateChannelMap(map2); - // // print values stored in variable map - // PrintMQParam(map2, config); - MyDevice device; device.CatchSignals(); device.SetConfig(config); diff --git a/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx b/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx index 3913c4a2..9a94b961 100644 --- a/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx +++ b/fairmq/plugins/config/FairMQDDSConfigPlugin.cxx @@ -125,7 +125,7 @@ class FairMQConfigPluginDDS } } - void Run(FairMQDevice& device) + void Run(FairMQDevice& /*device*/) { // start DDS intercom service fService.start(); diff --git a/fairmq/run/runDDSCommandUI.cxx b/fairmq/run/runDDSCommandUI.cxx index fe563960..1781f2d9 100644 --- a/fairmq/run/runDDSCommandUI.cxx +++ b/fairmq/run/runDDSCommandUI.cxx @@ -31,7 +31,7 @@ int main(int argc, char* argv[]) }); // subscribe to receive messages from DDS - ddsCustomCmd.subscribe([](const string& msg, const string& condition, uint64_t senderId) + ddsCustomCmd.subscribe([](const string& msg, const string& /*condition*/, uint64_t /*senderId*/) { cout << "Received: \"" << msg << "\"" << endl; }); diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index dfb34da8..223ca28d 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -25,37 +25,37 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ() LOG(DEBUG) << "Using ZeroMQ library, version: " << major << "." << minor << "." << patch; } -FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage() const +FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage() const { - return new FairMQMessageZMQ(); + return unique_ptr(new FairMQMessageZMQ()); } -FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(const size_t size) const +FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(const size_t size) const { - return new FairMQMessageZMQ(size); + return unique_ptr(new FairMQMessageZMQ(size)); } -FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const +FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const { - return new FairMQMessageZMQ(data, size, ffn, hint); + return unique_ptr(new FairMQMessageZMQ(data, size, ffn, hint)); } -FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/) const +FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) const { - return new FairMQSocketZMQ(type, name, numIoThreads, id); + return unique_ptr(new FairMQSocketZMQ(type, name, numIoThreads, id)); } -FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector& channels) const +FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector& channels) const { - return new FairMQPollerZMQ(channels); + return unique_ptr(new FairMQPollerZMQ(channels)); } -FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const +FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const { - return new FairMQPollerZMQ(channelsMap, channelList); + return unique_ptr(new FairMQPollerZMQ(channelsMap, channelList)); } -FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const +FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const { - return new FairMQPollerZMQ(cmdSocket, dataSocket); + return unique_ptr(new FairMQPollerZMQ(cmdSocket, dataSocket)); } diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 374eb5eb..98d03d1e 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -28,15 +28,15 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory public: FairMQTransportFactoryZMQ(); - virtual FairMQMessage* CreateMessage() const; - virtual FairMQMessage* CreateMessage(const size_t size) const; - virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const; + virtual FairMQMessagePtr CreateMessage() const; + virtual FairMQMessagePtr CreateMessage(const size_t size) const; + virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const; - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const; + virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const; - virtual FairMQPoller* CreatePoller(const std::vector& channels) const; - virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const; - virtual FairMQPoller* CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const; + virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const; + virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const; + virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const; virtual ~FairMQTransportFactoryZMQ() {}; };