diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 24bf12f9..4c1a6ebe 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -1227,6 +1227,12 @@ bool FairMQDevice::Terminated() return fTerminationRequested; } +const FairMQChannel& FairMQDevice::GetChannel(const std::string& channelName, const int index) const +{ + return fChannels.at(channelName).at(index); +} + + void FairMQDevice::Exit() { // ask transports to terminate transfers diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 332d8496..d6354386 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -237,6 +237,49 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable return fChannels.at(channel).at(index).NewSimpleMessage(data); } + template + FairMQPollerPtr NewPoller(const Ts&... inputs) + { + std::vector chans{inputs...}; + + // if more than one channel provided, check compatibility + if (chans.size() > 1) + { + FairMQ::Transport type = fChannels.at(chans.at(0)).at(0).Transport()->GetType(); + + for (int i = 1; i < chans.size(); ++i) + { + if (type != fChannels.at(chans.at(i)).at(0).Transport()->GetType()) + { + LOG(ERROR) << "FairMQDevice::NewPoller() failed: different transports within same poller are not yet supported. Going to ERROR state."; + ChangeState(ERROR_FOUND); + } + } + } + + return fChannels.at(chans.at(0)).at(0).Transport()->CreatePoller(fChannels, chans); + } + + FairMQPollerPtr NewPoller(const std::vector& channels) + { + // if more than one channel provided, check compatibility + if (channels.size() > 1) + { + FairMQ::Transport type = channels.at(0)->Transport()->GetType(); + + for (int i = 1; i < channels.size(); ++i) + { + if (type != channels.at(i)->Transport()->GetType()) + { + LOG(ERROR) << "FairMQDevice::NewPoller() failed: different transports within same poller are not yet supported. Going to ERROR state."; + ChangeState(ERROR_FOUND); + } + } + } + + return channels.at(0)->Transport()->CreatePoller(channels); + } + /// Waits for the first initialization run to finish void WaitForInitialValidation(); @@ -337,6 +380,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable bool Terminated(); + const FairMQChannel& GetChannel(const std::string& channelName, const int index) const; + protected: std::shared_ptr fTransportFactory; ///< Transport factory std::unordered_map> fTransports; ///< Container for transports diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 29cf91ed..6489241d 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -56,9 +56,11 @@ class FairMQTransportFactory /// Create a socket virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const = 0; - /// Create a poller for all device channels + /// Create a poller for a single channel (all subchannels) virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const = 0; - /// Create a poller for all device channels + /// Create a poller for specific channels + virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const = 0; + /// Create a poller for specific channels (all subchannels) virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const = 0; /// Create a poller for two sockets virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const = 0; diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index 1b047b0d..fbae376c 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -41,7 +41,14 @@ void FairMQMerger::Run() { int numInputs = fChannels.at(fInChannelName).size(); - std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels.at(fInChannelName))); + vector chans; + + for (auto& chan : fChannels.at(fInChannelName)) + { + chans.push_back(&chan); + } + + FairMQPollerPtr poller(NewPoller(chans)); if (fMultipart) { diff --git a/fairmq/nanomsg/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx index 413d9b5a..c20a7986 100644 --- a/fairmq/nanomsg/FairMQPollerNN.cxx +++ b/fairmq/nanomsg/FairMQPollerNN.cxx @@ -24,43 +24,47 @@ using namespace std; FairMQPollerNN::FairMQPollerNN(const vector& channels) - : items() + : fItems() , fNumItems(0) , fOffsetMap() { fNumItems = channels.size(); - items = new nn_pollfd[fNumItems]; + fItems = new nn_pollfd[fNumItems]; for (int i = 0; i < fNumItems; ++i) { - items[i].fd = channels.at(i).GetSocket().GetSocket(1); + fItems[i].fd = channels.at(i).GetSocket().GetSocket(1); int type = 0; size_t sz = sizeof(type); nn_getsockopt(channels.at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); - if (type == NN_REQ || type == NN_REP || type == NN_PAIR) - { - items[i].events = NN_POLLIN|NN_POLLOUT; - } - else if (type == NN_PUSH || type == NN_PUB) - { - items[i].events = NN_POLLOUT; - } - else if (type == NN_PULL || type == NN_SUB) - { - items[i].events = NN_POLLIN; - } - else - { - LOG(ERROR) << "nanomsg: invalid poller configuration, exiting."; - exit(EXIT_FAILURE); - } + SetItemEvents(fItems[i], type); + } +} + +FairMQPollerNN::FairMQPollerNN(const vector& channels) + : fItems() + , fNumItems(0) + , fOffsetMap() +{ + fNumItems = channels.size(); + fItems = new nn_pollfd[fNumItems]; + + for (int i = 0; i < fNumItems; ++i) + { + fItems[i].fd = channels.at(i)->GetSocket().GetSocket(1); + + int type = 0; + size_t sz = sizeof(type); + nn_getsockopt(channels.at(i)->GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); + + SetItemEvents(fItems[i], type); } } FairMQPollerNN::FairMQPollerNN(const unordered_map>& channelsMap, const vector& channelList) - : items() + : fItems() , fNumItems(0) , fOffsetMap() { @@ -76,7 +80,7 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map fNumItems += channelsMap.at(channel).size(); } - items = new nn_pollfd[fNumItems]; + fItems = new nn_pollfd[fNumItems]; int index = 0; for (string channel : channelList) @@ -84,29 +88,13 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) { index = fOffsetMap[channel] + i; - items[index].fd = channelsMap.at(channel).at(i).GetSocket().GetSocket(1); + fItems[index].fd = channelsMap.at(channel).at(i).GetSocket().GetSocket(1); int type = 0; size_t sz = sizeof(type); nn_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); - if (type == NN_REQ || type == NN_REP || type == NN_PAIR) - { - items[index].events = NN_POLLIN|NN_POLLOUT; - } - else if (type == NN_PUSH || type == NN_PUB) - { - items[index].events = NN_POLLOUT; - } - else if (type == NN_PULL || type == NN_SUB) - { - items[index].events = NN_POLLIN; - } - else - { - LOG(ERROR) << "nanomsg: invalid poller configuration, exiting."; - exit(EXIT_FAILURE); - } + SetItemEvents(fItems[index], type); } } } @@ -119,34 +107,39 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map } FairMQPollerNN::FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) - : items() + : fItems() , fNumItems(2) , fOffsetMap() { - items = new nn_pollfd[fNumItems]; + fItems = new nn_pollfd[fNumItems]; - items[0].fd = cmdSocket.GetSocket(1); - items[0].events = NN_POLLIN; - items[0].revents = 0; + fItems[0].fd = cmdSocket.GetSocket(1); + fItems[0].events = NN_POLLIN; + fItems[0].revents = 0; - items[1].fd = dataSocket.GetSocket(1); - items[1].revents = 0; + fItems[1].fd = dataSocket.GetSocket(1); + fItems[1].revents = 0; int type = 0; size_t sz = sizeof(type); nn_getsockopt(dataSocket.GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); + SetItemEvents(fItems[1], type); +} + +void FairMQPollerNN::SetItemEvents(nn_pollfd& item, const int type) +{ if (type == NN_REQ || type == NN_REP || type == NN_PAIR) { - items[1].events = NN_POLLIN|NN_POLLOUT; + item.events = NN_POLLIN|NN_POLLOUT; } else if (type == NN_PUSH || type == NN_PUB) { - items[1].events = NN_POLLOUT; + item.events = NN_POLLOUT; } else if (type == NN_PULL || type == NN_SUB) { - items[1].events = NN_POLLIN; + item.events = NN_POLLIN; } else { @@ -157,7 +150,7 @@ FairMQPollerNN::FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket void FairMQPollerNN::Poll(const int timeout) { - if (nn_poll(items, fNumItems, timeout) < 0) + if (nn_poll(fItems, fNumItems, timeout) < 0) { if (errno == ETERM) { @@ -173,7 +166,7 @@ void FairMQPollerNN::Poll(const int timeout) bool FairMQPollerNN::CheckInput(const int index) { - if (items[index].revents & (NN_POLLIN | NN_POLLOUT)) + if (fItems[index].revents & (NN_POLLIN | NN_POLLOUT)) { return true; } @@ -183,7 +176,7 @@ bool FairMQPollerNN::CheckInput(const int index) bool FairMQPollerNN::CheckOutput(const int index) { - if (items[index].revents & NN_POLLOUT) + if (fItems[index].revents & NN_POLLOUT) { return true; } @@ -195,7 +188,7 @@ bool FairMQPollerNN::CheckInput(const string channelKey, const int index) { try { - if (items[fOffsetMap.at(channelKey) + index].revents & (NN_POLLIN | NN_POLLOUT)) + if (fItems[fOffsetMap.at(channelKey) + index].revents & (NN_POLLIN | NN_POLLOUT)) { return true; } @@ -214,7 +207,7 @@ bool FairMQPollerNN::CheckOutput(const string channelKey, const int index) { try { - if (items[fOffsetMap.at(channelKey) + index].revents & NN_POLLOUT) + if (fItems[fOffsetMap.at(channelKey) + index].revents & NN_POLLOUT) { return true; } @@ -231,5 +224,5 @@ bool FairMQPollerNN::CheckOutput(const string channelKey, const int index) FairMQPollerNN::~FairMQPollerNN() { - delete[] items; + delete[] fItems; } diff --git a/fairmq/nanomsg/FairMQPollerNN.h b/fairmq/nanomsg/FairMQPollerNN.h index d68eece0..94f252df 100644 --- a/fairmq/nanomsg/FairMQPollerNN.h +++ b/fairmq/nanomsg/FairMQPollerNN.h @@ -33,10 +33,14 @@ class FairMQPollerNN : public FairMQPoller public: FairMQPollerNN(const std::vector& channels); + FairMQPollerNN(const std::vector& channels); FairMQPollerNN(const std::unordered_map>& channelsMap, const std::vector& channelList); + FairMQPollerNN(const FairMQPollerNN&) = delete; FairMQPollerNN operator=(const FairMQPollerNN&) = delete; + void SetItemEvents(nn_pollfd& item, const int type); + virtual void Poll(const int timeout); virtual bool CheckInput(const int index); virtual bool CheckOutput(const int index); @@ -48,7 +52,7 @@ class FairMQPollerNN : public FairMQPoller private: FairMQPollerNN(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); - nn_pollfd* items; + nn_pollfd* fItems; int fNumItems; std::unordered_map fOffsetMap; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 7cbb8e4a..0472a303 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -51,6 +51,11 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector(new FairMQPollerNN(channels)); } +FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const std::vector& channels) const +{ + return unique_ptr(new FairMQPollerNN(channels)); +} + FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const { return unique_ptr(new FairMQPollerNN(channelsMap, channelList)); diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index f7e44e99..c4179781 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -32,6 +32,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; FairMQPollerPtr CreatePoller(const std::vector& channels) const override; + FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; diff --git a/fairmq/shmem/FairMQPollerSHM.cxx b/fairmq/shmem/FairMQPollerSHM.cxx index 376b173d..a931f02c 100644 --- a/fairmq/shmem/FairMQPollerSHM.cxx +++ b/fairmq/shmem/FairMQPollerSHM.cxx @@ -20,45 +20,51 @@ using namespace std; FairMQPollerSHM::FairMQPollerSHM(const vector& channels) - : items() + : fItems() , fNumItems(0) , fOffsetMap() { fNumItems = channels.size(); - items = new zmq_pollitem_t[fNumItems]; + fItems = new zmq_pollitem_t[fNumItems]; for (int i = 0; i < fNumItems; ++i) { - items[i].socket = channels.at(i).GetSocket().GetSocket(); - items[i].fd = 0; - items[i].revents = 0; + fItems[i].socket = channels.at(i).GetSocket().GetSocket(); + fItems[i].fd = 0; + fItems[i].revents = 0; int type = 0; size_t size = sizeof(type); zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); - if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) - { - items[i].events = ZMQ_POLLIN|ZMQ_POLLOUT; - } - else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) - { - items[i].events = ZMQ_POLLOUT; - } - else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) - { - items[i].events = ZMQ_POLLIN; - } - else - { - LOG(ERROR) << "shmem: invalid poller configuration, exiting."; - exit(EXIT_FAILURE); - } + SetItemEvents(fItems[i], type); + } +} + +FairMQPollerSHM::FairMQPollerSHM(const vector& channels) + : fItems() + , fNumItems(0) + , fOffsetMap() +{ + fNumItems = channels.size(); + fItems = new zmq_pollitem_t[fNumItems]; + + for (int i = 0; i < fNumItems; ++i) + { + fItems[i].socket = channels.at(i)->GetSocket().GetSocket(); + fItems[i].fd = 0; + fItems[i].revents = 0; + + int type = 0; + size_t size = sizeof(type); + zmq_getsockopt(channels.at(i)->GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); + + SetItemEvents(fItems[i], type); } } FairMQPollerSHM::FairMQPollerSHM(const unordered_map>& channelsMap, const vector& channelList) - : items() + : fItems() , fNumItems(0) , fOffsetMap() { @@ -74,7 +80,7 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map& channels); + FairMQPollerSHM(const std::vector& channels); FairMQPollerSHM(const std::unordered_map>& channelsMap, const std::vector& channelList); + FairMQPollerSHM(const FairMQPollerSHM&) = delete; FairMQPollerSHM operator=(const FairMQPollerSHM&) = delete; + void SetItemEvents(zmq_pollitem_t& item, const int type); + virtual void Poll(const int timeout); virtual bool CheckInput(const int index); virtual bool CheckOutput(const int index); @@ -42,7 +46,7 @@ class FairMQPollerSHM : public FairMQPoller private: FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); - zmq_pollitem_t* items; + zmq_pollitem_t* fItems; int fNumItems; std::unordered_map fOffsetMap; diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index c70909c9..e2a1db1c 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -157,6 +157,11 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector(new FairMQPollerSHM(channels)); } +FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const std::vector& channels) const +{ + return unique_ptr(new FairMQPollerSHM(channels)); +} + FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const { return unique_ptr(new FairMQPollerSHM(channelsMap, channelList)); diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 0d18f532..6089a834 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -37,6 +37,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; FairMQPollerPtr CreatePoller(const std::vector& channels) const override; + FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; diff --git a/fairmq/zeromq/FairMQPollerZMQ.cxx b/fairmq/zeromq/FairMQPollerZMQ.cxx index 4cbafdf2..40330938 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.cxx +++ b/fairmq/zeromq/FairMQPollerZMQ.cxx @@ -20,45 +20,52 @@ using namespace std; FairMQPollerZMQ::FairMQPollerZMQ(const vector& channels) - : items() + : fItems() , fNumItems(0) , fOffsetMap() { fNumItems = channels.size(); - items = new zmq_pollitem_t[fNumItems]; + fItems = new zmq_pollitem_t[fNumItems]; for (int i = 0; i < fNumItems; ++i) { - items[i].socket = channels.at(i).GetSocket().GetSocket(); - items[i].fd = 0; - items[i].revents = 0; + fItems[i].socket = channels.at(i).GetSocket().GetSocket(); + fItems[i].fd = 0; + fItems[i].revents = 0; int type = 0; size_t size = sizeof(type); zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); - if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) - { - items[i].events = ZMQ_POLLIN|ZMQ_POLLOUT; - } - else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) - { - items[i].events = ZMQ_POLLOUT; - } - else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) - { - items[i].events = ZMQ_POLLIN; - } - else - { - LOG(ERROR) << "zeromq: invalid poller configuration, exiting."; - exit(EXIT_FAILURE); - } + SetItemEvents(fItems[i], type); + } +} + + +FairMQPollerZMQ::FairMQPollerZMQ(const std::vector& channels) + : fItems() + , fNumItems(0) + , fOffsetMap() +{ + fNumItems = channels.size(); + fItems = new zmq_pollitem_t[fNumItems]; + + for (int i = 0; i < fNumItems; ++i) + { + fItems[i].socket = channels.at(i)->GetSocket().GetSocket(); + fItems[i].fd = 0; + fItems[i].revents = 0; + + int type = 0; + size_t size = sizeof(type); + zmq_getsockopt(channels.at(i)->GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); + + SetItemEvents(fItems[i], type); } } FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map>& channelsMap, const vector& channelList) - : items() + : fItems() , fNumItems(0) , fOffsetMap() { @@ -74,7 +81,7 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map& channels); + FairMQPollerZMQ(const std::vector& channels); FairMQPollerZMQ(const std::unordered_map>& channelsMap, const std::vector& channelList); + FairMQPollerZMQ(const FairMQPollerZMQ&) = delete; FairMQPollerZMQ operator=(const FairMQPollerZMQ&) = delete; + void SetItemEvents(zmq_pollitem_t& item, const int type); + virtual void Poll(const int timeout); virtual bool CheckInput(const int index); virtual bool CheckOutput(const int index); @@ -49,7 +53,7 @@ class FairMQPollerZMQ : public FairMQPoller private: FairMQPollerZMQ(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); - zmq_pollitem_t* items; + zmq_pollitem_t* fItems; int fNumItems; std::unordered_map fOffsetMap; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 3acbf9ad..c0b2176f 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -78,6 +78,11 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector(new FairMQPollerZMQ(channels)); } +FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const std::vector& channels) const +{ + return unique_ptr(new FairMQPollerZMQ(channels)); +} + FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const { return unique_ptr(new FairMQPollerZMQ(channelsMap, channelList)); diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index a6a84876..892ca491 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -38,6 +38,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; FairMQPollerPtr CreatePoller(const std::vector& channels) const override; + FairMQPollerPtr CreatePoller(const std::vector& channels) const override; FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override;