Refactor state machine.

This commit is contained in:
Alexey Rybalchenko 2017-09-07 11:42:43 +02:00 committed by Mohammad Al-Turany
parent 70e46a0b86
commit f6365d013e
9 changed files with 395 additions and 430 deletions

View File

@ -12,225 +12,9 @@
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
#include <chrono> // WaitForEndOfStateForMs()
#include "FairMQStateMachine.h" #include "FairMQStateMachine.h"
#include "FairMQLogger.h"
FairMQStateMachine::FairMQStateMachine() // void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback)
{ // {
start(); // fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)});
} // }
FairMQStateMachine::~FairMQStateMachine()
{
stop();
}
int FairMQStateMachine::GetInterfaceVersion()
{
return FAIRMQ_INTERFACE_VERSION;
}
int FairMQStateMachine::GetEventNumber(const std::string& event)
{
if (event == "INIT_DEVICE") return INIT_DEVICE;
if (event == "INIT_TASK") return INIT_TASK;
if (event == "RUN") return RUN;
if (event == "PAUSE") return PAUSE;
if (event == "STOP") return STOP;
if (event == "RESET_DEVICE") return RESET_DEVICE;
if (event == "RESET_TASK") return RESET_TASK;
if (event == "END") return END;
if (event == "ERROR_FOUND") return ERROR_FOUND;
LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND";
return -1;
}
bool FairMQStateMachine::ChangeState(int event)
{
try
{
switch (event)
{
case INIT_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::INIT_DEVICE());
return true;
}
case internal_DEVICE_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::internal_DEVICE_READY());
return true;
}
case INIT_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::INIT_TASK());
return true;
}
case internal_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::internal_READY());
return true;
}
case RUN:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::RUN());
return true;
}
case PAUSE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::PAUSE());
return true;
}
case STOP:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::STOP());
return true;
}
case RESET_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::RESET_DEVICE());
return true;
}
case RESET_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::RESET_TASK());
return true;
}
case internal_IDLE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::internal_IDLE());
return true;
}
case END:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::END());
return true;
}
case ERROR_FOUND:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(FairMQFSM::ERROR_FOUND());
return true;
}
default:
{
LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND";
return false;
}
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what();
exit(EXIT_FAILURE);
}
return false;
}
bool FairMQStateMachine::ChangeState(const std::string& event)
{
return ChangeState(GetEventNumber(event));
}
void FairMQStateMachine::WaitForEndOfState(int event)
{
try
{
switch (event)
{
case INIT_DEVICE:
case INIT_TASK:
case RUN:
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1));
}
break;
}
default:
LOG(ERROR) << "Requested state is either synchronous or does not exist.";
break;
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what();
}
}
void FairMQStateMachine::WaitForEndOfState(const std::string& event)
{
return WaitForEndOfState(GetEventNumber(event));
}
bool FairMQStateMachine::WaitForEndOfStateForMs(int event, int durationInMs)
{
try
{
switch (event)
{
case INIT_DEVICE:
case INIT_TASK:
case RUN:
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs));
if (fWorkActive)
{
return false;
}
}
return true;
break;
}
default:
LOG(ERROR) << "Requested state is either synchronous or does not exist.";
return false;
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what();
}
return false;
}
bool FairMQStateMachine::WaitForEndOfStateForMs(const std::string& event, int durationInMs)
{
return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs);
}
void FairMQStateMachine::SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback)
{
fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)});
}
void FairMQStateMachine::UnsubscribeFromStateChange(const std::string& key)
{
fStateChangeSignalsMap.at(key).disconnect();
fStateChangeSignalsMap.erase(key);
}

View File

@ -22,6 +22,7 @@
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <thread> #include <thread>
#include <chrono>
#include <functional> #include <functional>
#include <unordered_map> #include <unordered_map>
@ -40,12 +41,15 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
namespace msm = boost::msm;
namespace mpl = boost::mpl;
namespace msmf = boost::msm::front; namespace msmf = boost::msm::front;
namespace FairMQFSM namespace fair
{ {
namespace mq
{
namespace fsm
{
// defining events for the boost MSM state machine // defining events for the boost MSM state machine
struct INIT_DEVICE { std::string name() const { return "INIT_DEVICE"; } }; struct INIT_DEVICE { std::string name() const { return "INIT_DEVICE"; } };
struct internal_DEVICE_READY { std::string name() const { return "internal_DEVICE_READY"; } }; struct internal_DEVICE_READY { std::string name() const { return "internal_DEVICE_READY"; } };
@ -71,10 +75,10 @@ _Pragma("GCC diagnostic ignored \"-Weffc++\"")
#endif #endif
// defining the boost MSM state machine // defining the boost MSM state machine
struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_> struct FairMQFSM : public msmf::state_machine_def<FairMQFSM>
{ {
public: public:
FairMQFSM_() FairMQFSM()
: fWorkerThread() : fWorkerThread()
, fWork() , fWork()
, fWorkAvailableCondition() , fWorkAvailableCondition()
@ -87,247 +91,210 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
, fChangeStateMutex() , fChangeStateMutex()
, fStateChangeSignal() , fStateChangeSignal()
, fStateChangeSignalsMap() , fStateChangeSignalsMap()
{} {}
// Destructor virtual ~FairMQFSM()
virtual ~FairMQFSM_() {}; {}
template <class Event, class FSM> template<typename Event, typename FSM>
void on_entry(Event const&, FSM& fsm) void on_entry(Event const&, FSM& fsm)
{ {
LOG(STATE) << "Starting FairMQ state machine"; LOG(STATE) << "Starting FairMQ state machine";
fState = IDLE; fState = IDLE;
fsm.CallStateChangeCallbacks(IDLE);
// start a worker thread to execute user states in. // start a worker thread to execute user states in.
fsm.fWorkerThread = std::thread(&FairMQFSM_::Worker, &fsm); fsm.fWorkerThread = std::thread(&FairMQFSM::Worker, &fsm);
} }
template <class Event, class FSM> template<typename Event, typename FSM>
void on_exit(Event const&, FSM& /*fsm*/) void on_exit(Event const&, FSM& /*fsm*/)
{ {
LOG(STATE) << "Exiting FairMQ state machine"; LOG(STATE) << "Exiting FairMQ state machine";
} }
// The list of FSM states // list of FSM states
struct OK_FSM : public msmf::state<> {}; struct OK_FSM : public msmf::state<> {};
struct ERROR_FSM : public msmf::terminate_state<> {}; struct ERROR_FSM : public msmf::terminate_state<> {};
struct IDLE_FSM : public msmf::state<> {}; struct IDLE_FSM : public msmf::state<> {};
struct INITIALIZING_DEVICE_FSM : public msmf::state<> {}; struct INITIALIZING_DEVICE_FSM : public msmf::state<> {};
struct DEVICE_READY_FSM : public msmf::state<> {}; struct DEVICE_READY_FSM : public msmf::state<> {};
struct INITIALIZING_TASK_FSM : public msmf::state<> {}; struct INITIALIZING_TASK_FSM : public msmf::state<> {};
struct READY_FSM : public msmf::state<> {}; struct READY_FSM : public msmf::state<> {};
struct RUNNING_FSM : public msmf::state<> {}; struct RUNNING_FSM : public msmf::state<> {};
struct PAUSED_FSM : public msmf::state<> {}; struct PAUSED_FSM : public msmf::state<> {};
struct RESETTING_TASK_FSM : public msmf::state<> {}; struct RESETTING_TASK_FSM : public msmf::state<> {};
struct RESETTING_DEVICE_FSM : public msmf::state<> {}; struct RESETTING_DEVICE_FSM : public msmf::state<> {};
struct EXITING_FSM : public msmf::state<> {}; struct EXITING_FSM : public msmf::state<> {};
// Define initial states // initial states
typedef mpl::vector<IDLE_FSM, OK_FSM> initial_state; using initial_state = boost::mpl::vector<IDLE_FSM, OK_FSM>;
// Actions // actions
struct IdleFct struct IdleFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering IDLE state"; LOG(STATE) << "Entering IDLE state";
fsm.fState = IDLE; fsm.fState = IDLE;
fsm.CallStateChangeCallbacks(IDLE);
} }
}; };
struct InitDeviceFct struct InitDeviceFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering INITIALIZING DEVICE state"; LOG(STATE) << "Entering INITIALIZING DEVICE state";
fsm.fState = INITIALIZING_DEVICE; fsm.fState = INITIALIZING_DEVICE;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex); fsm.WaitForWorkCompletion();
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
fsm.fWork = std::bind(&FairMQFSM_::InitWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::InitWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one(); fsm.fWorkAvailableCondition.notify_one();
} }
}; };
struct DeviceReadyFct struct DeviceReadyFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering DEVICE READY state"; LOG(STATE) << "Entering DEVICE READY state";
fsm.fState = DEVICE_READY; fsm.fState = DEVICE_READY;
fsm.CallStateChangeCallbacks(DEVICE_READY);
} }
}; };
struct InitTaskFct struct InitTaskFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering INITIALIZING TASK state"; LOG(STATE) << "Entering INITIALIZING TASK state";
fsm.fState = INITIALIZING_TASK; fsm.fState = INITIALIZING_TASK;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex); fsm.WaitForWorkCompletion();
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
fsm.fWork = std::bind(&FairMQFSM_::InitTaskWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::InitTaskWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one(); fsm.fWorkAvailableCondition.notify_one();
} }
}; };
struct ReadyFct struct ReadyFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering READY state"; LOG(STATE) << "Entering READY state";
fsm.fState = READY; fsm.fState = READY;
fsm.CallStateChangeCallbacks(READY);
} }
}; };
struct RunFct struct RunFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering RUNNING state"; LOG(STATE) << "Entering RUNNING state";
fsm.fState = RUNNING; fsm.fState = RUNNING;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex); fsm.WaitForWorkCompletion();
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
fsm.fWork = std::bind(&FairMQFSM_::RunWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one(); fsm.fWorkAvailableCondition.notify_one();
} }
}; };
struct PauseFct struct PauseFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering PAUSED state"; LOG(STATE) << "Entering PAUSED state";
fsm.fState = PAUSED; fsm.fState = PAUSED;
fsm.Unblock(); fsm.Unblock();
std::unique_lock<std::mutex> lock(fsm.fWorkMutex); fsm.WaitForWorkCompletion();
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
fsm.fWork = std::bind(&FairMQFSM_::PauseWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::PauseWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one(); fsm.fWorkAvailableCondition.notify_one();
} }
}; };
struct ResumeFct struct ResumeFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering RUNNING state"; LOG(STATE) << "Entering RUNNING state";
fsm.fState = RUNNING; fsm.fState = RUNNING;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex); fsm.WaitForWorkCompletion();
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
fsm.fWork = std::bind(&FairMQFSM_::RunWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::RunWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one(); fsm.fWorkAvailableCondition.notify_one();
} }
}; };
struct StopFct struct StopFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering READY state"; LOG(STATE) << "Entering READY state";
fsm.fState = READY; fsm.fState = READY;
fsm.CallStateChangeCallbacks(READY);
fsm.Unblock(); fsm.Unblock();
std::unique_lock<std::mutex> lock(fsm.fWorkMutex); fsm.WaitForWorkCompletion();
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
} }
}; };
struct InternalStopFct struct InternalStopFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "RUNNING state finished without an external event, entering READY state"; LOG(STATE) << "RUNNING state finished without an external event, entering READY state";
fsm.fState = READY; fsm.fState = READY;
fsm.CallStateChangeCallbacks(READY);
fsm.Unblock(); fsm.Unblock();
} }
}; };
struct ResetTaskFct struct ResetTaskFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering RESETTING TASK state"; LOG(STATE) << "Entering RESETTING TASK state";
fsm.fState = RESETTING_TASK; fsm.fState = RESETTING_TASK;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex); fsm.WaitForWorkCompletion();
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
fsm.fWork = std::bind(&FairMQFSM_::ResetTaskWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::ResetTaskWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one(); fsm.fWorkAvailableCondition.notify_one();
} }
}; };
struct ResetDeviceFct struct ResetDeviceFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering RESETTING DEVICE state"; LOG(STATE) << "Entering RESETTING DEVICE state";
fsm.fState = RESETTING_DEVICE; fsm.fState = RESETTING_DEVICE;
std::unique_lock<std::mutex> lock(fsm.fWorkMutex); fsm.WaitForWorkCompletion();
while (fsm.fWorkActive)
{
fsm.fWorkDoneCondition.wait(lock);
}
fsm.fWorkAvailable = true; fsm.fWorkAvailable = true;
fsm.fWork = std::bind(&FairMQFSM_::ResetWrapper, &fsm); fsm.fWork = std::bind(&FairMQFSM::ResetWrapper, &fsm);
fsm.fWorkAvailableCondition.notify_one(); fsm.fWorkAvailableCondition.notify_one();
} }
}; };
struct ExitingFct struct ExitingFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering EXITING state"; LOG(STATE) << "Entering EXITING state";
@ -353,7 +320,7 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
struct ErrorFoundFct struct ErrorFoundFct
{ {
template <class EVT, class FSM, class SourceState, class TargetState> template<typename EVT, typename FSM, typename SourceState, typename TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{ {
LOG(STATE) << "Entering ERROR state"; LOG(STATE) << "Entering ERROR state";
@ -362,55 +329,9 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
} }
}; };
// actions to be overwritten by derived classes
virtual void InitWrapper() {}
virtual void Init() {}
virtual void InitTaskWrapper() {}
virtual void InitTask() {}
virtual void RunWrapper() {}
virtual void Run() {}
virtual void PauseWrapper() {}
virtual void Pause() {}
virtual void ResetWrapper() {}
virtual void Reset() {}
virtual void ResetTaskWrapper() {}
virtual void ResetTask() {}
virtual void Exit() {}
virtual void Unblock() {} // Method to send commands.
void Worker()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(fWorkMutex);
// Wait for work to be done.
while (!fWorkAvailable && !fWorkerTerminated)
{
fWorkAvailableCondition.wait(lock);
}
if (fWorkerTerminated)
{
break;
}
fWorkActive = true;
}
fWork();
std::lock_guard<std::mutex> lock(fWorkMutex);
fWorkActive = false;
fWorkAvailable = false;
fWorkDoneCondition.notify_one();
}
}
// Transition table for FairMQFSM // Transition table for FairMQFSM
struct transition_table : mpl::vector< struct transition_table : boost::mpl::vector<
// Start Event Next Action Guard // Start Event Next Action Guard
// +------------------------+----------------------+------------------------+----------------+---------+
msmf::Row<IDLE_FSM, INIT_DEVICE, INITIALIZING_DEVICE_FSM, InitDeviceFct, msmf::none>, msmf::Row<IDLE_FSM, INIT_DEVICE, INITIALIZING_DEVICE_FSM, InitDeviceFct, msmf::none>,
msmf::Row<IDLE_FSM, END, EXITING_FSM, ExitingFct, msmf::none>, msmf::Row<IDLE_FSM, END, EXITING_FSM, ExitingFct, msmf::none>,
msmf::Row<INITIALIZING_DEVICE_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>, msmf::Row<INITIALIZING_DEVICE_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
@ -426,18 +347,18 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
msmf::Row<RESETTING_TASK_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>, msmf::Row<RESETTING_TASK_FSM, internal_DEVICE_READY, DEVICE_READY_FSM, DeviceReadyFct, msmf::none>,
msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE, IDLE_FSM, IdleFct, msmf::none>, msmf::Row<RESETTING_DEVICE_FSM, internal_IDLE, IDLE_FSM, IdleFct, msmf::none>,
msmf::Row<OK_FSM, ERROR_FOUND, ERROR_FSM, ErrorFoundFct, msmf::none>> msmf::Row<OK_FSM, ERROR_FOUND, ERROR_FSM, ErrorFoundFct, msmf::none>>
{}; {};
// Replaces the default no-transition response. // replaces the default no-transition response.
template <class FSM, class Event> template<typename FSM, typename Event>
void no_transition(Event const& e, FSM&, int state) void no_transition(Event const& e, FSM&, int state)
{ {
typedef typename msm::back::recursive_get_transition_table<FSM>::type recursive_stt; using recursive_stt = typename boost::msm::back::recursive_get_transition_table<FSM>::type;
typedef typename msm::back::generate_state_set<recursive_stt>::type all_states; using all_states = typename boost::msm::back::generate_state_set<recursive_stt>::type;
std::string stateName; std::string stateName;
mpl::for_each<all_states, msm::wrap<mpl::placeholders::_1>>(msm::back::get_state_name<recursive_stt>(stateName, state)); boost::mpl::for_each<all_states, boost::msm::wrap<boost::mpl::placeholders::_1>>(boost::msm::back::get_state_name<recursive_stt>(stateName, state));
stateName = stateName.substr(24); stateName = stateName.substr(24);
std::size_t pos = stateName.find("_FSME"); std::size_t pos = stateName.find("_FSME");
@ -506,21 +427,36 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
} }
} }
std::string GetCurrentStateName() const { return GetStateName(fState); } std::string GetCurrentStateName() const
int GetCurrentState() const { return fState; }
bool CheckCurrentState(int state) const { return state == fState; }
bool CheckCurrentState(std::string state) const { return state == GetCurrentStateName(); }
void CallStateChangeCallbacks(const State state) const
{ {
if (!fStateChangeSignal.empty()) return GetStateName(fState);
{ }
fStateChangeSignal(state); int GetCurrentState() const
} {
return fState;
}
bool CheckCurrentState(int state) const
{
return state == fState;
}
bool CheckCurrentState(std::string state) const
{
return state == GetCurrentStateName();
} }
// this is to run certain functions in a separate thread // actions to be overwritten by derived classes
std::thread fWorkerThread; virtual void InitWrapper() {}
virtual void InitTaskWrapper() {}
virtual void RunWrapper() {}
virtual void PauseWrapper() {}
virtual void ResetWrapper() {}
virtual void ResetTaskWrapper() {}
virtual void Exit() {}
virtual void Unblock() {}
protected:
std::atomic<State> fState;
std::mutex fChangeStateMutex;
// function to execute user states in a worker thread // function to execute user states in a worker thread
std::function<void(void)> fWork; std::function<void(void)> fWork;
@ -531,12 +467,61 @@ struct FairMQFSM_ : public msmf::state_machine_def<FairMQFSM_>
bool fWorkActive; bool fWorkActive;
bool fWorkAvailable; bool fWorkAvailable;
protected:
std::atomic<State> fState;
std::mutex fChangeStateMutex;
boost::signals2::signal<void(const State)> fStateChangeSignal; boost::signals2::signal<void(const State)> fStateChangeSignal;
std::unordered_map<std::string, boost::signals2::connection> fStateChangeSignalsMap; std::unordered_map<std::string, boost::signals2::connection> fStateChangeSignalsMap;
void CallStateChangeCallbacks(const State state) const
{
if (!fStateChangeSignal.empty())
{
fStateChangeSignal(state);
}
}
private:
void Worker()
{
while (true)
{
{
std::unique_lock<std::mutex> lock(fWorkMutex);
// Wait for work to be done.
while (!fWorkAvailable && !fWorkerTerminated)
{
fWorkAvailableCondition.wait(lock);
}
if (fWorkerTerminated)
{
break;
}
fWorkActive = true;
}
fWork();
{
std::lock_guard<std::mutex> lock(fWorkMutex);
fWorkActive = false;
fWorkAvailable = false;
fWorkDoneCondition.notify_one();
}
CallStateChangeCallbacks(fState);
}
}
void WaitForWorkCompletion()
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive)
{
fWorkDoneCondition.wait(lock);
}
}
// run state handlers in a separate thread
std::thread fWorkerThread;
}; };
// reactivate the warning for non-virtual destructor // reactivate the warning for non-virtual destructor
@ -546,11 +531,11 @@ _Pragma("clang diagnostic pop")
_Pragma("GCC diagnostic pop") _Pragma("GCC diagnostic pop")
#endif #endif
typedef msm::back::state_machine<FairMQFSM_> FairMQFSM; } // namespace fsm
} // namespace FairMQFSM } // namespace mq
} // namespace fair
class FairMQStateMachine : public boost::msm::back::state_machine<fair::mq::fsm::FairMQFSM>
class FairMQStateMachine : public FairMQFSM::FairMQFSM
{ {
public: public:
enum Event enum Event
@ -569,24 +554,218 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM
ERROR_FOUND ERROR_FOUND
}; };
FairMQStateMachine(); FairMQStateMachine()
virtual ~FairMQStateMachine(); {
start();
}
virtual ~FairMQStateMachine()
{
stop();
}
int GetInterfaceVersion(); int GetInterfaceVersion()
{
return FAIRMQ_INTERFACE_VERSION;
}
int GetEventNumber(const std::string& event); bool ChangeState(int event)
{
try
{
switch (event)
{
case INIT_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::INIT_DEVICE());
return true;
}
case internal_DEVICE_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_DEVICE_READY());
return true;
}
case INIT_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::INIT_TASK());
return true;
}
case internal_READY:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_READY());
return true;
}
case RUN:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RUN());
return true;
}
case PAUSE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::PAUSE());
return true;
}
case STOP:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::STOP());
return true;
}
case RESET_DEVICE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RESET_DEVICE());
return true;
}
case RESET_TASK:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::RESET_TASK());
return true;
}
case internal_IDLE:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::internal_IDLE());
return true;
}
case END:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::END());
return true;
}
case ERROR_FOUND:
{
std::lock_guard<std::mutex> lock(fChangeStateMutex);
process_event(fair::mq::fsm::ERROR_FOUND());
return true;
}
default:
{
LOG(ERROR) << "Requested state transition with an unsupported event: " << event << std::endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_TASK, RESET_DEVICE, END, ERROR_FOUND";
return false;
}
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::ChangeState(): " << e.what();
exit(EXIT_FAILURE);
}
return false;
}
bool ChangeState(const std::string& event)
{
return ChangeState(GetEventNumber(event));
}
bool ChangeState(int event); void WaitForEndOfState(int event)
bool ChangeState(const std::string& event); {
try
{
switch (event)
{
case INIT_DEVICE:
case INIT_TASK:
case RUN:
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::seconds(1));
}
void WaitForEndOfState(int state); break;
void WaitForEndOfState(const std::string& state); }
default:
LOG(ERROR) << "Requested state is either synchronous or does not exist.";
break;
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfState(): " << e.what();
}
}
void WaitForEndOfState(const std::string& event)
{
return WaitForEndOfState(GetEventNumber(event));
}
bool WaitForEndOfStateForMs(int state, int durationInMs); bool WaitForEndOfStateForMs(int event, int durationInMs)
bool WaitForEndOfStateForMs(const std::string& state, int durationInMs); {
try
{
switch (event)
{
case INIT_DEVICE:
case INIT_TASK:
case RUN:
case RESET_TASK:
case RESET_DEVICE:
{
std::unique_lock<std::mutex> lock(fWorkMutex);
while (fWorkActive || fWorkAvailable)
{
fWorkDoneCondition.wait_for(lock, std::chrono::milliseconds(durationInMs));
if (fWorkActive)
{
return false;
}
}
return true;
}
default:
LOG(ERROR) << "Requested state is either synchronous or does not exist.";
return false;
}
}
catch (std::exception& e)
{
LOG(ERROR) << "Exception in FairMQStateMachine::WaitForEndOfStateForMs(): " << e.what();
}
return false;
}
bool WaitForEndOfStateForMs(const std::string& event, int durationInMs)
{
return WaitForEndOfStateForMs(GetEventNumber(event), durationInMs);
}
void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback); void SubscribeToStateChange(const std::string& key, std::function<void(const State)> callback)
void UnsubscribeFromStateChange(const std::string& key); {
fStateChangeSignalsMap.insert({key, fStateChangeSignal.connect(callback)});
}
void UnsubscribeFromStateChange(const std::string& key)
{
fStateChangeSignalsMap.at(key).disconnect();
fStateChangeSignalsMap.erase(key);
}
private:
int GetEventNumber(const std::string& event)
{
if (event == "INIT_DEVICE") return INIT_DEVICE;
if (event == "INIT_TASK") return INIT_TASK;
if (event == "RUN") return RUN;
if (event == "PAUSE") return PAUSE;
if (event == "STOP") return STOP;
if (event == "RESET_DEVICE") return RESET_DEVICE;
if (event == "RESET_TASK") return RESET_TASK;
if (event == "END") return END;
if (event == "ERROR_FOUND") return ERROR_FOUND;
LOG(ERROR) << "Requested number for non-existent event... " << event << std::endl
<< "Supported are: INIT_DEVICE, INIT_TASK, RUN, PAUSE, STOP, RESET_DEVICE, RESET_TASK, END, ERROR_FOUND";
return -1;
}
}; };
#endif /* FAIRMQSTATEMACHINE_H_ */ #endif /* FAIRMQSTATEMACHINE_H_ */

View File

@ -13,15 +13,17 @@
#include <nanomsg/FairMQTransportFactoryNN.h> #include <nanomsg/FairMQTransportFactoryNN.h>
#endif /* NANOMSG_FOUND */ #endif /* NANOMSG_FOUND */
#include <FairMQLogger.h> #include <FairMQLogger.h>
#include <memory>
#include <boost/uuid/uuid.hpp> #include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp> #include <boost/uuid/uuid_io.hpp>
#include <memory>
#include <string> #include <string>
#include <sstream> #include <sstream>
FairMQTransportFactory::FairMQTransportFactory(const std::string& id) FairMQTransportFactory::FairMQTransportFactory(const std::string& id)
: fkId(id) : fkId(id)
{ {
} }
@ -29,26 +31,26 @@ auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, con
{ {
using namespace std; using namespace std;
auto final_id = id; auto finalId = id;
// Generate uuid if empty // Generate uuid if empty
if (final_id == "") if (finalId == "")
{ {
final_id = boost::uuids::to_string(boost::uuids::random_generator()()); finalId = boost::uuids::to_string(boost::uuids::random_generator()());
} }
if (type == "zeromq") if (type == "zeromq")
{ {
return std::make_shared<FairMQTransportFactoryZMQ>(final_id, config); return std::make_shared<FairMQTransportFactoryZMQ>(finalId, config);
} }
else if (type == "shmem") else if (type == "shmem")
{ {
return std::make_shared<FairMQTransportFactorySHM>(final_id, config); return std::make_shared<FairMQTransportFactorySHM>(finalId, config);
} }
#ifdef NANOMSG_FOUND #ifdef NANOMSG_FOUND
else if (type == "nanomsg") else if (type == "nanomsg")
{ {
return std::make_shared<FairMQTransportFactoryNN>(final_id, config); return std::make_shared<FairMQTransportFactoryNN>(finalId, config);
} }
#endif /* NANOMSG_FOUND */ #endif /* NANOMSG_FOUND */
else else

View File

@ -27,7 +27,8 @@ FairMQBenchmarkSampler::FairMQBenchmarkSampler()
, fMsgSize(10000) , fMsgSize(10000)
, fMsgCounter(0) , fMsgCounter(0)
, fMsgRate(1) , fMsgRate(1)
, fNumMsgs(0) , fNumIterations(0)
, fMaxIterations(0)
, fOutChannelName() , fOutChannelName()
, fResetMsgCounter() , fResetMsgCounter()
{ {
@ -42,7 +43,7 @@ void FairMQBenchmarkSampler::InitTask()
fSameMessage = fConfig->GetValue<bool>("same-msg"); fSameMessage = fConfig->GetValue<bool>("same-msg");
fMsgSize = fConfig->GetValue<int>("msg-size"); fMsgSize = fConfig->GetValue<int>("msg-size");
fMsgRate = fConfig->GetValue<int>("msg-rate"); fMsgRate = fConfig->GetValue<int>("msg-rate");
fNumMsgs = fConfig->GetValue<uint64_t>("num-msgs"); fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
fOutChannelName = fConfig->GetValue<string>("out-channel"); fOutChannelName = fConfig->GetValue<string>("out-channel");
} }
@ -53,14 +54,12 @@ void FairMQBenchmarkSampler::PreRun()
void FairMQBenchmarkSampler::Run() void FairMQBenchmarkSampler::Run()
{ {
uint64_t numSentMsgs = 0;
// store the channel reference to avoid traversing the map on every loop iteration // store the channel reference to avoid traversing the map on every loop iteration
FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0); FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0);
FairMQMessagePtr baseMsg(dataOutChannel.Transport()->CreateMessage(fMsgSize)); FairMQMessagePtr baseMsg(dataOutChannel.Transport()->CreateMessage(fMsgSize));
LOG(INFO) << "Starting the benchmark with message size of " << fMsgSize << " and number of messages " << fNumMsgs << "."; LOG(INFO) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
auto tStart = chrono::high_resolution_clock::now(); auto tStart = chrono::high_resolution_clock::now();
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
@ -72,14 +71,14 @@ void FairMQBenchmarkSampler::Run()
if (dataOutChannel.Send(msg) >= 0) if (dataOutChannel.Send(msg) >= 0)
{ {
if (fNumMsgs > 0) if (fMaxIterations > 0)
{ {
if (numSentMsgs >= fNumMsgs) if (fNumIterations >= fMaxIterations)
{ {
break; break;
} }
} }
++numSentMsgs; ++fNumIterations;
} }
} }
else else
@ -88,14 +87,14 @@ void FairMQBenchmarkSampler::Run()
if (dataOutChannel.Send(msg) >= 0) if (dataOutChannel.Send(msg) >= 0)
{ {
if (fNumMsgs > 0) if (fMaxIterations > 0)
{ {
if (numSentMsgs >= fNumMsgs) if (fNumIterations >= fMaxIterations)
{ {
break; break;
} }
} }
++numSentMsgs; ++fNumIterations;
} }
} }
@ -109,7 +108,7 @@ void FairMQBenchmarkSampler::Run()
auto tEnd = chrono::high_resolution_clock::now(); auto tEnd = chrono::high_resolution_clock::now();
LOG(INFO) << "Leaving RUNNING state. Sent " << numSentMsgs << " messages in " << chrono::duration<double, milli>(tEnd - tStart).count() << "ms."; LOG(INFO) << "Leaving RUNNING state. Done " << fNumIterations << " iterations in " << chrono::duration<double, milli>(tEnd - tStart).count() << "ms.";
} }

