Condition check before ending the main thread.

Make sure the main thread waits for the child thread with the Run() method.

* Add this at the end of your Run() method for each device:
```
boost::lock_guard<boost::mutex> lock(fRunningMutex);
fRunningFinished = true;
fRunningCondition.notify_one();
```
* Then you must replace the `char ch; cin.get(ch);` in your main() function with:
```
boost::unique_lock<boost::mutex> lock(processor.fRunningMutex);
while (!processor.fRunningFinished)
{
    processor.fRunningCondition.wait(lock);
}
```
This commit is contained in:
Alexey Rybalchenko 2014-08-26 15:35:50 +02:00
parent 7d7e1a1084
commit bd79420f93
18 changed files with 100 additions and 21 deletions

View File

@ -16,6 +16,7 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
FairMQStateMachine::FairMQStateMachine() FairMQStateMachine::FairMQStateMachine()
: fRunningFinished(false)
{ {
start(); start();
} }

View File

@ -16,13 +16,16 @@
#define FAIRMQSTATEMACHINE_H_ #define FAIRMQSTATEMACHINE_H_
#include <boost/thread.hpp> #include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/msm/back/state_machine.hpp> #include <boost/msm/back/state_machine.hpp>
#include <boost/msm/front/state_machine_def.hpp> #include <boost/msm/front/state_machine_def.hpp>
#include <boost/msm/front/functor_row.hpp> #include <boost/msm/front/functor_row.hpp>
#include <boost/msm/front/euml/common.hpp> #include <boost/msm/front/euml/common.hpp>
#include <boost/msm/front/euml/operator.hpp> #include <boost/msm/front/euml/operator.hpp>
#include <boost/function.hpp>
#include "FairMQLogger.h" #include "FairMQLogger.h"
@ -192,6 +195,11 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM
FairMQStateMachine(); FairMQStateMachine();
virtual ~FairMQStateMachine(); virtual ~FairMQStateMachine();
void ChangeState(int event); void ChangeState(int event);
// condition variable to notify parent thread about end of running state.
boost::condition_variable fRunningCondition;
boost::mutex fRunningMutex;
bool fRunningFinished;
}; };
#endif /* FAIRMQSTATEMACHINE_H_ */ #endif /* FAIRMQSTATEMACHINE_H_ */

View File

@ -76,6 +76,11 @@ void FairMQBenchmarkSampler::Run()
} }
FairMQDevice::Shutdown(); FairMQDevice::Shutdown();
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fRunningMutex);
fRunningFinished = true;
fRunningCondition.notify_one();
} }
void FairMQBenchmarkSampler::ResetEventCounter() void FairMQBenchmarkSampler::ResetEventCounter()

View File

@ -54,6 +54,11 @@ void FairMQBuffer::Run()
} }
FairMQDevice::Shutdown(); FairMQDevice::Shutdown();
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fRunningMutex);
fRunningFinished = true;
fRunningCondition.notify_one();
} }
FairMQBuffer::~FairMQBuffer() FairMQBuffer::~FairMQBuffer()

View File

@ -69,4 +69,9 @@ void FairMQMerger::Run()
} }
FairMQDevice::Shutdown(); FairMQDevice::Shutdown();
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fRunningMutex);
fRunningFinished = true;
fRunningCondition.notify_one();
} }

View File

@ -56,4 +56,9 @@ void FairMQProxy::Run()
} }
FairMQDevice::Shutdown(); FairMQDevice::Shutdown();
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fRunningMutex);
fRunningFinished = true;
fRunningCondition.notify_one();
} }

View File

@ -47,6 +47,11 @@ void FairMQSink::Run()
} }
FairMQDevice::Shutdown(); FairMQDevice::Shutdown();
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fRunningMutex);
fRunningFinished = true;
fRunningCondition.notify_one();
} }
FairMQSink::~FairMQSink() FairMQSink::~FairMQSink()

View File

@ -63,4 +63,9 @@ void FairMQSplitter::Run()
} }
FairMQDevice::Shutdown(); FairMQDevice::Shutdown();
// notify parent thread about end of processing.
boost::lock_guard<boost::mutex> lock(fRunningMutex);
fRunningFinished = true;
fRunningCondition.notify_one();
} }

View File

