From 81299ce005bb4fb7a9158ac1da2295adb4fa77ab Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Wed, 16 May 2018 18:06:57 +0200 Subject: [PATCH] Improve compilation speed --- fairmq/CMakeLists.txt | 3 + fairmq/FairMQStateMachine.cxx | 642 ++++++++++++++++++++++++-- fairmq/FairMQStateMachine.h | 589 ++--------------------- fairmq/options/FairMQParser.cxx | 1 + fairmq/options/FairMQParser.h | 2 +- fairmq/plugins/DDS/DDS.cxx | 1 + fairmq/tools/Network.cxx | 192 ++++++++ fairmq/tools/Network.h | 176 +------ fairmq/tools/Process.cxx | 72 +++ fairmq/tools/Process.h | 40 +- fairmq/tools/Unique.cxx | 44 ++ fairmq/tools/Unique.h | 20 +- fairmq/zeromq/FairMQMessageZMQ.cxx | 2 + fairmq/zeromq/FairMQSocketZMQ.cxx | 2 + test/device/TestReceiver.h | 1 + test/device/TestSender.h | 1 + test/helper/devices/TestPairLeft.cxx | 2 + test/helper/devices/TestPairRight.cxx | 1 + test/helper/devices/TestPollIn.cxx | 1 + test/helper/devices/TestPollOut.cxx | 1 + test/helper/devices/TestPull.cxx | 1 + test/helper/devices/TestPush.cxx | 1 + test/helper/devices/TestRep.cxx | 1 + test/helper/devices/TestReq.cxx | 1 + 24 files changed, 985 insertions(+), 812 deletions(-) create mode 100644 fairmq/tools/Network.cxx create mode 100644 fairmq/tools/Process.cxx create mode 100644 fairmq/tools/Unique.cxx diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 82f674f9..f45b919f 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -148,6 +148,9 @@ set(FAIRMQ_SOURCE_FILES shmem/Manager.cxx shmem/Monitor.cxx shmem/Region.cxx + tools/Network.cxx + tools/Process.cxx + tools/Unique.cxx zeromq/FairMQMessageZMQ.cxx zeromq/FairMQPollerZMQ.cxx zeromq/FairMQUnmanagedRegionZMQ.cxx diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index bd9199b7..d688656c 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -14,14 +14,531 @@ #include "FairMQStateMachine.h" -FairMQStateMachine::FairMQStateMachine() +// Increase maximum number of boost::msm states (default is 10) +// This #define has to be before any msm header includes +#define FUSION_MAX_VECTOR_SIZE 20 + +#include +#include +#include +#include +#include +#include + +#include // signal/slot for onStateChange callbacks + +#include +#include +#include +#include +#include + +using namespace std; + +namespace msmf = boost::msm::front; + +namespace fair { - start(); +namespace mq +{ +namespace fsm +{ + +// defining events for the boost MSM state machine +struct INIT_DEVICE_E { string name() const { return "INIT_DEVICE"; } }; +struct internal_DEVICE_READY_E { string name() const { return "internal_DEVICE_READY"; } }; +struct INIT_TASK_E { string name() const { return "INIT_TASK"; } }; +struct internal_READY_E { string name() const { return "internal_READY"; } }; +struct RUN_E { string name() const { return "RUN"; } }; +struct PAUSE_E { string name() const { return "PAUSE"; } }; +struct STOP_E { string name() const { return "STOP"; } }; +struct RESET_TASK_E { string name() const { return "RESET_TASK"; } }; +struct RESET_DEVICE_E { string name() const { return "RESET_DEVICE"; } }; +struct internal_IDLE_E { string name() const { return "internal_IDLE"; } }; +struct END_E { string name() const { return "END"; } }; +struct ERROR_FOUND_E { string name() const { return "ERROR_FOUND"; } }; + +// deactivate the warning for non-virtual destructor thrown in the boost library +#if defined(__clang__) +_Pragma("clang diagnostic push") +_Pragma("clang diagnostic ignored \"-Wnon-virtual-dtor\"") +#elif defined(__GNUC__) || defined(__GNUG__) +_Pragma("GCC diagnostic push") +_Pragma("GCC diagnostic ignored \"-Wnon-virtual-dtor\"") +#endif + +// defining the boost MSM state machine +struct Machine_ : public msmf::state_machine_def +{ + public: + Machine_() + : fState() + , fWork() + , fWorkAvailableCondition() + , fWorkDoneCondition() + , fWorkMutex() + , fWorkerTerminated(false) + , fWorkActive(false) + , fWorkAvailable(false) + , fStateChangeSignal() + , fStateChangeSignalsMap() + , fTerminationRequested(false) + , fWorkerThread() + {} + + virtual ~Machine_() + {} + + template + void on_entry(Event const&, FSM& fsm) + { + LOG(state) << "Starting FairMQ state machine"; + fState = FairMQStateMachine::IDLE; + fsm.CallStateChangeCallbacks(FairMQStateMachine::IDLE); + + // start a worker thread to execute user states in. + fsm.fWorkerThread = thread(&Machine_::Worker, &fsm); + } + + template + void on_exit(Event const&, FSM& /*fsm*/) + { + LOG(state) << "Exiting FairMQ state machine"; + } + + // list of FSM states + struct OK_FSM : public msmf::state<> {}; + struct ERROR_FSM : public msmf::terminate_state<> {}; + + struct IDLE_FSM : public msmf::state<> {}; + struct INITIALIZING_DEVICE_FSM : public msmf::state<> {}; + struct DEVICE_READY_FSM : public msmf::state<> {}; + struct INITIALIZING_TASK_FSM : public msmf::state<> {}; + struct READY_FSM : public msmf::state<> {}; + struct RUNNING_FSM : public msmf::state<> {}; + struct PAUSED_FSM : public msmf::state<> {}; + struct RESETTING_TASK_FSM : public msmf::state<> {}; + struct RESETTING_DEVICE_FSM : public msmf::state<> {}; + struct EXITING_FSM : public msmf::state<> {}; + + // initial states + using initial_state = boost::mpl::vector; + + // actions + struct IdleFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + LOG(state) << "Entering IDLE state"; + fsm.fState = FairMQStateMachine::IDLE; + } + }; + + struct InitDeviceFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = FairMQStateMachine::INITIALIZING_DEVICE; + + unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + LOG(state) << "Entering INITIALIZING DEVICE state"; + fsm.fWork = fsm.fInitWrapperHandler; + fsm.fWorkAvailableCondition.notify_one(); + } + }; + + struct DeviceReadyFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + LOG(state) << "Entering DEVICE READY state"; + fsm.fState = FairMQStateMachine::DEVICE_READY; + } + }; + + struct InitTaskFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = FairMQStateMachine::INITIALIZING_TASK; + + unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + LOG(state) << "Entering INITIALIZING TASK state"; + fsm.fWork = fsm.fInitTaskWrapperHandler; + fsm.fWorkAvailableCondition.notify_one(); + } + }; + + struct ReadyFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + LOG(state) << "Entering READY state"; + fsm.fState = FairMQStateMachine::READY; + } + }; + + struct RunFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = FairMQStateMachine::RUNNING; + + unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + LOG(state) << "Entering RUNNING state"; + fsm.fWork = fsm.fRunWrapperHandler; + fsm.fWorkAvailableCondition.notify_one(); + } + }; + + struct PauseFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = FairMQStateMachine::PAUSED; + + fsm.fUnblockHandler(); + unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + LOG(state) << "Entering PAUSED state"; + fsm.fWork = fsm.fPauseWrapperHandler; + fsm.fWorkAvailableCondition.notify_one(); + } + }; + + struct ResumeFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = FairMQStateMachine::RUNNING; + + unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + LOG(state) << "Entering RUNNING state"; + fsm.fWork = fsm.fRunWrapperHandler; + fsm.fWorkAvailableCondition.notify_one(); + } + }; + + struct StopFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = FairMQStateMachine::READY; + + fsm.fUnblockHandler(); + unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + LOG(state) << "Entering READY state"; + } + }; + + struct InternalStopFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = FairMQStateMachine::READY; + fsm.fUnblockHandler(); + LOG(state) << "RUNNING state finished without an external event, entering READY state"; + } + }; + + struct ResetTaskFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = FairMQStateMachine::RESETTING_TASK; + + unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + LOG(state) << "Entering RESETTING TASK state"; + fsm.fWork = fsm.fResetTaskWrapperHandler; + fsm.fWorkAvailableCondition.notify_one(); + } + }; + + struct ResetDeviceFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = FairMQStateMachine::RESETTING_DEVICE; + + unique_lock lock(fsm.fWorkMutex); + while (fsm.fWorkActive) + { + fsm.fWorkDoneCondition.wait(lock); + } + fsm.fWorkAvailable = true; + LOG(state) << "Entering RESETTING DEVICE state"; + fsm.fWork = fsm.fResetWrapperHandler; + fsm.fWorkAvailableCondition.notify_one(); + } + }; + + struct ExitingFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + LOG(state) << "Entering EXITING state"; + fsm.fState = FairMQStateMachine::EXITING; + fsm.fTerminationRequested = true; + fsm.CallStateChangeCallbacks(FairMQStateMachine::EXITING); + + // terminate worker thread + { + lock_guard lock(fsm.fWorkMutex); + fsm.fWorkerTerminated = true; + fsm.fWorkAvailableCondition.notify_one(); + } + + // join the worker thread (executing user states) + if (fsm.fWorkerThread.joinable()) + { + fsm.fWorkerThread.join(); + } + + fsm.fExitHandler(); + } + }; + + struct ErrorFoundFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + LOG(state) << "Entering ERROR state"; + fsm.fState = FairMQStateMachine::Error; + fsm.CallStateChangeCallbacks(FairMQStateMachine::Error); + } + }; + + // Transition table for Machine_ + struct transition_table : boost::mpl::vector< + // Start Event Next Action Guard + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row> + {}; + + // replaces the default no-transition response. + template + void no_transition(Event const& e, FSM&, int state) + { + using recursive_stt = typename boost::msm::back::recursive_get_transition_table::type; + using all_states = typename boost::msm::back::generate_state_set::type; + + string stateName; + + boost::mpl::for_each>(boost::msm::back::get_state_name(stateName, state)); + + stateName = stateName.substr(24); + size_t pos = stateName.find("_FSME"); + stateName.erase(pos); + + if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE") + { + stateName = stateName.substr(1); + } + + if (stateName != "OK") + { + LOG(state) << "No transition from state " << stateName << " on event " << e.name(); + } + + // LOG(state) << "no transition from state " << GetStateName(state) << " (" << state << ") on event " << e.name(); + } + + static string GetStateName(const int state) + { + switch(state) + { + case FairMQStateMachine::OK: + return "OK"; + case FairMQStateMachine::Error: + return "Error"; + case FairMQStateMachine::IDLE: + return "IDLE"; + case FairMQStateMachine::INITIALIZING_DEVICE: + return "INITIALIZING_DEVICE"; + case FairMQStateMachine::DEVICE_READY: + return "DEVICE_READY"; + case FairMQStateMachine::INITIALIZING_TASK: + return "INITIALIZING_TASK"; + case FairMQStateMachine::READY: + return "READY"; + case FairMQStateMachine::RUNNING: + return "RUNNING"; + case FairMQStateMachine::PAUSED: + return "PAUSED"; + case FairMQStateMachine::RESETTING_TASK: + return "RESETTING_TASK"; + case FairMQStateMachine::RESETTING_DEVICE: + return "RESETTING_DEVICE"; + case FairMQStateMachine::EXITING: + return "EXITING"; + default: + return "requested name for non-existent state..."; + } + } + + void CallStateChangeCallbacks(const FairMQStateMachine::State state) const + { + if (!fStateChangeSignal.empty()) + { + fStateChangeSignal(state); + } + } + + function fInitWrapperHandler; + function fInitTaskWrapperHandler; + function fRunWrapperHandler; + function fPauseWrapperHandler; + function fResetWrapperHandler; + function fResetTaskWrapperHandler; + function fExitHandler; + function fUnblockHandler; + + // function to execute user states in a worker thread + function fWork; + condition_variable fWorkAvailableCondition; + condition_variable fWorkDoneCondition; + mutex fWorkMutex; + bool fWorkerTerminated; + bool fWorkActive; + bool fWorkAvailable; + + boost::signals2::signal fStateChangeSignal; + unordered_map fStateChangeSignalsMap; + atomic fTerminationRequested; + + atomic fState; + + private: + void Worker() + { + while (true) + { + { + unique_lock lock(fWorkMutex); + // Wait for work to be done. + while (!fWorkAvailable && !fWorkerTerminated) + { + fWorkAvailableCondition.wait(lock); + } + + if (fWorkerTerminated) + { + break; + } + + fWorkActive = true; + } + + fWork(); + + { + lock_guard lock(fWorkMutex); + fWorkActive = false; + fWorkAvailable = false; + fWorkDoneCondition.notify_one(); + } + CallStateChangeCallbacks(fState); + } + } + + // run state handlers in a separate thread + thread fWorkerThread; +}; // Machine_ + +using FairMQFSM = boost::msm::back::state_machine; + +// reactivate the warning for non-virtual destructor +#if defined(__clang__) +_Pragma("clang diagnostic pop") +#elif defined(__GNUC__) || defined(__GNUG__) +_Pragma("GCC diagnostic pop") +#endif + +} // namespace fsm +} // namespace mq +} // namespace fair + +using namespace fair::mq::fsm; + +FairMQStateMachine::FairMQStateMachine() + : fFsm(new FairMQFSM) + , fChangeStateMutex() +{ + static_pointer_cast(fFsm)->fInitWrapperHandler = bind(&FairMQStateMachine::InitWrapper, this); + static_pointer_cast(fFsm)->fInitTaskWrapperHandler = bind(&FairMQStateMachine::InitTaskWrapper, this); + static_pointer_cast(fFsm)->fRunWrapperHandler = bind(&FairMQStateMachine::RunWrapper, this); + static_pointer_cast(fFsm)->fPauseWrapperHandler = bind(&FairMQStateMachine::PauseWrapper, this); + static_pointer_cast(fFsm)->fResetWrapperHandler = bind(&FairMQStateMachine::ResetWrapper, this); + static_pointer_cast(fFsm)->fResetTaskWrapperHandler = bind(&FairMQStateMachine::ResetTaskWrapper, this); + static_pointer_cast(fFsm)->fExitHandler = bind(&FairMQStateMachine::Exit, this); + static_pointer_cast(fFsm)->fUnblockHandler = bind(&FairMQStateMachine::Unblock, this); + + static_pointer_cast(fFsm)->start(); } FairMQStateMachine::~FairMQStateMachine() { - stop(); + static_pointer_cast(fFsm)->stop(); } int FairMQStateMachine::GetInterfaceVersion() const @@ -37,85 +554,85 @@ bool FairMQStateMachine::ChangeState(int event) { case INIT_DEVICE: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::INIT_DEVICE()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(INIT_DEVICE_E()); return true; } case internal_DEVICE_READY: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::internal_DEVICE_READY()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(internal_DEVICE_READY_E()); return true; } case INIT_TASK: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::INIT_TASK()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(INIT_TASK_E()); return true; } case internal_READY: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::internal_READY()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(internal_READY_E()); return true; } case RUN: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::RUN()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(RUN_E()); return true; } case PAUSE: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::PAUSE()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(PAUSE_E()); return true; } case STOP: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::STOP()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(STOP_E()); return true; } case RESET_DEVICE: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::RESET_DEVICE()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(RESET_DEVICE_E()); return true; } case RESET_TASK: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::RESET_TASK()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(RESET_TASK_E()); return true; } case internal_IDLE: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::internal_IDLE()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(internal_IDLE_E()); return true; } case END: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::END()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(END_E()); return true; } case ERROR_FOUND: { - std::lock_guard lock(fChangeStateMutex); - process_event(fair::mq::fsm::ERROR_FOUND()); + lock_guard lock(fChangeStateMutex); + static_pointer_cast(fFsm)->process_event(ERROR_FOUND_E()); return true; } default: { - LOG(error) << "Requested state transition with an unsupported event: " << event << std::endl + LOG(error) << "Requested state transition with an unsupported event: " << event << endl << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND"; return false; } } } - catch (std::exception& e) + catch (exception& e) { LOG(error) << "Exception in FairMQStateMachine::ChangeState(): " << e.what(); exit(EXIT_FAILURE); @@ -123,7 +640,7 @@ bool FairMQStateMachine::ChangeState(int event) return false; } -bool FairMQStateMachine::ChangeState(const std::string& event) +bool FairMQStateMachine::ChangeState(const string& event) { return ChangeState(GetEventNumber(event)); } @@ -140,10 +657,10 @@ void FairMQStateMachine::WaitForEndOfState(int event) case RESET_TASK: case RESET_DEVICE: { - std::unique_lock lock(fWorkMutex); - while (fWorkActive || fWorkAvailable) + unique_lock lock(static_pointer_cast(fFsm)->fWorkMutex); + while (static_pointer_cast(fFsm)->fWorkActive || static_pointer_cast(fFsm)->fWorkAvailable) { - fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1)); + static_pointer_cast(fFsm)->fWorkDoneCondition.wait_for(lock, chrono::seconds(1)); } break; @@ -153,13 +670,13 @@ void FairMQStateMachine::WaitForEndOfState(int event) break; } } - catch (std::exception& e) + catch (exception& e) { LOG(error) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what(); } } -void FairMQStateMachine::WaitForEndOfState(const std::string& event) +void FairMQStateMachine::WaitForEndOfState(const string& event) { return WaitForEndOfState(GetEventNumber(event)); } @@ -176,11 +693,11 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs) case RESET_TASK: case RESET_DEVICE: { - std::unique_lock lock(fWorkMutex); - while (fWorkActive || fWorkAvailable) + unique_lock lock(static_pointer_cast(fFsm)->fWorkMutex); + while (static_pointer_cast(fFsm)->fWorkActive || static_pointer_cast(fFsm)->fWorkAvailable) { - fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs)); - if (fWorkActive) + static_pointer_cast(fFsm)->fWorkDoneCondition.wait_for(lock, chrono::milliseconds(durationInMs)); + if (static_pointer_cast(fFsm)->fWorkActive) { return false; } @@ -192,32 +709,59 @@ bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs) return false; } } - catch (std::exception& e) + catch (exception& e) { LOG(error) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what(); } return false; } -bool FairMQStateMachine::WaitForEndOfStateForMs(const std::string& event, int durationInMs) +bool FairMQStateMachine::WaitForEndOfStateForMs(const string& event, int durationInMs) { return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs); } -void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function callback) +void FairMQStateMachine::SubscribeToStateChange(const string& key, function callback) { - fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)}); + static_pointer_cast(fFsm)->fStateChangeSignalsMap.insert({key, static_pointer_cast(fFsm)->fStateChangeSignal.connect(callback)}); } -void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key) +void FairMQStateMachine::UnsubscribeFromStateChange(const string& key) { - if (fStateChangeSignalsMap.count(key)) + if (static_pointer_cast(fFsm)->fStateChangeSignalsMap.count(key)) { - fStateChangeSignalsMap.at(key).disconnect(); - fStateChangeSignalsMap.erase(key); + static_pointer_cast(fFsm)->fStateChangeSignalsMap.at(key).disconnect(); + static_pointer_cast(fFsm)->fStateChangeSignalsMap.erase(key); } } -int FairMQStateMachine::GetEventNumber(const std::string& event) +void FairMQStateMachine::CallStateChangeCallbacks(const State state) const +{ + static_pointer_cast(fFsm)->CallStateChangeCallbacks(state); +} + +string FairMQStateMachine::GetCurrentStateName() const +{ + return static_pointer_cast(fFsm)->GetStateName(static_pointer_cast(fFsm)->fState); +} +int FairMQStateMachine::GetCurrentState() const +{ + return static_pointer_cast(fFsm)->fState; +} +bool FairMQStateMachine::CheckCurrentState(int state) const +{ + return state == static_pointer_cast(fFsm)->fState; +} +bool FairMQStateMachine::CheckCurrentState(string state) const +{ + return state == GetCurrentStateName(); +} + +bool FairMQStateMachine::Terminated() +{ + return static_pointer_cast(fFsm)->fTerminationRequested; +} + +int FairMQStateMachine::GetEventNumber(const string& event) { if (event == "INIT_DEVICE") return INIT_DEVICE; if (event == "INIT_TASK") return INIT_TASK; @@ -228,7 +772,7 @@ int FairMQStateMachine::GetEventNumber(const std::string& event) if (event == "RESET_TASK") return RESET_TASK; if (event == "END") return END; if (event == "ERROR_FOUND") return ERROR_FOUND; - LOG(error) << "Requested number for non-existent event... " << event << std::endl + LOG(error) << "Requested number for non-existent event... " << event << endl << "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND"; return -1; } \ No newline at end of file diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index f85b393e..d7f90bfd 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -17,555 +17,14 @@ #define FAIRMQ_INTERFACE_VERSION 3 -#include -#include -#include -#include -#include -#include -#include -#include - -// Increase maximum number of boost::msm states (default is 10) -// This #define has to be before any msm header includes -#define FUSION_MAX_VECTOR_SIZE 20 - -#include -#include -#include -#include -#include -#include - -#include // signal/slot for onStateChange callbacks - #include "FairMQLogger.h" -namespace msmf = boost::msm::front; +#include +#include +#include +#include -namespace fair -{ -namespace mq -{ -namespace fsm -{ - -// defining events for the boost MSM state machine -struct INIT_DEVICE { std::string name() const { return "INIT_DEVICE"; } }; -struct internal_DEVICE_READY { std::string name() const { return "internal_DEVICE_READY"; } }; -struct INIT_TASK { std::string name() const { return "INIT_TASK"; } }; -struct internal_READY { std::string name() const { return "internal_READY"; } }; -struct RUN { std::string name() const { return "RUN"; } }; -struct PAUSE { std::string name() const { return "PAUSE"; } }; -struct STOP { std::string name() const { return "STOP"; } }; -struct RESET_TASK { std::string name() const { return "RESET_TASK"; } }; -struct RESET_DEVICE { std::string name() const { return "RESET_DEVICE"; } }; -struct internal_IDLE { std::string name() const { return "internal_IDLE"; } }; -struct END { std::string name() const { return "END"; } }; -struct ERROR_FOUND { std::string name() const { return "ERROR_FOUND"; } }; - -// deactivate the warning for non-virtual destructor thrown in the boost library -#if defined(__clang__) -_Pragma("clang diagnostic push") -_Pragma("clang diagnostic ignored \"-Wnon-virtual-dtor\"") -#elif defined(__GNUC__) || defined(__GNUG__) -_Pragma("GCC diagnostic push") -_Pragma("GCC diagnostic ignored \"-Wnon-virtual-dtor\"") -#endif - -// defining the boost MSM state machine -struct FairMQFSM : public msmf::state_machine_def -{ - public: - FairMQFSM() - : fState() - , fChangeStateMutex() - , fWork() - , fWorkAvailableCondition() - , fWorkDoneCondition() - , fWorkMutex() - , fWorkerTerminated(false) - , fWorkActive(false) - , fWorkAvailable(false) - , fStateChangeSignal() - , fStateChangeSignalsMap() - , fTerminationRequested(false) - , fWorkerThread() - {} - - virtual ~FairMQFSM() - {} - - template - void on_entry(Event const&, FSM& fsm) - { - LOG(state) << "Starting FairMQ state machine"; - fState = IDLE; - fsm.CallStateChangeCallbacks(IDLE); - - // start a worker thread to execute user states in. - fsm.fWorkerThread = std::thread(&FairMQFSM::Worker, &fsm); - } - - template - void on_exit(Event const&, FSM& /*fsm*/) - { - LOG(state) << "Exiting FairMQ state machine"; - } - - // list of FSM states - struct OK_FSM : public msmf::state<> {}; - struct ERROR_FSM : public msmf::terminate_state<> {}; - - struct IDLE_FSM : public msmf::state<> {}; - struct INITIALIZING_DEVICE_FSM : public msmf::state<> {}; - struct DEVICE_READY_FSM : public msmf::state<> {}; - struct INITIALIZING_TASK_FSM : public msmf::state<> {}; - struct READY_FSM : public msmf::state<> {}; - struct RUNNING_FSM : public msmf::state<> {}; - struct PAUSED_FSM : public msmf::state<> {}; - struct RESETTING_TASK_FSM : public msmf::state<> {}; - struct RESETTING_DEVICE_FSM : public msmf::state<> {}; - struct EXITING_FSM : public msmf::state<> {}; - - // initial states - using initial_state = boost::mpl::vector; - - // actions - struct IdleFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - LOG(state) << "Entering IDLE state"; - fsm.fState = IDLE; - } - }; - - struct InitDeviceFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = INITIALIZING_DEVICE; - - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering INITIALIZING DEVICE state"; - fsm.fWork = std::bind(&FairMQFSM::InitWrapper, &fsm); - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct DeviceReadyFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - LOG(state) << "Entering DEVICE READY state"; - fsm.fState = DEVICE_READY; - } - }; - - struct InitTaskFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = INITIALIZING_TASK; - - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering INITIALIZING TASK state"; - fsm.fWork = std::bind(&FairMQFSM::InitTaskWrapper, &fsm); - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct ReadyFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - LOG(state) << "Entering READY state"; - fsm.fState = READY; - } - }; - - struct RunFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = RUNNING; - - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering RUNNING state"; - fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm); - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct PauseFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = PAUSED; - - fsm.Unblock(); - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering PAUSED state"; - fsm.fWork = std::bind(&FairMQFSM::PauseWrapper, &fsm); - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct ResumeFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = RUNNING; - - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering RUNNING state"; - fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm); - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct StopFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = READY; - - fsm.Unblock(); - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - LOG(state) << "Entering READY state"; - } - }; - - struct InternalStopFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = READY; - fsm.Unblock(); - LOG(state) << "RUNNING state finished without an external event, entering READY state"; - } - }; - - struct ResetTaskFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = RESETTING_TASK; - - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering RESETTING TASK state"; - fsm.fWork = std::bind(&FairMQFSM::ResetTaskWrapper, &fsm); - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct ResetDeviceFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - fsm.fState = RESETTING_DEVICE; - - std::unique_lock lock(fsm.fWorkMutex); - while (fsm.fWorkActive) - { - fsm.fWorkDoneCondition.wait(lock); - } - fsm.fWorkAvailable = true; - LOG(state) << "Entering RESETTING DEVICE state"; - fsm.fWork = std::bind(&FairMQFSM::ResetWrapper, &fsm); - fsm.fWorkAvailableCondition.notify_one(); - } - }; - - struct ExitingFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - LOG(state) << "Entering EXITING state"; - fsm.fState = EXITING; - fsm.fTerminationRequested = true; - fsm.CallStateChangeCallbacks(EXITING); - - // terminate worker thread - { - std::lock_guard lock(fsm.fWorkMutex); - fsm.fWorkerTerminated = true; - fsm.fWorkAvailableCondition.notify_one(); - } - - // join the worker thread (executing user states) - if (fsm.fWorkerThread.joinable()) - { - fsm.fWorkerThread.join(); - } - - fsm.Exit(); - } - }; - - struct ErrorFoundFct - { - template - void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) - { - LOG(state) << "Entering ERROR state"; - fsm.fState = Error; - fsm.CallStateChangeCallbacks(Error); - } - }; - - // Transition table for FairMQFSM - struct transition_table : boost::mpl::vector< - // Start Event Next Action Guard - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row, - msmf::Row> - {}; - - // replaces the default no-transition response. - template - void no_transition(Event const& e, FSM&, int state) - { - using recursive_stt = typename boost::msm::back::recursive_get_transition_table::type; - using all_states = typename boost::msm::back::generate_state_set::type; - - std::string stateName; - - boost::mpl::for_each>(boost::msm::back::get_state_name(stateName, state)); - - stateName = stateName.substr(24); - std::size_t pos = stateName.find("_FSME"); - stateName.erase(pos); - - if (stateName == "1RUNNING" || stateName == "6DEVICE_READY" || stateName == "0PAUSED" || stateName == "8RESETTING_TASK" || stateName == "0RESETTING_DEVICE") - { - stateName = stateName.substr(1); - } - - if (stateName != "OK") - { - LOG(state) << "No transition from state " << stateName << " on event " << e.name(); - } - - // LOG(state) << "no transition from state " << GetStateName(state) << " (" << state << ") on event " << e.name(); - } - - // backward compatibility to FairMQStateMachine - enum State - { - OK, - Error, - IDLE, - INITIALIZING_DEVICE, - DEVICE_READY, - INITIALIZING_TASK, - READY, - RUNNING, - PAUSED, - RESETTING_TASK, - RESETTING_DEVICE, - EXITING - }; - - static std::string GetStateName(const int state) - { - switch(state) - { - case OK: - return "OK"; - case Error: - return "Error"; - case IDLE: - return "IDLE"; - case INITIALIZING_DEVICE: - return "INITIALIZING_DEVICE"; - case DEVICE_READY: - return "DEVICE_READY"; - case INITIALIZING_TASK: - return "INITIALIZING_TASK"; - case READY: - return "READY"; - case RUNNING: - return "RUNNING"; - case PAUSED: - return "PAUSED"; - case RESETTING_TASK: - return "RESETTING_TASK"; - case RESETTING_DEVICE: - return "RESETTING_DEVICE"; - case EXITING: - return "EXITING"; - default: - return "requested name for non-existent state..."; - } - } - - std::string GetCurrentStateName() const - { - return GetStateName(fState); - } - int GetCurrentState() const - { - return fState; - } - bool CheckCurrentState(int state) const - { - return state == fState; - } - bool CheckCurrentState(std::string state) const - { - return state == GetCurrentStateName(); - } - - // actions to be overwritten by derived classes - virtual void InitWrapper() {} - virtual void InitTaskWrapper() {} - virtual void RunWrapper() {} - virtual void PauseWrapper() {} - virtual void ResetWrapper() {} - virtual void ResetTaskWrapper() {} - virtual void Exit() {} - virtual void Unblock() {} - - bool Terminated() - { - return fTerminationRequested; - } - - protected: - std::atomic fState; - std::mutex fChangeStateMutex; - - // function to execute user states in a worker thread - std::function fWork; - std::condition_variable fWorkAvailableCondition; - std::condition_variable fWorkDoneCondition; - std::mutex fWorkMutex; - bool fWorkerTerminated; - bool fWorkActive; - bool fWorkAvailable; - - boost::signals2::signal fStateChangeSignal; - std::unordered_map fStateChangeSignalsMap; - std::atomic fTerminationRequested; - - void CallStateChangeCallbacks(const State state) const - { - if (!fStateChangeSignal.empty()) - { - fStateChangeSignal(state); - } - } - - private: - void Worker() - { - while (true) - { - { - std::unique_lock lock(fWorkMutex); - // Wait for work to be done. - while (!fWorkAvailable && !fWorkerTerminated) - { - fWorkAvailableCondition.wait(lock); - } - - if (fWorkerTerminated) - { - break; - } - - fWorkActive = true; - } - - fWork(); - - { - std::lock_guard lock(fWorkMutex); - fWorkActive = false; - fWorkAvailable = false; - fWorkDoneCondition.notify_one(); - } - CallStateChangeCallbacks(fState); - } - } - - // run state handlers in a separate thread - std::thread fWorkerThread; -}; - -// reactivate the warning for non-virtual destructor -#if defined(__clang__) -_Pragma("clang diagnostic pop") -#elif defined(__GNUC__) || defined(__GNUG__) -_Pragma("GCC diagnostic pop") -#endif - -} // namespace fsm -} // namespace mq -} // namespace fair - -class FairMQStateMachine : public boost::msm::back::state_machine +class FairMQStateMachine { public: enum Event @@ -584,6 +43,22 @@ class FairMQStateMachine : public boost::msm::back::state_machine callback); void UnsubscribeFromStateChange(const std::string& key); + void CallStateChangeCallbacks(const State state) const; + + std::string GetCurrentStateName() const; + int GetCurrentState() const; + bool CheckCurrentState(int state) const; + bool CheckCurrentState(std::string state) const; + bool Terminated(); + + // actions to be overwritten by derived classes + virtual void InitWrapper() {} + virtual void InitTaskWrapper() {} + virtual void RunWrapper() {} + virtual void PauseWrapper() {} + virtual void ResetWrapper() {} + virtual void ResetTaskWrapper() {} + virtual void Exit() {} + virtual void Unblock() {} + private: int GetEventNumber(const std::string& event); + + std::mutex fChangeStateMutex; + + std::shared_ptr fFsm; }; #endif /* FAIRMQSTATEMACHINE_H_ */ diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index cda5f329..44d8a867 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -15,6 +15,7 @@ #include "FairMQParser.h" #include "FairMQLogger.h" #include +#include using namespace std; diff --git a/fairmq/options/FairMQParser.h b/fairmq/options/FairMQParser.h index e910d5aa..d0eeb158 100644 --- a/fairmq/options/FairMQParser.h +++ b/fairmq/options/FairMQParser.h @@ -13,7 +13,7 @@ #include #include -#include +#include #include "FairMQChannel.h" diff --git a/fairmq/plugins/DDS/DDS.cxx b/fairmq/plugins/DDS/DDS.cxx index 29a5d32e..6f75621c 100644 --- a/fairmq/plugins/DDS/DDS.cxx +++ b/fairmq/plugins/DDS/DDS.cxx @@ -12,6 +12,7 @@ #include #include +#include #include // for the interactive mode #include // for the interactive mode diff --git a/fairmq/tools/Network.cxx b/fairmq/tools/Network.cxx new file mode 100644 index 00000000..e95680b9 --- /dev/null +++ b/fairmq/tools/Network.cxx @@ -0,0 +1,192 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE // To get defns of NI_MAXSERV and NI_MAXHOST +#endif + +#include "FairMQLogger.h" + +#include +#include +#include +#include +#include + +#include // trim +#include + +#include +#include +#include +#include +#include +#include + +using namespace std; + +namespace fair +{ +namespace mq +{ +namespace tools +{ + +// returns a map with network interface names as keys and their IP addresses as values +int getHostIPs(map& addressMap) +{ + struct ifaddrs *ifaddr, *ifa; + int s; + char host[NI_MAXHOST]; + + if (getifaddrs(&ifaddr) == -1) + { + perror("getifaddrs"); + return -1; + } + + for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) + { + if (ifa->ifa_addr == NULL) + { + continue; + } + + if (ifa->ifa_addr->sa_family == AF_INET) + { + s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST); + if (s != 0) + { + cout << "getnameinfo() failed: " << gai_strerror(s) << endl; + return -1; + } + + addressMap.insert(pair(ifa->ifa_name, host)); + } + } + freeifaddrs(ifaddr); + + return 0; +} + +// get IP address of a given interface name +string getInterfaceIP(const string& interface) +{ + map IPs; + getHostIPs(IPs); + if (IPs.count(interface)) + { + return IPs[interface]; + } + else + { + LOG(error) << "Could not find provided network interface: \"" << interface << "\"!, exiting."; + return ""; + } +} + +// get name of the default route interface +string getDefaultRouteNetworkInterface() +{ + array buffer; + string interfaceName; + +#ifdef __APPLE__ // MacOS + unique_ptr file(popen("route -n get default | grep interface | cut -d \":\" -f 2", "r"), pclose); +#else // Linux + unique_ptr file(popen("ip route | grep default | cut -d \" \" -f 5 | head -n 1", "r"), pclose); +#endif + + if (!file) + { + LOG(error) << "Could not detect default route network interface name - popen() failed!"; + return ""; + } + + while (!feof(file.get())) + { + if (fgets(buffer.data(), 128, file.get()) != NULL) + { + interfaceName += buffer.data(); + } + } + + boost::algorithm::trim(interfaceName); + + if (interfaceName == "") + { + LOG(error) << "Could not detect default route network interface name"; + } + else + { + LOG(debug) << "Detected network interface name for the default route: " << interfaceName; + } + + return interfaceName; +} + +string getIpFromHostname(const string& hostname) +{ + try { + namespace bai = boost::asio::ip; + boost::asio::io_service ios; + bai::tcp::resolver resolver(ios); + bai::tcp::resolver::query query(hostname, ""); + bai::tcp::resolver::iterator end; + + auto it = find_if(static_cast>(resolver.resolve(query)), end, [](const bai::tcp::endpoint& ep) { + return ep.address().is_v4(); + }); + + if (it != end) { + stringstream ss; + ss << static_cast(*it).address(); + return ss.str(); + } + + LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'"; + + return ""; + } catch (exception& e) { + LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what(); + return ""; + } +} + +string getIpFromHostname(const string& hostname, boost::asio::io_service& ios) +{ + try { + namespace bai = boost::asio::ip; + bai::tcp::resolver resolver(ios); + bai::tcp::resolver::query query(hostname, ""); + bai::tcp::resolver::iterator end; + + auto it = find_if(static_cast>(resolver.resolve(query)), end, [](const bai::tcp::endpoint& ep) { + return ep.address().is_v4(); + }); + + if (it != end) { + stringstream ss; + ss << static_cast(*it).address(); + return ss.str(); + } + + LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'"; + + return ""; + } catch (exception& e) { + LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what(); + return ""; + } +} + +} /* namespace tools */ +} /* namespace mq */ +} /* namespace fair */ diff --git a/fairmq/tools/Network.h b/fairmq/tools/Network.h index 9757093d..d80680ac 100644 --- a/fairmq/tools/Network.h +++ b/fairmq/tools/Network.h @@ -9,27 +9,20 @@ #ifndef FAIR_MQ_TOOLS_NETWORK_H #define FAIR_MQ_TOOLS_NETWORK_H -#ifndef _GNU_SOURCE -#define _GNU_SOURCE // To get defns of NI_MAXSERV and NI_MAXHOST -#endif - -#include "FairMQLogger.h" - -#include -#include -#include -#include -#include - -#include // trim -#include - #include #include -#include -#include -#include -#include + +// forward declarations +namespace boost +{ +namespace asio +{ + +class io_context; +typedef class io_context io_service; + +} // namespace asio +} // namespace boost namespace fair { @@ -39,152 +32,17 @@ namespace tools { // returns a map with network interface names as keys and their IP addresses as values -inline int getHostIPs(std::map& addressMap) -{ - struct ifaddrs *ifaddr, *ifa; - int s; - char host[NI_MAXHOST]; - - if (getifaddrs(&ifaddr) == -1) - { - perror("getifaddrs"); - return -1; - } - - for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) - { - if (ifa->ifa_addr == NULL) - { - continue; - } - - if (ifa->ifa_addr->sa_family == AF_INET) - { - s = getnameinfo(ifa->ifa_addr, sizeof(struct sockaddr_in), host, NI_MAXHOST, NULL, 0, NI_NUMERICHOST); - if (s != 0) - { - std::cout << "getnameinfo() failed: " << gai_strerror(s) << std::endl; - return -1; - } - - addressMap.insert(std::pair(ifa->ifa_name, host)); - } - } - freeifaddrs(ifaddr); - - return 0; -} +int getHostIPs(std::map& addressMap); // get IP address of a given interface name -inline std::string getInterfaceIP(std::string interface) -{ - std::map IPs; - getHostIPs(IPs); - if (IPs.count(interface)) - { - return IPs[interface]; - } - else - { - LOG(error) << "Could not find provided network interface: \"" << interface << "\"!, exiting."; - return ""; - } -} +std::string getInterfaceIP(const std::string& interface); // get name of the default route interface -inline std::string getDefaultRouteNetworkInterface() -{ - std::array buffer; - std::string interfaceName; +std::string getDefaultRouteNetworkInterface(); -#ifdef __APPLE__ // MacOS - std::unique_ptr file(popen("route -n get default | grep interface | cut -d \":\" -f 2", "r"), pclose); -#else // Linux - std::unique_ptr file(popen("ip route | grep default | cut -d \" \" -f 5 | head -n 1", "r"), pclose); -#endif +std::string getIpFromHostname(const std::string& hostname); - if (!file) - { - LOG(error) << "Could not detect default route network interface name - popen() failed!"; - return ""; - } - - while (!feof(file.get())) - { - if (fgets(buffer.data(), 128, file.get()) != NULL) - { - interfaceName += buffer.data(); - } - } - - boost::algorithm::trim(interfaceName); - - if (interfaceName == "") - { - LOG(error) << "Could not detect default route network interface name"; - } - else - { - LOG(debug) << "Detected network interface name for the default route: " << interfaceName; - } - - return interfaceName; -} - -inline std::string getIpFromHostname(const std::string& hostname) -{ - try { - namespace bai = boost::asio::ip; - boost::asio::io_service ios; - bai::tcp::resolver resolver(ios); - bai::tcp::resolver::query query(hostname, ""); - bai::tcp::resolver::iterator end; - - auto it = std::find_if(static_cast>(resolver.resolve(query)), end, [](const bai::tcp::endpoint& ep) { - return ep.address().is_v4(); - }); - - if (it != end) { - std::stringstream ss; - ss << static_cast(*it).address(); - return ss.str(); - } - - LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'"; - - return ""; - } catch (std::exception& e) { - LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what(); - return ""; - } -} - -inline std::string getIpFromHostname(const std::string& hostname, boost::asio::io_service& ios) -{ - try { - namespace bai = boost::asio::ip; - bai::tcp::resolver resolver(ios); - bai::tcp::resolver::query query(hostname, ""); - bai::tcp::resolver::iterator end; - - auto it = std::find_if(static_cast>(resolver.resolve(query)), end, [](const bai::tcp::endpoint& ep) { - return ep.address().is_v4(); - }); - - if (it != end) { - std::stringstream ss; - ss << static_cast(*it).address(); - return ss.str(); - } - - LOG(warn) << "could not find ipv4 address for hostname '" << hostname << "'"; - - return ""; - } catch (std::exception& e) { - LOG(error) << "could not resolve hostname '" << hostname << "', reason: " << e.what(); - return ""; - } -} +std::string getIpFromHostname(const std::string& hostname, boost::asio::io_service& ios); } /* namespace tools */ } /* namespace mq */ diff --git a/fairmq/tools/Process.cxx b/fairmq/tools/Process.cxx new file mode 100644 index 00000000..4ae4a118 --- /dev/null +++ b/fairmq/tools/Process.cxx @@ -0,0 +1,72 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include + +#include + +#include + +using namespace std; + +namespace fair +{ +namespace mq +{ +namespace tools +{ + +/** + * Execute given command in forked process and capture stdout output + * and exit code. + * + * @param[in] cmd Command to execute + * @param[in] log_prefix How to prefix each captured output line with + * @return Captured stdout output and exit code + */ +execute_result execute(string cmd, string prefix) +{ + execute_result result; + stringstream out; + + // print full line thread-safe + stringstream printCmd; + printCmd << prefix << cmd << "\n"; + cout << printCmd.str() << flush; + + out << prefix << cmd << endl; + + // Execute command and capture stdout, add prefix line by line + boost::process::ipstream stdout; + boost::process::child c(cmd, boost::process::std_out > stdout); + string line; + while (getline(stdout, line)) + { + // print full line thread-safe + stringstream printLine; + printLine << prefix << line << "\n"; + cout << printLine.str() << flush; + + out << prefix << line << "\n"; + } + + c.wait(); + + // Capture exit code + result.exit_code = c.exit_code(); + out << prefix << " Exit code: " << result.exit_code << endl; + + result.console_out = out.str(); + + // Return result + return result; +} + +} /* namespace tools */ +} /* namespace mq */ +} /* namespace fair */ diff --git a/fairmq/tools/Process.h b/fairmq/tools/Process.h index 10ba9a94..ddd867f4 100644 --- a/fairmq/tools/Process.h +++ b/fairmq/tools/Process.h @@ -9,8 +9,6 @@ #ifndef FAIR_MQ_TOOLS_PROCESS_H #define FAIR_MQ_TOOLS_PROCESS_H -#include - #include namespace fair @@ -37,43 +35,7 @@ struct execute_result * @param[in] log_prefix How to prefix each captured output line with * @return Captured stdout output and exit code */ -inline execute_result execute(std::string cmd, std::string prefix = "") -{ - execute_result result; - std::stringstream out; - - // print full line thread-safe - std::stringstream printCmd; - printCmd << prefix << cmd << "\n"; - std::cout << printCmd.str() << std::flush; - - out << prefix << cmd << std::endl; - - // Execute command and capture stdout, add prefix line by line - boost::process::ipstream stdout; - boost::process::child c(cmd, boost::process::std_out > stdout); - std::string line; - while (getline(stdout, line)) - { - // print full line thread-safe - std::stringstream printLine; - printLine << prefix << line << "\n"; - std::cout << printLine.str() << std::flush; - - out << prefix << line << "\n"; - } - - c.wait(); - - // Capture exit code - result.exit_code = c.exit_code(); - out << prefix << " Exit code: " << result.exit_code << std::endl; - - result.console_out = out.str(); - - // Return result - return result; -} +execute_result execute(std::string cmd, std::string prefix = ""); } /* namespace tools */ } /* namespace mq */ diff --git a/fairmq/tools/Unique.cxx b/fairmq/tools/Unique.cxx new file mode 100644 index 00000000..fb0e03ab --- /dev/null +++ b/fairmq/tools/Unique.cxx @@ -0,0 +1,44 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include + +#include +#include +#include +#include + +using namespace std; + +namespace fair +{ +namespace mq +{ +namespace tools +{ + +// generates UUID string +string Uuid() +{ + boost::uuids::random_generator gen; + boost::uuids::uuid u = gen(); + return boost::uuids::to_string(u); +} + +// generates UUID and returns its hash +size_t UuidHash() +{ + boost::uuids::random_generator gen; + boost::hash uuid_hasher; + boost::uuids::uuid u = gen(); + return uuid_hasher(u); +} + +} /* namespace tools */ +} /* namespace mq */ +} /* namespace fair */ diff --git a/fairmq/tools/Unique.h b/fairmq/tools/Unique.h index 0462ca96..42e2ed21 100644 --- a/fairmq/tools/Unique.h +++ b/fairmq/tools/Unique.h @@ -9,11 +9,6 @@ #ifndef FAIR_MQ_TOOLS_UNIQUE_H #define FAIR_MQ_TOOLS_UNIQUE_H -#include -#include -#include -#include - #include namespace fair @@ -24,21 +19,10 @@ namespace tools { // generates UUID string -inline std::string Uuid() -{ - boost::uuids::random_generator gen; - boost::uuids::uuid u = gen(); - return boost::uuids::to_string(u); -} +std::string Uuid(); // generates UUID and returns its hash -inline std::size_t UuidHash() -{ - boost::uuids::random_generator gen; - boost::hash uuid_hasher; - boost::uuids::uuid u = gen(); - return uuid_hasher(u); -} +std::size_t UuidHash(); } /* namespace tools */ } /* namespace mq */ diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index f782a147..60de00aa 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -18,6 +18,8 @@ #include #include "FairMQUnmanagedRegionZMQ.h" +#include + using namespace std; fair::mq::Transport FairMQMessageZMQ::fTransportType = fair::mq::Transport::ZMQ; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 5724ba07..1684d6f7 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -12,6 +12,8 @@ #include +#include + using namespace std; atomic FairMQSocketZMQ::fInterrupted(false); diff --git a/test/device/TestReceiver.h b/test/device/TestReceiver.h index 41b15de1..6db57692 100644 --- a/test/device/TestReceiver.h +++ b/test/device/TestReceiver.h @@ -13,6 +13,7 @@ #include #include +#include namespace fair { diff --git a/test/device/TestSender.h b/test/device/TestSender.h index cea43c23..4e4903e6 100644 --- a/test/device/TestSender.h +++ b/test/device/TestSender.h @@ -13,6 +13,7 @@ #include #include +#include namespace fair { diff --git a/test/helper/devices/TestPairLeft.cxx b/test/helper/devices/TestPairLeft.cxx index 132200ae..fb36f0e9 100644 --- a/test/helper/devices/TestPairLeft.cxx +++ b/test/helper/devices/TestPairLeft.cxx @@ -8,6 +8,8 @@ #include +#include + namespace fair { namespace mq diff --git a/test/helper/devices/TestPairRight.cxx b/test/helper/devices/TestPairRight.cxx index 43068cb9..a4779bd0 100644 --- a/test/helper/devices/TestPairRight.cxx +++ b/test/helper/devices/TestPairRight.cxx @@ -8,6 +8,7 @@ #include #include +#include namespace fair { diff --git a/test/helper/devices/TestPollIn.cxx b/test/helper/devices/TestPollIn.cxx index 9aa70caa..a3f6f9ea 100644 --- a/test/helper/devices/TestPollIn.cxx +++ b/test/helper/devices/TestPollIn.cxx @@ -9,6 +9,7 @@ #include #include #include +#include namespace fair { diff --git a/test/helper/devices/TestPollOut.cxx b/test/helper/devices/TestPollOut.cxx index 7a79a218..4c92cc69 100644 --- a/test/helper/devices/TestPollOut.cxx +++ b/test/helper/devices/TestPollOut.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include +#include namespace fair { diff --git a/test/helper/devices/TestPull.cxx b/test/helper/devices/TestPull.cxx index 007f7de4..503b8c13 100644 --- a/test/helper/devices/TestPull.cxx +++ b/test/helper/devices/TestPull.cxx @@ -8,6 +8,7 @@ #include #include +#include namespace fair { diff --git a/test/helper/devices/TestPush.cxx b/test/helper/devices/TestPush.cxx index ef856432..e6e240ad 100644 --- a/test/helper/devices/TestPush.cxx +++ b/test/helper/devices/TestPush.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include +#include namespace fair { diff --git a/test/helper/devices/TestRep.cxx b/test/helper/devices/TestRep.cxx index 5bb2879a..f162d9db 100644 --- a/test/helper/devices/TestRep.cxx +++ b/test/helper/devices/TestRep.cxx @@ -8,6 +8,7 @@ #include #include +#include namespace fair { diff --git a/test/helper/devices/TestReq.cxx b/test/helper/devices/TestReq.cxx index 396597c5..44149a44 100644 --- a/test/helper/devices/TestReq.cxx +++ b/test/helper/devices/TestReq.cxx @@ -8,6 +8,7 @@ #include #include +#include namespace fair {