View File

@ -40,7 +40,8 @@ class FairMQBenchmarkSampler : public FairMQDevice
int fMsgSize; int fMsgSize;
int fMsgCounter; int fMsgCounter;
int fMsgRate; int fMsgRate;
uint64_t fNumMsgs; uint64_t fNumIterations;
uint64_t fMaxIterations;
std::string fOutChannelName; std::string fOutChannelName;
std::thread fResetMsgCounter; std::thread fResetMsgCounter;

View File

@ -42,7 +42,7 @@ class FairMQSink : public FairMQDevice//, public OutputPolicy
virtual void InitTask() virtual void InitTask()
{ {
fMaxIterations = fConfig->GetValue<uint64_t>("num-iterations"); fMaxIterations = fConfig->GetValue<uint64_t>("max-iterations");
fInChannelName = fConfig->GetValue<std::string>("in-channel"); fInChannelName = fConfig->GetValue<std::string>("in-channel");
} }

View File

@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options)
("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel") ("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel")
("same-msg", bpo::value<bool>()->default_value(true), "Re-send the same message (default), or recreate for each iteration") ("same-msg", bpo::value<bool>()->default_value(true), "Re-send the same message (default), or recreate for each iteration")
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes") ("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("num-msgs", bpo::value<uint64_t>()->default_value(0), "Number of messages to send") ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second"); ("msg-rate", bpo::value<int>()->default_value(0), "Msg rate limit in maximum number of messages per second");
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& options)
{ {
options.add_options() options.add_options()
("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel") ("in-channel", bpo::value<std::string>()->default_value("data"), "Name of the input channel")
("num-msgs", bpo::value<uint64_t>()->default_value(0), "Number of messages to receive"); ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)");
} }
FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)

