Use FairMQDevice::CatchSignals for Tutorial7

This commit is contained in:
Alexey Rybalchenko 2015-09-08 16:16:05 +02:00 committed by Mohammad Al-Turany
parent dc6fb4698c
commit 8b71e4d20b
11 changed files with 48 additions and 57 deletions

View File

@ -12,8 +12,6 @@
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#include <cstdlib> // quick_exit()
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQConfigurable.h" #include "FairMQConfigurable.h"
@ -26,7 +24,7 @@ FairMQConfigurable::FairMQConfigurable()
void FairMQConfigurable::SetProperty(const int key, const string& value) void FairMQConfigurable::SetProperty(const int key, const string& value)
{ {
LOG(ERROR) << "Reached end of the property list. SetProperty(" << key << ", " << value << ") has no effect."; LOG(ERROR) << "Reached end of the property list. SetProperty(" << key << ", " << value << ") has no effect.";
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
string FairMQConfigurable::GetProperty(const int key, const string& default_ /*= ""*/) string FairMQConfigurable::GetProperty(const int key, const string& default_ /*= ""*/)
@ -38,7 +36,7 @@ string FairMQConfigurable::GetProperty(const int key, const string& default_ /*=
void FairMQConfigurable::SetProperty(const int key, const int value) void FairMQConfigurable::SetProperty(const int key, const int value)
{ {
LOG(ERROR) << "Reached end of the property list. SetProperty(" << key << ", " << value << ") has no effect."; LOG(ERROR) << "Reached end of the property list. SetProperty(" << key << ", " << value << ") has no effect.";
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
int FairMQConfigurable::GetProperty(const int key, const int default_ /*= 0*/) int FairMQConfigurable::GetProperty(const int key, const int default_ /*= 0*/)

View File

@ -15,7 +15,7 @@
#include <list> #include <list>
#include <algorithm> // std::sort() #include <algorithm> // std::sort()
#include <csignal> // catching system signals #include <csignal> // catching system signals
#include <cstdlib> // quick_exit() #include <cstdlib>
#include <termios.h> // for the InteractiveStateLoop #include <termios.h> // for the InteractiveStateLoop
@ -57,15 +57,9 @@ void FairMQDevice::CatchSignals()
{ {
if (!fCatchingSignals) if (!fCatchingSignals)
{ {
// setup signal catching
sigHandler = bind1st(mem_fun(&FairMQDevice::SignalHandler), this); sigHandler = bind1st(mem_fun(&FairMQDevice::SignalHandler), this);
struct sigaction action; std::signal(SIGINT, CallSignalHandler);
action.sa_handler = CallSignalHandler; std::signal(SIGTERM, CallSignalHandler);
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
fCatchingSignals = true; fCatchingSignals = true;
} }
} }
@ -74,17 +68,19 @@ void FairMQDevice::SignalHandler(int signal)
{ {
LOG(INFO) << "Caught signal " << signal; LOG(INFO) << "Caught signal " << signal;
// fState = EXITING; fState = EXITING;
// Unblock(); Unblock();
// fStateThread.interrupt(); fStateThread.interrupt();
// fStateThread.join(); fStateThread.join();
// fTerminateStateThread = boost::thread(boost::bind(&FairMQDevice::Terminate, this)); fTerminateStateThread = boost::thread(boost::bind(&FairMQDevice::Terminate, this));
// Shutdown(); Shutdown();
// fTerminateStateThread.join(); fTerminateStateThread.join();
MQLOG(INFO) << "Exiting."; MQLOG(INFO) << "Exiting.";
quick_exit(EXIT_FAILURE); stop();
std::abort();
// exit(EXIT_FAILURE);
} }
void FairMQDevice::InitWrapper() void FairMQDevice::InitWrapper()
@ -148,7 +144,7 @@ void FairMQDevice::InitWrapper()
{ {
LOG(ERROR) << "could not initialize all channels after " << maxAttempts << " attempts"; LOG(ERROR) << "could not initialize all channels after " << maxAttempts << " attempts";
// TODO: goto ERROR state; // TODO: goto ERROR state;
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (numAttempts != 0) if (numAttempts != 0)
@ -334,7 +330,7 @@ void FairMQDevice::Run()
void FairMQDevice::Pause() void FairMQDevice::Pause()
{ {
while (CheckCurrentState(PAUSED)) while (true)
{ {
try try
{ {
@ -481,7 +477,7 @@ void FairMQDevice::LogSocketRates()
t0 = get_timestamp(); t0 = get_timestamp();
while (CheckCurrentState(RUNNING)) while (true)
{ {
try try
{ {

View File

@ -62,12 +62,12 @@ struct FairMQFSM_ : public msm::front::state_machine_def<FairMQFSM_>
{ {
public: public:
FairMQFSM_() FairMQFSM_()
: fState() : fStateThread()
, fStateThread()
, fTerminateStateThread() , fTerminateStateThread()
, fStateFinished(false) , fStateFinished(false)
, fStateCondition() , fStateCondition()
, fStateMutex() , fStateMutex()
, fState()
{} {}
// Destructor // Destructor
@ -87,7 +87,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def<FairMQFSM_>
} }
// The list of FSM states // The list of FSM states
struct NORMAL_FSM : public msm::front::state<> {}; struct OK_FSM : public msm::front::state<> {};
struct ERROR_FSM : public msm::front::terminate_state<> {}; struct ERROR_FSM : public msm::front::terminate_state<> {};
struct IDLE_FSM : public msm::front::state<> {}; struct IDLE_FSM : public msm::front::state<> {};
@ -102,7 +102,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def<FairMQFSM_>
struct EXITING_FSM : public msm::front::state<> {}; struct EXITING_FSM : public msm::front::state<> {};
// Define initial states // Define initial states
typedef mpl::vector<IDLE_FSM, NORMAL_FSM> initial_state; typedef mpl::vector<IDLE_FSM, OK_FSM> initial_state;
// Actions // Actions
struct IdleFct struct IdleFct
@ -314,7 +314,6 @@ struct FairMQFSM_ : public msm::front::state_machine_def<FairMQFSM_>
LOG(STATE) << "ERROR!"; LOG(STATE) << "ERROR!";
fsm.fState = ERROR; fsm.fState = ERROR;
// quick_exit(EXIT_FAILURE);
} }
}; };
@ -353,7 +352,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def<FairMQFSM_>
msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE, IDLE_FSM, IdleFct, msmf::none>, msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE, IDLE_FSM, IdleFct, msmf::none>,
msmf::Row<RUNNING_FSM, END, EXITING_FSM, ExitingRunFct, msmf::none>, msmf::Row<RUNNING_FSM, END, EXITING_FSM, ExitingRunFct, msmf::none>,
msmf::Row<IDLE_FSM, END, EXITING_FSM, ExitingFct, msmf::none>, msmf::Row<IDLE_FSM, END, EXITING_FSM, ExitingFct, msmf::none>,
msmf::Row<NORMAL_FSM, ERROR_FOUND, ERROR_FSM, ErrorFoundFct, msmf::none>> msmf::Row<OK_FSM, ERROR_FOUND, ERROR_FSM, ErrorFoundFct, msmf::none>>
{}; {};
// Replaces the default no-transition response. // Replaces the default no-transition response.
@ -366,7 +365,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def<FairMQFSM_>
// backward compatibility to FairMQStateMachine // backward compatibility to FairMQStateMachine
enum State enum State
{ {
NORMAL, OK,
ERROR, ERROR,
IDLE, IDLE,
INITIALIZING_DEVICE, INITIALIZING_DEVICE,
@ -384,8 +383,8 @@ struct FairMQFSM_ : public msm::front::state_machine_def<FairMQFSM_>
{ {
switch(state) switch(state)
{ {
case NORMAL: case OK:
return "NORMAL"; return "OK";
case ERROR: case ERROR:
return "ERROR"; return "ERROR";
case IDLE: case IDLE:

View File

@ -83,6 +83,11 @@ void base_GenericSampler<T,U,K,L>::Run()
{ {
fCurrentIdx--; fCurrentIdx--;
} }
if (!CheckCurrentState(RUNNING))
{
break;
}
} }
} }
while (CheckCurrentState(RUNNING) && fContinuous); while (CheckCurrentState(RUNNING) && fContinuous);
@ -128,7 +133,7 @@ void base_GenericSampler<T,U,K,L>::SetContinuous(bool flag)
template <typename T, typename U, typename K, typename L> template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::ResetEventCounter() void base_GenericSampler<T,U,K,L>::ResetEventCounter()
{ {
while (CheckCurrentState(RUNNING)) while (true)
{ {
try try
{ {

View File

@ -12,8 +12,8 @@
* @author A. Rybalchenko * @author A. Rybalchenko
*/ */
#ifndef FAIRMQEXAMPLESAMPLER_H_ #ifndef FAIRMQEXAMPLE4SAMPLER_H_
#define FAIRMQEXAMPLESAMPLER_H_ #define FAIRMQEXAMPLE4SAMPLER_H_
#include <string> #include <string>

View File

@ -13,7 +13,7 @@
*/ */
#ifndef FAIRMQEXAMPLE4SINK_H_ #ifndef FAIRMQEXAMPLE4SINK_H_
#define FAIRMQEXAMPLE1SINK_H_ #define FAIRMQEXAMPLE4SINK_H_
#include "FairMQDevice.h" #include "FairMQDevice.h"

View File

@ -12,8 +12,6 @@
* @author A. Rybalchenko * @author A. Rybalchenko
*/ */
#include <cstdlib> // quick_exit()
#include <nanomsg/nn.h> #include <nanomsg/nn.h>
#include <nanomsg/pipeline.h> #include <nanomsg/pipeline.h>
#include <nanomsg/pubsub.h> #include <nanomsg/pubsub.h>
@ -74,7 +72,7 @@ FairMQPollerNN::FairMQPollerNN(map<string, vector<FairMQChannel>>& channelsMap,
{ {
LOG(ERROR) << "At least one of the provided channel keys for poller initialization is invalid"; LOG(ERROR) << "At least one of the provided channel keys for poller initialization is invalid";
LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n';
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
@ -109,7 +107,7 @@ FairMQPollerNN::FairMQPollerNN(FairMQSocket& cmdSocket, FairMQSocket& dataSocket
else else
{ {
LOG(ERROR) << "invalid poller configuration, exiting."; LOG(ERROR) << "invalid poller configuration, exiting.";
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
@ -163,7 +161,7 @@ bool FairMQPollerNN::CheckInput(const string channelKey, const int index)
{ {
LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\"";
LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n';
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
@ -182,7 +180,7 @@ bool FairMQPollerNN::CheckOutput(const string channelKey, const int index)
{ {
LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\"";
LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n';
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }

View File

@ -12,7 +12,6 @@
* @author A. Rybalchenko * @author A. Rybalchenko
*/ */
#include <cstdlib> // quick_exit()
#include <sstream> #include <sstream>
#include "FairMQSocketNN.h" #include "FairMQSocketNN.h"
@ -46,7 +45,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, int
if (fSocket == -1) if (fSocket == -1)
{ {
LOG(ERROR) << "failed creating socket " << fId << ", reason: " << nn_strerror(errno); LOG(ERROR) << "failed creating socket " << fId << ", reason: " << nn_strerror(errno);
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
else else
@ -55,7 +54,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, int
if (fSocket == -1) if (fSocket == -1)
{ {
LOG(ERROR) << "failed creating socket " << fId << ", reason: " << nn_strerror(errno); LOG(ERROR) << "failed creating socket " << fId << ", reason: " << nn_strerror(errno);
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (type == "sub") if (type == "sub")
{ {

View File

@ -12,7 +12,6 @@
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#include <cstdlib> // quick_exit()
#include <sstream> #include <sstream>
#include "FairMQLogger.h" #include "FairMQLogger.h"
@ -25,7 +24,7 @@ FairMQContextZMQ::FairMQContextZMQ(int numIoThreads)
if (fContext == NULL) if (fContext == NULL)
{ {
LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno);
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)

View File

@ -12,8 +12,6 @@
* @author A. Rybalchenko * @author A. Rybalchenko
*/ */
#include <cstdlib> // quick_exit()
#include <zmq.h> #include <zmq.h>
#include "FairMQPollerZMQ.h" #include "FairMQPollerZMQ.h"
@ -74,7 +72,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(map<string, vector<FairMQChannel>>& channelsMap
{ {
LOG(ERROR) << "At least one of the provided channel keys for poller initialization is invalid"; LOG(ERROR) << "At least one of the provided channel keys for poller initialization is invalid";
LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n';
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
@ -113,7 +111,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(FairMQSocket& cmdSocket, FairMQSocket& dataSock
else else
{ {
LOG(ERROR) << "invalid poller configuration, exiting."; LOG(ERROR) << "invalid poller configuration, exiting.";
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
@ -167,7 +165,7 @@ bool FairMQPollerZMQ::CheckInput(const string channelKey, const int index)
{ {
LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\"";
LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n';
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
@ -186,7 +184,7 @@ bool FairMQPollerZMQ::CheckOutput(const string channelKey, const int index)
{ {
LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\"";
LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n';
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }

View File

@ -12,7 +12,6 @@
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#include <cstdlib> // quick_exit()
#include <sstream> #include <sstream>
#include "FairMQSocketZMQ.h" #include "FairMQSocketZMQ.h"
@ -44,7 +43,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, int num
if (fSocket == NULL) if (fSocket == NULL)
{ {
LOG(ERROR) << "failed creating socket " << fId << ", reason: " << zmq_strerror(errno); LOG(ERROR) << "failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()) != 0) if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()) != 0)
@ -100,7 +99,7 @@ void FairMQSocketZMQ::Connect(const string& address)
{ {
LOG(ERROR) << "failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); LOG(ERROR) << "failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
// error here means incorrect configuration. exit if it happens. // error here means incorrect configuration. exit if it happens.
quick_exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }