mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Fix throw after quit signal case
This commit is contained in:
committed by
Dennis Klein
parent
e39316c866
commit
cb199e7283
@@ -223,7 +223,7 @@ void FairMQDevice::InitWrapper()
|
|||||||
AttachChannels(uninitializedConnectingChannels);
|
AttachChannels(uninitializedConnectingChannels);
|
||||||
}
|
}
|
||||||
|
|
||||||
CallAndHandleError(std::bind(&FairMQDevice::Init, this));
|
Init();
|
||||||
|
|
||||||
ChangeState(internal_DEVICE_READY);
|
ChangeState(internal_DEVICE_READY);
|
||||||
}
|
}
|
||||||
@@ -429,7 +429,7 @@ void FairMQDevice::InitTaskWrapper()
|
|||||||
{
|
{
|
||||||
CallStateChangeCallbacks(INITIALIZING_TASK);
|
CallStateChangeCallbacks(INITIALIZING_TASK);
|
||||||
|
|
||||||
CallAndHandleError(std::bind(&FairMQDevice::InitTask, this));
|
InitTask();
|
||||||
|
|
||||||
ChangeState(internal_READY);
|
ChangeState(internal_READY);
|
||||||
}
|
}
|
||||||
@@ -504,46 +504,43 @@ void FairMQDevice::RunWrapper()
|
|||||||
t.second->Resume();
|
t.second->Resume();
|
||||||
}
|
}
|
||||||
|
|
||||||
CallAndHandleError([this]
|
try
|
||||||
{
|
{
|
||||||
try
|
PreRun();
|
||||||
|
|
||||||
|
// process either data callbacks or ConditionalRun/Run
|
||||||
|
if (fDataCallbacks)
|
||||||
{
|
{
|
||||||
PreRun();
|
// if only one input channel, do lightweight handling without additional polling.
|
||||||
|
if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1)
|
||||||
// process either data callbacks or ConditionalRun/Run
|
|
||||||
if (fDataCallbacks)
|
|
||||||
{
|
{
|
||||||
// if only one input channel, do lightweight handling without additional polling.
|
HandleSingleChannelInput();
|
||||||
if (fInputChannelKeys.size() == 1 && fChannels.at(fInputChannelKeys.at(0)).size() == 1)
|
|
||||||
{
|
|
||||||
HandleSingleChannelInput();
|
|
||||||
}
|
|
||||||
else // otherwise do full handling with polling
|
|
||||||
{
|
|
||||||
HandleMultipleChannelInput();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else // otherwise do full handling with polling
|
||||||
{
|
{
|
||||||
fair::mq::tools::RateLimiter rateLimiter(fRate);
|
HandleMultipleChannelInput();
|
||||||
|
|
||||||
while (CheckCurrentState(RUNNING) && ConditionalRun())
|
|
||||||
{
|
|
||||||
if (fRate > 0.001)
|
|
||||||
{
|
|
||||||
rateLimiter.maybe_sleep();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Run();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (const out_of_range& oor)
|
else
|
||||||
{
|
{
|
||||||
LOG(error) << "out of range: " << oor.what();
|
fair::mq::tools::RateLimiter rateLimiter(fRate);
|
||||||
LOG(error) << "incorrect/incomplete channel configuration?";
|
|
||||||
|
while (CheckCurrentState(RUNNING) && ConditionalRun())
|
||||||
|
{
|
||||||
|
if (fRate > 0.001)
|
||||||
|
{
|
||||||
|
rateLimiter.maybe_sleep();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Run();
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
|
catch (const out_of_range& oor)
|
||||||
|
{
|
||||||
|
LOG(error) << "out of range: " << oor.what();
|
||||||
|
LOG(error) << "incorrect/incomplete channel configuration?";
|
||||||
|
}
|
||||||
|
|
||||||
// if Run() exited and the state is still RUNNING, transition to READY.
|
// if Run() exited and the state is still RUNNING, transition to READY.
|
||||||
if (CheckCurrentState(RUNNING))
|
if (CheckCurrentState(RUNNING))
|
||||||
@@ -551,7 +548,7 @@ void FairMQDevice::RunWrapper()
|
|||||||
ChangeState(internal_READY);
|
ChangeState(internal_READY);
|
||||||
}
|
}
|
||||||
|
|
||||||
CallAndHandleError(std::bind(&FairMQDevice::PostRun, this));
|
PostRun();
|
||||||
|
|
||||||
rateLogger.get();
|
rateLogger.get();
|
||||||
}
|
}
|
||||||
@@ -774,7 +771,7 @@ void FairMQDevice::PauseWrapper()
|
|||||||
{
|
{
|
||||||
CallStateChangeCallbacks(PAUSED);
|
CallStateChangeCallbacks(PAUSED);
|
||||||
|
|
||||||
CallAndHandleError(std::bind(&FairMQDevice::Pause, this));
|
Pause();
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::Pause()
|
void FairMQDevice::Pause()
|
||||||
@@ -940,7 +937,7 @@ void FairMQDevice::ResetTaskWrapper()
|
|||||||
{
|
{
|
||||||
CallStateChangeCallbacks(RESETTING_TASK);
|
CallStateChangeCallbacks(RESETTING_TASK);
|
||||||
|
|
||||||
CallAndHandleError(std::bind(&FairMQDevice::ResetTask, this));
|
ResetTask();
|
||||||
|
|
||||||
ChangeState(internal_DEVICE_READY);
|
ChangeState(internal_DEVICE_READY);
|
||||||
}
|
}
|
||||||
@@ -953,7 +950,7 @@ void FairMQDevice::ResetWrapper()
|
|||||||
{
|
{
|
||||||
CallStateChangeCallbacks(RESETTING_DEVICE);
|
CallStateChangeCallbacks(RESETTING_DEVICE);
|
||||||
|
|
||||||
CallAndHandleError(std::bind(&FairMQDevice::Reset, this));
|
Reset();
|
||||||
|
|
||||||
ChangeState(internal_IDLE);
|
ChangeState(internal_IDLE);
|
||||||
}
|
}
|
||||||
@@ -977,17 +974,6 @@ const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const i
|
|||||||
return fChannels.at(channelName).at(index);
|
return fChannels.at(channelName).at(index);
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::CallAndHandleError(std::function<void()> callable)
|
|
||||||
try
|
|
||||||
{
|
|
||||||
callable();
|
|
||||||
}
|
|
||||||
catch(...)
|
|
||||||
{
|
|
||||||
ChangeState(ERROR_FOUND);
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
|
|
||||||
void FairMQDevice::Exit()
|
void FairMQDevice::Exit()
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
@@ -650,10 +650,21 @@ bool FairMQStateMachine::CheckCurrentState(const string& state) const
|
|||||||
}
|
}
|
||||||
|
|
||||||
void FairMQStateMachine::ProcessWork()
|
void FairMQStateMachine::ProcessWork()
|
||||||
|
try
|
||||||
{
|
{
|
||||||
static_pointer_cast<FairMQFSM>(fFsm)->ProcessWork();
|
static_pointer_cast<FairMQFSM>(fFsm)->ProcessWork();
|
||||||
|
} catch(...) {
|
||||||
|
{
|
||||||
|
lock_guard<mutex> lock(static_pointer_cast<FairMQFSM>(fFsm)->fWorkMutex);
|
||||||
|
static_pointer_cast<FairMQFSM>(fFsm)->fWorkActive = false;
|
||||||
|
static_pointer_cast<FairMQFSM>(fFsm)->fWorkAvailable = false;
|
||||||
|
static_pointer_cast<FairMQFSM>(fFsm)->fWorkDoneCondition.notify_one();
|
||||||
|
}
|
||||||
|
ChangeState(ERROR_FOUND);
|
||||||
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int FairMQStateMachine::GetEventNumber(const string& event)
|
int FairMQStateMachine::GetEventNumber(const string& event)
|
||||||
{
|
{
|
||||||
return eventNumbers.at(event);
|
return eventNumbers.at(event);
|
||||||
|
@@ -88,7 +88,6 @@ void FairMQBenchmarkSampler::Run()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (fMsgRate > 0)
|
if (fMsgRate > 0)
|
||||||
{
|
{
|
||||||
rateLimiter.maybe_sleep();
|
rateLimiter.maybe_sleep();
|
||||||
|
@@ -339,7 +339,7 @@ auto Control::RunShutdownSequence() -> void
|
|||||||
{
|
{
|
||||||
auto nextState = GetCurrentDeviceState();
|
auto nextState = GetCurrentDeviceState();
|
||||||
EmptyEventQueue();
|
EmptyEventQueue();
|
||||||
while (nextState != DeviceState::Exiting)
|
while (nextState != DeviceState::Exiting && nextState != DeviceState::Error)
|
||||||
{
|
{
|
||||||
switch (nextState)
|
switch (nextState)
|
||||||
{
|
{
|
||||||
@@ -359,7 +359,7 @@ auto Control::RunShutdownSequence() -> void
|
|||||||
ChangeDeviceState(DeviceStateTransition::Resume);
|
ChangeDeviceState(DeviceStateTransition::Resume);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
// ignore other states
|
LOG(debug) << "Controller ignoring event: " << nextState;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -45,7 +45,10 @@ void RunExceptionIn(const std::string& state)
|
|||||||
exit(result.exit_code);
|
exit(result.exit_code);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(Exceptions, InInit) { EXPECT_EXIT(RunExceptionIn("Init"), ::testing::ExitedWithCode(1), ""); }
|
TEST(Exceptions, InInit)
|
||||||
|
{
|
||||||
|
EXPECT_EXIT(RunExceptionIn("Init"), ::testing::ExitedWithCode(1), "");
|
||||||
|
}
|
||||||
TEST(Exceptions, InInitTask)
|
TEST(Exceptions, InInitTask)
|
||||||
{
|
{
|
||||||
EXPECT_EXIT(RunExceptionIn("InitTask"), ::testing::ExitedWithCode(1), "");
|
EXPECT_EXIT(RunExceptionIn("InitTask"), ::testing::ExitedWithCode(1), "");
|
||||||
@@ -54,7 +57,10 @@ TEST(Exceptions, InPreRun)
|
|||||||
{
|
{
|
||||||
EXPECT_EXIT(RunExceptionIn("PreRun"), ::testing::ExitedWithCode(1), "");
|
EXPECT_EXIT(RunExceptionIn("PreRun"), ::testing::ExitedWithCode(1), "");
|
||||||
}
|
}
|
||||||
TEST(Exceptions, InRun) { EXPECT_EXIT(RunExceptionIn("Run"), ::testing::ExitedWithCode(1), ""); }
|
TEST(Exceptions, InRun)
|
||||||
|
{
|
||||||
|
EXPECT_EXIT(RunExceptionIn("Run"), ::testing::ExitedWithCode(1), "");
|
||||||
|
}
|
||||||
TEST(Exceptions, InPostRun)
|
TEST(Exceptions, InPostRun)
|
||||||
{
|
{
|
||||||
EXPECT_EXIT(RunExceptionIn("PostRun"), ::testing::ExitedWithCode(1), "");
|
EXPECT_EXIT(RunExceptionIn("PostRun"), ::testing::ExitedWithCode(1), "");
|
||||||
|
Reference in New Issue
Block a user