Expose BIND and CONNECT states for use with dynamic configuration

introduce FairMQ interface version
This commit is contained in:
Alexey Rybalchenko 2015-01-28 14:24:14 +01:00
parent 6d65c4313a
commit 8a82afe184
22 changed files with 255 additions and 54 deletions

View File

@ -128,12 +128,13 @@ Set(FairMQHDRFiles
devices/GenericProcessor.h
devices/GenericFileSink.h
devices/GenericFileSink.tpl
tools/FairMQTools.h
)
install(FILES ${FairMQHDRFiles} DESTINATION include)
set(DEPENDENCIES
${DEPENDENCIES}
boost_thread boost_timer boost_system boost_program_options
boost_thread boost_timer boost_system boost_program_options boost_random
)
set(LIBRARY_NAME FairMQ)

View File

@ -13,6 +13,8 @@
*/
#include <boost/thread.hpp>
#include <boost/random/mersenne_twister.hpp> // for choosing random port in range
#include <boost/random/uniform_int_distribution.hpp> // for choosing random port in range
#include "FairMQSocket.h"
#include "FairMQDevice.h"
@ -23,6 +25,8 @@ FairMQDevice::FairMQDevice()
, fNumIoThreads(1)
, fNumInputs(0)
, fNumOutputs(0)
, fPortRangeMin(22000)
, fPortRangeMax(32000)
, fInputAddress()
, fInputMethod()
, fInputSocketType()
@ -79,22 +83,6 @@ void FairMQDevice::InitInput()
socket->SetOption("rcv-hwm", &fInputRcvBufSize.at(i), sizeof(fInputRcvBufSize.at(i)));
fPayloadInputs->push_back(socket);
try
{
if (fInputMethod.at(i) == "bind")
{
fPayloadInputs->at(i)->Bind(fInputAddress.at(i));
}
else
{
fPayloadInputs->at(i)->Connect(fInputAddress.at(i));
}
}
catch (out_of_range& e)
{
LOG(ERROR) << e.what();
}
}
}
@ -110,21 +98,89 @@ void FairMQDevice::InitOutput()
socket->SetOption("rcv-hwm", &fOutputRcvBufSize.at(i), sizeof(fOutputRcvBufSize.at(i)));
fPayloadOutputs->push_back(socket);
}
}
try
void FairMQDevice::Bind()
{
LOG(INFO) << ">>>>>>> binding <<<<<<<";
int maxAttempts = 1000;
int numAttempts = 0;
boost::random::mt19937 gen(getpid());
boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax);
for (int i = 0; i < fNumOutputs; ++i)
{
if (fOutputMethod.at(i) == "bind")
{
if (fOutputMethod.at(i) == "bind")
if (!fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i)))
{
fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i));
}
else
{
fPayloadOutputs->at(i)->Connect(fOutputAddress.at(i));
do {
LOG(WARN) << "could not bind at " << fOutputAddress.at(i) << ", trying another port in range";
++numAttempts;
size_t pos = fOutputAddress.at(i).rfind(":");
stringstream ss;
ss << (int)randomPort(gen);
string portString = ss.str();
fOutputAddress.at(i) = fOutputAddress.at(i).substr(0, pos + 1) + portString;
if (numAttempts > maxAttempts)
{
LOG(ERROR) << "could not bind output " << i << " to any port in the given range";
break;
}
} while (!fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i)));
}
}
catch (out_of_range& e)
}
numAttempts = 0;
for (int i = 0; i < fNumInputs; ++i)
{
if (fInputMethod.at(i) == "bind")
{
LOG(ERROR) << e.what();
if (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i)))
{
do {
LOG(WARN) << "could not bind at " << fInputAddress.at(i) << ", trying another port in range";
++numAttempts;
size_t pos = fInputAddress.at(i).rfind(":");
stringstream ss;
ss << (int)randomPort(gen);
string portString = ss.str();
fInputAddress.at(i) = fInputAddress.at(i).substr(0, pos + 1) + portString;
if (numAttempts > maxAttempts)
{
LOG(ERROR) << "could not bind output " << i << " to any port in the given range";
break;
}
} while (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i)));
}
}
}
}
void FairMQDevice::Connect()
{
LOG(INFO) << ">>>>>>> connecting <<<<<<<";
for (int i = 0; i < fNumOutputs; ++i)
{
if (fOutputMethod.at(i) == "connect")
{
fPayloadOutputs->at(i)->Connect(fOutputAddress.at(i));
}
}
for (int i = 0; i < fNumInputs; ++i)
{
if (fInputMethod.at(i) == "connect")
{
fPayloadInputs->at(i)->Connect(fInputAddress.at(i));
}
}
}
@ -189,6 +245,12 @@ void FairMQDevice::SetProperty(const int key, const int value, const int slot /*
case NumOutputs:
fNumOutputs = value;
break;
case PortRangeMin:
fPortRangeMin = value;
break;
case PortRangeMax:
fPortRangeMax = value;
break;
case LogIntervalInMs:
fLogIntervalInMs = value;
break;
@ -253,6 +315,14 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/, const i
{
case NumIoThreads:
return fNumIoThreads;
case NumInputs:
return fNumInputs;
case NumOutputs:
return fNumOutputs;
case PortRangeMin:
return fPortRangeMin;
case PortRangeMax:
return fPortRangeMax;
case LogIntervalInMs:
return fLogIntervalInMs;
case InputSndBufSize:
@ -397,10 +467,6 @@ void FairMQDevice::LogSocketRates()
LOG(INFO) << ">>>>>>> stopping FairMQDevice::LogSocketRates() <<<<<<<";
}
void FairMQDevice::ListenToCommands()
{
}
void FairMQDevice::Shutdown()
{
LOG(INFO) << ">>>>>>> closing inputs <<<<<<<";

View File

@ -35,6 +35,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
NumIoThreads,
NumInputs,
NumOutputs,
PortRangeMin,
PortRangeMax,
InputAddress,
InputMethod,
InputSocketType,
@ -54,7 +56,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
FairMQDevice();
virtual void LogSocketRates();
virtual void ListenToCommands();
virtual void SetProperty(const int key, const string& value, const int slot = 0);
virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0);
@ -67,11 +68,15 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
protected:
string fId;
int fNumIoThreads;
int fNumInputs;
int fNumOutputs;
int fPortRangeMin;
int fPortRangeMax;
vector<string> fInputAddress;
vector<string> fInputMethod;
vector<string> fInputSocketType;
@ -99,6 +104,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
virtual void Shutdown();
virtual void InitOutput();
virtual void InitInput();
virtual void Bind();
virtual void Connect();
virtual void Terminate();

View File

@ -35,7 +35,7 @@ class FairMQSocket
virtual string GetId() = 0;
virtual void Bind(const string& address) = 0;
virtual bool Bind(const string& address) = 0;
virtual void Connect(const string& address) = 0;
virtual int Send(FairMQMessage* msg, const string& flag="") = 0;

View File

@ -28,7 +28,12 @@ FairMQStateMachine::~FairMQStateMachine()
stop();
}
void FairMQStateMachine::ChangeState(int event)
int FairMQStateMachine::GetInterfaceVersion()
{
return FAIRMQ_INTERFACE_VERSION;
}
bool FairMQStateMachine::ChangeState(int event)
{
try
{
@ -36,25 +41,35 @@ void FairMQStateMachine::ChangeState(int event)
{
case INIT:
process_event(FairMQFSM::INIT());
return;
return true;
case SETOUTPUT:
process_event(FairMQFSM::SETOUTPUT());
return;
return true;
case SETINPUT:
process_event(FairMQFSM::SETINPUT());
return;
return true;
case BIND:
process_event(FairMQFSM::BIND());
return true;
case CONNECT:
process_event(FairMQFSM::CONNECT());
return true;
case RUN:
process_event(FairMQFSM::RUN());
return;
return true;
case PAUSE:
process_event(FairMQFSM::PAUSE());
return;
return true;
case STOP:
process_event(FairMQFSM::STOP());
return;
return true;
case END:
process_event(FairMQFSM::END());
return;
return true;
default:
LOG(ERROR) << "Requested unsupported state: " << event;
LOG(ERROR) << "Supported are: INIT, SETOUTPUT, SETINPUT, BIND, CONNECT, RUN, PAUSE, STOP, END";
return false;
}
}
catch (boost::bad_function_call& e)
@ -62,3 +77,49 @@ void FairMQStateMachine::ChangeState(int event)
LOG(ERROR) << e.what();
}
}
bool FairMQStateMachine::ChangeState(std::string event)
{
if (event == "INIT")
{
return ChangeState(INIT);
}
else if (event == "SETOUTPUT")
{
return ChangeState(SETOUTPUT);
}
else if (event == "SETINPUT")
{
return ChangeState(SETINPUT);
}
else if (event == "BIND")
{
return ChangeState(BIND);
}
else if (event == "CONNECT")
{
return ChangeState(CONNECT);
}
else if (event == "RUN")
{
return ChangeState(RUN);
}
else if (event == "PAUSE")
{
return ChangeState(PAUSE);
}
else if (event == "STOP")
{
return ChangeState(STOP);
}
else if (event == "END")
{
return ChangeState(END);
}
else
{
LOG(ERROR) << "Requested unsupported state: " << event;
LOG(ERROR) << "Supported are: INIT, SETOUTPUT, SETINPUT, BIND, CONNECT, RUN, PAUSE, STOP, END";
return false;
}
}

View File

@ -15,6 +15,10 @@
#ifndef FAIRMQSTATEMACHINE_H_
#define FAIRMQSTATEMACHINE_H_
#define FAIRMQ_INTERFACE_VERSION 1
#include <string>
#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
@ -39,6 +43,8 @@ namespace FairMQFSM
struct INIT {};
struct SETOUTPUT {};
struct SETINPUT {};
struct BIND {};
struct CONNECT {};
struct PAUSE {};
struct RUN {};
struct STOP {};
@ -48,9 +54,9 @@ namespace FairMQFSM
struct FairMQFSM_ : public msm::front::state_machine_def<FairMQFSM_>
{
FairMQFSM_()
: fState()
, fRunningStateThread()
{}
: fState()
, fRunningStateThread()
{}
// Destructor
virtual ~FairMQFSM_() {};
@ -71,6 +77,8 @@ namespace FairMQFSM
struct INITIALIZING_FSM : public msm::front::state<> {};
struct SETTINGOUTPUT_FSM : public msm::front::state<> {};
struct SETTINGINPUT_FSM : public msm::front::state<> {};
struct BINDING_FSM : public msm::front::state<> {};
struct CONNECTING_FSM : public msm::front::state<> {};
struct WAITING_FSM : public msm::front::state<> {};
struct RUNNING_FSM : public msm::front::state<> {};
// Define initial state
@ -111,6 +119,24 @@ namespace FairMQFSM
fsm.InitInput();
}
};
struct BindFct
{
template <class EVT, class FSM, class SourceState, class TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = BINDING;
fsm.Bind();
}
};
struct ConnectFct
{
template <class EVT, class FSM, class SourceState, class TargetState>
void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&)
{
fsm.fState = CONNECTING;
fsm.Connect();
}
};
struct RunFct
{
template <class EVT, class FSM, class SourceState, class TargetState>
@ -147,6 +173,8 @@ namespace FairMQFSM
virtual void Shutdown() {}
virtual void InitOutput() {}
virtual void InitInput() {}
virtual void Bind() {}
virtual void Connect() {}
virtual void Terminate() {} // Termination method called during StopFct action.
// Transition table for FairMQFMS
struct transition_table : mpl::vector<
@ -156,8 +184,10 @@ namespace FairMQFSM
msmf::Row<IDLE_FSM, END, msmf::none, TestFct, msmf::none>, // this is an invalid transition...
msmf::Row<INITIALIZING_FSM, SETOUTPUT, SETTINGOUTPUT_FSM, SetOutputFct, msmf::none>,
msmf::Row<SETTINGOUTPUT_FSM, SETINPUT, SETTINGINPUT_FSM, SetInputFct, msmf::none>,
msmf::Row<SETTINGINPUT_FSM, PAUSE, WAITING_FSM, PauseFct, msmf::none>,
msmf::Row<SETTINGINPUT_FSM, RUN, RUNNING_FSM, RunFct, msmf::none>,
msmf::Row<SETTINGINPUT_FSM, BIND, BINDING_FSM, BindFct, msmf::none>,
msmf::Row<BINDING_FSM, CONNECT, CONNECTING_FSM, ConnectFct, msmf::none>,
msmf::Row<CONNECTING_FSM, PAUSE, WAITING_FSM, PauseFct, msmf::none>,
msmf::Row<CONNECTING_FSM, RUN, RUNNING_FSM, RunFct, msmf::none>,
msmf::Row<WAITING_FSM, RUN, RUNNING_FSM, RunFct, msmf::none>,
msmf::Row<WAITING_FSM, STOP, IDLE_FSM, StopFct, msmf::none>,
msmf::Row<RUNNING_FSM, PAUSE, WAITING_FSM, PauseFct, msmf::none>,
@ -179,6 +209,8 @@ namespace FairMQFSM
INITIALIZING,
SETTINGOUTPUT,
SETTINGINPUT,
BINDING,
CONNECTING,
WAITING,
RUNNING
};
@ -195,6 +227,8 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM
INIT,
SETOUTPUT,
SETINPUT,
BIND,
CONNECT,
PAUSE,
RUN,
STOP,
@ -202,7 +236,11 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM
};
FairMQStateMachine();
virtual ~FairMQStateMachine();
void ChangeState(int event);
int GetInterfaceVersion();
bool ChangeState(int event);
bool ChangeState(std::string event);
// condition variable to notify parent thread about end of running state.
boost::condition_variable fRunningCondition;

View File

@ -130,6 +130,8 @@ int main(int argc, char** argv)
client.ChangeState(FairMQExampleClient::SETOUTPUT);
client.ChangeState(FairMQExampleClient::SETINPUT);
client.ChangeState(FairMQExampleClient::BIND);
client.ChangeState(FairMQExampleClient::CONNECT);
client.ChangeState(FairMQExampleClient::RUN);
// wait until the running thread has finished processing.

View File

@ -78,6 +78,8 @@ int main(int argc, char** argv)
server.ChangeState(FairMQExampleServer::SETOUTPUT);
server.ChangeState(FairMQExampleServer::SETINPUT);
server.ChangeState(FairMQExampleServer::BIND);
server.ChangeState(FairMQExampleServer::CONNECT);
LOG(INFO) << "Listening for requests!";

View File

@ -61,7 +61,7 @@ string FairMQSocketNN::GetId()
return fId;
}
void FairMQSocketNN::Bind(const string& address)
bool FairMQSocketNN::Bind(const string& address)
{
LOG(INFO) << "bind socket #" << fId << " on " << address;
@ -69,7 +69,9 @@ void FairMQSocketNN::Bind(const string& address)
if (eid < 0)
{
LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << nn_strerror(errno);
return false;
}
return true;
}
void FairMQSocketNN::Connect(const string& address)

View File

@ -30,7 +30,7 @@ class FairMQSocketNN : public FairMQSocket
virtual string GetId();
virtual void Bind(const string& address);
virtual bool Bind(const string& address);
virtual void Connect(const string& address);
virtual int Send(FairMQMessage* msg, const string& flag="");

View File

@ -168,6 +168,8 @@ int main(int argc, char** argv)
sampler.ChangeState(FairMQBenchmarkSampler::SETOUTPUT);
sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT);
sampler.ChangeState(FairMQBenchmarkSampler::BIND);
sampler.ChangeState(FairMQBenchmarkSampler::CONNECT);
sampler.ChangeState(FairMQBenchmarkSampler::RUN);
// wait until the running thread has finished processing.

View File

@ -168,6 +168,8 @@ int main(int argc, char** argv)
sampler.ChangeState(FairMQBinSampler::SETOUTPUT);
sampler.ChangeState(FairMQBinSampler::SETINPUT);
sampler.ChangeState(FairMQBinSampler::BIND);
sampler.ChangeState(FairMQBinSampler::CONNECT);
sampler.ChangeState(FairMQBinSampler::RUN);
// wait until the running thread has finished processing.

View File

@ -154,6 +154,8 @@ int main(int argc, char** argv)
sink.ChangeState(FairMQBinSink::SETOUTPUT);
sink.ChangeState(FairMQBinSink::SETINPUT);
sink.ChangeState(FairMQBinSink::BIND);
sink.ChangeState(FairMQBinSink::CONNECT);
sink.ChangeState(FairMQBinSink::RUN);
// wait until the running thread has finished processing.

View File

@ -180,6 +180,8 @@ int main(int argc, char** argv)
buffer.ChangeState(FairMQBuffer::SETOUTPUT);
buffer.ChangeState(FairMQBuffer::SETINPUT);
buffer.ChangeState(FairMQBuffer::BIND);
buffer.ChangeState(FairMQBuffer::CONNECT);
buffer.ChangeState(FairMQBuffer::RUN);
// wait until the running thread has finished processing.

View File

@ -188,6 +188,8 @@ int main(int argc, char** argv)
merger.ChangeState(FairMQMerger::SETOUTPUT);
merger.ChangeState(FairMQMerger::SETINPUT);
merger.ChangeState(FairMQMerger::BIND);
merger.ChangeState(FairMQMerger::CONNECT);
merger.ChangeState(FairMQMerger::RUN);
// wait until the running thread has finished processing.

View File

@ -168,6 +168,8 @@ int main(int argc, char** argv)
sampler.ChangeState(FairMQProtoSampler::SETOUTPUT);
sampler.ChangeState(FairMQProtoSampler::SETINPUT);
sampler.ChangeState(FairMQProtoSampler::BIND);
sampler.ChangeState(FairMQProtoSampler::CONNECT);
sampler.ChangeState(FairMQProtoSampler::RUN);
// wait until the running thread has finished processing.

View File

@ -154,6 +154,8 @@ int main(int argc, char** argv)
sink.ChangeState(FairMQProtoSink::SETOUTPUT);
sink.ChangeState(FairMQProtoSink::SETINPUT);
sink.ChangeState(FairMQProtoSink::BIND);
sink.ChangeState(FairMQProtoSink::CONNECT);
sink.ChangeState(FairMQProtoSink::RUN);
// wait until the running thread has finished processing.

View File

@ -180,6 +180,8 @@ int main(int argc, char** argv)
proxy.ChangeState(FairMQProxy::SETOUTPUT);
proxy.ChangeState(FairMQProxy::SETINPUT);
proxy.ChangeState(FairMQProxy::BIND);
proxy.ChangeState(FairMQProxy::CONNECT);
proxy.ChangeState(FairMQProxy::RUN);
// wait until the running thread has finished processing.

View File

@ -154,6 +154,8 @@ int main(int argc, char** argv)
sink.ChangeState(FairMQSink::SETOUTPUT);
sink.ChangeState(FairMQSink::SETINPUT);
sink.ChangeState(FairMQSink::BIND);
sink.ChangeState(FairMQSink::CONNECT);
sink.ChangeState(FairMQSink::RUN);
// wait until the running thread has finished processing.

View File

@ -188,6 +188,8 @@ int main(int argc, char** argv)
splitter.ChangeState(FairMQSplitter::SETOUTPUT);
splitter.ChangeState(FairMQSplitter::SETINPUT);
splitter.ChangeState(FairMQSplitter::BIND);
splitter.ChangeState(FairMQSplitter::CONNECT);
splitter.ChangeState(FairMQSplitter::RUN);
// wait until the running thread has finished processing.

View File

@ -72,7 +72,7 @@ string FairMQSocketZMQ::GetId()
return fId;
}
void FairMQSocketZMQ::Bind(const string& address)
bool FairMQSocketZMQ::Bind(const string& address)
{
LOG(INFO) << "bind socket #" << fId << " on " << address;
@ -80,7 +80,9 @@ void FairMQSocketZMQ::Bind(const string& address)
if (rc != 0)
{
LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
return false;
}
return true;
}
void FairMQSocketZMQ::Connect(const string& address)

View File

@ -29,7 +29,7 @@ class FairMQSocketZMQ : public FairMQSocket
virtual string GetId();
virtual void Bind(const string& address);
virtual bool Bind(const string& address);
virtual void Connect(const string& address);
virtual int Send(FairMQMessage* msg, const string& flag="");