add support for I/O threads to the interface (only used with ZeroMQ).

This commit is contained in:
Alexey Rybalchenko 2014-05-27 19:01:49 +02:00
parent 24d26e802a
commit e80e6d4269
10 changed files with 24 additions and 18 deletions

View File

@ -25,9 +25,6 @@ void FairMQDevice::Init()
LOG(INFO) << ">>>>>>> Init <<<<<<<";
LOG(INFO) << "numIoThreads: " << fNumIoThreads;
// fPayloadContext = new FairMQContextZMQ(fNumIoThreads);
// TODO: nafiga?
fInputAddress = new vector<string>(fNumInputs);
fInputMethod = new vector<string>();
fInputSocketType = new vector<string>();
@ -60,7 +57,7 @@ void FairMQDevice::InitInput()
LOG(INFO) << ">>>>>>> InitInput <<<<<<<";
for (int i = 0; i < fNumInputs; ++i) {
FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i);
FairMQSocket* socket = fTransportFactory->CreateSocket(fInputSocketType->at(i), i, fNumIoThreads);
socket->SetOption("snd-hwm", &fInputSndBufSize->at(i), sizeof(fInputSndBufSize->at(i)));
socket->SetOption("rcv-hwm", &fInputRcvBufSize->at(i), sizeof(fInputRcvBufSize->at(i)));
@ -83,7 +80,7 @@ void FairMQDevice::InitOutput()
LOG(INFO) << ">>>>>>> InitOutput <<<<<<<";
for (int i = 0; i < fNumOutputs; ++i) {
FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i);
FairMQSocket* socket = fTransportFactory->CreateSocket(fOutputSocketType->at(i), i, fNumIoThreads);
socket->SetOption("snd-hwm", &fOutputSndBufSize->at(i), sizeof(fOutputSndBufSize->at(i)));
socket->SetOption("rcv-hwm", &fOutputRcvBufSize->at(i), sizeof(fOutputRcvBufSize->at(i)));

View File

@ -23,7 +23,7 @@ class FairMQTransportFactory
virtual FairMQMessage* CreateMessage() = 0;
virtual FairMQMessage* CreateMessage(size_t size) = 0;
virtual FairMQMessage* CreateMessage(void* data, size_t size) = 0;
virtual FairMQSocket* CreateSocket(const string& type, int num) = 0;
virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads) = 0;
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs) = 0;
virtual ~FairMQTransportFactory() {};

View File

@ -11,7 +11,7 @@
#include "FairMQMessageNN.h"
#include "FairMQLogger.h"
FairMQSocketNN::FairMQSocketNN(const string& type, int num) :
FairMQSocketNN::FairMQSocketNN(const string& type, int num, int numIoThreads) :
fBytesTx(0),
fBytesRx(0),
fMessagesTx(0),
@ -21,6 +21,10 @@ FairMQSocketNN::FairMQSocketNN(const string& type, int num) :
id << type << "." << num;
fId = id.str();
if ( numIoThreads > 1 ) {
LOG(INFO) << "number of I/O threads is not used in nanomsg";
}
fSocket = nn_socket (AF_SP, GetConstant(type));
if (type == "sub") {
nn_setsockopt(fSocket, NN_SUB, NN_SUB_SUBSCRIBE, NULL, 0);

View File

@ -18,7 +18,7 @@
class FairMQSocketNN : public FairMQSocket
{
public:
FairMQSocketNN(const string& type, int num);
FairMQSocketNN(const string& type, int num, int numIoThreads); // numIoThreads is not used in nanomsg.
virtual string GetId();

View File

@ -27,9 +27,9 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, size_t size)
return new FairMQMessageNN(data, size);
}
FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num)
FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, int num, int numIoThreads)
{
return new FairMQSocketNN(type, num);
return new FairMQSocketNN(type, num, numIoThreads);
}
FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector<FairMQSocket*>& inputs)

View File

@ -23,7 +23,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size);
virtual FairMQSocket* CreateSocket(const string& type, int num);
virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads);
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs);

View File

@ -10,9 +10,9 @@
#include "FairMQSocketZMQ.h"
#include "FairMQLogger.h"
boost::shared_ptr<FairMQContextZMQ> FairMQSocketZMQ::fContext = boost::shared_ptr<FairMQContextZMQ>(new FairMQContextZMQ(1)); // TODO: numIoThreads!
boost::shared_ptr<FairMQContextZMQ> FairMQSocketZMQ::fContext = boost::shared_ptr<FairMQContextZMQ>(new FairMQContextZMQ(1));
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num) :
FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads) :
fBytesTx(0),
fBytesRx(0),
fMessagesTx(0),
@ -22,9 +22,14 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num) :
id << type << "." << num;
fId = id.str();
int rc = zmq_ctx_set (fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads);
if (rc != 0){
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
}
fSocket = zmq_socket(fContext->GetContext(), GetConstant(type));
int rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length());
if (rc != 0) {
LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno);
}

View File

@ -19,7 +19,7 @@
class FairMQSocketZMQ : public FairMQSocket
{
public:
FairMQSocketZMQ(const string& type, int num);
FairMQSocketZMQ(const string& type, int num, int numIoThreads);
virtual string GetId();

View File

@ -31,9 +31,9 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, size_t size)
return new FairMQMessageZMQ(data, size);
}
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num)
FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, int num, int numIoThreads)
{
return new FairMQSocketZMQ(type, num);
return new FairMQSocketZMQ(type, num, numIoThreads);
}
FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQSocket*>& inputs)

View File

@ -24,7 +24,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
virtual FairMQMessage* CreateMessage();
virtual FairMQMessage* CreateMessage(size_t size);
virtual FairMQMessage* CreateMessage(void* data, size_t size);
virtual FairMQSocket* CreateSocket(const string& type, int num);
virtual FairMQSocket* CreateSocket(const string& type, int num, int numIoThreads);
virtual FairMQPoller* CreatePoller(const vector<FairMQSocket*>& inputs);