FairMQChannel: API to access socket metrics; make fSocket private.

This commit is contained in:
Alexey Rybalchenko 2017-03-07 10:52:27 +01:00 committed by Mohammad Al-Turany
parent 2a72d58766
commit 2293c5e417
5 changed files with 48 additions and 14 deletions

View File

@ -117,6 +117,12 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
return *this; return *this;
} }
FairMQSocket const & FairMQChannel::GetSocket() const
{
assert(fSocket);
return *fSocket;
}
string FairMQChannel::GetChannelName() const string FairMQChannel::GetChannelName() const
{ {
return fName; return fName;
@ -758,6 +764,27 @@ void FairMQChannel::Tokenize(vector<string>& output, const string& input, const
boost::algorithm::split(output, input, boost::algorithm::is_any_of(delimiters)); boost::algorithm::split(output, input, boost::algorithm::is_any_of(delimiters));
} }
unsigned long FairMQChannel::GetBytesTx() const
{
return fSocket->GetBytesTx();
}
unsigned long FairMQChannel::GetBytesRx() const
{
return fSocket->GetBytesRx();
}
unsigned long FairMQChannel::GetMessagesTx() const
{
return fSocket->GetMessagesTx();
}
unsigned long FairMQChannel::GetMessagesRx() const
{
return fSocket->GetMessagesRx();
}
FairMQTransportFactory* FairMQChannel::Transport() FairMQTransportFactory* FairMQChannel::Transport()
{ {
return fTransportFactory.get(); return fTransportFactory.get();

View File

@ -53,6 +53,8 @@ class FairMQChannel
/// Default destructor /// Default destructor
virtual ~FairMQChannel(); virtual ~FairMQChannel();
FairMQSocket const & GetSocket() const;
/// Get channel name /// Get channel name
/// @return Returns full channel name (e.g. "data[0]") /// @return Returns full channel name (e.g. "data[0]")
std::string GetChannelName() const; std::string GetChannelName() const;
@ -144,8 +146,6 @@ class FairMQChannel
/// Resets the channel (requires validation to be used again). /// Resets the channel (requires validation to be used again).
void ResetChannel(); void ResetChannel();
std::unique_ptr<FairMQSocket> fSocket;
int Send(std::unique_ptr<FairMQMessage>& msg) const; int Send(std::unique_ptr<FairMQMessage>& msg) const;
int Receive(std::unique_ptr<FairMQMessage>& msg) const; int Receive(std::unique_ptr<FairMQMessage>& msg) const;
@ -221,9 +221,16 @@ class FairMQChannel
// TODO: this might go to some base utility library // TODO: this might go to some base utility library
static void Tokenize(std::vector<std::string>& output, const std::string& input, const std::string delimiters = ","); static void Tokenize(std::vector<std::string>& output, const std::string& input, const std::string delimiters = ",");
unsigned long GetBytesTx() const;
unsigned long GetBytesRx() const;
unsigned long GetMessagesTx() const;
unsigned long GetMessagesRx() const;
FairMQTransportFactory* Transport(); FairMQTransportFactory* Transport();
private: private:
std::unique_ptr<FairMQSocket> fSocket;
std::string fType; std::string fType;
std::string fMethod; std::string fMethod;
std::string fAddress; std::string fAddress;

View File

@ -33,11 +33,11 @@ FairMQPollerNN::FairMQPollerNN(const vector<FairMQChannel>& channels)
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
items[i].fd = channels.at(i).fSocket->GetSocket(1); items[i].fd = channels.at(i).GetSocket().GetSocket(1);
int type = 0; int type = 0;
size_t sz = sizeof(type); size_t sz = sizeof(type);
nn_getsockopt(channels.at(i).fSocket->GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); nn_getsockopt(channels.at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
if (type == NN_REQ || type == NN_REP || type == NN_PAIR) if (type == NN_REQ || type == NN_REP || type == NN_PAIR)
{ {
@ -84,11 +84,11 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map<string, vector<FairMQChannel>
for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i)
{ {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
items[index].fd = channelsMap.at(channel).at(i).fSocket->GetSocket(1); items[index].fd = channelsMap.at(channel).at(i).GetSocket().GetSocket(1);
int type = 0; int type = 0;
size_t sz = sizeof(type); size_t sz = sizeof(type);
nn_getsockopt(channelsMap.at(channel).at(i).fSocket->GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); nn_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz);
if (type == NN_REQ || type == NN_REP || type == NN_PAIR) if (type == NN_REQ || type == NN_REP || type == NN_PAIR)
{ {

View File

@ -29,13 +29,13 @@ FairMQPollerSHM::FairMQPollerSHM(const vector<FairMQChannel>& channels)
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
items[i].socket = channels.at(i).fSocket->GetSocket(); items[i].socket = channels.at(i).GetSocket().GetSocket();
items[i].fd = 0; items[i].fd = 0;
items[i].revents = 0; items[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt (channels.at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
{ {
@ -83,13 +83,13 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_map<string, vector<FairMQChanne
{ {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
items[index].socket = channelsMap.at(channel).at(i).fSocket->GetSocket(); items[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket();
items[index].fd = 0; items[index].fd = 0;
items[index].revents = 0; items[index].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt (channelsMap.at(channel).at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
{ {

View File

@ -29,13 +29,13 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector<FairMQChannel>& channels)
for (int i = 0; i < fNumItems; ++i) for (int i = 0; i < fNumItems; ++i)
{ {
items[i].socket = channels.at(i).fSocket->GetSocket(); items[i].socket = channels.at(i).GetSocket().GetSocket();
items[i].fd = 0; items[i].fd = 0;
items[i].revents = 0; items[i].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt (channels.at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
{ {
@ -83,13 +83,13 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_map<string, vector<FairMQChanne
{ {
index = fOffsetMap[channel] + i; index = fOffsetMap[channel] + i;
items[index].socket = channelsMap.at(channel).at(i).fSocket->GetSocket(); items[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket();
items[index].fd = 0; items[index].fd = 0;
items[index].revents = 0; items[index].revents = 0;
int type = 0; int type = 0;
size_t size = sizeof(type); size_t size = sizeof(type);
zmq_getsockopt (channelsMap.at(channel).at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size);
if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER)
{ {