Update FairMQStateMachine & introduce FairMQChannels

Organize sockets as a map of vectors of FairMQChannels.

Update FairMQStateMachine by removing SETTINGINPUT, SETTINGOUTPUT,
BIND and CONNECT states and by adding INITIALIZING_TASK, RESETTING_TASK
and RESETTING_DEVICE states. Run states functions in their own thread.
This commit is contained in:
Alexey Rybalchenko
2015-04-29 13:25:42 +02:00
parent a2ebbbe450
commit 7fda980710
54 changed files with 1674 additions and 1573 deletions

View File

@@ -27,15 +27,13 @@ FairMQContextZMQ::FairMQContextZMQ(int numIoThreads)
exit(EXIT_FAILURE);
}
int rc = zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads);
if (rc != 0)
if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0)
{
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
}
// Set the maximum number of allowed sockets on the context.
rc = zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000);
if (rc != 0)
if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0)
{
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
}
@@ -58,8 +56,7 @@ void FairMQContextZMQ::Close()
return;
}
int rc = zmq_ctx_destroy(fContext);
if (rc != 0)
if (zmq_ctx_destroy(fContext) != 0)
{
if (errno == EINTR) {
LOG(ERROR) << " failed closing context, reason: " << zmq_strerror(errno);

View File

@@ -23,8 +23,7 @@ using namespace std;
FairMQMessageZMQ::FairMQMessageZMQ()
: fMessage()
{
int rc = zmq_msg_init(&fMessage);
if (rc != 0)
if (zmq_msg_init(&fMessage) != 0)
{
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
}
@@ -33,8 +32,7 @@ FairMQMessageZMQ::FairMQMessageZMQ()
FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
: fMessage()
{
int rc = zmq_msg_init_size(&fMessage, size);
if (rc != 0)
if (zmq_msg_init_size(&fMessage, size) != 0)
{
LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno);
}
@@ -43,8 +41,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(size_t size)
FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
: fMessage()
{
int rc = zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint);
if (rc != 0)
if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0)
{
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
}
@@ -53,8 +50,7 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, size_t size, fairmq_free_fn *ffn,
void FairMQMessageZMQ::Rebuild()
{
CloseMessage();
int rc = zmq_msg_init(&fMessage);
if (rc != 0)
if (zmq_msg_init(&fMessage) != 0)
{
LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno);
}
@@ -63,8 +59,7 @@ void FairMQMessageZMQ::Rebuild()
void FairMQMessageZMQ::Rebuild(size_t size)
{
CloseMessage();
int rc = zmq_msg_init_size(&fMessage, size);
if (rc != 0)
if (zmq_msg_init_size(&fMessage, size) != 0)
{
LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno);
}
@@ -73,8 +68,7 @@ void FairMQMessageZMQ::Rebuild(size_t size)
void FairMQMessageZMQ::Rebuild(void* data, size_t size, fairmq_free_fn *ffn, void* hint)
{
CloseMessage();
int rc = zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint);
if (rc != 0)
if (zmq_msg_init_data(&fMessage, data, size, ffn ? ffn : &CleanUp, hint) != 0)
{
LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno);
}
@@ -103,8 +97,7 @@ void FairMQMessageZMQ::SetMessage(void* data, size_t size)
void FairMQMessageZMQ::Copy(FairMQMessage* msg)
{
// Shares the message buffer between msg and this fMessage.
int rc = zmq_msg_copy(&fMessage, (zmq_msg_t*)msg->GetMessage());
if (rc != 0)
if (zmq_msg_copy(&fMessage, (zmq_msg_t*)msg->GetMessage()) != 0)
{
LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno);
}
@@ -119,8 +112,7 @@ void FairMQMessageZMQ::Copy(FairMQMessage* msg)
inline void FairMQMessageZMQ::CloseMessage()
{
int rc = zmq_msg_close(&fMessage);
if (rc != 0)
if (zmq_msg_close(&fMessage) != 0)
{
LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno);
}
@@ -133,8 +125,7 @@ void FairMQMessageZMQ::CleanUp(void* data, void* hint)
FairMQMessageZMQ::~FairMQMessageZMQ()
{
int rc = zmq_msg_close(&fMessage);
if (rc != 0)
if (zmq_msg_close(&fMessage) != 0)
{
LOG(ERROR) << "failed closing message with data, reason: " << zmq_strerror(errno);
}

View File

@@ -19,16 +19,16 @@
using namespace std;
FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQSocket*>& inputs)
FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQChannel>& channels)
: items()
, fNumItems()
{
fNumItems = inputs.size();
fNumItems = channels.size();
items = new zmq_pollitem_t[fNumItems];
for (int i = 0; i < fNumItems; i++)
for (int i = 0; i < fNumItems; ++i)
{
items[i].socket = inputs.at(i)->GetSocket();
items[i].socket = channels.at(i).fSocket->GetSocket();
items[i].fd = 0;
items[i].events = ZMQ_POLLIN;
items[i].revents = 0;
@@ -37,8 +37,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQSocket*>& inputs)
void FairMQPollerZMQ::Poll(int timeout)
{
int rc = zmq_poll(items, fNumItems, timeout);
if (rc < 0)
if (zmq_poll(items, fNumItems, timeout) < 0)
{
LOG(ERROR) << "polling failed, reason: " << zmq_strerror(errno);
}
@@ -47,7 +46,9 @@ void FairMQPollerZMQ::Poll(int timeout)
bool FairMQPollerZMQ::CheckInput(int index)
{
if (items[index].revents & ZMQ_POLLIN)
{
return true;
}
return false;
}
@@ -55,7 +56,9 @@ bool FairMQPollerZMQ::CheckInput(int index)
bool FairMQPollerZMQ::CheckOutput(int index)
{
if (items[index].revents & ZMQ_POLLOUT)
{
return true;
}
return false;
}
@@ -63,5 +66,7 @@ bool FairMQPollerZMQ::CheckOutput(int index)
FairMQPollerZMQ::~FairMQPollerZMQ()
{
if (items != NULL)
{
delete[] items;
}
}

View File

@@ -18,12 +18,12 @@
#include <vector>
#include "FairMQPoller.h"
#include "FairMQSocket.h"
#include "FairMQChannel.h"
class FairMQPollerZMQ : public FairMQPoller
{
public:
FairMQPollerZMQ(const std::vector<FairMQSocket*>& inputs);
FairMQPollerZMQ(const std::vector<FairMQChannel>& channels);
virtual void Poll(int timeout);
virtual bool CheckInput(int index);

View File

@@ -19,9 +19,10 @@
using namespace std;
// Context to hold the ZeroMQ sockets
boost::shared_ptr<FairMQContextZMQ> FairMQSocketZMQ::fContext = boost::shared_ptr<FairMQContextZMQ>(new FairMQContextZMQ(1));
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads)
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, int numIoThreads)
: FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT)
, fSocket(NULL)
, fId()
@@ -30,12 +31,9 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads)
, fMessagesTx(0)
, fMessagesRx(0)
{
stringstream id;
id << type << "." << num;
fId = id.str();
fId = name + "." + type;
int rc = zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads);
if (rc != 0)
if (zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads) != 0)
{
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
}
@@ -44,12 +42,11 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads)
if (fSocket == NULL)
{
LOG(ERROR) << "failed creating socket #" << fId << ", reason: " << zmq_strerror(errno);
LOG(ERROR) << "failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
exit(EXIT_FAILURE);
}
rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
if (rc != 0)
if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()) != 0)
{
LOG(ERROR) << "failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno);
}
@@ -57,22 +54,20 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads)
// Tell socket to try and send/receive outstanding messages for <linger> milliseconds before terminating.
// Default value for ZeroMQ is -1, which is to wait forever.
int linger = 500;
rc = zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger));
if (rc != 0)
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0)
{
LOG(ERROR) << "failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
}
if (type == "sub")
{
rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0);
if (rc != 0)
if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0) != 0)
{
LOG(ERROR) << "failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno);
}
}
LOG(INFO) << "created socket #" << fId;
// LOG(INFO) << "created socket " << fId;
}
string FairMQSocketZMQ::GetId()
@@ -82,16 +77,15 @@ string FairMQSocketZMQ::GetId()
bool FairMQSocketZMQ::Bind(const string& address)
{
LOG(INFO) << "bind socket #" << fId << " on " << address;
// LOG(INFO) << "bind socket " << fId << " on " << address;
int rc = zmq_bind(fSocket, address.c_str());
if (rc != 0)
if (zmq_bind(fSocket, address.c_str()) != 0)
{
if (errno == EADDRINUSE) {
// do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range.
return false;
}
LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno);
LOG(ERROR) << "failed binding socket " << fId << ", reason: " << zmq_strerror(errno);
return false;
}
return true;
@@ -99,12 +93,11 @@ bool FairMQSocketZMQ::Bind(const string& address)
void FairMQSocketZMQ::Connect(const string& address)
{
LOG(INFO) << "connect socket #" << fId << " on " << address;
// LOG(INFO) << "connect socket " << fId << " on " << address;
int rc = zmq_connect(fSocket, address.c_str());
if (rc != 0)
if (zmq_connect(fSocket, address.c_str()) != 0)
{
LOG(ERROR) << "failed connecting socket #" << fId << ", reason: " << zmq_strerror(errno);
LOG(ERROR) << "failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
}
}
@@ -123,10 +116,10 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag)
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket #" << fId;
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno);
LOG(ERROR) << "failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
@@ -145,10 +138,10 @@ int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags)
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket #" << fId;
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno);
LOG(ERROR) << "failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
@@ -167,10 +160,10 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag)
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket #" << fId;
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno);
LOG(ERROR) << "failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
@@ -189,24 +182,25 @@ int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags)
}
if (zmq_errno() == ETERM)
{
LOG(INFO) << "terminating socket #" << fId;
LOG(INFO) << "terminating socket " << fId;
return -1;
}
LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno);
LOG(ERROR) << "failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes;
}
void FairMQSocketZMQ::Close()
{
// LOG(DEBUG) << "Closing socket " << fId;
if (fSocket == NULL)
{
return;
}
int rc = zmq_close(fSocket);
if (rc != 0)
if (zmq_close(fSocket) != 0)
{
LOG(ERROR) << "failed closing socket, reason: " << zmq_strerror(errno);
LOG(ERROR) << "failed closing socket " << fId << ", reason: " << zmq_strerror(errno);
}
fSocket = NULL;
@@ -214,8 +208,7 @@ void FairMQSocketZMQ::Close()
void FairMQSocketZMQ::Terminate()
{
int rc = zmq_ctx_destroy(fContext->GetContext());
if (rc != 0)
if (zmq_ctx_destroy(fContext->GetContext()) != 0)
{
LOG(ERROR) << "failed terminating context, reason: " << zmq_strerror(errno);
}
@@ -234,8 +227,7 @@ int FairMQSocketZMQ::GetSocket(int nothing)
void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t valueSize)
{
int rc = zmq_setsockopt(fSocket, GetConstant(option), value, valueSize);
if (rc < 0)
if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0)
{
LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno);
}
@@ -243,8 +235,8 @@ void FairMQSocketZMQ::SetOption(const string& option, const void* value, size_t
void FairMQSocketZMQ::GetOption(const string& option, void* value, size_t* valueSize)
{
int rc = zmq_getsockopt(fSocket, GetConstant(option), value, valueSize);
if (rc < 0) {
if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0)
{
LOG(ERROR) << "failed getting socket option, reason: " << zmq_strerror(errno);
}
}

View File

@@ -25,17 +25,17 @@
class FairMQSocketZMQ : public FairMQSocket
{
public:
FairMQSocketZMQ(const std::string& type, int num, int numIoThreads);
FairMQSocketZMQ(const std::string& type, const std::string& name, int numIoThreads);
virtual std::string GetId();
virtual bool Bind(const std::string& address);
virtual void Connect(const std::string& address);
virtual int Send(FairMQMessage* msg, const std::string& flag="");
virtual int Send(FairMQMessage* msg, const int flags);
virtual int Receive(FairMQMessage* msg, const std::string& flag="");
virtual int Receive(FairMQMessage* msg, const int flags);
virtual int Send(FairMQMessage* msg, const std::string& flag = "");
virtual int Send(FairMQMessage* msg, const int flags = 0);
virtual int Receive(FairMQMessage* msg, const std::string& flag = "");
virtual int Receive(FairMQMessage* msg, const int flags = 0);
virtual void* GetSocket();
virtual int GetSocket(int nothing);

View File

@@ -22,7 +22,7 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ()
{
int major, minor, patch;
zmq_version(&major, &minor, &patch);
LOG(INFO) << "Using ZeroMQ library, version: " << major << "." << minor << "." << patch;
LOG(DEBUG) << "Using ZeroMQ library, version: " << major << "." << minor << "." << patch;
}
FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage()
@@ -40,12 +40,12 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size,
return new FairMQMessageZMQ(data, size, ffn, hint);
}
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num, int numIoThreads)
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, int numIoThreads)
{
return new FairMQSocketZMQ(type, num, numIoThreads);
return new FairMQSocketZMQ(type, name, numIoThreads);
}
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQSocket*>& inputs)
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel>& channels)
{
return new FairMQPollerZMQ(inputs);
return new FairMQPollerZMQ(channels);
}

View File

@@ -31,8 +31,8 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size, fairmq_free_fn *ffn = NULL, void* hint = NULL);
virtual FairMQSocket* CreateSocket(const std::string& type, int num, int numIoThreads);
virtual FairMQPoller* CreatePoller(const std::vector<FairMQSocket*>& inputs);
virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, int numIoThreads);
virtual FairMQPoller* CreatePoller(const std::vector<FairMQChannel>& channels);
virtual ~FairMQTransportFactoryZMQ() {};
};