Add interruptable FairMQDevice::WaitFor(duration) method

This commit is contained in:
Alexey Rybalchenko 2018-09-12 11:37:36 +02:00 committed by Dennis Klein
parent 88f897536e
commit 4123ebc9d4
2 changed files with 28 additions and 0 deletions

View File

@ -72,6 +72,9 @@ FairMQDevice::FairMQDevice(FairMQProgOptions* config, const fair::mq::tools::Ver
, fVersion(version) , fVersion(version)
, fRate(0.) , fRate(0.)
, fRawCmdLineArgs() , fRawCmdLineArgs()
, fInterrupted(false)
, fInterruptedCV()
, fInterruptedMtx()
{ {
} }
@ -491,6 +494,10 @@ void FairMQDevice::RunWrapper()
thread rateLogger(&FairMQDevice::LogSocketRates, this); thread rateLogger(&FairMQDevice::LogSocketRates, this);
// notify transports to resume transfers // notify transports to resume transfers
{
lock_guard<mutex> guard(fInterruptedMtx);
fInterrupted = false;
}
for (auto& t : fTransports) for (auto& t : fTransports)
{ {
t.second->Resume(); t.second->Resume();
@ -909,6 +916,7 @@ void FairMQDevice::LogSocketRates()
t0 = t1; t0 = t1;
this_thread::sleep_for(chrono::milliseconds(1000)); this_thread::sleep_for(chrono::milliseconds(1000));
// WaitFor(chrono::milliseconds(1000)); TODO: enable this when nanomsg linger is fixed
} }
} }
} }
@ -919,6 +927,11 @@ void FairMQDevice::Unblock()
{ {
t.second->Interrupt(); t.second->Interrupt();
} }
{
lock_guard<mutex> guard(fInterruptedMtx);
fInterrupted = true;
}
fInterruptedCV.notify_all();
} }
void FairMQDevice::ResetTaskWrapper() void FairMQDevice::ResetTaskWrapper()

View File

@ -25,6 +25,7 @@
#include <memory> // unique_ptr #include <memory> // unique_ptr
#include <algorithm> // std::sort() #include <algorithm> // std::sort()
#include <string> #include <string>
#include <chrono>
#include <iostream> #include <iostream>
#include <unordered_map> #include <unordered_map>
#include <functional> #include <functional>
@ -433,6 +434,16 @@ class FairMQDevice : public FairMQStateMachine
void RunStateMachine() { ProcessWork(); }; void RunStateMachine() { ProcessWork(); };
/// Wait for the supplied amount of time or for interruption.
/// If interrupted, returns false, otherwise true.
/// @param duration wait duration
template<class Rep, class Period>
bool WaitFor(std::chrono::duration<Rep, Period> const& duration)
{
std::unique_lock<std::mutex> lock(fInterruptedMtx);
return !fInterruptedCV.wait_for(lock, duration, [&] { return fInterrupted.load(); }); // return true if no interruption happened
}
protected: protected:
std::shared_ptr<FairMQTransportFactory> fTransportFactory; ///< Default transport factory std::shared_ptr<FairMQTransportFactory> fTransportFactory; ///< Default transport factory
std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports; ///< Container for transports std::unordered_map<fair::mq::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports; ///< Container for transports
@ -551,6 +562,10 @@ class FairMQDevice : public FairMQStateMachine
const fair::mq::tools::Version fVersion; const fair::mq::tools::Version fVersion;
float fRate; ///< Rate limiting for ConditionalRun float fRate; ///< Rate limiting for ConditionalRun
std::vector<std::string> fRawCmdLineArgs; std::vector<std::string> fRawCmdLineArgs;
std::atomic<bool> fInterrupted;
std::condition_variable fInterruptedCV;
std::mutex fInterruptedMtx;
}; };
#endif /* FAIRMQDEVICE_H_ */ #endif /* FAIRMQDEVICE_H_ */