diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index ff315af7..8b8da60d 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -29,23 +29,24 @@ Set(INCLUDE_DIRECTORIES ${CMAKE_CURRENT_BINARY_DIR} ) -if(DDS_PATH) +if(DDS_FOUND) + add_definitions(-DENABLE_DDS) Set(INCLUDE_DIRECTORIES ${INCLUDE_DIRECTORIES} ${CMAKE_SOURCE_DIR}/fairmq/examples/3-dds ) -endif(DDS_PATH) +endif(DDS_FOUND) Set(SYSTEM_INCLUDE_DIRECTORIES ${Boost_INCLUDE_DIR} ) -If(DDS_PATH) +If(DDS_FOUND) Set(SYSTEM_INCLUDE_DIRECTORIES ${SYSTEM_INCLUDE_DIRECTORIES} - ${DDS_PATH}/include + ${DDS_INCLUDE_DIR} ) -EndIf(DDS_PATH) +EndIf(DDS_FOUND) If(PROTOBUF_FOUND) Set(INCLUDE_DIRECTORIES @@ -86,12 +87,12 @@ Set(LINK_DIRECTORIES ${Boost_LIBRARY_DIRS} ) -if(DDS_PATH) +if(DDS_FOUND) set(LINK_DIRECTORIES ${LINK_DIRECTORIES} - ${DDS_PATH}/lib + ${DDS_LIBRARY_DIR} ) -endif(DDS_PATH) +endif(DDS_FOUND) Link_Directories(${LINK_DIRECTORIES}) @@ -131,7 +132,7 @@ set(SRCS "examples/5-req-rep/FairMQExample5Server.cxx" ) -if(DDS_PATH) +if(DDS_FOUND) set(SRCS ${SRCS} "examples/3-dds/FairMQExample3Sampler.cxx" @@ -142,7 +143,7 @@ if(DDS_PATH) ${DEPENDENCIES} dds-key-value-lib ) -endif(DDS_PATH) +endif(DDS_FOUND) if(PROTOBUF_FOUND) # following source files are only for protobuf tests and are not essential part of FairMQ @@ -242,14 +243,14 @@ set(Exe_Names ex5-server ) -if(DDS_PATH) +if(DDS_FOUND) set(Exe_Names ${Exe_Names} ex3-sampler-dds ex3-processor-dds ex3-sink-dds ) -endif(DDS_PATH) +endif(DDS_FOUND) # following executables are only for protobuf tests and are not essential part of FairMQ # if(PROTOBUF_FOUND) @@ -280,14 +281,14 @@ set(Exe_Source examples/5-req-rep/runExample5Server.cxx ) -if(DDS_PATH) +if(DDS_FOUND) set(Exe_Source ${Exe_Source} examples/3-dds/runExample3Sampler.cxx examples/3-dds/runExample3Processor.cxx examples/3-dds/runExample3Sink.cxx ) -endif(DDS_PATH) +endif(DDS_FOUND) # following source files are only for protobuf tests and are not essential part of FairMQ # if(PROTOBUF_FOUND) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index bfd4fac1..ed3ac343 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -21,7 +21,7 @@ using namespace std; -boost::mutex FairMQChannel::channelMutex; +boost::mutex FairMQChannel::fChannelMutex; FairMQChannel::FairMQChannel() : fType("unspecified") @@ -63,7 +63,7 @@ std::string FairMQChannel::GetType() const { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); return fType; } catch (boost::exception& e) @@ -76,7 +76,7 @@ std::string FairMQChannel::GetMethod() const { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); return fMethod; } catch (boost::exception& e) @@ -89,7 +89,7 @@ std::string FairMQChannel::GetAddress() const { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); return fAddress; } catch (boost::exception& e) @@ -102,7 +102,7 @@ int FairMQChannel::GetSndBufSize() const { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); return fSndBufSize; } catch (boost::exception& e) @@ -115,7 +115,7 @@ int FairMQChannel::GetRcvBufSize() const { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); return fRcvBufSize; } catch (boost::exception& e) @@ -128,7 +128,7 @@ int FairMQChannel::GetRateLogging() const { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); return fRateLogging; } catch (boost::exception& e) @@ -141,7 +141,7 @@ void FairMQChannel::UpdateType(const std::string& type) { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); fIsValid = false; fType = type; } @@ -155,7 +155,7 @@ void FairMQChannel::UpdateMethod(const std::string& method) { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); fIsValid = false; fMethod = method; } @@ -169,7 +169,7 @@ void FairMQChannel::UpdateAddress(const std::string& address) { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); fIsValid = false; fAddress = address; } @@ -183,7 +183,7 @@ void FairMQChannel::UpdateSndBufSize(const int sndBufSize) { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); fIsValid = false; fSndBufSize = sndBufSize; } @@ -197,7 +197,7 @@ void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); fIsValid = false; fRcvBufSize = rcvBufSize; } @@ -211,7 +211,7 @@ void FairMQChannel::UpdateRateLogging(const int rateLogging) { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); fIsValid = false; fRateLogging = rateLogging; } @@ -225,7 +225,7 @@ bool FairMQChannel::IsValid() const { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); return fIsValid; } catch (boost::exception& e) @@ -238,7 +238,7 @@ bool FairMQChannel::ValidateChannel() { try { - boost::unique_lock scoped_lock(channelMutex); + boost::unique_lock scoped_lock(fChannelMutex); stringstream ss; ss << "Validating channel \"" << fChannelName << "\"... "; @@ -358,7 +358,7 @@ bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory) fNoBlockFlag = fCmdSocket->NOBLOCK; fSndMoreFlag = fCmdSocket->SNDMORE; - fPoller = fTransportFactory->CreatePoller(*fSocket, *fCmdSocket); + fPoller = fTransportFactory->CreatePoller(*fCmdSocket, *fSocket); return true; } @@ -380,7 +380,7 @@ int FairMQChannel::Send(const unique_ptr& msg) const if (fPoller->CheckInput(0)) { - HandleCommand(); + HandleUnblock(); return -1; } @@ -408,7 +408,7 @@ int FairMQChannel::Receive(const unique_ptr& msg) const if (fPoller->CheckInput(0)) { - HandleCommand(); + HandleUnblock(); return -1; } @@ -433,7 +433,7 @@ int FairMQChannel::Send(FairMQMessage* msg, const string& flag) const if (fPoller->CheckInput(0)) { - HandleCommand(); + HandleUnblock(); return -1; } @@ -458,7 +458,7 @@ int FairMQChannel::Send(FairMQMessage* msg, const int flags) const if (fPoller->CheckInput(0)) { - HandleCommand(); + HandleUnblock(); return -1; } @@ -483,7 +483,7 @@ int FairMQChannel::Receive(FairMQMessage* msg, const string& flag) const if (fPoller->CheckInput(0)) { - HandleCommand(); + HandleUnblock(); return -1; } @@ -508,7 +508,7 @@ int FairMQChannel::Receive(FairMQMessage* msg, const int flags) const if (fPoller->CheckInput(0)) { - HandleCommand(); + HandleUnblock(); return -1; } @@ -548,7 +548,7 @@ bool FairMQChannel::ExpectsAnotherPart() const } } -inline bool FairMQChannel::HandleCommand() const +inline bool FairMQChannel::HandleUnblock() const { FairMQMessage* cmd = fTransportFactory->CreateMessage(); fCmdSocket->Receive(cmd, 0); diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 0669addd..cc1ac47d 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -96,13 +96,13 @@ class FairMQChannel int fNoBlockFlag; int fSndMoreFlag; - bool HandleCommand() const; + bool HandleUnblock() const; // use static mutex to make the class easily copyable // implication: same mutex is used for all instances of the class // this does not hurt much, because mutex is used only during initialization with very low contention // possible TODO: improve this - static boost::mutex channelMutex; + static boost::mutex fChannelMutex; }; #endif /* FAIRMQCHANNEL_H_ */ diff --git a/fairmq/FairMQConfigurable.cxx b/fairmq/FairMQConfigurable.cxx index 9178142e..49b0730b 100644 --- a/fairmq/FairMQConfigurable.cxx +++ b/fairmq/FairMQConfigurable.cxx @@ -12,6 +12,8 @@ * @author D. Klein, A. Rybalchenko */ +#include // quick_exit() + #include "FairMQLogger.h" #include "FairMQConfigurable.h" @@ -24,7 +26,7 @@ FairMQConfigurable::FairMQConfigurable() void FairMQConfigurable::SetProperty(const int key, const string& value) { LOG(ERROR) << "Reached end of the property list. SetProperty(" << key << ", " << value << ") has no effect."; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } string FairMQConfigurable::GetProperty(const int key, const string& default_ /*= ""*/) @@ -36,7 +38,7 @@ string FairMQConfigurable::GetProperty(const int key, const string& default_ /*= void FairMQConfigurable::SetProperty(const int key, const int value) { LOG(ERROR) << "Reached end of the property list. SetProperty(" << key << ", " << value << ") has no effect."; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } int FairMQConfigurable::GetProperty(const int key, const int default_ /*= 0*/) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 08ed5e1d..058d02a6 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -13,8 +13,9 @@ */ #include -#include // for std::sort() -#include // for catching system signals +#include // std::sort() +#include // catching system signals +#include // quick_exit() #include // for the InteractiveStateLoop @@ -57,7 +58,7 @@ void FairMQDevice::CatchSignals() if (!fCatchingSignals) { // setup signal catching - sigHandler = std::bind1st(std::mem_fun(&FairMQDevice::SignalHandler), this); + sigHandler = bind1st(mem_fun(&FairMQDevice::SignalHandler), this); struct sigaction action; action.sa_handler = CallSignalHandler; action.sa_flags = 0; @@ -73,15 +74,17 @@ void FairMQDevice::SignalHandler(int signal) { LOG(INFO) << "Caught signal " << signal; - ChangeState(STOP); + // fState = EXITING; + // Unblock(); + // fStateThread.interrupt(); + // fStateThread.join(); - ChangeState(RESET_TASK); - WaitForEndOfState(RESET_TASK); + // fTerminateStateThread = boost::thread(boost::bind(&FairMQDevice::Terminate, this)); + // Shutdown(); + // fTerminateStateThread.join(); - ChangeState(RESET_DEVICE); - WaitForEndOfState(RESET_DEVICE); - - ChangeState(END); + MQLOG(INFO) << "Exiting."; + quick_exit(EXIT_FAILURE); } void FairMQDevice::InitWrapper() @@ -145,7 +148,7 @@ void FairMQDevice::InitWrapper() { LOG(ERROR) << "could not initialize all channels after " << maxAttempts << " attempts"; // TODO: goto ERROR state; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } if (numAttempts != 0) @@ -418,11 +421,6 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/) } } -void FairMQDevice::SetTransport(unique_ptr& factory) -{ - fTransportFactory = factory.get(); -} - void FairMQDevice::SetTransport(FairMQTransportFactory* factory) { fTransportFactory = factory; @@ -579,6 +577,10 @@ void FairMQDevice::InteractiveStateLoop() LOG(INFO) << "[h] help"; PrintInteractiveStateLoopHelp(); break; + case 'x': + LOG(INFO) << "[x] ERROR"; + ChangeState("ERROR_FOUND"); + break; case 'q': LOG(INFO) << "[q] end"; ChangeState("END"); @@ -605,11 +607,11 @@ inline void FairMQDevice::PrintInteractiveStateLoopHelp() LOG(INFO) << "[h] help, [p] pause, [r] run, [s] stop, [t] reset task, [d] reset device, [q] end, [j] init task, [i] init device"; } -void FairMQDevice::SendCommand(const string& command) +void FairMQDevice::Unblock() { - FairMQMessage* cmd = fTransportFactory->CreateMessage(command.size()); - memcpy(cmd->GetData(), command.c_str(), command.size()); + FairMQMessage* cmd = fTransportFactory->CreateMessage(); fCmdSocket->Send(cmd, 0); + delete cmd; } void FairMQDevice::ResetTaskWrapper() diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index bdac4c88..9384ef63 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -61,7 +61,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable virtual int GetProperty(const int key, const int default_ = 0); virtual void SetTransport(FairMQTransportFactory* factory); - virtual void SetTransport(std::unique_ptr& factory); static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); @@ -105,7 +104,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable void Shutdown(); void Terminate(); - void SendCommand(const std::string& command); + void Unblock(); bool InitChannel(FairMQChannel&); diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index 972e5a07..4787e7e5 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -12,7 +12,6 @@ * @author D. Klein, A. Rybalchenko */ - #include #include // for WaitForEndOfStateForMs() @@ -44,8 +43,9 @@ int FairMQStateMachine::GetEventNumber(std::string event) if (event == "RESET_DEVICE") return RESET_DEVICE; if (event == "RESET_TASK") return RESET_TASK; if (event == "END") return END; + if (event == "ERROR_FOUND") return ERROR_FOUND; LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END"; + << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND"; return -1; } @@ -88,9 +88,12 @@ bool FairMQStateMachine::ChangeState(int event) case END: process_event(FairMQFSM::END()); return true; + case ERROR_FOUND: + process_event(FairMQFSM::ERROR_FOUND()); + return true; default: - LOG(ERROR) << "Requested unsupported state: " << event << std::endl - << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END"; + LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl + << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; return false; } } diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 56c83525..b6e1c69e 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -18,13 +18,19 @@ #define FAIRMQ_INTERFACE_VERSION 2 #include +#include #include #include #include +#include #include #include +// Increase maximum number of boost::msm states (default is 10) +// This #define has to be before any msm header includes +#define FUSION_MAX_VECTOR_SIZE 20 + #include #include #include @@ -37,7 +43,6 @@ namespace msmf = boost::msm::front; namespace FairMQFSM { - // defining events for the boost MSM state machine struct INIT_DEVICE { std::string name() const { return "INIT_DEVICE"; } }; struct internal_DEVICE_READY { std::string name() const { return "internal_DEVICE_READY"; } }; @@ -50,6 +55,7 @@ struct RESET_TASK { std::string name() const { return "RESET_TASK"; } struct RESET_DEVICE { std::string name() const { return "RESET_DEVICE"; } }; struct internal_IDLE { std::string name() const { return "internal_IDLE"; } }; struct END { std::string name() const { return "END"; } }; +struct ERROR_FOUND { std::string name() const { return "ERROR_FOUND"; } }; // defining the boost MSM state machine struct FairMQFSM_ : public msm::front::state_machine_def @@ -81,6 +87,9 @@ struct FairMQFSM_ : public msm::front::state_machine_def } // The list of FSM states + struct NORMAL_FSM : public msm::front::state<> {}; + struct ERROR_FSM : public msm::front::terminate_state<> {}; + struct IDLE_FSM : public msm::front::state<> {}; struct INITIALIZING_DEVICE_FSM : public msm::front::state<> {}; struct DEVICE_READY_FSM : public msm::front::state<> {}; @@ -92,8 +101,8 @@ struct FairMQFSM_ : public msm::front::state_machine_def struct RESETTING_DEVICE_FSM : public msm::front::state<> {}; struct EXITING_FSM : public msm::front::state<> {}; - // Define initial state - typedef IDLE_FSM initial_state; + // Define initial states + typedef mpl::vector initial_state; // Actions struct IdleFct @@ -101,8 +110,9 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { - fsm.fState = IDLE; LOG(STATE) << "Entering IDLE state"; + + fsm.fState = IDLE; } }; @@ -111,9 +121,12 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { - fsm.fStateFinished = false; LOG(STATE) << "Entering INITIALIZING DEVICE state"; + + fsm.fStateFinished = false; + fsm.fState = INITIALIZING_DEVICE; + fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::InitWrapper, &fsm)); } }; @@ -124,6 +137,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering DEVICE READY state"; + fsm.fState = DEVICE_READY; } }; @@ -133,9 +147,12 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { - fsm.fStateFinished = false; LOG(STATE) << "Entering INITIALIZING TASK state"; + + fsm.fStateFinished = false; + fsm.fState = INITIALIZING_TASK; + fsm.InitTaskWrapper(); // fsm.fInitializingTaskThread = boost::thread(boost::bind(&FairMQFSM_::InitTaskWrapper, &fsm)); } @@ -147,6 +164,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering READY state"; + fsm.fState = READY; } }; @@ -156,9 +174,12 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { - fsm.fStateFinished = false; LOG(STATE) << "Entering RUNNING state"; + + fsm.fStateFinished = false; + fsm.fState = RUNNING; + fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::RunWrapper, &fsm)); } }; @@ -168,11 +189,14 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { - fsm.fStateFinished = false; fsm.fState = PAUSED; - fsm.SendCommand("pause"); + + fsm.fStateFinished = false; + fsm.Unblock(); fsm.fStateThread.join(); + LOG(STATE) << "Entering PAUSED state"; + fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::Pause, &fsm)); } }; @@ -183,10 +207,13 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { fsm.fState = RUNNING; + fsm.fStateThread.interrupt(); fsm.fStateThread.join(); fsm.fStateFinished = false; + LOG(STATE) << "Entering RUNNING state"; + fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::RunWrapper, &fsm)); } }; @@ -197,8 +224,10 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Received STOP event"; - fsm.fState = IDLE; - fsm.SendCommand("stop"); + + fsm.fState = READY; + + fsm.Unblock(); fsm.fStateThread.join(); } }; @@ -208,8 +237,9 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { - LOG(STATE) << "RUNNING state finished without an external event"; - fsm.fState = IDLE; + LOG(STATE) << "RUNNING state finished without an external event, going to IDLE"; + + fsm.fState = READY; } }; @@ -218,9 +248,12 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { - fsm.fStateFinished = false; LOG(STATE) << "Entering RESETTING TASK state"; + + fsm.fStateFinished = false; + fsm.fState = RESETTING_TASK; + fsm.fStateThread = boost::thread(boost::bind(&FairMQFSM_::ResetTaskWrapper, &fsm)); } }; @@ -230,9 +263,12 @@ struct FairMQFSM_ : public msm::front::state_machine_def template void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { - fsm.fStateFinished = false; LOG(STATE) << "Entering RESETTING DEVICE state"; + + fsm.fStateFinished = false; + fsm.fState = RESETTING_DEVICE; + fsm.ResetWrapper(); } }; @@ -243,6 +279,7 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Received END event"; + fsm.fState = EXITING; fsm.fTerminateStateThread = boost::thread(boost::bind(&FairMQFSM_::Terminate, &fsm)); @@ -257,9 +294,10 @@ struct FairMQFSM_ : public msm::front::state_machine_def void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { LOG(STATE) << "Entering EXITING state"; + fsm.fState = EXITING; - fsm.SendCommand("stop"); - fsm.fStateThread.interrupt(); + + fsm.Unblock(); fsm.fStateThread.join(); fsm.fTerminateStateThread = boost::thread(boost::bind(&FairMQFSM_::Terminate, &fsm)); @@ -268,6 +306,18 @@ struct FairMQFSM_ : public msm::front::state_machine_def } }; + struct ErrorFoundFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + LOG(STATE) << "ERROR!"; + + fsm.fState = ERROR; + // quick_exit(EXIT_FAILURE); + } + }; + // actions to be overwritten by derived classes virtual void InitWrapper() {} virtual void Init() {} @@ -282,12 +332,12 @@ struct FairMQFSM_ : public msm::front::state_machine_def virtual void ResetTask() {} virtual void Shutdown() {} virtual void Terminate() {} // Termination method called during StopFct action. - virtual void SendCommand(const std::string& command) {} // Method to send commands. + virtual void Unblock() {} // Method to send commands. // Transition table for FairMQFMS struct transition_table : mpl::vector< - // Start Event Next Action Guard - // +-------------------------+----------------------+------------------------+---------------+---------+ + // Start Event Next Action Guard + // +------------------------+----------------------+------------------------+----------------+---------+ msmf::Row, msmf::Row, msmf::Row, @@ -301,8 +351,9 @@ struct FairMQFSM_ : public msm::front::state_machine_def msmf::Row, msmf::Row, msmf::Row, - msmf::Row, // temporary - msmf::Row > + msmf::Row, + msmf::Row, + msmf::Row> {}; // Replaces the default no-transition response. @@ -312,13 +363,11 @@ struct FairMQFSM_ : public msm::front::state_machine_def LOG(STATE) << "no transition from state " << GetStateName(state) << " on event " << e.name(); } - // this is to run certain functions in a separate thread - boost::thread fStateThread; - boost::thread fTerminateStateThread; - // backward compatibility to FairMQStateMachine enum State { + NORMAL, + ERROR, IDLE, INITIALIZING_DEVICE, DEVICE_READY, @@ -335,6 +384,10 @@ struct FairMQFSM_ : public msm::front::state_machine_def { switch(state) { + case NORMAL: + return "NORMAL"; + case ERROR: + return "ERROR"; case IDLE: return "IDLE"; case INITIALIZING_DEVICE: @@ -394,18 +447,23 @@ struct FairMQFSM_ : public msm::front::state_machine_def } } + // this is to run certain functions in a separate thread + boost::thread fStateThread; + boost::thread fTerminateStateThread; + // condition variable to notify parent thread about end of state. bool fStateFinished; boost::condition_variable fStateCondition; boost::mutex fStateMutex; - private: - State fState; + protected: + std::atomic fState; }; typedef msm::back::state_machine FairMQFSM; } // namespace FairMQFSM + class FairMQStateMachine : public FairMQFSM::FairMQFSM { public: @@ -421,8 +479,10 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM RESET_TASK, RESET_DEVICE, internal_IDLE, - END + END, + ERROR_FOUND }; + FairMQStateMachine(); virtual ~FairMQStateMachine(); diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 68ee9cb8..ca127183 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -69,7 +69,7 @@ void FairMQBenchmarkSampler::Run() void FairMQBenchmarkSampler::ResetEventCounter() { - while (CheckCurrentState(RUNNING)) + while (true) { try { diff --git a/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.cxx b/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.cxx index 828178d4..72cab457 100644 --- a/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.cxx +++ b/fairmq/examples/1-sampler-sink/FairMQExample1Sampler.cxx @@ -12,12 +12,16 @@ * @author A. Rybalchenko */ +#include // unique_ptr + #include #include #include "FairMQExample1Sampler.h" #include "FairMQLogger.h" +using namespace std; + FairMQExample1Sampler::FairMQExample1Sampler() : fText() { @@ -25,18 +29,18 @@ FairMQExample1Sampler::FairMQExample1Sampler() void FairMQExample1Sampler::CustomCleanup(void *data, void *object) { - delete (std::string*)object; + delete (string*)object; } void FairMQExample1Sampler::Run() { - while (GetCurrentState() == RUNNING) + while (CheckCurrentState(RUNNING)) { boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - std::string* text = new std::string(fText); + string* text = new string(fText); - FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text); + unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); LOG(INFO) << "Sending \"" << fText << "\""; @@ -48,7 +52,7 @@ FairMQExample1Sampler::~FairMQExample1Sampler() { } -void FairMQExample1Sampler::SetProperty(const int key, const std::string& value) +void FairMQExample1Sampler::SetProperty(const int key, const string& value) { switch (key) { @@ -61,7 +65,7 @@ void FairMQExample1Sampler::SetProperty(const int key, const std::string& value) } } -std::string FairMQExample1Sampler::GetProperty(const int key, const std::string& default_ /*= ""*/) +string FairMQExample1Sampler::GetProperty(const int key, const string& default_ /*= ""*/) { switch (key) { diff --git a/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx b/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx index 94aced6c..e4547950 100644 --- a/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx +++ b/fairmq/examples/1-sampler-sink/FairMQExample1Sink.cxx @@ -28,15 +28,13 @@ void FairMQExample1Sink::Run() { while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + unique_ptr msg(fTransportFactory->CreateMessage()); if (fChannels.at("data-in").at(0).Receive(msg) > 0) { LOG(INFO) << "Received message: \"" << string(static_cast(msg->GetData()), msg->GetSize()) << "\""; - - delete msg; } } } diff --git a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Processor.cxx b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Processor.cxx index f8b1535a..d35f68e2 100644 --- a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Processor.cxx +++ b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Processor.cxx @@ -18,6 +18,8 @@ #include "FairMQExample2Processor.h" #include "FairMQLogger.h" +using namespace std; + FairMQExample2Processor::FairMQExample2Processor() : fText() { @@ -25,25 +27,33 @@ FairMQExample2Processor::FairMQExample2Processor() void FairMQExample2Processor::CustomCleanup(void *data, void *object) { - delete (std::string*)object; + delete (string*)object; } void FairMQExample2Processor::Run() { + // Check if we are still in the RUNNING state while (CheckCurrentState(RUNNING)) { - FairMQMessage* input = fTransportFactory->CreateMessage(); - fChannels.at("data-in").at(0).Receive(input); + // Create empty message to hold the input + unique_ptr input(fTransportFactory->CreateMessage()); - LOG(INFO) << "Received data, processing..."; + // Receive the message (blocks until received or interrupted (e.g. by state change)). + // Returns size of the received message or -1 if interrupted. + if (fChannels.at("data-in").at(0).Receive(input) > 0) + { + LOG(INFO) << "Received data, processing..."; - std::string* text = new std::string(static_cast(input->GetData()), input->GetSize()); - *text += " (modified by " + fId + ")"; + // Modify the received string + string* text = new string(static_cast(input->GetData()), input->GetSize()); + *text += " (modified by " + fId + ")"; - delete input; + // Create output message + unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text); - fChannels.at("data-out").at(0).Send(msg); + // Send out the output message + fChannels.at("data-out").at(0).Send(msg); + } } } diff --git a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx index 77af084d..40f90be6 100644 --- a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx +++ b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sampler.cxx @@ -18,6 +18,8 @@ #include "FairMQExample2Sampler.h" #include "FairMQLogger.h" +using namespace std; + FairMQExample2Sampler::FairMQExample2Sampler() : fText() { @@ -25,18 +27,19 @@ FairMQExample2Sampler::FairMQExample2Sampler() void FairMQExample2Sampler::CustomCleanup(void *data, void *object) { - delete (std::string*)object; + delete (string*)object; } void FairMQExample2Sampler::Run() { + // Check if we are still in the RUNNING state while (CheckCurrentState(RUNNING)) { boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - std::string* text = new std::string(fText); + string* text = new string(fText); - FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text); + unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); LOG(INFO) << "Sending \"" << fText << "\""; @@ -48,7 +51,7 @@ FairMQExample2Sampler::~FairMQExample2Sampler() { } -void FairMQExample2Sampler::SetProperty(const int key, const std::string& value) +void FairMQExample2Sampler::SetProperty(const int key, const string& value) { switch (key) { @@ -61,7 +64,7 @@ void FairMQExample2Sampler::SetProperty(const int key, const std::string& value) } } -std::string FairMQExample2Sampler::GetProperty(const int key, const std::string& default_ /*= ""*/) +string FairMQExample2Sampler::GetProperty(const int key, const string& default_ /*= ""*/) { switch (key) { diff --git a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sink.cxx b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sink.cxx index 9728aeb8..ec4728d4 100644 --- a/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sink.cxx +++ b/fairmq/examples/2-sampler-processor-sink/FairMQExample2Sink.cxx @@ -28,15 +28,14 @@ void FairMQExample2Sink::Run() { while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + unique_ptr msg(fTransportFactory->CreateMessage()); - fChannels.at("data-in").at(0).Receive(msg); - - LOG(INFO) << "Received message: \"" - << string(static_cast(msg->GetData()), msg->GetSize()) - << "\""; - - delete msg; + if (fChannels.at("data-in").at(0).Receive(msg) > 0) + { + LOG(INFO) << "Received message: \"" + << string(static_cast(msg->GetData()), msg->GetSize()) + << "\""; + } } } diff --git a/fairmq/examples/3-dds/FairMQExample3Processor.cxx b/fairmq/examples/3-dds/FairMQExample3Processor.cxx index 3a6e9964..6fb6ed25 100644 --- a/fairmq/examples/3-dds/FairMQExample3Processor.cxx +++ b/fairmq/examples/3-dds/FairMQExample3Processor.cxx @@ -18,6 +18,8 @@ #include "FairMQExample3Processor.h" #include "FairMQLogger.h" +using namespace std; + FairMQExample3Processor::FairMQExample3Processor() : fTaskIndex(0) { @@ -25,29 +27,37 @@ FairMQExample3Processor::FairMQExample3Processor() void FairMQExample3Processor::CustomCleanup(void *data, void *object) { - delete (std::string*)object; + delete (string*)object; } void FairMQExample3Processor::Run() { + // Check if we are still in the RUNNING state while (CheckCurrentState(RUNNING)) { - FairMQMessage* input = fTransportFactory->CreateMessage(); - fChannels.at("data-in").at(0).Receive(input); + // Create empty message to hold the input + unique_ptr input(fTransportFactory->CreateMessage()); - LOG(INFO) << "Received data, processing..."; + // Receive the message (blocks until received or interrupted (e.g. by state change)). + // Returns size of the received message or -1 if interrupted. + if (fChannels.at("data-in").at(0).Receive(input) > 0) + { + LOG(INFO) << "Received data, processing..."; - std::string* text = new std::string(static_cast(input->GetData()), input->GetSize()); - *text += " (modified by " + fId + std::to_string(fTaskIndex) + ")"; + // Modify the received string + string* text = new string(static_cast(input->GetData()), input->GetSize()); + *text += " (modified by " + fId + to_string(fTaskIndex) + ")"; - delete input; + // Create output message + unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); - FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text); - fChannels.at("data-out").at(0).Send(msg); + // Send out the output message + fChannels.at("data-out").at(0).Send(msg); + } } } -void FairMQExample3Processor::SetProperty(const int key, const std::string& value) +void FairMQExample3Processor::SetProperty(const int key, const string& value) { switch (key) { @@ -57,7 +67,7 @@ void FairMQExample3Processor::SetProperty(const int key, const std::string& valu } } -std::string FairMQExample3Processor::GetProperty(const int key, const std::string& default_ /*= ""*/) +string FairMQExample3Processor::GetProperty(const int key, const string& default_ /*= ""*/) { switch (key) { diff --git a/fairmq/examples/3-dds/FairMQExample3Sampler.cxx b/fairmq/examples/3-dds/FairMQExample3Sampler.cxx index a3c1e939..86b0ecc1 100644 --- a/fairmq/examples/3-dds/FairMQExample3Sampler.cxx +++ b/fairmq/examples/3-dds/FairMQExample3Sampler.cxx @@ -18,6 +18,8 @@ #include "FairMQExample3Sampler.h" #include "FairMQLogger.h" +using namespace std; + FairMQExample3Sampler::FairMQExample3Sampler() : fText() { @@ -25,18 +27,19 @@ FairMQExample3Sampler::FairMQExample3Sampler() void FairMQExample3Sampler::CustomCleanup(void *data, void *object) { - delete (std::string*)object; + delete (string*)object; } void FairMQExample3Sampler::Run() { + // Check if we are still in the RUNNING state while (CheckCurrentState(RUNNING)) { boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - std::string* text = new std::string(fText); + string* text = new string(fText); - FairMQMessage* msg = fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text); + unique_ptr msg(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); LOG(INFO) << "Sending \"" << fText << "\""; @@ -48,7 +51,7 @@ FairMQExample3Sampler::~FairMQExample3Sampler() { } -void FairMQExample3Sampler::SetProperty(const int key, const std::string& value) +void FairMQExample3Sampler::SetProperty(const int key, const string& value) { switch (key) { @@ -61,7 +64,7 @@ void FairMQExample3Sampler::SetProperty(const int key, const std::string& value) } } -std::string FairMQExample3Sampler::GetProperty(const int key, const std::string& default_ /*= ""*/) +string FairMQExample3Sampler::GetProperty(const int key, const string& default_ /*= ""*/) { switch (key) { diff --git a/fairmq/examples/3-dds/FairMQExample3Sink.cxx b/fairmq/examples/3-dds/FairMQExample3Sink.cxx index 8fb4ebcc..c2c66c7c 100644 --- a/fairmq/examples/3-dds/FairMQExample3Sink.cxx +++ b/fairmq/examples/3-dds/FairMQExample3Sink.cxx @@ -28,15 +28,14 @@ void FairMQExample3Sink::Run() { while (CheckCurrentState(RUNNING)) { - FairMQMessage* msg = fTransportFactory->CreateMessage(); + unique_ptr msg(fTransportFactory->CreateMessage()); - fChannels.at("data-in").at(0).Receive(msg); - - LOG(INFO) << "Received message: \"" - << string(static_cast(msg->GetData()), msg->GetSize()) - << "\""; - - delete msg; + if (fChannels.at("data-in").at(0).Receive(msg) > 0) + { + LOG(INFO) << "Received message: \"" + << string(static_cast(msg->GetData()), msg->GetSize()) + << "\""; + } } } diff --git a/fairmq/examples/3-dds/README.md b/fairmq/examples/3-dds/README.md index db84abd2..1a71027a 100644 --- a/fairmq/examples/3-dds/README.md +++ b/fairmq/examples/3-dds/README.md @@ -3,6 +3,119 @@ Example 3: DDS This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices. -The description below outlines the minimal steps needed to run the example. For more detailed DDS documentation please visit [DDS Website](http://dds.gsi.de/). +The description below outlines the minimal steps needed to run the example with DDS. For more details please refer to DDS documentation on [DDS Website](http://dds.gsi.de/). -The topology run by DDS is defined in `ex3-dds-topology.xml` and the hosts to run it on are configured in `ex3-dds-hosts.cfg`. The topology starts one Sampler, one Sink and a group of 10 Processors. \ No newline at end of file +##### 1. The devices that bind their sockets need to advertise their bound addresses to DDS by writing a property. + +In our example Sampler and Sink bind their sockets. The bound addresses are available after the initial validation. The following code takes the address value and gives it to DDS: + +```C++ +sampler.ChangeState("INIT_DEVICE"); +sampler.WaitForInitialValidation(); + +dds::key_value::CKeyValue ddsKeyValue; +ddsKeyValue.putValue("SamplerOutputAddress", sampler.fChannels.at("data-out").at(0).GetAddress()); + +sampler.WaitForEndOfState("INIT_DEVICE"); +``` + +Same approach for the Sink. + +##### 2. The devices that connect their sockets need to read the addresses from DDS. + +The Processors in our example need the addresses of Sampler and Sink. They receive these from DDS via properties (sent in the step above): + +```C++ +dds::key_value::CKeyValue ddsKeyValue; +// Sampler properties +dds::key_value::CKeyValue::valuesMap_t samplerValues; +{ + mutex keyMutex; + condition_variable keyCondition; + + LOG(INFO) << "Subscribing and waiting for sampler output address."; + ddsKeyValue.subscribe([&keyCondition](const string& /*_key*/, const string& /*_value*/) { keyCondition.notify_all(); }); + ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues); + while (samplerValues.empty()) + { + unique_lock lock(keyMutex); + keyCondition.wait_until(lock, chrono::system_clock::now() + chrono::milliseconds(1000)); + ddsKeyValue.getValues("SamplerOutputAddress", &samplerValues); + } +} +// Sink properties +// ... same as above, but for sinkValues ... + +processor.fChannels.at("data-in").at(0).UpdateAddress(samplerValues.begin()->second); +processor.fChannels.at("data-out").at(0).UpdateAddress(sinkValues.begin()->second); +``` + +After this step each device will have the necessary connection information. + +##### 3. Write DDS hosts file that contains a list of worker nodes to run the topology on (When deploying using the SSH plug-in). + +We run this example on the local machine for simplicity. The file below defines one worker `wn0` with 12 DDS Agents (thus able to accept 12 tasks). The parameters for each worker node are: + - user-chosen worker ID (must be unique) + - a host name with or without a login, in a form: login@host.fqdn + - additional SSH params (can be empty) + - a remote working directory + - number of DDS Agents for this worker + +```bash +@bash_begin@ +echo "DBG: SSH ENV Script" +#source setup.sh +@bash_end@ + +wn0, username@localhost, , /tmp/, 12 +``` + +##### 4. Write DDS topology file that describes which tasks (processes) to run and their topology and configuration. + +Take a look at `ex3-dds-topology.xml`. It consists of a definition part (properties, tasks, collections and more) and execution part (main). In our example Sampler, Processor and Sink tasks are defines, containing their executables and exchanged properties. The `
` of the topology uses the defined tasks. Besides one Sampler and one Sink task, a group containing Processor task is defined. The group has a multiplicity of 10, meaninig 10 Processors will be executed. Each of the Processors will receive the properties with Sampler and Sink addresses. + +##### 5. Start DDS server. + +The DDS server is started with: + +```bash +dds-server start -s +``` + +##### 6. Submit DDS Agents (configured in the hosts file). + +Agents are submitted with: +```bash +dds-submit --rms ssh --ssh-rms-cfg ex3-dds-hosts.cfg +``` +The `--rms` option defines a destination resource management system. The `--ssh-rms-cfg` specifies an SSH plug-in resource definition file. + +##### 7. Set the topology file. + +Point DDS to the topology file: +```bash +dds-topology --set ex3-dds-topology.xml +``` + +##### 8. Activate the topology. + +```bash +dds-topology --activate +``` + +##### 9. Run + +After activation, agents will execute the defined tasks on the worker nodes. Output of the tasks will be stored in the directory that was specified in the hosts file. + +##### 10. Stop DDS server/topology. + +The execution of tasks can be stopped with: +```bash +dds-topology --stop +``` +Or by stopping the DDS server: +```bash +dds-server stop +``` + +For a more complete DDS documentation please refer to [DDS Website](http://dds.gsi.de/). diff --git a/fairmq/examples/3-dds/ex3-dds-hosts.cfg b/fairmq/examples/3-dds/ex3-dds-hosts.cfg index 95f53193..0518fc21 100644 --- a/fairmq/examples/3-dds/ex3-dds-hosts.cfg +++ b/fairmq/examples/3-dds/ex3-dds-hosts.cfg @@ -3,4 +3,4 @@ echo "DBG: SSH ENV Script" #source setup.sh @bash_end@ -worker, username@localhost, , /tmp/, 12 +wn0, username@localhost, , /tmp/, 12 diff --git a/fairmq/examples/3-dds/runExample3Sampler.cxx b/fairmq/examples/3-dds/runExample3Sampler.cxx index cc06b020..373ca2f2 100644 --- a/fairmq/examples/3-dds/runExample3Sampler.cxx +++ b/fairmq/examples/3-dds/runExample3Sampler.cxx @@ -83,13 +83,20 @@ int main(int argc, char** argv) FairMQ::tools::getHostIPs(IPs); stringstream ss; // Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0. - if (IPs.count("ib0")) { - ss << "tcp://" << IPs["ib0"] << ":1"; - } else if (IPs.count("eth0")) { - ss << "tcp://" << IPs["eth0"] << ":1"; - } else if (IPs.count("wlan0")) { - ss << "tcp://" << IPs["wlan0"] << ":1"; - } else { + if (IPs.count("ib0")) + { + ss << "tcp://" << IPs["ib0"] << ":1"; + } + else if (IPs.count("eth0")) + { + ss << "tcp://" << IPs["eth0"] << ":1"; + } + else if (IPs.count("wlan0")) + { + ss << "tcp://" << IPs["wlan0"] << ":1"; + } + else + { LOG(INFO) << ss.str(); LOG(ERROR) << "Could not find ib0, eth0 or wlan0"; exit(EXIT_FAILURE); diff --git a/fairmq/examples/3-dds/runExample3Sink.cxx b/fairmq/examples/3-dds/runExample3Sink.cxx index 50cddde0..c2f26d69 100644 --- a/fairmq/examples/3-dds/runExample3Sink.cxx +++ b/fairmq/examples/3-dds/runExample3Sink.cxx @@ -74,13 +74,20 @@ int main(int argc, char** argv) FairMQ::tools::getHostIPs(IPs); stringstream ss; // Check if ib0 (infiniband) interface is available, otherwise try eth0 or wlan0. - if (IPs.count("ib0")) { - ss << "tcp://" << IPs["ib0"] << ":1"; - } else if (IPs.count("eth0")) { - ss << "tcp://" << IPs["eth0"] << ":1"; - } else if (IPs.count("wlan0")) { - ss << "tcp://" << IPs["wlan0"] << ":1"; - } else { + if (IPs.count("ib0")) + { + ss << "tcp://" << IPs["ib0"] << ":1"; + } + else if (IPs.count("eth0")) + { + ss << "tcp://" << IPs["eth0"] << ":1"; + } + else if (IPs.count("wlan0")) + { + ss << "tcp://" << IPs["wlan0"] << ":1"; + } + else + { LOG(INFO) << ss.str(); LOG(ERROR) << "Could not find ib0, eth0 or wlan0"; exit(EXIT_FAILURE); diff --git a/fairmq/examples/4-copypush/FairMQExample4Sampler.cxx b/fairmq/examples/4-copypush/FairMQExample4Sampler.cxx index ef003563..8bcad499 100644 --- a/fairmq/examples/4-copypush/FairMQExample4Sampler.cxx +++ b/fairmq/examples/4-copypush/FairMQExample4Sampler.cxx @@ -12,7 +12,7 @@ * @author A. Rybalchenko */ -#include +#include // unique_ptr #include #include @@ -32,9 +32,9 @@ void FairMQExample4Sampler::Run() { boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - std::unique_ptr number(new uint64_t(counter)); + uint64_t* number = new uint64_t(counter); - std::unique_ptr msg(fTransportFactory->CreateMessage(number.release(), sizeof(uint64_t))); + std::unique_ptr msg(fTransportFactory->CreateMessage(number, sizeof(uint64_t))); LOG(INFO) << "Sending \"" << counter << "\""; diff --git a/fairmq/examples/5-req-rep/FairMQExample5Client.cxx b/fairmq/examples/5-req-rep/FairMQExample5Client.cxx index aee77c03..2ccf0316 100644 --- a/fairmq/examples/5-req-rep/FairMQExample5Client.cxx +++ b/fairmq/examples/5-req-rep/FairMQExample5Client.cxx @@ -42,19 +42,17 @@ void FairMQExample5Client::Run() string* text = new string(fText); - FairMQMessage* request = fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text); - FairMQMessage* reply = fTransportFactory->CreateMessage(); + unique_ptr request(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); + unique_ptr reply(fTransportFactory->CreateMessage()); LOG(INFO) << "Sending \"" << fText << "\" to server."; if (fChannels.at("data").at(0).Send(request) > 0) { fChannels.at("data").at(0).Receive(reply); + LOG(INFO) << "Received reply from server: \"" << string(static_cast(reply->GetData()), reply->GetSize()) << "\""; } - LOG(INFO) << "Received reply from server: \"" << string(static_cast(reply->GetData()), reply->GetSize()) << "\""; - - delete reply; } } diff --git a/fairmq/examples/5-req-rep/FairMQExample5Server.cxx b/fairmq/examples/5-req-rep/FairMQExample5Server.cxx index df6d9017..9f47c75f 100644 --- a/fairmq/examples/5-req-rep/FairMQExample5Server.cxx +++ b/fairmq/examples/5-req-rep/FairMQExample5Server.cxx @@ -35,7 +35,7 @@ void FairMQExample5Server::Run() { boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - FairMQMessage* request = fTransportFactory->CreateMessage(); + unique_ptr request(fTransportFactory->CreateMessage()); if (fChannels.at("data").at(0).Receive(request) > 0) { @@ -43,11 +43,9 @@ void FairMQExample5Server::Run() string* text = new string("Thank you for the \"" + string(static_cast(request->GetData()), request->GetSize()) + "\"!"); - delete request; - LOG(INFO) << "Sending reply to client."; - FairMQMessage* reply = fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text); + unique_ptr reply(fTransportFactory->CreateMessage(const_cast(text->c_str()), text->length(), CustomCleanup, text)); fChannels.at("data").at(0).Send(reply); } diff --git a/fairmq/nanomsg/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx index 1c4b5a15..b5ac824c 100644 --- a/fairmq/nanomsg/FairMQPollerNN.cxx +++ b/fairmq/nanomsg/FairMQPollerNN.cxx @@ -12,6 +12,8 @@ * @author A. Rybalchenko */ +#include // quick_exit() + #include #include #include @@ -72,7 +74,7 @@ FairMQPollerNN::FairMQPollerNN(map< string,vector >& channelsMap, { LOG(ERROR) << "At least one of the provided channel keys for poller initialization is invalid"; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } } @@ -107,7 +109,7 @@ FairMQPollerNN::FairMQPollerNN(FairMQSocket& dataSocket, FairMQSocket& cmdSocket else { LOG(ERROR) << "invalid poller configuration, exiting."; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } } @@ -161,7 +163,7 @@ bool FairMQPollerNN::CheckInput(const string channelKey, const int index) { LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } } @@ -180,7 +182,7 @@ bool FairMQPollerNN::CheckOutput(const string channelKey, const int index) { LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } } diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 9e478a3e..fc5880cb 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -12,6 +12,7 @@ * @author A. Rybalchenko */ +#include // quick_exit() #include #include "FairMQSocketNN.h" @@ -45,7 +46,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, int if (fSocket == -1) { LOG(ERROR) << "failed creating socket " << fId << ", reason: " << nn_strerror(errno); - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } } else @@ -54,7 +55,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, int if (fSocket == -1) { LOG(ERROR) << "failed creating socket " << fId << ", reason: " << nn_strerror(errno); - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } if (type == "sub") { diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index 8531703d..c38a59b0 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -13,7 +13,7 @@ */ #include "FairMQParser.h" -#include "FairMQLogger.h" +#include "FairLogger.h" #include // WARNING : pragma commands to hide boost (1.54.0) warning @@ -34,19 +34,21 @@ +using namespace std; + namespace FairMQParser { // TODO : add key-value map parameter for replacing/updating values from keys // function that convert property tree (given the xml or json structure) to FairMQMap -FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& deviceId, const std::string& rootNode, const std::string& formatFlag) +FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const string& deviceId, const string& rootNode, const string& formatFlag) { // Create fair mq map FairMQMap channelMap; // variables to create key for the mq map. Note: maybe device name and id useless here - std::string deviceIdKey; - std::string channelKey; + string deviceIdKey; + string channelKey; // do a first loop just to print the device-id in xml/json input for(const auto& p : pt.get_child(rootNode)) @@ -59,13 +61,13 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& //get id attribute to choose the device if (formatFlag == "xml") { - deviceIdKey = p.second.get(".id"); + deviceIdKey = p.second.get(".id"); LOG(DEBUG) << "Found device id '" << deviceIdKey << "' in XML input"; } if (formatFlag == "json") { - deviceIdKey = p.second.get("id"); + deviceIdKey = p.second.get("id"); LOG(DEBUG) << "Found device id '"<< deviceIdKey << "' in JSON input"; } } @@ -82,12 +84,12 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& //get id attribute to choose the device if (formatFlag == "xml") { - deviceIdKey = p.second.get(".id"); + deviceIdKey = p.second.get(".id"); } if (formatFlag == "json") { - deviceIdKey = p.second.get("id"); + deviceIdKey = p.second.get("id"); } // if not correct device id, do not fill MQMap @@ -97,9 +99,8 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& } // print if DEBUG log level set - std::stringstream deviceStream; - deviceStream << "[node = " << p.first - << "] id = " << deviceIdKey; + stringstream deviceStream; + deviceStream << "[node = " << p.first << "] id = " << deviceIdKey; LOG(DEBUG) << deviceStream.str(); // for each channel in device @@ -113,22 +114,21 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& // get name attribute to form key if (formatFlag == "xml") { - channelKey = q.second.get(".name"); + channelKey = q.second.get(".name"); } if (formatFlag == "json") { - channelKey = q.second.get("name"); + channelKey = q.second.get("name"); } // print if DEBUG log level set - std::stringstream channelStream; - channelStream << "\t [node = " << q.first - << "] name = " << channelKey; + stringstream channelStream; + channelStream << "\t [node = " << q.first << "] name = " << channelKey; LOG(DEBUG) << channelStream.str(); // temporary FairMQChannel container - std::vector channelList; + vector channelList; int socketCounter = 0; // for each socket in channel @@ -143,20 +143,19 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& FairMQChannel channel; // print if DEBUG log level set - std::stringstream socket; - socket << "\t \t [node = " << r.first - << "] socket index = " << socketCounter; + stringstream socket; + socket << "\t \t [node = " << r.first << "] socket index = " << socketCounter; LOG(DEBUG) << socket.str(); - LOG(DEBUG) << "\t \t \t type = " << r.second.get("type", channel.GetType()); - LOG(DEBUG) << "\t \t \t method = " << r.second.get("method", channel.GetMethod()); - LOG(DEBUG) << "\t \t \t address = " << r.second.get("address", channel.GetAddress()); + LOG(DEBUG) << "\t \t \t type = " << r.second.get("type", channel.GetType()); + LOG(DEBUG) << "\t \t \t method = " << r.second.get("method", channel.GetMethod()); + LOG(DEBUG) << "\t \t \t address = " << r.second.get("address", channel.GetAddress()); LOG(DEBUG) << "\t \t \t sndBufSize = " << r.second.get("sndBufSize", channel.GetSndBufSize()); LOG(DEBUG) << "\t \t \t rcvBufSize = " << r.second.get("rcvBufSize", channel.GetRcvBufSize()); LOG(DEBUG) << "\t \t \t rateLogging = " << r.second.get("rateLogging", channel.GetRateLogging()); - channel.UpdateType(r.second.get("type", channel.GetType())); - channel.UpdateMethod(r.second.get("method", channel.GetMethod())); - channel.UpdateAddress(r.second.get("address", channel.GetAddress())); + channel.UpdateType(r.second.get("type", channel.GetType())); + channel.UpdateMethod(r.second.get("method", channel.GetMethod())); + channel.UpdateAddress(r.second.get("address", channel.GetAddress())); channel.UpdateSndBufSize(r.second.get("sndBufSize", channel.GetSndBufSize())); // int channel.UpdateRcvBufSize(r.second.get("rcvBufSize", channel.GetRcvBufSize())); // int channel.UpdateRateLogging(r.second.get("rateLogging", channel.GetRateLogging())); // int @@ -165,7 +164,7 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& }// end socket loop //fill mq map option - channelMap.insert(std::make_pair(channelKey,std::move(channelList))); + channelMap.insert(make_pair(channelKey, move(channelList))); } } @@ -182,37 +181,36 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& LOG(WARN) << "---- No channel-keys found for device-id " << deviceId; LOG(WARN) << "---- Check the "<< formatFlag << " inputs and/or command line inputs"; } + return channelMap; } -//////////////////////////////////////////////////////////////////////////// -FairMQMap JSON::UserParser(const std::string& filename, const std::string& deviceId, const std::string& rootNode) +FairMQMap JSON::UserParser(const string& filename, const string& deviceId, const string& rootNode) { boost::property_tree::ptree pt; boost::property_tree::read_json(filename, pt); return ptreeToMQMap(pt, deviceId, rootNode,"json"); } -FairMQMap JSON::UserParser(std::stringstream& input, const std::string& deviceId, const std::string& rootNode) +FairMQMap JSON::UserParser(stringstream& input, const string& deviceId, const string& rootNode) { boost::property_tree::ptree pt; boost::property_tree::read_json(input, pt); return ptreeToMQMap(pt, deviceId, rootNode,"json"); } -//////////////////////////////////////////////////////////////////////////// -FairMQMap XML::UserParser(const std::string& filename, const std::string& deviceId, const std::string& rootNode) +FairMQMap XML::UserParser(const string& filename, const string& deviceId, const string& rootNode) { boost::property_tree::ptree pt; boost::property_tree::read_xml(filename, pt); - return ptreeToMQMap(pt,deviceId,rootNode,"xml"); + return ptreeToMQMap(pt, deviceId, rootNode, "xml"); } -FairMQMap XML::UserParser(std::stringstream& input, const std::string& deviceId, const std::string& rootNode) +FairMQMap XML::UserParser(stringstream& input, const string& deviceId, const string& rootNode) { boost::property_tree::ptree pt; boost::property_tree::read_xml(input, pt); - return ptreeToMQMap(pt,deviceId,rootNode,"xml"); + return ptreeToMQMap(pt, deviceId, rootNode, "xml"); } } // end FairMQParser namespace \ No newline at end of file diff --git a/fairmq/options/FairMQParser.h b/fairmq/options/FairMQParser.h index 1da741f1..088ab62a 100644 --- a/fairmq/options/FairMQParser.h +++ b/fairmq/options/FairMQParser.h @@ -23,7 +23,7 @@ namespace FairMQParser { -typedef std::unordered_map< std::string,std::vector > FairMQMap; +typedef std::unordered_map> FairMQMap; FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& deviceId, const std::string& rootNode, const std::string& formatFlag = "json"); @@ -35,8 +35,8 @@ struct JSON struct XML { - FairMQMap UserParser(const std::string& filename, const std::string& deviceId, const std::string& root_node="fairMQOptions"); - FairMQMap UserParser(std::stringstream& input, const std::string& deviceId, const std::string& rootNode="fairMQOptions"); + FairMQMap UserParser(const std::string& filename, const std::string& deviceId, const std::string& rootNode = "fairMQOptions"); + FairMQMap UserParser(std::stringstream& input, const std::string& deviceId, const std::string& rootNode = "fairMQOptions"); }; } // FairMQParser namespace diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 2deba455..ef735606 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -5,7 +5,6 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ - /* * File: FairMQProgOptions.cxx * Author: winckler @@ -16,26 +15,28 @@ #include "FairMQProgOptions.h" #include +using namespace std; + FairMQProgOptions::FairMQProgOptions() : FairProgOptions() , fMQParserOptions("MQ-Device parser options") , fMQOptionsInCmd("MQ-Device options") , fMQOptionsInCfg("MQ-Device options") , fMQtree() - , fFairMQmap() + , fFairMQMap() { } -FairMQProgOptions::~FairMQProgOptions() +FairMQProgOptions::~FairMQProgOptions() { } -int FairMQProgOptions::ParseAll(const int argc, char** argv, bool AllowUnregistered) -{ +int FairMQProgOptions::ParseAll(const int argc, char** argv, bool allowUnregistered) +{ // init description InitOptionDescription(); // parse command line options - if (ParseCmdLine(argc,argv,fCmdline_options,fvarmap,AllowUnregistered)) + if (ParseCmdLine(argc, argv, fCmdLineOptions, fVarMap, allowUnregistered)) { return 1; } @@ -46,20 +47,22 @@ int FairMQProgOptions::ParseAll(const int argc, char** argv, bool AllowUnregiste // check if file exist if (fs::exists(fConfigFile)) { - if (ParseCfgFile(fConfigFile.string(), fConfig_file_options, fvarmap, AllowUnregistered)) + if (ParseCfgFile(fConfigFile.string(), fConfigFileOptions, fVarMap, allowUnregistered)) + { return 1; + } } else { - LOG(ERROR)<<"config file '"<< fConfigFile <<"' not found"; + LOG(ERROR) << "config file '" << fConfigFile << "' not found"; return 1; } } - + // set log level before printing (default is 0 = DEBUG level) std::string verbose=GetValue("verbose"); //SET_LOG_LEVEL(DEBUG); - if(fSeverity_map.count(verbose)) + if (fSeverity_map.count(verbose)) { set_global_log_level(log_op::operation::GREATER_EQ_THAN,fSeverity_map.at(verbose)); } @@ -70,44 +73,44 @@ int FairMQProgOptions::ParseAll(const int argc, char** argv, bool AllowUnregiste } PrintOptions(); - + // check if one of required MQ config option is there auto parserOption_shptr = fMQParserOptions.options(); - bool option_exists=false; - std::vector MQParserKeys; - for(const auto& p : parserOption_shptr) + bool optionExists = false; + vector MQParserKeys; + for (const auto& p : parserOption_shptr) { - MQParserKeys.push_back( p->canonical_display_name() ); - if( fvarmap.count( p->canonical_display_name() ) ) + MQParserKeys.push_back(p->canonical_display_name()); + if (fVarMap.count(p->canonical_display_name())) { - option_exists=true; + optionExists = true; break; } } - - if(!option_exists) + + if (!optionExists) { - LOG(ERROR)<<"Required option to configure the MQ device is not there."; - LOG(ERROR)<<"Please provide the value of one of the following key:"; - for(const auto& p : MQParserKeys) + LOG(ERROR) << "Required option to configure the MQ device is not there."; + LOG(ERROR) << "Please provide the value of one of the following key:"; + for (const auto& p : MQParserKeys) { - LOG(ERROR)<(), "Device ID (required argument)") - ("io-threads", po::value()->default_value(1), "io threads number"); - + ("id", po::value(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads."); + fMQOptionsInCfg.add_options() - ("id", po::value< std::string >()->required(), "Device ID (required argument)") - ("io-threads", po::value()->default_value(1), "io threads number"); + ("id", po::value()->required(), "Device ID (required argument).") + ("io-threads", po::value()->default_value(1), "Number of I/O threads."); } else { fMQOptionsInCmd.add_options() - ("id", po::value< std::string >()->required(), "Device ID (required argument)") - ("io-threads", po::value()->default_value(1), "io threads number"); + ("id", po::value()->required(), "Device ID (required argument)") + ("io-threads", po::value()->default_value(1), "Number of I/O threads"); } - + fMQParserOptions.add_options() - ("config-xml-string", po::value< std::vector >()->multitoken(), "XML input as command line string.") - ("config-xml-file", po::value< std::string >(), "XML input as file.") - ("config-json-string", po::value< std::vector >()->multitoken(), "JSON input as command line string.") - ("config-json-file", po::value< std::string >(), "JSON input as file."); - - + ("config-xml-string", po::value>()->multitoken(), "XML input as command line string.") + ("config-xml-file", po::value(), "XML input as file.") + ("config-json-string", po::value>()->multitoken(), "JSON input as command line string.") + ("config-json-file", po::value(), "JSON input as file."); + AddToCmdLineOptions(fGenericDesc); AddToCmdLineOptions(fMQOptionsInCmd); AddToCmdLineOptions(fMQParserOptions); - + if (fUseConfigFile) { AddToCfgFileOptions(fMQOptionsInCfg,false); AddToCfgFileOptions(fMQParserOptions,false); } - -} \ No newline at end of file +} diff --git a/fairmq/options/FairMQProgOptions.h b/fairmq/options/FairMQProgOptions.h index 08210123..55141367 100644 --- a/fairmq/options/FairMQProgOptions.h +++ b/fairmq/options/FairMQProgOptions.h @@ -14,7 +14,7 @@ */ #ifndef FAIRMQPROGOPTIONS_H -#define FAIRMQPROGOPTIONS_H +#define FAIRMQPROGOPTIONS_H #include @@ -24,16 +24,17 @@ #include namespace pt = boost::property_tree; + class FairMQProgOptions : public FairProgOptions { -protected: - typedef std::unordered_map< std::string,std::vector > FairMQMap; + protected: + typedef std::unordered_map> FairMQMap; -public: + public: FairMQProgOptions(); virtual ~FairMQProgOptions(); - virtual int ParseAll(const int argc, char** argv, bool AllowUnregistered = false); + virtual int ParseAll(const int argc, char** argv, bool allowUnregistered = false); // external parser, store function template @@ -53,7 +54,7 @@ public: int Store(const po::variables_map& vm) { - fvarmap = vm; + fVarMap = vm; return 0; } @@ -65,26 +66,26 @@ public: int Store(const FairMQMap& channels) { - fFairMQmap = channels; + fFairMQMap = channels; return 0; } FairMQMap GetFairMQMap() { - return fFairMQmap; + return fFairMQMap; } -protected: + protected: po::options_description fMQParserOptions; po::options_description fMQOptionsInCfg; po::options_description fMQOptionsInCmd; pt::ptree fMQtree; - FairMQMap fFairMQmap; + FairMQMap fFairMQMap; virtual int NotifySwitchOption(); // for custom help & version printing void InitOptionDescription(); }; -#endif /* FAIRMQPROGOPTIONS_H */ +#endif /* FAIRMQPROGOPTIONS_H */ diff --git a/fairmq/options/FairProgOptions.cxx b/fairmq/options/FairProgOptions.cxx index 3db390c8..42963c60 100644 --- a/fairmq/options/FairProgOptions.cxx +++ b/fairmq/options/FairProgOptions.cxx @@ -5,7 +5,6 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ - /* * File: FairProgOptions.cxx * Author: winckler @@ -15,21 +14,20 @@ #include "FairProgOptions.h" - +using namespace std; /// ////////////////////////////////////////////////////////////////////////////////////////////////////// -/// Constructor / destructor +/// Constructor FairProgOptions::FairProgOptions() : - fGenericDesc("Generic options description"), - fConfigDesc("Configuration options description"), - fHiddenDesc("Hidden options description"), + fGenericDesc("Generic options description"), + fConfigDesc("Configuration options description"), + fHiddenDesc("Hidden options description"), fEnvironmentDesc("Environment Variables"), - fCmdline_options("Command line options"), - fConfig_file_options("Configuration file options"), - fVisible_options("Visible options"), + fCmdLineOptions("Command line options"), + fConfigFileOptions("Configuration file options"), + fVisibleOptions("Visible options"), fVerboseLvl("INFO"), fUseConfigFile(false), fConfigFile() { - // ////////////////////////////////// // define generic options fGenericDesc.add_options() ("help,h", "produce help") @@ -45,7 +43,7 @@ FairProgOptions::FairProgOptions() : " NOLOG" ) ; - + fSeverity_map["TRACE"] = fairmq::severity_level::TRACE; fSeverity_map["DEBUG"] = fairmq::severity_level::DEBUG; fSeverity_map["RESULTS"] = fairmq::severity_level::RESULTS; @@ -54,45 +52,48 @@ FairProgOptions::FairProgOptions() : fSeverity_map["ERROR"] = fairmq::severity_level::ERROR; fSeverity_map["STATE"] = fairmq::severity_level::STATE; fSeverity_map["NOLOG"] = fairmq::severity_level::NOLOG; - } -FairProgOptions::~FairProgOptions() +/// Destructor +FairProgOptions::~FairProgOptions() { } - /// ////////////////////////////////////////////////////////////////////////////////////////////////////// /// Add option descriptions - -int FairProgOptions::AddToCmdLineOptions(const po::options_description& optdesc, bool visible) +int FairProgOptions::AddToCmdLineOptions(const po::options_description& optDesc, bool visible) { - fCmdline_options.add(optdesc); - if(visible) - fVisible_options.add(optdesc); + fCmdLineOptions.add(optDesc); + if (visible) + { + fVisibleOptions.add(optDesc); + } return 0; } -int FairProgOptions::AddToCfgFileOptions(const po::options_description& optdesc, bool visible) +int FairProgOptions::AddToCfgFileOptions(const po::options_description& optDesc, bool visible) { //if UseConfigFile() not yet called, then enable it with required file name to be provided by command line - if(!fUseConfigFile) + if (!fUseConfigFile) + { UseConfigFile(); - - fConfig_file_options.add(optdesc); - if(visible) - fVisible_options.add(optdesc); + } + + fConfigFileOptions.add(optDesc); + if (visible) + { + fVisibleOptions.add(optDesc); + } return 0; } -int FairProgOptions::AddToEnvironmentOptions(const po::options_description& optdesc) +int FairProgOptions::AddToEnvironmentOptions(const po::options_description& optDesc) { - fEnvironmentDesc.add(optdesc); + fEnvironmentDesc.add(optDesc); return 0; } - -void FairProgOptions::UseConfigFile(const std::string& filename) +void FairProgOptions::UseConfigFile(const string& filename) { fUseConfigFile = true; if (filename.empty()) @@ -110,176 +111,168 @@ void FairProgOptions::UseConfigFile(const std::string& filename) /// ////////////////////////////////////////////////////////////////////////////////////////////////////// /// Parser -int FairProgOptions::ParseCmdLine(const int argc, char** argv, const po::options_description& desc, po::variables_map& varmap, bool AllowUnregistered) +int FairProgOptions::ParseCmdLine(const int argc, char** argv, const po::options_description& desc, po::variables_map& varmap, bool allowUnregistered) { - // ////////////////////////////////// // get options from cmd line and store in variable map // here we use command_line_parser instead of parse_command_line, to allow unregistered and positional options - if(AllowUnregistered) + if (allowUnregistered) { po::command_line_parser parser{argc, argv}; parser.options(desc).allow_unregistered(); po::parsed_options parsedOptions = parser.run(); - po::store(parsedOptions,varmap); + po::store(parsedOptions, varmap); } else + { po::store(po::parse_command_line(argc, argv, desc), varmap); + } - // ////////////////////////////////// // call the virtual NotifySwitchOption method to handle switch options like e.g. "--help" or "--version" // return 1 if switch options found in varmap - if(NotifySwitchOption()) + if (NotifySwitchOption()) + { return 1; + } po::notify(varmap); return 0; } - -int FairProgOptions::ParseCmdLine(const int argc, char** argv, const po::options_description& desc, bool AllowUnregistered) +int FairProgOptions::ParseCmdLine(const int argc, char** argv, const po::options_description& desc, bool allowUnregistered) { - return ParseCmdLine(argc,argv,desc,fvarmap,AllowUnregistered); + return ParseCmdLine(argc,argv,desc,fVarMap,allowUnregistered); } - - -int FairProgOptions::ParseCfgFile(std::ifstream& ifs, const po::options_description& desc, po::variables_map& varmap, bool AllowUnregistered) +int FairProgOptions::ParseCfgFile(ifstream& ifs, const po::options_description& desc, po::variables_map& varmap, bool allowUnregistered) { if (!ifs) { - std::cout << "can not open configuration file \n"; + cout << "can not open configuration file \n"; return -1; } else { - po:store(parse_config_file(ifs, desc, AllowUnregistered), varmap); + po:store(parse_config_file(ifs, desc, allowUnregistered), varmap); po::notify(varmap); } return 0; } -int FairProgOptions::ParseCfgFile(const std::string& filename, const po::options_description& desc, po::variables_map& varmap, bool AllowUnregistered) +int FairProgOptions::ParseCfgFile(const string& filename, const po::options_description& desc, po::variables_map& varmap, bool allowUnregistered) { - std::ifstream ifs(filename.c_str()); + ifstream ifs(filename.c_str()); if (!ifs) { - std::cout << "can not open configuration file: " << filename << "\n"; + cout << "can not open configuration file: " << filename << "\n"; return -1; } else { - po:store(parse_config_file(ifs, desc, AllowUnregistered), varmap); + po:store(parse_config_file(ifs, desc, allowUnregistered), varmap); po::notify(varmap); } return 0; } - -int FairProgOptions::ParseCfgFile(const std::string& filename, const po::options_description& desc, bool AllowUnregistered) +int FairProgOptions::ParseCfgFile(const string& filename, const po::options_description& desc, bool allowUnregistered) { - return ParseCfgFile(filename,desc,fvarmap,AllowUnregistered); + return ParseCfgFile(filename,desc,fVarMap,allowUnregistered); } -int FairProgOptions::ParseCfgFile(std::ifstream& ifs, const po::options_description& desc, bool AllowUnregistered) +int FairProgOptions::ParseCfgFile(ifstream& ifs, const po::options_description& desc, bool allowUnregistered) { - return ParseCfgFile(ifs,desc,fvarmap,AllowUnregistered); + return ParseCfgFile(ifs,desc,fVarMap,allowUnregistered); } - -int FairProgOptions::ParseEnvironment(const std::function & environment_mapper) +int FairProgOptions::ParseEnvironment(const function& environmentMapper) { - po::store(po::parse_environment(fEnvironmentDesc, environment_mapper), fvarmap); - po::notify(fvarmap); - + po::store(po::parse_environment(fEnvironmentDesc, environmentMapper), fVarMap); + po::notify(fVarMap); + return 0; } // Given a key, convert the variable value to string -std::string FairProgOptions::GetStringValue(const std::string& key) +string FairProgOptions::GetStringValue(const string& key) +{ + string valueStr; + try { - std::string val_str; - try + if (fVarMap.count(key)) { - if ( fvarmap.count(key) ) + auto& value = fVarMap[key].value(); + + // string albeit useless here + if (auto q = boost::any_cast(&value)) { - auto& value = fvarmap[key].value(); + valueStr = VariableValueToString(fVarMap[key]); + return valueStr; + } + // vector + if (auto q = boost::any_cast>(&value)) + { + valueStr = VariableValueToString>(fVarMap[key]); + return valueStr; + } - // string albeit useless here - if(auto q = boost::any_cast< std::string >(&value )) - { - val_str = variable_value_to_string< std::string >(fvarmap[key]); - return val_str; - } - - // vector - if(auto q = boost::any_cast< std::vector >(&value )) - { - val_str = variable_value_to_string< std::vector >(fvarmap[key]); - return val_str; - } - - // int - if(auto q = boost::any_cast< int >(&value )) - { - val_str = variable_value_to_string< int >(fvarmap[key]); - return val_str; - } - - // vector - if(auto q = boost::any_cast< std::vector >(&value )) - { - val_str = variable_value_to_string< std::vector >(fvarmap[key]); - return val_str; - } - - // float - if(auto q = boost::any_cast< float >(&value )) - { - val_str = variable_value_to_string< float >(fvarmap[key]); - return val_str; - } - - // vector float - if(auto q = boost::any_cast< std::vector >(&value )) - { - val_str = variable_value_to_string< std::vector >(fvarmap[key]); - return val_str; - } - - // double - if(auto q = boost::any_cast< double >(&value )) - { - val_str = variable_value_to_string< double >(fvarmap[key]); - return val_str; - } - - // vector double - if(auto q = boost::any_cast< std::vector >(&value )) - { - val_str = variable_value_to_string< std::vector >(fvarmap[key]); - return val_str; - } - - + // int + if (auto q = boost::any_cast(&value)) + { + valueStr = VariableValueToString(fVarMap[key]); + return valueStr; + } + + // vector + if (auto q = boost::any_cast>(&value)) + { + valueStr = VariableValueToString>(fVarMap[key]); + return valueStr; + } + + // float + if (auto q = boost::any_cast(&value)) + { + valueStr = VariableValueToString(fVarMap[key]); + return valueStr; + } + + // vector float + if (auto q = boost::any_cast>(&value)) + { + valueStr = VariableValueToString>(fVarMap[key]); + return valueStr; + } + + // double + if (auto q = boost::any_cast(&value)) + { + valueStr = VariableValueToString(fVarMap[key]); + return valueStr; + } + + // vector double + if (auto q = boost::any_cast>(&value)) + { + valueStr = VariableValueToString>(fVarMap[key]); + return valueStr; } } - catch(std::exception& e) - { - LOG(ERROR) << "Exception thrown for the key '" << key << "'"; - LOG(ERROR) << e.what(); - } - - return val_str; + } + catch(exception& e) + { + LOG(ERROR) << "Exception thrown for the key '" << key << "'"; + LOG(ERROR) << e.what(); } + return valueStr; +} /// ////////////////////////////////////////////////////////////////////////////////////////////////////// /// Print/notify options - int FairProgOptions::PrintHelp() const { - std::cout << fVisible_options << "\n"; + cout << fVisibleOptions << "\n"; return 0; } @@ -289,205 +282,210 @@ int FairProgOptions::PrintOptions() // Method to overload. // -> loop over variable map and print its content // -> In this example the following types are supported: - // std::string, int, float, double, boost::filesystem::path - // std::vector, std::vector, std::vector, std::vector - + // string, int, float, double, boost::filesystem::path + // vector, vector, vector, vector MapVarValInfo_t mapinfo; // get string length for formatting and convert varmap values into string - int maxlen_1st = 0; - int maxlen_2nd = 0; - int maxlen_TypeInfo = 0; - int maxlen_default =0; - int maxlen_empty = 0; - int total_len=0; - for (const auto& m : fvarmap) + int maxLength1st = 0; + int maxLength2nd = 0; + int maxLengthTypeInfo = 0; + int maxLengthDefault = 0; + int maxLengthEmpty = 0; + int totalLength = 0; + for (const auto& m : fVarMap) { - Max(maxlen_1st, m.first.length()); + Max(maxLength1st, m.first.length()); - VarValInfo_t valinfo=Get_variable_value_info(m.second); - mapinfo[m.first]=valinfo; - std::string val_str; - std::string typeInfo_str; - std::string default_str; - std::string empty_str; - std::tie(val_str,typeInfo_str,default_str,empty_str)=valinfo; - - Max(maxlen_2nd, val_str.length()); - Max(maxlen_TypeInfo, typeInfo_str.length()); - Max(maxlen_default, default_str.length()); - Max(maxlen_empty, empty_str.length()); + VarValInfo_t valinfo = GetVariableValueInfo(m.second); + mapinfo[m.first] = valinfo; + string valueStr; + string typeInfoStr; + string defaultStr; + string emptyStr; + tie(valueStr, typeInfoStr, defaultStr, emptyStr) = valinfo; + Max(maxLength2nd, valueStr.length()); + Max(maxLengthTypeInfo, typeInfoStr.length()); + Max(maxLengthDefault, defaultStr.length()); + Max(maxLengthEmpty, emptyStr.length()); } // TODO : limit the value length field in a better way - if(maxlen_2nd>100) - maxlen_2nd=100; - total_len=maxlen_1st+maxlen_2nd+maxlen_TypeInfo+maxlen_default+maxlen_empty; - - - //maxlen_2nd=200; - + if (maxLength2nd > 100) + { + maxLength2nd = 100; + } + totalLength = maxLength1st + maxLength2nd + maxLengthTypeInfo + maxLengthDefault + maxLengthEmpty; + + // maxLength2nd = 200; + // formatting and printing - LOG(INFO)< + auto& value = varValue.value(); + string defaultedValue; + string emptyValue; + + if (varValue.empty()) { - // tuple - auto& value = var_val.value(); - std::string defaulted_val; - std::string empty_val; - - if(var_val.empty()) - empty_val=" [empty]"; + emptyValue = " [empty]"; + } + else + { + if (varValue.defaulted()) + { + defaultedValue = " [default value]"; + } else - if(var_val.defaulted()) - defaulted_val=" [default value]"; - else - defaulted_val=" [provided value]"; - - empty_val+=" *"; - // string - if(auto q = boost::any_cast< std::string >(&value)) { - std::string val_str = *q; - return std::make_tuple(val_str,std::string(" [Type=string]"),defaulted_val,empty_val); + defaultedValue = " [provided value]"; } - - // vector - if(auto q = boost::any_cast< std::vector >(&value)) - { - std::string val_str = variable_value_to_string< std::vector >(var_val); - return std::make_tuple(val_str,std::string(" [Type=vector]"),defaulted_val,empty_val); - } - - // int - if(auto q = boost::any_cast< int >(&value)) - { - std::string val_str = variable_value_to_string< int >(var_val); - return std::make_tuple(val_str,std::string(" [Type=int]"),defaulted_val,empty_val); - } - - // vector - if(auto q = boost::any_cast< std::vector >(&value)) - { - std::string val_str = variable_value_to_string< std::vector >(var_val); - return std::make_tuple(val_str,std::string(" [Type=vector]"),defaulted_val,empty_val); - } - - // float - if(auto q = boost::any_cast< float >(&value)) - { - std::string val_str = variable_value_to_string< float >(var_val); - return std::make_tuple(val_str,std::string(" [Type=float]"),defaulted_val,empty_val); - } - - // vector - if(auto q = boost::any_cast< std::vector >(&value)) - { - std::string val_str = variable_value_to_string< std::vector >(var_val); - return std::make_tuple(val_str,std::string(" [Type=vector]"),defaulted_val,empty_val); - } - - // double - if(auto q = boost::any_cast< double >(&value)) - { - std::string val_str = variable_value_to_string< double >(var_val); - return std::make_tuple(val_str,std::string(" [Type=double]"),defaulted_val,empty_val); - } - - // vector - if(auto q = boost::any_cast< std::vector >(&value)) - { - std::string val_str = variable_value_to_string< std::vector >(var_val); - return std::make_tuple(val_str,std::string(" [Type=vector]"),defaulted_val,empty_val); - } - - // boost::filesystem::path - if(auto q = boost::any_cast(&value)) - { - std::string val_str = (*q).string(); - //std::string val_str = (*q).filename().generic_string(); - return std::make_tuple(val_str,std::string(" [Type=boost::filesystem::path]"),defaulted_val,empty_val); - } - - // if we get here, the type is not supported return unknown info - return std::make_tuple(std::string("Unknown value"), std::string(" [Type=Unknown]"),defaulted_val,empty_val); } + emptyValue += " *"; + // string + if (auto q = boost::any_cast(&value)) + { + string valueStr = *q; + return make_tuple(valueStr, string(" [Type=string]"), defaultedValue, emptyValue); + } + // vector + if (auto q = boost::any_cast>(&value)) + { + string valueStr = VariableValueToString>(varValue); + return make_tuple(valueStr, string(" [Type=vector]"), defaultedValue, emptyValue); + } + + // int + if (auto q = boost::any_cast(&value)) + { + string valueStr = VariableValueToString(varValue); + return make_tuple(valueStr, string(" [Type=int]"), defaultedValue, emptyValue); + } + + // vector + if (auto q = boost::any_cast>(&value)) + { + string valueStr = VariableValueToString>(varValue); + return make_tuple(valueStr, string(" [Type=vector]"), defaultedValue, emptyValue); + } + + // float + if (auto q = boost::any_cast(&value)) + { + string valueStr = VariableValueToString(varValue); + return make_tuple(valueStr, string(" [Type=float]"), defaultedValue, emptyValue); + } + + // vector + if (auto q = boost::any_cast>(&value)) + { + string valueStr = VariableValueToString>(varValue); + return make_tuple(valueStr, string(" [Type=vector]"), defaultedValue, emptyValue); + } + + // double + if (auto q = boost::any_cast(&value)) + { + string valueStr = VariableValueToString(varValue); + return make_tuple(valueStr, string(" [Type=double]"), defaultedValue, emptyValue); + } + + // vector + if (auto q = boost::any_cast>(&value)) + { + string valueStr = VariableValueToString>(varValue); + return make_tuple(valueStr, string(" [Type=vector]"), defaultedValue, emptyValue); + } + + // boost::filesystem::path + if (auto q = boost::any_cast(&value)) + { + string valueStr = (*q).string(); + //string valueStr = (*q).filename().generic_string(); + return make_tuple(valueStr, string(" [Type=boost::filesystem::path]"), defaultedValue, emptyValue); + } + + // if we get here, the type is not supported return unknown info + return make_tuple(string("Unknown value"), string(" [Type=Unknown]"), defaultedValue, emptyValue); +} diff --git a/fairmq/options/FairProgOptions.h b/fairmq/options/FairProgOptions.h index a68f79e7..bdff7cd3 100644 --- a/fairmq/options/FairProgOptions.h +++ b/fairmq/options/FairProgOptions.h @@ -5,7 +5,6 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ - /* * File: FairProgOptions.h * Author: winckler @@ -14,7 +13,7 @@ */ #ifndef FAIRPROGOPTIONS_H -#define FAIRPROGOPTIONS_H +#define FAIRPROGOPTIONS_H #include "FairMQLogger.h" #include @@ -39,15 +38,17 @@ * public : * MyOptions() : FairProgOptions() * { - * fCmdline_options.add(fGenericDesc); - * fVisible_options.add(fCmdline_options); + * fCmdlineOptions.add(fGenericDesc); + * fVisibleOptions.add(fCmdlineOptions); * } - * virtual ~MyOptions(){} - * virtual int ParseAll(const int argc, char** argv, bool AllowUnregistered = false) + * virtual ~MyOptions() {} + * virtual int ParseAll(const int argc, char** argv, bool allowUnregistered = false) * { - * if(ParseCmdLine(argc,argv,fCmdline_options,fvarmap,AllowUnregistered)) + * if(ParseCmdLine(argc, argv, fCmdlineOptions, fVarMap, allowUnregistered)) + * { * return 1; - * + * } + * * PrintOptions(); * return 0; * } @@ -66,17 +67,17 @@ namespace fs = boost::filesystem; class FairProgOptions { -public: + public: FairProgOptions(); virtual ~FairProgOptions(); // add options_description - int AddToCmdLineOptions(const po::options_description& optdesc, bool visible = true); - int AddToCfgFileOptions(const po::options_description& optdesc, bool visible = true); - int AddToEnvironmentOptions(const po::options_description& optdesc); - + int AddToCmdLineOptions(const po::options_description& optDesc, bool visible = true); + int AddToCfgFileOptions(const po::options_description& optDesc, bool visible = true); + int AddToEnvironmentOptions(const po::options_description& optDesc); + void UseConfigFile(const std::string& filename = ""); - + // get value corresponding to the key template T GetValue(const std::string& key) const @@ -84,9 +85,9 @@ public: T val = T(); try { - if (fvarmap.count(key)) + if (fVarMap.count(key)) { - val = fvarmap[key].as(); + val = fVarMap[key].as(); } } catch(std::exception& e) @@ -102,27 +103,27 @@ public: // convert value to string that corresponds to the key std::string GetStringValue(const std::string& key); - const po::variables_map& GetVarMap() const {return fvarmap;} + const po::variables_map& GetVarMap() const {return fVarMap;} // boost prog options parsers - int ParseCmdLine(const int argc, char** argv, const po::options_description& desc, po::variables_map& varmap, bool AllowUnregistered = false); - int ParseCmdLine(const int argc, char** argv, const po::options_description& desc, bool AllowUnregistered = false); + int ParseCmdLine(const int argc, char** argv, const po::options_description& desc, po::variables_map& varmap, bool allowUnregistered = false); + int ParseCmdLine(const int argc, char** argv, const po::options_description& desc, bool allowUnregistered = false); - int ParseCfgFile(const std::string& filename, const po::options_description& desc, po::variables_map& varmap, bool AllowUnregistered = false); - int ParseCfgFile(const std::string& filename, const po::options_description& desc, bool AllowUnregistered = false); - int ParseCfgFile(std::ifstream& ifs, const po::options_description& desc, po::variables_map& varmap, bool AllowUnregistered = false); - int ParseCfgFile(std::ifstream& ifs, const po::options_description& desc, bool AllowUnregistered = false); + int ParseCfgFile(const std::string& filename, const po::options_description& desc, po::variables_map& varmap, bool allowUnregistered = false); + int ParseCfgFile(const std::string& filename, const po::options_description& desc, bool allowUnregistered = false); + int ParseCfgFile(std::ifstream& ifs, const po::options_description& desc, po::variables_map& varmap, bool allowUnregistered = false); + int ParseCfgFile(std::ifstream& ifs, const po::options_description& desc, bool allowUnregistered = false); int ParseEnvironment(const std::function&); - virtual int ParseAll(const int argc, char** argv, bool AllowUnregistered = false) = 0; + virtual int ParseAll(const int argc, char** argv, bool allowUnregistered = false) = 0; virtual int PrintOptions(); int PrintHelp() const; -protected: + protected: // options container - po::variables_map fvarmap; + po::variables_map fVarMap; // basic description categories po::options_description fGenericDesc; @@ -131,14 +132,13 @@ protected: po::options_description fHiddenDesc; // Description of cmd line and simple configuration file (configuration file like txt, but not like xml json ini) - po::options_description fCmdline_options; - po::options_description fConfig_file_options; + po::options_description fCmdLineOptions; + po::options_description fConfigFileOptions; // Description which is printed in help command line - po::options_description fVisible_options; - // to handle logger severity - std::map fSeverity_map; + std::map fSeverityMap; + po::options_description fVisibleOptions; std::string fVerboseLvl; bool fUseConfigFile; @@ -149,7 +149,7 @@ protected: template void UpadateVarMap(const std::string& key, const T& val) { - replace(fvarmap, key, val); + replace(fVarMap, key, val); } template @@ -158,25 +158,24 @@ protected: vm[opt].value() = boost::any(val); } -private: - // ///////////////////////////////////////////// + private: // Methods below are helper functions used in the PrintOptions method typedef std::tuple VarValInfo_t; - typedef std::map MapVarValInfo_t; + typedef std::map MapVarValInfo_t; - VarValInfo_t Get_variable_value_info(const po::variable_value& var_val); + VarValInfo_t GetVariableValueInfo(const po::variable_value& varValue); template - std::string variable_value_to_string(const po::variable_value& var_val) + std::string VariableValueToString(const po::variable_value& varValue) { - auto& value = var_val.value(); + auto& value = varValue.value(); std::ostringstream ostr; if (auto q = boost::any_cast(&value)) { ostr << *q; } - std::string val_str = ostr.str(); - return val_str; + std::string valStr = ostr.str(); + return valStr; } static void Max(int &val, const int &comp) @@ -188,6 +187,4 @@ private: } }; - -#endif /* FAIRPROGOPTIONS_H */ - +#endif /* FAIRPROGOPTIONS_H */ diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index 55ab362a..8fb2df3d 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -17,6 +17,7 @@ #include "boost/program_options.hpp" #include "FairMQLogger.h" +#include "FairMQTools.h" #include "FairMQParser.h" #include "FairMQProgOptions.h" #include "FairMQBenchmarkSampler.h" @@ -72,7 +73,6 @@ int main(int argc, char** argv) sampler.SetTransport(new FairMQTransportFactoryZMQ()); #endif - sampler.SetProperty(FairMQBenchmarkSampler::Id, id); sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); diff --git a/fairmq/tools/FairMQTools.h b/fairmq/tools/FairMQTools.h index 9e1413b0..248f06a1 100644 --- a/fairmq/tools/FairMQTools.h +++ b/fairmq/tools/FairMQTools.h @@ -2,7 +2,7 @@ #define FAIRMQTOOLS_H_ #ifndef _GNU_SOURCE -#define _GNU_SOURCE /* To get defns of NI_MAXSERV and NI_MAXHOST */ +#define _GNU_SOURCE // To get defns of NI_MAXSERV and NI_MAXHOST #endif #include @@ -23,6 +23,12 @@ namespace FairMQ namespace tools { +template +unique_ptr make_unique(Args&& ...args) +{ + return unique_ptr(new T(forward(args)...)); +} + int getHostIPs(map& addressMap) { struct ifaddrs *ifaddr, *ifa; diff --git a/fairmq/zeromq/FairMQContextZMQ.cxx b/fairmq/zeromq/FairMQContextZMQ.cxx index ed8fe795..dcb95abd 100644 --- a/fairmq/zeromq/FairMQContextZMQ.cxx +++ b/fairmq/zeromq/FairMQContextZMQ.cxx @@ -12,6 +12,7 @@ * @author D. Klein, A. Rybalchenko */ +#include // quick_exit() #include #include "FairMQLogger.h" @@ -24,7 +25,7 @@ FairMQContextZMQ::FairMQContextZMQ(int numIoThreads) if (fContext == NULL) { LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) diff --git a/fairmq/zeromq/FairMQPollerZMQ.cxx b/fairmq/zeromq/FairMQPollerZMQ.cxx index 5a2f9778..5cbbe99a 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.cxx +++ b/fairmq/zeromq/FairMQPollerZMQ.cxx @@ -12,6 +12,8 @@ * @author A. Rybalchenko */ +#include // quick_exit() + #include #include "FairMQPollerZMQ.h" @@ -72,11 +74,11 @@ FairMQPollerZMQ::FairMQPollerZMQ(map< string,vector >& channelsMa { LOG(ERROR) << "At least one of the provided channel keys for poller initialization is invalid"; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } } -FairMQPollerZMQ::FairMQPollerZMQ(FairMQSocket& dataSocket, FairMQSocket& cmdSocket) +FairMQPollerZMQ::FairMQPollerZMQ(FairMQSocket& cmdSocket, FairMQSocket& dataSocket) : items() , fNumItems(2) , fOffsetMap() @@ -111,7 +113,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(FairMQSocket& dataSocket, FairMQSocket& cmdSock else { LOG(ERROR) << "invalid poller configuration, exiting."; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } } @@ -165,7 +167,7 @@ bool FairMQPollerZMQ::CheckInput(const string channelKey, const int index) { LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } } @@ -184,7 +186,7 @@ bool FairMQPollerZMQ::CheckOutput(const string channelKey, const int index) { LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } } diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h index 585c9efe..58a68de2 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/FairMQPollerZMQ.h @@ -43,7 +43,7 @@ class FairMQPollerZMQ : public FairMQPoller virtual ~FairMQPollerZMQ(); private: - FairMQPollerZMQ(FairMQSocket& dataSocket, FairMQSocket& cmdSocket); + FairMQPollerZMQ(FairMQSocket& cmdSocket, FairMQSocket& dataSocket); zmq_pollitem_t* items; int fNumItems; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 1ca8c4aa..e3566843 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -12,6 +12,7 @@ * @author D. Klein, A. Rybalchenko */ +#include // quick_exit() #include #include "FairMQSocketZMQ.h" @@ -43,7 +44,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, int num if (fSocket == NULL) { LOG(ERROR) << "failed creating socket " << fId << ", reason: " << zmq_strerror(errno); - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()) != 0) @@ -99,7 +100,7 @@ void FairMQSocketZMQ::Connect(const string& address) { LOG(ERROR) << "failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); // error here means incorrect configuration. exit if it happens. - exit(EXIT_FAILURE); + quick_exit(EXIT_FAILURE); } }