diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index b4bedd2b..1056f749 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -117,6 +117,12 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) return *this; } +FairMQSocket const & FairMQChannel::GetSocket() const +{ + assert(fSocket); + return *fSocket; +} + string FairMQChannel::GetChannelName() const { return fName; @@ -758,6 +764,27 @@ void FairMQChannel::Tokenize(vector& output, const string& input, const boost::algorithm::split(output, input, boost::algorithm::is_any_of(delimiters)); } +unsigned long FairMQChannel::GetBytesTx() const +{ + return fSocket->GetBytesTx(); +} + +unsigned long FairMQChannel::GetBytesRx() const +{ + return fSocket->GetBytesRx(); +} + +unsigned long FairMQChannel::GetMessagesTx() const +{ + return fSocket->GetMessagesTx(); +} + +unsigned long FairMQChannel::GetMessagesRx() const +{ + return fSocket->GetMessagesRx(); +} + + FairMQTransportFactory* FairMQChannel::Transport() { return fTransportFactory.get(); diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index a6c1f02a..26b2c7ab 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -53,6 +53,8 @@ class FairMQChannel /// Default destructor virtual ~FairMQChannel(); + FairMQSocket const & GetSocket() const; + /// Get channel name /// @return Returns full channel name (e.g. "data[0]") std::string GetChannelName() const; @@ -144,8 +146,6 @@ class FairMQChannel /// Resets the channel (requires validation to be used again). void ResetChannel(); - std::unique_ptr fSocket; - int Send(std::unique_ptr& msg) const; int Receive(std::unique_ptr& msg) const; @@ -221,9 +221,16 @@ class FairMQChannel // TODO: this might go to some base utility library static void Tokenize(std::vector& output, const std::string& input, const std::string delimiters = ","); + unsigned long GetBytesTx() const; + unsigned long GetBytesRx() const; + unsigned long GetMessagesTx() const; + unsigned long GetMessagesRx() const; + FairMQTransportFactory* Transport(); private: + std::unique_ptr fSocket; + std::string fType; std::string fMethod; std::string fAddress; diff --git a/fairmq/nanomsg/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx index 30383b93..413d9b5a 100644 --- a/fairmq/nanomsg/FairMQPollerNN.cxx +++ b/fairmq/nanomsg/FairMQPollerNN.cxx @@ -33,11 +33,11 @@ FairMQPollerNN::FairMQPollerNN(const vector& channels) for (int i = 0; i < fNumItems; ++i) { - items[i].fd = channels.at(i).fSocket->GetSocket(1); + items[i].fd = channels.at(i).GetSocket().GetSocket(1); int type = 0; size_t sz = sizeof(type); - nn_getsockopt(channels.at(i).fSocket->GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); + nn_getsockopt(channels.at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); if (type == NN_REQ || type == NN_REP || type == NN_PAIR) { @@ -84,11 +84,11 @@ FairMQPollerNN::FairMQPollerNN(const unordered_map for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) { index = fOffsetMap[channel] + i; - items[index].fd = channelsMap.at(channel).at(i).fSocket->GetSocket(1); + items[index].fd = channelsMap.at(channel).at(i).GetSocket().GetSocket(1); int type = 0; size_t sz = sizeof(type); - nn_getsockopt(channelsMap.at(channel).at(i).fSocket->GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); + nn_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(1), NN_SOL_SOCKET, NN_PROTOCOL, &type, &sz); if (type == NN_REQ || type == NN_REP || type == NN_PAIR) { diff --git a/fairmq/shmem/FairMQPollerSHM.cxx b/fairmq/shmem/FairMQPollerSHM.cxx index 1a216077..376b173d 100644 --- a/fairmq/shmem/FairMQPollerSHM.cxx +++ b/fairmq/shmem/FairMQPollerSHM.cxx @@ -29,13 +29,13 @@ FairMQPollerSHM::FairMQPollerSHM(const vector& channels) for (int i = 0; i < fNumItems; ++i) { - items[i].socket = channels.at(i).fSocket->GetSocket(); + items[i].socket = channels.at(i).GetSocket().GetSocket(); items[i].fd = 0; items[i].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt (channels.at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) { @@ -83,13 +83,13 @@ FairMQPollerSHM::FairMQPollerSHM(const unordered_mapGetSocket(); + items[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket(); items[index].fd = 0; items[index].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt (channelsMap.at(channel).at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) { diff --git a/fairmq/zeromq/FairMQPollerZMQ.cxx b/fairmq/zeromq/FairMQPollerZMQ.cxx index 4a50fdfc..4cbafdf2 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.cxx +++ b/fairmq/zeromq/FairMQPollerZMQ.cxx @@ -29,13 +29,13 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector& channels) for (int i = 0; i < fNumItems; ++i) { - items[i].socket = channels.at(i).fSocket->GetSocket(); + items[i].socket = channels.at(i).GetSocket().GetSocket(); items[i].fd = 0; items[i].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt (channels.at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(channels.at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) { @@ -83,13 +83,13 @@ FairMQPollerZMQ::FairMQPollerZMQ(const unordered_mapGetSocket(); + items[index].socket = channelsMap.at(channel).at(i).GetSocket().GetSocket(); items[index].fd = 0; items[index].revents = 0; int type = 0; size_t size = sizeof(type); - zmq_getsockopt (channelsMap.at(channel).at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); + zmq_getsockopt(channelsMap.at(channel).at(i).GetSocket().GetSocket(), ZMQ_TYPE, &type, &size); if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) {