View File

@ -1,6 +1,6 @@
#!/bin/bash #!/bin/bash
numMsgs="0" maxIterations="0"
msgSize="1000000" msgSize="1000000"
transport="zeromq" transport="zeromq"
sameMsg="true" sameMsg="true"
@ -14,7 +14,7 @@ if [[ $1 =~ ^[0-9]+$ ]]; then
fi fi
if [[ $2 =~ ^[0-9]+$ ]]; then if [[ $2 =~ ^[0-9]+$ ]]; then
numMsgs=$2 maxIterations=$2
fi fi
if [[ $3 =~ ^[a-z]+$ ]]; then if [[ $3 =~ ^[a-z]+$ ]]; then
@ -35,10 +35,10 @@ echo "Starting benchmark with following settings:"
echo "" echo ""
echo "message size: $msgSize bytes" echo "message size: $msgSize bytes"
if [ $numMsgs = 0 ]; then if [ $maxIterations = 0 ]; then
echo "number of messages: unlimited" echo "number of iterations: unlimited"
else else
echo "number of messages: $numMsgs" echo "number of iterations: $maxIterations"
fi fi
echo "transport: $transport" echo "transport: $transport"
@ -58,7 +58,7 @@ else
fi fi
echo "" echo ""
echo "Usage: startBenchmark [message size=1000000] [number of messages=0] [transport=zeromq/nanomsg/shmem] [resend same message=true] [affinity=false]" echo "Usage: startBenchmark [message size=1000000] [number of iterations=0] [transport=zeromq/nanomsg/shmem] [resend same message=true] [affinity=false]"
SAMPLER="bsampler" SAMPLER="bsampler"
SAMPLER+=" --id bsampler1" SAMPLER+=" --id bsampler1"
@ -68,7 +68,7 @@ SAMPLER+=" --transport $transport"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
SAMPLER+=" --same-msg $sameMsg" SAMPLER+=" --same-msg $sameMsg"
# SAMPLER+=" --msg-rate 1000" # SAMPLER+=" --msg-rate 1000"
SAMPLER+=" --num-msgs $numMsgs" SAMPLER+=" --max-iterations $maxIterations"
SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
xterm -geometry 90x23+0+0 -hold -e $affinitySamp @CMAKE_BINARY_DIR@/bin/$SAMPLER & xterm -geometry 90x23+0+0 -hold -e $affinitySamp @CMAKE_BINARY_DIR@/bin/$SAMPLER &
echo "" echo ""
@ -80,7 +80,7 @@ SINK+=" --id sink1"
#SINK+=" --io-threads 2" #SINK+=" --io-threads 2"
#SINK+=" --control static" #SINK+=" --control static"
SINK+=" --transport $transport" SINK+=" --transport $transport"
SINK+=" --num-msgs $numMsgs" SINK+=" --max-iterations $maxIterations"
SINK+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" SINK+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json"
xterm -geometry 90x23+550+0 -hold -e $affinitySink @CMAKE_BINARY_DIR@/bin/$SINK & xterm -geometry 90x23+550+0 -hold -e $affinitySink @CMAKE_BINARY_DIR@/bin/$SINK &
echo "" echo ""