diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index c18b788d..44547645 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -16,6 +16,7 @@ #include "FairMQLogger.h" FairMQStateMachine::FairMQStateMachine() + : fRunningFinished(false) { start(); } diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 9a4c6408..561ba5a4 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -16,13 +16,16 @@ #define FAIRMQSTATEMACHINE_H_ #include +#include +#include #include +#include + #include #include #include #include #include -#include #include "FairMQLogger.h" @@ -192,6 +195,11 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM FairMQStateMachine(); virtual ~FairMQStateMachine(); void ChangeState(int event); + + // condition variable to notify parent thread about end of running state. + boost::condition_variable fRunningCondition; + boost::mutex fRunningMutex; + bool fRunningFinished; }; #endif /* FAIRMQSTATEMACHINE_H_ */ diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 3d907cb4..a0b77b11 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -76,6 +76,11 @@ void FairMQBenchmarkSampler::Run() } FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); } void FairMQBenchmarkSampler::ResetEventCounter() diff --git a/fairmq/devices/FairMQBuffer.cxx b/fairmq/devices/FairMQBuffer.cxx index 10b4ce63..276970ed 100644 --- a/fairmq/devices/FairMQBuffer.cxx +++ b/fairmq/devices/FairMQBuffer.cxx @@ -54,6 +54,11 @@ void FairMQBuffer::Run() } FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); } FairMQBuffer::~FairMQBuffer() diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index 1d7adcac..cfd29a8b 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -69,4 +69,9 @@ void FairMQMerger::Run() } FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); } diff --git a/fairmq/devices/FairMQProxy.cxx b/fairmq/devices/FairMQProxy.cxx index 17bf7575..1f961af4 100644 --- a/fairmq/devices/FairMQProxy.cxx +++ b/fairmq/devices/FairMQProxy.cxx @@ -56,4 +56,9 @@ void FairMQProxy::Run() } FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); } diff --git a/fairmq/devices/FairMQSink.cxx b/fairmq/devices/FairMQSink.cxx index ae7154b2..ae6bf9e6 100644 --- a/fairmq/devices/FairMQSink.cxx +++ b/fairmq/devices/FairMQSink.cxx @@ -47,6 +47,11 @@ void FairMQSink::Run() } FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); } FairMQSink::~FairMQSink() diff --git a/fairmq/devices/FairMQSplitter.cxx b/fairmq/devices/FairMQSplitter.cxx index 8c41daf5..93b41e20 100644 --- a/fairmq/devices/FairMQSplitter.cxx +++ b/fairmq/devices/FairMQSplitter.cxx @@ -63,4 +63,9 @@ void FairMQSplitter::Run() } FairMQDevice::Shutdown(); + + // notify parent thread about end of processing. + boost::lock_guard lock(fRunningMutex); + fRunningFinished = true; + fRunningCondition.notify_one(); } diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index bbbc93d2..5c52a718 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -113,8 +113,12 @@ int main(int argc, char** argv) sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT); sampler.ChangeState(FairMQBenchmarkSampler::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(sampler.fRunningMutex); + while (!sampler.fRunningFinished) + { + sampler.fRunningCondition.wait(lock); + } sampler.ChangeState(FairMQBenchmarkSampler::STOP); sampler.ChangeState(FairMQBenchmarkSampler::END); diff --git a/fairmq/run/runBinSampler.cxx b/fairmq/run/runBinSampler.cxx index 44ec1654..38c51606 100644 --- a/fairmq/run/runBinSampler.cxx +++ b/fairmq/run/runBinSampler.cxx @@ -113,8 +113,12 @@ int main(int argc, char** argv) sampler.ChangeState(FairMQBinSampler::SETINPUT); sampler.ChangeState(FairMQBinSampler::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(sampler.fRunningMutex); + while (!sampler.fRunningFinished) + { + sampler.fRunningCondition.wait(lock); + } sampler.ChangeState(FairMQBinSampler::STOP); sampler.ChangeState(FairMQBinSampler::END); diff --git a/fairmq/run/runBinSink.cxx b/fairmq/run/runBinSink.cxx index 85456644..7b0c0fcb 100644 --- a/fairmq/run/runBinSink.cxx +++ b/fairmq/run/runBinSink.cxx @@ -103,8 +103,12 @@ int main(int argc, char** argv) sink.ChangeState(FairMQBinSink::SETINPUT); sink.ChangeState(FairMQBinSink::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(sink.fRunningMutex); + while (!sink.fRunningFinished) + { + sink.fRunningCondition.wait(lock); + } sink.ChangeState(FairMQBinSink::STOP); sink.ChangeState(FairMQBinSink::END); diff --git a/fairmq/run/runBuffer.cxx b/fairmq/run/runBuffer.cxx index efd5e263..85eb17c0 100644 --- a/fairmq/run/runBuffer.cxx +++ b/fairmq/run/runBuffer.cxx @@ -114,8 +114,12 @@ int main(int argc, char** argv) buffer.ChangeState(FairMQBuffer::SETINPUT); buffer.ChangeState(FairMQBuffer::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(buffer.fRunningMutex); + while (!buffer.fRunningFinished) + { + buffer.fRunningCondition.wait(lock); + } buffer.ChangeState(FairMQBuffer::STOP); buffer.ChangeState(FairMQBuffer::END); diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index 8ea34283..3c04da92 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -124,8 +124,12 @@ int main(int argc, char** argv) merger.ChangeState(FairMQMerger::SETINPUT); merger.ChangeState(FairMQMerger::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(merger.fRunningMutex); + while (!merger.fRunningFinished) + { + merger.fRunningCondition.wait(lock); + } merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::END); diff --git a/fairmq/run/runProtoSampler.cxx b/fairmq/run/runProtoSampler.cxx index 14c942f8..bbe5cb8d 100644 --- a/fairmq/run/runProtoSampler.cxx +++ b/fairmq/run/runProtoSampler.cxx @@ -113,8 +113,12 @@ int main(int argc, char** argv) sampler.ChangeState(FairMQProtoSampler::SETINPUT); sampler.ChangeState(FairMQProtoSampler::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(sampler.fRunningMutex); + while (!sampler.fRunningFinished) + { + sampler.fRunningCondition.wait(lock); + } sampler.ChangeState(FairMQProtoSampler::STOP); sampler.ChangeState(FairMQProtoSampler::END); diff --git a/fairmq/run/runProtoSink.cxx b/fairmq/run/runProtoSink.cxx index 554bc383..dc694146 100644 --- a/fairmq/run/runProtoSink.cxx +++ b/fairmq/run/runProtoSink.cxx @@ -103,8 +103,12 @@ int main(int argc, char** argv) sink.ChangeState(FairMQProtoSink::SETINPUT); sink.ChangeState(FairMQProtoSink::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(sink.fRunningMutex); + while (!sink.fRunningFinished) + { + sink.fRunningCondition.wait(lock); + } sink.ChangeState(FairMQProtoSink::STOP); sink.ChangeState(FairMQProtoSink::END); diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index ab67e331..e3df0125 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -115,8 +115,12 @@ int main(int argc, char** argv) proxy.ChangeState(FairMQProxy::SETINPUT); proxy.ChangeState(FairMQProxy::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(proxy.fRunningMutex); + while (!proxy.fRunningFinished) + { + proxy.fRunningCondition.wait(lock); + } proxy.ChangeState(FairMQProxy::STOP); proxy.ChangeState(FairMQProxy::END); diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index fc463287..02c35ce9 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -103,8 +103,12 @@ int main(int argc, char** argv) sink.ChangeState(FairMQSink::SETINPUT); sink.ChangeState(FairMQSink::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(sink.fRunningMutex); + while (!sink.fRunningFinished) + { + sink.fRunningCondition.wait(lock); + } sink.ChangeState(FairMQSink::STOP); sink.ChangeState(FairMQSink::END); diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index e3f7ee7a..8babee06 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -124,8 +124,12 @@ int main(int argc, char** argv) splitter.ChangeState(FairMQSplitter::SETINPUT); splitter.ChangeState(FairMQSplitter::RUN); - char ch; - cin.get(ch); + // wait until the running thread has finished processing. + boost::unique_lock lock(splitter.fRunningMutex); + while (!splitter.fRunningFinished) + { + splitter.fRunningCondition.wait(lock); + } splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::END);