Add NewPoller() wrapper.

This commit is contained in:
Alexey Rybalchenko 2017-05-19 09:41:13 +02:00 committed by Mohammad Al-Turany
parent 3be2f297f3
commit 6b221d950c
16 changed files with 261 additions and 187 deletions

View File

@ -1227,6 +1227,12 @@ bool FairMQDevice::Terminated()
return fTerminationRequested; return fTerminationRequested;
} }
const FairMQChannel& FairMQDevice::GetChannel(const std::string& channelName, const int index) const
{
return fChannels.at(channelName).at(index);
}
void FairMQDevice::Exit() void FairMQDevice::Exit()
{ {
// ask transports to terminate transfers // ask transports to terminate transfers

View File

@ -237,6 +237,49 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
return fChannels.at(channel).at(index).NewSimpleMessage(data); return fChannels.at(channel).at(index).NewSimpleMessage(data);
} }
template<typename ...Ts>
FairMQPollerPtr NewPoller(const Ts&... inputs)
{
std::vector<std::string> chans{inputs...};
// if more than one channel provided, check compatibility
if (chans.size() > 1)
{
FairMQ::Transport type = fChannels.at(chans.at(0)).at(0).Transport()->GetType();
for (int i = 1; i < chans.size(); ++i)
{
if (type != fChannels.at(chans.at(i)).at(0).Transport()->GetType())
{
LOG(ERROR) << "FairMQDevice::NewPoller() failed: different transports within same poller are not yet supported. Going to ERROR state.";
ChangeState(ERROR_FOUND);
}
}
}
return fChannels.at(chans.at(0)).at(0).Transport()->CreatePoller(fChannels, chans);
}
FairMQPollerPtr NewPoller(const std::vector<const FairMQChannel*>& channels)
{
// if more than one channel provided, check compatibility
if (channels.size() > 1)
{
FairMQ::Transport type = channels.at(0)->Transport()->GetType();
for (int i = 1; i < channels.size(); ++i)
{
if (type != channels.at(i)->Transport()->GetType())
{
LOG(ERROR) << "FairMQDevice::NewPoller() failed: different transports within same poller are not yet supported. Going to ERROR state.";
ChangeState(ERROR_FOUND);
}
}
}
return channels.at(0)->Transport()->CreatePoller(channels);
}
/// Waits for the first initialization run to finish /// Waits for the first initialization run to finish
void WaitForInitialValidation(); void WaitForInitialValidation();
@ -337,6 +380,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
bool Terminated(); bool Terminated();
const FairMQChannel& GetChannel(const std::string& channelName, const int index) const;
protected: protected:
std::shared_ptr<FairMQTransportFactory> fTransportFactory; ///< Transport factory std::shared_ptr<FairMQTransportFactory> fTransportFactory; ///< Transport factory
std::unordered_map<FairMQ::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports; ///< Container for transports std::unordered_map<FairMQ::Transport, std::shared_ptr<FairMQTransportFactory>> fTransports; ///< Container for transports

View File

@ -56,9 +56,11 @@ class FairMQTransportFactory
/// Create a socket /// Create a socket
virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const = 0; virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const = 0;
/// Create a poller for all device channels /// Create a poller for a single channel (all subchannels)
virtual FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const = 0; virtual FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const = 0;
/// Create a poller for all device channels /// Create a poller for specific channels
virtual FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const = 0;
/// Create a poller for specific channels (all subchannels)
virtual FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const = 0; virtual FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const = 0;
/// Create a poller for two sockets /// Create a poller for two sockets
virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const = 0; virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const = 0;

View File