@ -113,8 +113,12 @@ int main(int argc, char** argv)
sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT); sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT);
sampler.ChangeState(FairMQBenchmarkSampler::RUN); sampler.ChangeState(FairMQBenchmarkSampler::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(sampler.fRunningMutex);
while (!sampler.fRunningFinished)
{
sampler.fRunningCondition.wait(lock);
}
sampler.ChangeState(FairMQBenchmarkSampler::STOP); sampler.ChangeState(FairMQBenchmarkSampler::STOP);
sampler.ChangeState(FairMQBenchmarkSampler::END); sampler.ChangeState(FairMQBenchmarkSampler::END);

View File

@ -113,8 +113,12 @@ int main(int argc, char** argv)
sampler.ChangeState(FairMQBinSampler::SETINPUT); sampler.ChangeState(FairMQBinSampler::SETINPUT);
sampler.ChangeState(FairMQBinSampler::RUN); sampler.ChangeState(FairMQBinSampler::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(sampler.fRunningMutex);
while (!sampler.fRunningFinished)
{
sampler.fRunningCondition.wait(lock);
}
sampler.ChangeState(FairMQBinSampler::STOP); sampler.ChangeState(FairMQBinSampler::STOP);
sampler.ChangeState(FairMQBinSampler::END); sampler.ChangeState(FairMQBinSampler::END);

View File

@ -103,8 +103,12 @@ int main(int argc, char** argv)
sink.ChangeState(FairMQBinSink::SETINPUT); sink.ChangeState(FairMQBinSink::SETINPUT);
sink.ChangeState(FairMQBinSink::RUN); sink.ChangeState(FairMQBinSink::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(sink.fRunningMutex);
while (!sink.fRunningFinished)
{
sink.fRunningCondition.wait(lock);
}
sink.ChangeState(FairMQBinSink::STOP); sink.ChangeState(FairMQBinSink::STOP);
sink.ChangeState(FairMQBinSink::END); sink.ChangeState(FairMQBinSink::END);

View File

@ -114,8 +114,12 @@ int main(int argc, char** argv)
buffer.ChangeState(FairMQBuffer::SETINPUT); buffer.ChangeState(FairMQBuffer::SETINPUT);
buffer.ChangeState(FairMQBuffer::RUN); buffer.ChangeState(FairMQBuffer::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(buffer.fRunningMutex);
while (!buffer.fRunningFinished)
{
buffer.fRunningCondition.wait(lock);
}
buffer.ChangeState(FairMQBuffer::STOP); buffer.ChangeState(FairMQBuffer::STOP);
buffer.ChangeState(FairMQBuffer::END); buffer.ChangeState(FairMQBuffer::END);

View File

@ -124,8 +124,12 @@ int main(int argc, char** argv)
merger.ChangeState(FairMQMerger::SETINPUT); merger.ChangeState(FairMQMerger::SETINPUT);
merger.ChangeState(FairMQMerger::RUN); merger.ChangeState(FairMQMerger::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(merger.fRunningMutex);
while (!merger.fRunningFinished)
{
merger.fRunningCondition.wait(lock);
}
merger.ChangeState(FairMQMerger::STOP); merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQMerger::END); merger.ChangeState(FairMQMerger::END);

View File

@ -113,8 +113,12 @@ int main(int argc, char** argv)
sampler.ChangeState(FairMQProtoSampler::SETINPUT); sampler.ChangeState(FairMQProtoSampler::SETINPUT);
sampler.ChangeState(FairMQProtoSampler::RUN); sampler.ChangeState(FairMQProtoSampler::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(sampler.fRunningMutex);
while (!sampler.fRunningFinished)
{
sampler.fRunningCondition.wait(lock);
}
sampler.ChangeState(FairMQProtoSampler::STOP); sampler.ChangeState(FairMQProtoSampler::STOP);
sampler.ChangeState(FairMQProtoSampler::END); sampler.ChangeState(FairMQProtoSampler::END);

View File

@ -103,8 +103,12 @@ int main(int argc, char** argv)
sink.ChangeState(FairMQProtoSink::SETINPUT); sink.ChangeState(FairMQProtoSink::SETINPUT);
sink.ChangeState(FairMQProtoSink::RUN); sink.ChangeState(FairMQProtoSink::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(sink.fRunningMutex);
while (!sink.fRunningFinished)
{
sink.fRunningCondition.wait(lock);
}
sink.ChangeState(FairMQProtoSink::STOP); sink.ChangeState(FairMQProtoSink::STOP);
sink.ChangeState(FairMQProtoSink::END); sink.ChangeState(FairMQProtoSink::END);

View File

@ -115,8 +115,12 @@ int main(int argc, char** argv)
proxy.ChangeState(FairMQProxy::SETINPUT); proxy.ChangeState(FairMQProxy::SETINPUT);
proxy.ChangeState(FairMQProxy::RUN); proxy.ChangeState(FairMQProxy::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(proxy.fRunningMutex);
while (!proxy.fRunningFinished)
{
proxy.fRunningCondition.wait(lock);
}
proxy.ChangeState(FairMQProxy::STOP); proxy.ChangeState(FairMQProxy::STOP);
proxy.ChangeState(FairMQProxy::END); proxy.ChangeState(FairMQProxy::END);

View File

@ -103,8 +103,12 @@ int main(int argc, char** argv)
sink.ChangeState(FairMQSink::SETINPUT); sink.ChangeState(FairMQSink::SETINPUT);
sink.ChangeState(FairMQSink::RUN); sink.ChangeState(FairMQSink::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(sink.fRunningMutex);
while (!sink.fRunningFinished)
{
sink.fRunningCondition.wait(lock);
}
sink.ChangeState(FairMQSink::STOP); sink.ChangeState(FairMQSink::STOP);
sink.ChangeState(FairMQSink::END); sink.ChangeState(FairMQSink::END);

View File

@ -124,8 +124,12 @@ int main(int argc, char** argv)
splitter.ChangeState(FairMQSplitter::SETINPUT); splitter.ChangeState(FairMQSplitter::SETINPUT);
splitter.ChangeState(FairMQSplitter::RUN); splitter.ChangeState(FairMQSplitter::RUN);
char ch; // wait until the running thread has finished processing.
cin.get(ch); boost::unique_lock<boost::mutex> lock(splitter.fRunningMutex);
while (!splitter.fRunningFinished)
{
splitter.fRunningCondition.wait(lock);
}
splitter.ChangeState(FairMQSplitter::STOP); splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQSplitter::END); splitter.ChangeState(FairMQSplitter::END);