mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
Delete special member functions where they are not used. (as part of applying suggestions from cppcoreguidelines-special-member-functions) These classes don't need to be copyable/movable: # copy/move not used: zmq:: TransportFactory, Socket, Message, UnmanagedRegion, Poller, Context shm:: TransportFactory, Socket, Message, UnmanagedRegion, Poller ofi:: TransportFactory, Socket, Message, Context shm:: ZMsg, Region, Monitor, TerminalConfig, Manager plugins:: Config, Control, TerminalConfig fairmq::StateQueue, StateMachine, ProgOptions, PluginServices, PluginManager, Plugin, Device, StateSubscription TestData, BadDevice, TestDevice (test suite heplers) # used via ptr interface: fairmq::UnmanagedRegion, TransportFactory, Socket, Poller, Message These classes need to be movable/copyable: MyClass (test suite helper), fairmq::Channel, fairmq::Parts
202 lines
7.1 KiB
C++
202 lines
7.1 KiB
C++
/********************************************************************************
|
|
* Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
|
* *
|
|
* This software is distributed under the terms of the *
|
|
* GNU Lesser General Public Licence (LGPL) version 3, *
|
|
* copied verbatim in the file "LICENSE" *
|
|
********************************************************************************/
|
|
|
|
#ifndef FAIR_MQ_SHMEM_POLLER_H_
|
|
#define FAIR_MQ_SHMEM_POLLER_H_
|
|
|
|
#include <fairlogger/Logger.h>
|
|
#include <fairmq/Channel.h>
|
|
#include <fairmq/Poller.h>
|
|
#include <fairmq/shmem/Socket.h>
|
|
#include <fairmq/tools/Strings.h>
|
|
#include <unordered_map>
|
|
#include <vector>
|
|
#include <zmq.h>
|
|
|
|
namespace fair::mq::shmem
|
|
{
|
|
|
|
class Poller final : public fair::mq::Poller
|
|
{
|
|
public:
|
|
Poller(const std::vector<Channel>& channels)
|
|
: fItems()
|
|
, fNumItems(0)
|
|
{
|
|
fNumItems = channels.size();
|
|
fItems = new zmq_pollitem_t[fNumItems];
|
|
|
|
for (int i = 0; i < fNumItems; ++i) {
|
|
fItems[i].socket = static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket();
|
|
fItems[i].fd = 0;
|
|
fItems[i].revents = 0;
|
|
|
|
int type = 0;
|
|
size_t size = sizeof(type);
|
|
zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
|
|
|
|
SetItemEvents(fItems[i], type);
|
|
}
|
|
}
|
|
|
|
Poller(const std::vector<Channel*>& channels)
|
|
: fItems()
|
|
, fNumItems(0)
|
|
{
|
|
fNumItems = channels.size();
|
|
fItems = new zmq_pollitem_t[fNumItems];
|
|
|
|
for (int i = 0; i < fNumItems; ++i) {
|
|
fItems[i].socket = static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket();
|
|
fItems[i].fd = 0;
|
|
fItems[i].revents = 0;
|
|
|
|
int type = 0;
|
|
size_t size = sizeof(type);
|
|
zmq_getsockopt(static_cast<const Socket*>(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
|
|
|
|
SetItemEvents(fItems[i], type);
|
|
}
|
|
}
|
|
|
|
Poller(const std::unordered_map<std::string, std::vector<Channel>>& channelsMap, const std::vector<std::string>& channelList)
|
|
: fItems()
|
|
, fNumItems(0)
|
|
{
|
|
try {
|
|
int offset = 0;
|
|
// calculate offsets and the total size of the poll item set
|
|
for (std::string channel : channelList) {
|
|
fOffsetMap[channel] = offset;
|
|
offset += channelsMap.at(channel).size();
|
|
fNumItems += channelsMap.at(channel).size();
|
|
}
|
|
|
|
fItems = new zmq_pollitem_t[fNumItems];
|
|
|
|
int index = 0;
|
|
for (std::string channel : channelList) {
|
|
for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) {
|
|
index = fOffsetMap[channel] + i;
|
|
|
|
fItems[index].socket = static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket();
|
|
fItems[index].fd = 0;
|
|
fItems[index].revents = 0;
|
|
|
|
int type = 0;
|
|
size_t size = sizeof(type);
|
|
zmq_getsockopt(static_cast<const Socket*>(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size);
|
|
|
|
SetItemEvents(fItems[index], type);
|
|
}
|
|
}
|
|
} catch (const std::out_of_range& oor) {
|
|
LOG(error) << "At least one of the provided channel keys for poller initialization is invalid." << " Out of range error: " << oor.what();
|
|
throw fair::mq::PollerError(fair::mq::tools::ToString("At least one of the provided channel keys for poller initialization is invalid. ", "Out of range error: ", oor.what()));
|
|
}
|
|
}
|
|
|
|
Poller(const Poller&) = delete;
|
|
Poller(Poller&&) = delete;
|
|
Poller& operator=(const Poller&) = delete;
|
|
Poller& operator=(Poller&&) = delete;
|
|
|
|
void SetItemEvents(zmq_pollitem_t& item, int type)
|
|
{
|
|
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) {
|
|
item.events = ZMQ_POLLIN | ZMQ_POLLOUT;
|
|
} else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) {
|
|
item.events = ZMQ_POLLOUT;
|
|
} else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) {
|
|
item.events = ZMQ_POLLIN;
|
|
} else {
|
|
LOG(error) << "invalid poller configuration, exiting.";
|
|
throw fair::mq::PollerError("Invalid poller configuration, exiting.");
|
|
}
|
|
}
|
|
|
|
void Poll(int timeout) override
|
|
{
|
|
while (true) {
|
|
if (zmq_poll(fItems, fNumItems, timeout) < 0) {
|
|
if (errno == ETERM) {
|
|
LOG(debug) << "polling exited, reason: " << zmq_strerror(errno);
|
|
return;
|
|
} else if (errno == EINTR) {
|
|
LOG(debug) << "polling interrupted by system call";
|
|
continue;
|
|
} else {
|
|
LOG(error) << "polling failed, reason: " << zmq_strerror(errno);
|
|
throw fair::mq::PollerError(fair::mq::tools::ToString("Polling failed, reason: ", zmq_strerror(errno)));
|
|
}
|
|
}
|
|
break;
|
|
}
|
|
}
|
|
|
|
bool CheckInput(int index) override
|
|
{
|
|
if (fItems[index].revents & ZMQ_POLLIN) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
bool CheckOutput(int index) override
|
|
{
|
|
if (fItems[index].revents & ZMQ_POLLOUT) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
bool CheckInput(const std::string& channelKey, int index) override
|
|
{
|
|
try {
|
|
if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
} catch (const std::out_of_range& oor) {
|
|
LOG(error) << "invalid channel key: '" << channelKey << "'";
|
|
LOG(error) << "out of range error: " << oor.what();
|
|
throw fair::mq::PollerError(fair::mq::tools::ToString("Invalid channel key '", channelKey, "'. Out of range error: ", oor.what()));
|
|
}
|
|
}
|
|
|
|
bool CheckOutput(const std::string& channelKey, int index) override
|
|
{
|
|
try {
|
|
if (fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
} catch (const std::out_of_range& oor) {
|
|
LOG(error) << "invalid channel key: '" << channelKey << "'";
|
|
LOG(error) << "out of range error: " << oor.what();
|
|
throw fair::mq::PollerError(fair::mq::tools::ToString("Invalid channel key '", channelKey, "'. Out of range error: ", oor.what()));
|
|
}
|
|
}
|
|
|
|
~Poller() override { delete[] fItems; }
|
|
|
|
private:
|
|
zmq_pollitem_t* fItems;
|
|
int fNumItems;
|
|
|
|
std::unordered_map<std::string, int> fOffsetMap;
|
|
};
|
|
|
|
} // namespace fair::mq::shmem
|
|
|
|
#endif /* FAIR_MQ_SHMEM_POLLER_H_ */
|