@ -41,7 +41,14 @@ void FairMQMerger::Run()
{ {
int numInputs = fChannels.at(fInChannelName).size(); int numInputs = fChannels.at(fInChannelName).size();
std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels.at(fInChannelName))); vector<const FairMQChannel*> chans;
for (auto& chan : fChannels.at(fInChannelName))
{
chans.push_back(&chan);
}
FairMQPollerPtr poller(NewPoller(chans));
if (fMultipart) if (fMultipart)
{ {

View File

@ -24,43 +24,47 @@
using namespace std; using namespace std;
FairMQPollerNN::FairMQPollerNN(const vector<FairMQChannel>& channels) FairMQPollerNN::FairMQPollerNN(const vector<FairMQChannel>& channels)
: items() : fItems()
, fNumItems(0) , fNumItems(0)
, fOffsetMap() , fOffsetMap()
{ {
fNumItems = channels.size(); fNumItems = channels.size();
items = new nn_pollfd[fNumItems]; fItems = new nn_pollfd[fNumItems];
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
items[i].fd = channels.at(i).GetSocket().GetSocket(1); fItems[i].fd = channels.at(i).GetSocket().GetSocket(1);
int type = 0; int type = 0;
size_t sz = sizeof(type); size_t sz = sizeof(type);
nn_getsockopt(channels.at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); nn_getsockopt(channels.at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
if (type == NN_REQ || type == NN_REP || type == NN_PAIR) SetItemEvents(fItems[i], type);
{ }
items[i].events = NN_POLLIN|NN_POLLOUT; }
}
else if (type == NN_PUSH || type == NN_PUB) FairMQPollerNN::FairMQPollerNN(const vector<const FairMQChannel*>& channels)
{ : fItems()
items[i].events = NN_POLLOUT; , fNumItems(0)
} , fOffsetMap()
else if (type == NN_PULL || type == NN_SUB) {
{ fNumItems = channels.size();
items[i].events = NN_POLLIN; fItems = new nn_pollfd[fNumItems];
}
else for (int i = 0; i < fNumItems; ++i)
{ {
LOG(ERROR) << "nanomsg: invalid poller configuration, exiting."; fItems[i].fd = channels.at(i)->GetSocket().GetSocket(1);
exit(EXIT_FAILURE);
} int type = 0;
size_t sz = sizeof(type);
nn_getsockopt(channels.at(i)->GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
SetItemEvents(fItems[i], type);
} }
} }
FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
: items() : fItems()
, fNumItems(0) , fNumItems(0)
, fOffsetMap() , fOffsetMap()
{ {
@ -76,7 +80,7 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>
fNumItems += channelsMap.at(channel).size(); fNumItems += channelsMap.at(channel).size();
} }
items = new nn_pollfd[fNumItems]; fItems = new nn_pollfd[fNumItems];
int index = 0; int index = 0;
for (string channel : channelList) for (string channel : channelList)
@ -84,29 +88,13 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>
for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i)
{ {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
items[index].fd = channelsMap.at(channel).at(i).GetSocket().GetSocket(1); fItems[index].fd = channelsMap.at(channel).at(i).GetSocket().GetSocket(1);
int type = 0; int type = 0;
size_t sz = sizeof(type); size_t sz = sizeof(type);
nn_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); nn_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
if (type == NN_REQ || type == NN_REP || type == NN_PAIR) SetItemEvents(fItems[index], type);
{
items[index].events = NN_POLLIN|NN_POLLOUT;
}
else if (type == NN_PUSH || type == NN_PUB)
{
items[index].events = NN_POLLOUT;
}
else if (type == NN_PULL || type == NN_SUB)
{
items[index].events = NN_POLLIN;
}
else
{
LOG(ERROR) << "nanomsg: invalid poller configuration, exiting.";
exit(EXIT_FAILURE);
}
} }
} }
} }
@ -119,34 +107,39 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>
} }
FairMQPollerNN::FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) FairMQPollerNN::FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: items() : fItems()
, fNumItems(2) , fNumItems(2)
, fOffsetMap() , fOffsetMap()
{ {
items = new nn_pollfd[fNumItems]; fItems = new nn_pollfd[fNumItems];
items[0].fd = cmdSocket.GetSocket(1); fItems[0].fd = cmdSocket.GetSocket(1);
items[0].events = NN_POLLIN; fItems[0].events = NN_POLLIN;
items[0].revents = 0; fItems[0].revents = 0;
items[1].fd = dataSocket.GetSocket(1); fItems[1].fd = dataSocket.GetSocket(1);
items[1].revents = 0; fItems[1].revents = 0;
int type = 0; int type = 0;
size_t sz = sizeof(type); size_t sz = sizeof(type);
nn_getsockopt(dataSocket.GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); nn_getsockopt(dataSocket.GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
SetItemEvents(fItems[1], type);
}
void FairMQPollerNN::SetItemEvents(nn_pollfd& item, const int type)
{
if (type == NN_REQ || type == NN_REP || type == NN_PAIR) if (type == NN_REQ || type == NN_REP || type == NN_PAIR)
{ {
items[1].events = NN_POLLIN|NN_POLLOUT; item.events = NN_POLLIN|NN_POLLOUT;
} }
else if (type == NN_PUSH || type == NN_PUB) else if (type == NN_PUSH || type == NN_PUB)
{ {
items[1].events = NN_POLLOUT; item.events = NN_POLLOUT;
} }
else if (type == NN_PULL || type == NN_SUB) else if (type == NN_PULL || type == NN_SUB)
{ {
items[1].events = NN_POLLIN; item.events = NN_POLLIN;
} }
else else
{ {
@ -157,7 +150,7 @@ FairMQPollerNN::FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket
void FairMQPollerNN::Poll(const int timeout) void FairMQPollerNN::Poll(const int timeout)
{ {
if (nn_poll(items, fNumItems, timeout) < 0) if (nn_poll(fItems, fNumItems, timeout) < 0)
{ {
if (errno == ETERM) if (errno == ETERM)
{ {
@ -173,7 +166,7 @@ void FairMQPollerNN::Poll(const int timeout)
bool FairMQPollerNN::CheckInput(const int index) bool FairMQPollerNN::CheckInput(const int index)
{ {
if (items[index].revents & (NN_POLLIN | NN_POLLOUT)) if (fItems[index].revents & (NN_POLLIN | NN_POLLOUT))
{ {
return true; return true;
} }
@ -183,7 +176,7 @@ bool FairMQPollerNN::CheckInput(const int index)
bool FairMQPollerNN::CheckOutput(const int index) bool FairMQPollerNN::CheckOutput(const int index)
{ {
if (items[index].revents & NN_POLLOUT) if (fItems[index].revents & NN_POLLOUT)
{ {
return true; return true;
} }
@ -195,7 +188,7 @@ bool FairMQPollerNN::CheckInput(const string channelKey, const int index)
{ {
try try
{ {
if (items[fOffsetMap.at(channelKey) + index].revents & (NN_POLLIN | NN_POLLOUT)) if (fItems[fOffsetMap.at(channelKey) + index].revents & (NN_POLLIN | NN_POLLOUT))
{ {
return true; return true;
} }
@ -214,7 +207,7 @@ bool FairMQPollerNN::CheckOutput(const string channelKey, const int index)
{ {
try try
{ {
if (items[fOffsetMap.at(channelKey) + index].revents & NN_POLLOUT) if (fItems[fOffsetMap.at(channelKey) + index].revents & NN_POLLOUT)
{ {
return true; return true;
} }
@ -231,5 +224,5 @@ bool FairMQPollerNN::CheckOutput(const string channelKey, const int index)
FairMQPollerNN::~FairMQPollerNN() FairMQPollerNN::~FairMQPollerNN()
{ {
delete[] items; delete[] fItems;
} }

View File

@ -33,10 +33,14 @@ class FairMQPollerNN : public FairMQPoller
public: public:
FairMQPollerNN(const std::vector<FairMQChannel>& channels); FairMQPollerNN(const std::vector<FairMQChannel>& channels);
FairMQPollerNN(const std::vector<const FairMQChannel*>& channels);
FairMQPollerNN(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList); FairMQPollerNN(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList);
FairMQPollerNN(const FairMQPollerNN&) = delete; FairMQPollerNN(const FairMQPollerNN&) = delete;
FairMQPollerNN operator=(const FairMQPollerNN&) = delete; FairMQPollerNN operator=(const FairMQPollerNN&) = delete;
void SetItemEvents(nn_pollfd& item, const int type);
virtual void Poll(const int timeout); virtual void Poll(const int timeout);
virtual bool CheckInput(const int index); virtual bool CheckInput(const int index);
virtual bool CheckOutput(const int index); virtual bool CheckOutput(const int index);
@ -48,7 +52,7 @@ class FairMQPollerNN : public FairMQPoller
private: private:
FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
nn_pollfd* items; nn_pollfd* fItems;
int fNumItems; int fNumItems;
std::unordered_map<std::string, int> fOffsetMap; std::unordered_map<std::string, int> fOffsetMap;

View File

@ -51,6 +51,11 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector<FairMQChanne
return unique_ptr<FairMQPoller>(new FairMQPollerNN(channels)); return unique_ptr<FairMQPoller>(new FairMQPollerNN(channels));
} }
FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const std::vector<const FairMQChannel*>& channels) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerNN(channels));
}
FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) const FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) const
{ {
return unique_ptr<FairMQPoller>(new FairMQPollerNN(channelsMap, channelList)); return unique_ptr<FairMQPoller>(new FairMQPollerNN(channelsMap, channelList));

View File

@ -32,6 +32,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override; FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override; FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override;

View File

@ -20,45 +20,51 @@
using namespace std; using namespace std;
FairMQPollerSHM::FairMQPollerSHM(const vector<FairMQChannel>& channels) FairMQPollerSHM::FairMQPollerSHM(const vector<FairMQChannel>& channels)
: items() : fItems()
, fNumItems(0) , fNumItems(0)
, fOffsetMap() , fOffsetMap()
{ {
fNumItems = channels.size(); fNumItems = channels.size();
items = new zmq_pollitem_t[fNumItems]; fItems = new zmq_pollitem_t[fNumItems];
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
items[i].socket = channels.at(i).GetSocket().GetSocket(); fItems[i].socket = channels.at(i).GetSocket().GetSocket();
items[i].fd = 0; fItems[i].fd = 0;
items[i].revents = 0; fItems[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) SetItemEvents(fItems[i], type);
{ }
items[i].events = ZMQ_POLLIN|ZMQ_POLLOUT; }
}
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) FairMQPollerSHM::FairMQPollerSHM(const vector<const FairMQChannel*>& channels)
{ : fItems()
items[i].events = ZMQ_POLLOUT; , fNumItems(0)
} , fOffsetMap()
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) {
{ fNumItems = channels.size();
items[i].events = ZMQ_POLLIN; fItems = new zmq_pollitem_t[fNumItems];
}
else for (int i = 0; i < fNumItems; ++i)
{ {
LOG(ERROR) << "shmem: invalid poller configuration, exiting."; fItems[i].socket = channels.at(i)->GetSocket().GetSocket();
exit(EXIT_FAILURE); fItems[i].fd = 0;
} fItems[i].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt(channels.at(i)->GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type);
} }
} }
FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
: items() : fItems()
, fNumItems(0) , fNumItems(0)
, fOffsetMap() , fOffsetMap()
{ {
@ -74,7 +80,7 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
fNumItems += channelsMap.at(channel).size(); fNumItems += channelsMap.at(channel).size();
} }
items = new zmq_pollitem_t[fNumItems]; fItems = new zmq_pollitem_t[fNumItems];
int index = 0; int index = 0;
for (string channel : channelList) for (string channel : channelList)
@ -83,31 +89,15 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
{ {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
items[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket(); fItems[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket();
items[index].fd = 0; fItems[index].fd = 0;
items[index].revents = 0; fItems[index].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) SetItemEvents(fItems[index], type);
{
items[index].events = ZMQ_POLLIN|ZMQ_POLLOUT;
}
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
{
items[index].events = ZMQ_POLLOUT;
}
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB)
{
items[index].events = ZMQ_POLLIN;
}
else
{
LOG(ERROR) << "shmem: invalid poller configuration, exiting.";
exit(EXIT_FAILURE);
}
} }
} }
} }
@ -120,47 +110,52 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
} }
FairMQPollerSHM::FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) FairMQPollerSHM::FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: items() : fItems()
, fNumItems(2) , fNumItems(2)
, fOffsetMap() , fOffsetMap()
{ {
items = new zmq_pollitem_t[fNumItems]; fItems = new zmq_pollitem_t[fNumItems];
items[0].socket = cmdSocket.GetSocket(); fItems[0].socket = cmdSocket.GetSocket();
items[0].fd = 0; fItems[0].fd = 0;
items[0].events = ZMQ_POLLIN; fItems[0].events = ZMQ_POLLIN;
items[0].revents = 0; fItems[0].revents = 0;
items[1].socket = dataSocket.GetSocket(); fItems[1].socket = dataSocket.GetSocket();
items[1].fd = 0; fItems[1].fd = 0;
items[1].revents = 0; fItems[1].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[1], type);
}
void FairMQPollerSHM::SetItemEvents(zmq_pollitem_t& item, const int type)
{
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
{ {
items[1].events = ZMQ_POLLIN|ZMQ_POLLOUT; item.events = ZMQ_POLLIN|ZMQ_POLLOUT;
} }
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
{ {
items[1].events = ZMQ_POLLOUT; item.events = ZMQ_POLLOUT;
} }
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB)
{ {
items[1].events = ZMQ_POLLIN; item.events = ZMQ_POLLIN;
} }
else else
{ {
LOG(ERROR) << "shmem: invalid poller configuration, exiting."; LOG(ERROR) << "zeromq: invalid poller configuration, exiting.";
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
void FairMQPollerSHM::Poll(const int timeout) void FairMQPollerSHM::Poll(const int timeout)
{ {
if (zmq_poll(items, fNumItems, timeout) < 0) if (zmq_poll(fItems, fNumItems, timeout) < 0)
{ {
if (errno == ETERM) if (errno == ETERM)
{ {
@ -176,7 +171,7 @@ void FairMQPollerSHM::Poll(const int timeout)
bool FairMQPollerSHM::CheckInput(const int index) bool FairMQPollerSHM::CheckInput(const int index)
{ {
if (items[index].revents & ZMQ_POLLIN) if (fItems[index].revents & ZMQ_POLLIN)
{ {
return true; return true;
} }
@ -186,7 +181,7 @@ bool FairMQPollerSHM::CheckInput(const int index)
bool FairMQPollerSHM::CheckOutput(const int index) bool FairMQPollerSHM::CheckOutput(const int index)
{ {
if (items[index].revents & ZMQ_POLLOUT) if (fItems[index].revents & ZMQ_POLLOUT)
{ {
return true; return true;
} }
@ -198,7 +193,7 @@ bool FairMQPollerSHM::CheckInput(const string channelKey, const int index)
{ {
try try
{ {
if (items[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN) if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN)
{ {
return true; return true;
} }
@ -217,7 +212,7 @@ bool FairMQPollerSHM::CheckOutput(const string channelKey, const int index)
{ {
try try
{ {
if (items[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT) if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT)
{ {
return true; return true;
} }
@ -234,5 +229,5 @@ bool FairMQPollerSHM::CheckOutput(const string channelKey, const int index)
FairMQPollerSHM::~FairMQPollerSHM() FairMQPollerSHM::~FairMQPollerSHM()
{ {
delete[] items; delete[] fItems;
} }

View File

@ -27,10 +27,14 @@ class FairMQPollerSHM : public FairMQPoller
public: public:
FairMQPollerSHM(const std::vector<FairMQChannel>& channels); FairMQPollerSHM(const std::vector<FairMQChannel>& channels);
FairMQPollerSHM(const std::vector<const FairMQChannel*>& channels);
FairMQPollerSHM(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList); FairMQPollerSHM(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList);
FairMQPollerSHM(const FairMQPollerSHM&) = delete; FairMQPollerSHM(const FairMQPollerSHM&) = delete;
FairMQPollerSHM operator=(const FairMQPollerSHM&) = delete; FairMQPollerSHM operator=(const FairMQPollerSHM&) = delete;
void SetItemEvents(zmq_pollitem_t& item, const int type);
virtual void Poll(const int timeout); virtual void Poll(const int timeout);
virtual bool CheckInput(const int index); virtual bool CheckInput(const int index);
virtual bool CheckOutput(const int index); virtual bool CheckOutput(const int index);
@ -42,7 +46,7 @@ class FairMQPollerSHM : public FairMQPoller
private: private:
FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
zmq_pollitem_t* items; zmq_pollitem_t* fItems;
int fNumItems; int fNumItems;
std::unordered_map<std::string, int> fOffsetMap; std::unordered_map<std::string, int> fOffsetMap;

View File

@ -157,6 +157,11 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector<FairMQChann
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channels)); return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channels));
} }
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const std::vector<const FairMQChannel*>& channels) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channels));
}
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) const FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) const
{ {
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channelsMap, channelList)); return unique_ptr<FairMQPoller>(new FairMQPollerSHM(channelsMap, channelList));

View File

@ -37,6 +37,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override; FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override; FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override;

View File

@ -20,45 +20,52 @@
using namespace std; using namespace std;
FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQChannel>& channels) FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQChannel>& channels)
: items() : fItems()
, fNumItems(0) , fNumItems(0)
, fOffsetMap() , fOffsetMap()
{ {
fNumItems = channels.size(); fNumItems = channels.size();
items = new zmq_pollitem_t[fNumItems]; fItems = new zmq_pollitem_t[fNumItems];
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
items[i].socket = channels.at(i).GetSocket().GetSocket(); fItems[i].socket = channels.at(i).GetSocket().GetSocket();
items[i].fd = 0; fItems[i].fd = 0;
items[i].revents = 0; fItems[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) SetItemEvents(fItems[i], type);
{ }
items[i].events = ZMQ_POLLIN|ZMQ_POLLOUT; }
}
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
{ FairMQPollerZMQ::FairMQPollerZMQ(const std::vector<const FairMQChannel*>& channels)
items[i].events = ZMQ_POLLOUT; : fItems()
} , fNumItems(0)
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) , fOffsetMap()
{ {
items[i].events = ZMQ_POLLIN; fNumItems = channels.size();
} fItems = new zmq_pollitem_t[fNumItems];
else
{ for (int i = 0; i < fNumItems; ++i)
LOG(ERROR) << "zeromq: invalid poller configuration, exiting."; {
exit(EXIT_FAILURE); fItems[i].socket = channels.at(i)->GetSocket().GetSocket();
} fItems[i].fd = 0;
fItems[i].revents = 0;
int type = 0;
size_t size = sizeof(type);
zmq_getsockopt(channels.at(i)->GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[i], type);
} }
} }
FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList)
: items() : fItems()
, fNumItems(0) , fNumItems(0)
, fOffsetMap() , fOffsetMap()
{ {
@ -74,7 +81,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChanne
fNumItems += channelsMap.at(channel).size(); fNumItems += channelsMap.at(channel).size();
} }
items = new zmq_pollitem_t[fNumItems]; fItems = new zmq_pollitem_t[fNumItems];
int index = 0; int index = 0;
for (string channel : channelList) for (string channel : channelList)
@ -83,31 +90,15 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChanne
{ {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
items[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket(); fItems[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket();
items[index].fd = 0; fItems[index].fd = 0;
items[index].revents = 0; fItems[index].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) SetItemEvents(fItems[index], type);
{
items[index].events = ZMQ_POLLIN|ZMQ_POLLOUT;
}
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
{
items[index].events = ZMQ_POLLOUT;
}
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB)
{
items[index].events = ZMQ_POLLIN;
}
else
{
LOG(ERROR) << "zeromq: invalid poller configuration, exiting.";
exit(EXIT_FAILURE);
}
} }
} }
} }
@ -120,36 +111,41 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChanne
} }
FairMQPollerZMQ::FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) FairMQPollerZMQ::FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket)
: items() : fItems()
, fNumItems(2) , fNumItems(2)
, fOffsetMap() , fOffsetMap()
{ {
items = new zmq_pollitem_t[fNumItems]; fItems = new zmq_pollitem_t[fNumItems];
items[0].socket = cmdSocket.GetSocket(); fItems[0].socket = cmdSocket.GetSocket();
items[0].fd = 0; fItems[0].fd = 0;
items[0].events = ZMQ_POLLIN; fItems[0].events = ZMQ_POLLIN;
items[0].revents = 0; fItems[0].revents = 0;
items[1].socket = dataSocket.GetSocket(); fItems[1].socket = dataSocket.GetSocket();
items[1].fd = 0; fItems[1].fd = 0;
items[1].revents = 0; fItems[1].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size);
SetItemEvents(fItems[1], type);
}
void FairMQPollerZMQ::SetItemEvents(zmq_pollitem_t& item, const int type)
{
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
{ {
items[1].events = ZMQ_POLLIN|ZMQ_POLLOUT; item.events = ZMQ_POLLIN|ZMQ_POLLOUT;
} }
else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB)
{ {
items[1].events = ZMQ_POLLOUT; item.events = ZMQ_POLLOUT;
} }
else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB)
{ {
items[1].events = ZMQ_POLLIN; item.events = ZMQ_POLLIN;
} }
else else
{ {
@ -160,7 +156,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSock
void FairMQPollerZMQ::Poll(const int timeout) void FairMQPollerZMQ::Poll(const int timeout)
{ {
if (zmq_poll(items, fNumItems, timeout) < 0) if (zmq_poll(fItems, fNumItems, timeout) < 0)
{ {
if (errno == ETERM) if (errno == ETERM)
{ {
@ -176,7 +172,7 @@ void FairMQPollerZMQ::Poll(const int timeout)
bool FairMQPollerZMQ::CheckInput(const int index) bool FairMQPollerZMQ::CheckInput(const int index)
{ {
if (items[index].revents & ZMQ_POLLIN) if (fItems[index].revents & ZMQ_POLLIN)
{ {
return true; return true;
} }
@ -186,7 +182,7 @@ bool FairMQPollerZMQ::CheckInput(const int index)
bool FairMQPollerZMQ::CheckOutput(const int index) bool FairMQPollerZMQ::CheckOutput(const int index)
{ {
if (items[index].revents & ZMQ_POLLOUT) if (fItems[index].revents & ZMQ_POLLOUT)
{ {
return true; return true;
} }
@ -198,7 +194,7 @@ bool FairMQPollerZMQ::CheckInput(const string channelKey, const int index)
{ {
try try
{ {
if (items[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN) if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN)
{ {
return true; return true;
} }
@ -217,7 +213,7 @@ bool FairMQPollerZMQ::CheckOutput(const string channelKey, const int index)
{ {
try try
{ {
if (items[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT) if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT)
{ {
return true; return true;
} }
@ -234,5 +230,5 @@ bool FairMQPollerZMQ::CheckOutput(const string channelKey, const int index)
FairMQPollerZMQ::~FairMQPollerZMQ() FairMQPollerZMQ::~FairMQPollerZMQ()
{ {
delete[] items; delete[] fItems;
} }

View File

@ -34,10 +34,14 @@ class FairMQPollerZMQ : public FairMQPoller
public: public:
FairMQPollerZMQ(const std::vector<FairMQChannel>& channels); FairMQPollerZMQ(const std::vector<FairMQChannel>& channels);
FairMQPollerZMQ(const std::vector<const FairMQChannel*>& channels);
FairMQPollerZMQ(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList); FairMQPollerZMQ(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList);
FairMQPollerZMQ(const FairMQPollerZMQ&) = delete; FairMQPollerZMQ(const FairMQPollerZMQ&) = delete;
FairMQPollerZMQ operator=(const FairMQPollerZMQ&) = delete; FairMQPollerZMQ operator=(const FairMQPollerZMQ&) = delete;
void SetItemEvents(zmq_pollitem_t& item, const int type);
virtual void Poll(const int timeout); virtual void Poll(const int timeout);
virtual bool CheckInput(const int index); virtual bool CheckInput(const int index);
virtual bool CheckOutput(const int index); virtual bool CheckOutput(const int index);
@ -49,7 +53,7 @@ class FairMQPollerZMQ : public FairMQPoller
private: private:
FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket);
zmq_pollitem_t* items; zmq_pollitem_t* fItems;
int fNumItems; int fNumItems;
std::unordered_map<std::string, int> fOffsetMap; std::unordered_map<std::string, int> fOffsetMap;

View File

@ -78,6 +78,11 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChann
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels)); return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
} }
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const std::vector<const FairMQChannel*>& channels) const
{
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channels));
}
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) const FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map<string, vector<FairMQChannel>>& channelsMap, const vector<string>& channelList) const
{ {
return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList)); return unique_ptr<FairMQPoller>(new FairMQPollerZMQ(channelsMap, channelList));

View File

@ -38,6 +38,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override; FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
FairMQPollerPtr CreatePoller(const std::vector<const FairMQChannel*>& channels) const override;
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override; FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override;