Implement nanomsg linger in our transport

This commit is contained in:
Alexey Rybalchenko 2018-10-12 17:10:14 +02:00 committed by Dennis Klein
parent cfb727181f
commit 44acd4997d
16 changed files with 116 additions and 14 deletions

View File

@ -14,12 +14,12 @@ SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10" # SAMPLER+=" --rate 10"
SAMPLER+=" --transport shmem" SAMPLER+=" --transport shmem"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & xterm -geometry 80x23+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
SINK="fairmq-ex-region-sink" SINK="fairmq-ex-region-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --transport shmem" SINK+=" --transport shmem"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK & xterm -geometry 80x23+500+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@ -33,6 +33,7 @@ FairMQChannel::FairMQChannel()
, fRcvBufSize(1000) , fRcvBufSize(1000)
, fSndKernelSize(0) , fSndKernelSize(0)
, fRcvKernelSize(0) , fRcvKernelSize(0)
, fLinger(500)
, fRateLogging(1) , fRateLogging(1)
, fName("") , fName("")
, fIsValid(false) , fIsValid(false)
@ -53,6 +54,7 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fRcvBufSize(1000) , fRcvBufSize(1000)
, fSndKernelSize(0) , fSndKernelSize(0)
, fRcvKernelSize(0) , fRcvKernelSize(0)
, fLinger(500)
, fRateLogging(1) , fRateLogging(1)
, fName("") , fName("")
, fIsValid(false) , fIsValid(false)
@ -73,6 +75,7 @@ FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared
, fRcvBufSize(1000) , fRcvBufSize(1000)
, fSndKernelSize(0) , fSndKernelSize(0)
, fRcvKernelSize(0) , fRcvKernelSize(0)
, fLinger(500)
, fRateLogging(1) , fRateLogging(1)
, fName(name) , fName(name)
, fIsValid(false) , fIsValid(false)
@ -93,6 +96,7 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fRcvBufSize(chan.fRcvBufSize) , fRcvBufSize(chan.fRcvBufSize)
, fSndKernelSize(chan.fSndKernelSize) , fSndKernelSize(chan.fSndKernelSize)
, fRcvKernelSize(chan.fRcvKernelSize) , fRcvKernelSize(chan.fRcvKernelSize)
, fLinger(chan.fLinger)
, fRateLogging(chan.fRateLogging) , fRateLogging(chan.fRateLogging)
, fName(chan.fName) , fName(chan.fName)
, fIsValid(false) , fIsValid(false)
@ -113,6 +117,7 @@ FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
fRcvBufSize = chan.fRcvBufSize; fRcvBufSize = chan.fRcvBufSize;
fSndKernelSize = chan.fSndKernelSize; fSndKernelSize = chan.fSndKernelSize;
fRcvKernelSize = chan.fRcvKernelSize; fRcvKernelSize = chan.fRcvKernelSize;
fLinger = chan.fLinger;
fRateLogging = chan.fRateLogging; fRateLogging = chan.fRateLogging;
fName = chan.fName; fName = chan.fName;
fIsValid = false; fIsValid = false;
@ -262,6 +267,20 @@ int FairMQChannel::GetRcvKernelSize() const
} }
} }
int FairMQChannel::GetLinger() const
{
try
{
unique_lock<mutex> lock(fChannelMutex);
return fLinger;
}
catch (exception& e)
{
LOG(error) << "Exception caught in FairMQChannel::GetLinger: " << e.what();
exit(EXIT_FAILURE);
}
}
int FairMQChannel::GetRateLogging() const int FairMQChannel::GetRateLogging() const
{ {
try try
@ -404,6 +423,22 @@ void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize)
} }
} }
void FairMQChannel::UpdateLinger(const int duration)
{
try
{
unique_lock<mutex> lock(fChannelMutex);
fIsValid = false;
fLinger = duration;
fModified = true;
}
catch (exception& e)
{
LOG(error) << "Exception caught in FairMQChannel::UpdateLinger: " << e.what();
exit(EXIT_FAILURE);
}
}
void FairMQChannel::UpdateRateLogging(const int rateLogging) void FairMQChannel::UpdateRateLogging(const int rateLogging)
{ {
try try

View File

@ -111,6 +111,10 @@ class FairMQChannel
/// @return Returns socket kernel transmit receive buffer size (in bytes) /// @return Returns socket kernel transmit receive buffer size (in bytes)
int GetRcvKernelSize() const; int GetRcvKernelSize() const;
/// Get linger duration (in milliseconds)
/// @return Returns linger duration (in milliseconds)
int GetLinger() const;
/// Get socket rate logging interval (in seconds) /// Get socket rate logging interval (in seconds)
/// @return Returns socket rate logging interval (in seconds) /// @return Returns socket rate logging interval (in seconds)
int GetRateLogging() const; int GetRateLogging() const;
@ -147,6 +151,10 @@ class FairMQChannel
/// @param rcvKernelSize Socket receive buffer size (in bytes) /// @param rcvKernelSize Socket receive buffer size (in bytes)
void UpdateRcvKernelSize(const int rcvKernelSize); void UpdateRcvKernelSize(const int rcvKernelSize);
/// Set linger duration (in milliseconds)
/// @param duration linger duration (in milliseconds)
void UpdateLinger(const int duration);
/// Set socket rate logging interval (in seconds) /// Set socket rate logging interval (in seconds)
/// @param rateLogging Socket rate logging interval (in seconds) /// @param rateLogging Socket rate logging interval (in seconds)
void UpdateRateLogging(const int rateLogging); void UpdateRateLogging(const int rateLogging);
@ -307,6 +315,7 @@ class FairMQChannel
int fRcvBufSize; int fRcvBufSize;
int fSndKernelSize; int fSndKernelSize;
int fRcvKernelSize; int fRcvKernelSize;
int fLinger;
int fRateLogging; int fRateLogging;
std::string fName; std::string fName;

View File

@ -295,6 +295,9 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
} }
} }
// set linger duration (how long socket should wait for outstanding transfers before shutdown)
ch.fSocket->SetOption("linger", &(ch.fLinger), sizeof(ch.fLinger));
// set high water marks // set high water marks
ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize));
ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize));
@ -365,7 +368,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
} }
endpoint += address; endpoint += address;
LOG(debug) << "Attached channel " << ch.fName << " to " << endpoint << (bind ? " (bind) " : " (connect) "); LOG(debug) << "Attached channel " << ch.fName << " to " << endpoint << (bind ? " (bind) " : " (connect) ") << "(" << ch.fType << ")";
// after the book keeping is done, exit in case of errors // after the book keeping is done, exit in case of errors
if (!success) if (!success)
@ -914,8 +917,7 @@ void FairMQDevice::LogSocketRates()
} }
t0 = t1; t0 = t1;
this_thread::sleep_for(chrono::milliseconds(1000)); WaitFor(chrono::milliseconds(1000));
// WaitFor(chrono::milliseconds(1000)); TODO: enable this when nanomsg linger is fixed
} }
} }
} }
@ -950,13 +952,11 @@ void FairMQDevice::ResetWrapper()
{ {
CallStateChangeCallbacks(RESETTING_DEVICE); CallStateChangeCallbacks(RESETTING_DEVICE);
Reset(); for (auto& t : fTransports)
{
ChangeState(internal_IDLE); t.second->Reset();
} }
void FairMQDevice::Reset()
{
// iterate over the channels map // iterate over the channels map
for (auto& mi : fChannels) for (auto& mi : fChannels)
{ {
@ -967,6 +967,14 @@ void FairMQDevice::Reset()
vi.fSocket.reset(); // destroy FairMQSocket vi.fSocket.reset(); // destroy FairMQSocket
} }
} }
Reset();
ChangeState(internal_IDLE);
}
void FairMQDevice::Reset()
{
} }
const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const int index) const const FairMQChannel& FairMQDevice::GetChannel(const string& channelName, const int index) const

View File

@ -73,6 +73,7 @@ class FairMQTransportFactory
virtual void Interrupt() = 0; virtual void Interrupt() = 0;
virtual void Resume() = 0; virtual void Resume() = 0;
virtual void Reset() = 0;
virtual ~FairMQTransportFactory() {}; virtual ~FairMQTransportFactory() {};

View File

@ -39,6 +39,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const str
, fMessagesRx(0) , fMessagesRx(0)
, fSndTimeout(100) , fSndTimeout(100)
, fRcvTimeout(100) , fRcvTimeout(100)
, fLinger(500)
{ {
if (type == "router" || type == "dealer") if (type == "router" || type == "dealer")
{ {
@ -456,6 +457,13 @@ void FairMQSocketNN::SetOption(const string& option, const void* value, size_t v
return; return;
} }
if (option == "linger")
{
int val = *(static_cast<int*>(const_cast<void*>(value)));
fLinger = val;
return;
}
int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize);
if (rc < 0) if (rc < 0)
{ {

View File

@ -15,8 +15,12 @@
#include "FairMQSocket.h" #include "FairMQSocket.h"
#include "FairMQMessage.h" #include "FairMQMessage.h"
class FairMQTransportFactoryNN;
class FairMQSocketNN : public FairMQSocket class FairMQSocketNN : public FairMQSocket
{ {
friend class FairMQTransportFactoryNN;
public: public:
FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = ""); FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = "");
FairMQSocketNN(const FairMQSocketNN&) = delete; FairMQSocketNN(const FairMQSocketNN&) = delete;
@ -69,6 +73,7 @@ class FairMQSocketNN : public FairMQSocket
int fSndTimeout; int fSndTimeout;
int fRcvTimeout; int fRcvTimeout;
int fLinger;
int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout); int SendImpl(FairMQMessagePtr& msg, const int flags, const int timeout);
int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout); int ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout);

View File

@ -9,6 +9,9 @@
#include "FairMQTransportFactoryNN.h" #include "FairMQTransportFactoryNN.h"
#include <nanomsg/nn.h> #include <nanomsg/nn.h>
#include <algorithm>
#include <thread>
#include <chrono>
using namespace std; using namespace std;
@ -42,7 +45,9 @@ FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQUnmanagedRegionPt
FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) const FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) const
{ {
return unique_ptr<FairMQSocket>(new FairMQSocketNN(type, name, GetId())); unique_ptr<FairMQSocket> socket(new FairMQSocketNN(type, name, GetId()));
fSockets.push_back(socket.get());
return socket;
} }
FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector<FairMQChannel>& channels) const FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector<FairMQChannel>& channels) const
@ -75,6 +80,17 @@ fair::mq::Transport FairMQTransportFactoryNN::GetType() const
return fTransportType; return fTransportType;
} }
void FairMQTransportFactoryNN::Reset()
{
auto result = max_element(fSockets.begin(), fSockets.end(), [](FairMQSocket* s1, FairMQSocket* s2) {
return static_cast<FairMQSocketNN*>(s1)->fLinger < static_cast<FairMQSocketNN*>(s2)->fLinger;
});
if (result != fSockets.end()) {
this_thread::sleep_for(chrono::milliseconds(static_cast<FairMQSocketNN*>(*result)->fLinger));
}
fSockets.clear();
}
FairMQTransportFactoryNN::~FairMQTransportFactoryNN() FairMQTransportFactoryNN::~FairMQTransportFactoryNN()
{ {
// nn_term(); // nn_term();

View File

@ -43,9 +43,11 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
void Interrupt() override { FairMQSocketNN::Interrupt(); } void Interrupt() override { FairMQSocketNN::Interrupt(); }
void Resume() override { FairMQSocketNN::Resume(); } void Resume() override { FairMQSocketNN::Resume(); }
void Reset() override;
private: private:
static fair::mq::Transport fTransportType; static fair::mq::Transport fTransportType;
mutable std::vector<FairMQSocket*> fSockets;
}; };
#endif /* FAIRMQTRANSPORTFACTORYNN_H_ */ #endif /* FAIRMQTRANSPORTFACTORYNN_H_ */

View File

@ -51,6 +51,7 @@ class TransportFactory : public FairMQTransportFactory
void Interrupt() override {} void Interrupt() override {}
void Resume() override {} void Resume() override {}
void Reset() override {}
private: private:
mutable Context fContext; mutable Context fContext;

View File

@ -153,6 +153,7 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& ch
commonChannel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", commonChannel.GetRcvBufSize())); commonChannel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", commonChannel.GetRcvBufSize()));
commonChannel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", commonChannel.GetSndKernelSize())); commonChannel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", commonChannel.GetSndKernelSize()));
commonChannel.UpdateRcvKernelSize(q.second.get<int>("rcvKernelSize", commonChannel.GetRcvKernelSize())); commonChannel.UpdateRcvKernelSize(q.second.get<int>("rcvKernelSize", commonChannel.GetRcvKernelSize()));
commonChannel.UpdateLinger(q.second.get<int>("linger", commonChannel.GetLinger()));
commonChannel.UpdateRateLogging(q.second.get<int>("rateLogging", commonChannel.GetRateLogging())); commonChannel.UpdateRateLogging(q.second.get<int>("rateLogging", commonChannel.GetRateLogging()));
// temporary FairMQChannel container // temporary FairMQChannel container
@ -172,6 +173,7 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQChannelMap& ch
LOG(debug) << "\trcvBufSize = " << commonChannel.GetRcvBufSize(); LOG(debug) << "\trcvBufSize = " << commonChannel.GetRcvBufSize();
LOG(debug) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize(); LOG(debug) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize();
LOG(debug) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize(); LOG(debug) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize();
LOG(debug) << "\tlinger = " << commonChannel.GetLinger();
LOG(debug) << "\trateLogging = " << commonChannel.GetRateLogging(); LOG(debug) << "\trateLogging = " << commonChannel.GetRateLogging();
for (int i = 0; i < numSockets; ++i) for (int i = 0; i < numSockets; ++i)
@ -214,6 +216,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
channel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", channel.GetRcvBufSize())); channel.UpdateRcvBufSize(q.second.get<int>("rcvBufSize", channel.GetRcvBufSize()));
channel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", channel.GetSndKernelSize())); channel.UpdateSndKernelSize(q.second.get<int>("sndKernelSize", channel.GetSndKernelSize()));
channel.UpdateRcvKernelSize(q.second.get<int>("rcvKernelSize", channel.GetRcvKernelSize())); channel.UpdateRcvKernelSize(q.second.get<int>("rcvKernelSize", channel.GetRcvKernelSize()));
channel.UpdateLinger(q.second.get<int>("linger", channel.GetLinger()));
channel.UpdateRateLogging(q.second.get<int>("rateLogging", channel.GetRateLogging())); channel.UpdateRateLogging(q.second.get<int>("rateLogging", channel.GetRateLogging()));
LOG(debug) << "" << channelName << "[" << socketCounter << "]:"; LOG(debug) << "" << channelName << "[" << socketCounter << "]:";
@ -225,6 +228,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize(); LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize(); LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();
LOG(debug) << "\trcvKernelSize = " << channel.GetRcvKernelSize(); LOG(debug) << "\trcvKernelSize = " << channel.GetRcvKernelSize();
LOG(debug) << "\tlinger = " << channel.GetLinger();
LOG(debug) << "\trateLogging = " << channel.GetRateLogging(); LOG(debug) << "\trateLogging = " << channel.GetRateLogging();
channelList.push_back(channel); channelList.push_back(channel);
@ -253,6 +257,7 @@ void SocketParser(const boost::property_tree::ptree& tree, vector<FairMQChannel>
LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize(); LOG(debug) << "\trcvBufSize = " << channel.GetRcvBufSize();
LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize(); LOG(debug) << "\tsndKernelSize = " << channel.GetSndKernelSize();
LOG(debug) << "\trcvKernelSize = " << channel.GetRcvKernelSize(); LOG(debug) << "\trcvKernelSize = " << channel.GetRcvKernelSize();
LOG(debug) << "\tlinger = " << channel.GetLinger();
LOG(debug) << "\trateLogging = " << channel.GetRateLogging(); LOG(debug) << "\trateLogging = " << channel.GetRateLogging();
channelList.push_back(channel); channelList.push_back(channel);

View File

@ -268,6 +268,7 @@ void FairMQProgOptions::UpdateMQValues()
string rcvBufSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvBufSize"; string rcvBufSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvBufSize";
string sndKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".sndKernelSize"; string sndKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".sndKernelSize";
string rcvKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvKernelSize"; string rcvKernelSizeKey = "chans." + p.first + "." + to_string(index) + ".rcvKernelSize";
string lingerKey = "chans." + p.first + "." + to_string(index) + ".linger";
string rateLoggingKey = "chans." + p.first + "." + to_string(index) + ".rateLogging"; string rateLoggingKey = "chans." + p.first + "." + to_string(index) + ".rateLogging";
fChannelKeyMap[typeKey] = ChannelKey{p.first, index, "type"}; fChannelKeyMap[typeKey] = ChannelKey{p.first, index, "type"};
@ -278,6 +279,7 @@ void FairMQProgOptions::UpdateMQValues()
fChannelKeyMap[rcvBufSizeKey] = ChannelKey{p.first, index, "rcvBufSize"}; fChannelKeyMap[rcvBufSizeKey] = ChannelKey{p.first, index, "rcvBufSize"};
fChannelKeyMap[sndKernelSizeKey] = ChannelKey{p.first, index, "sndKernelSize"}; fChannelKeyMap[sndKernelSizeKey] = ChannelKey{p.first, index, "sndKernelSize"};
fChannelKeyMap[rcvKernelSizeKey] = ChannelKey{p.first, index, "rcvkernelSize"}; fChannelKeyMap[rcvKernelSizeKey] = ChannelKey{p.first, index, "rcvkernelSize"};
fChannelKeyMap[lingerKey] = ChannelKey{p.first, index, "linger"};
fChannelKeyMap[rateLoggingKey] = ChannelKey{p.first, index, "rateLogging"}; fChannelKeyMap[rateLoggingKey] = ChannelKey{p.first, index, "rateLogging"};
UpdateVarMap<string>(typeKey, channel.GetType()); UpdateVarMap<string>(typeKey, channel.GetType());
@ -288,6 +290,7 @@ void FairMQProgOptions::UpdateMQValues()
UpdateVarMap<int>(rcvBufSizeKey, channel.GetRcvBufSize()); UpdateVarMap<int>(rcvBufSizeKey, channel.GetRcvBufSize());
UpdateVarMap<int>(sndKernelSizeKey, channel.GetSndKernelSize()); UpdateVarMap<int>(sndKernelSizeKey, channel.GetSndKernelSize());
UpdateVarMap<int>(rcvKernelSizeKey, channel.GetRcvKernelSize()); UpdateVarMap<int>(rcvKernelSizeKey, channel.GetRcvKernelSize());
UpdateVarMap<int>(lingerKey, channel.GetLinger());
UpdateVarMap<int>(rateLoggingKey, channel.GetRateLogging()); UpdateVarMap<int>(rateLoggingKey, channel.GetRateLogging());
index++; index++;
@ -343,6 +346,12 @@ int FairMQProgOptions::UpdateChannelValue(const string& channelName, int index,
return 0; return 0;
} }
if (member == "linger")
{
fFairMQChannelMap.at(channelName).at(index).UpdateLinger(val);
return 0;
}
if (member == "rateLogging") if (member == "rateLogging")
{ {
fFairMQChannelMap.at(channelName).at(index).UpdateRateLogging(val); fFairMQChannelMap.at(channelName).at(index).UpdateRateLogging(val);

View File

@ -56,6 +56,7 @@ struct SUBOPT
RCVBUFSIZE, // size of the receive queue RCVBUFSIZE, // size of the receive queue
SNDKERNELSIZE, SNDKERNELSIZE,
RCVKERNELSIZE, RCVKERNELSIZE,
LINGER,
RATELOGGING, // logging rate RATELOGGING, // logging rate
NUMSOCKETS, NUMSOCKETS,
lastsocketkey lastsocketkey
@ -71,6 +72,7 @@ struct SUBOPT
/*[RCVBUFSIZE] = */ "rcvBufSize", /*[RCVBUFSIZE] = */ "rcvBufSize",
/*[SNDKERNELSIZE] = */ "sndKernelSize", /*[SNDKERNELSIZE] = */ "sndKernelSize",
/*[RCVKERNELSIZE] = */ "rcvKernelSize", /*[RCVKERNELSIZE] = */ "rcvKernelSize",
/*[LINGER] = */ "linger",
/*[RATELOGGING] = */ "rateLogging", /*[RATELOGGING] = */ "rateLogging",
/*[NUMSOCKETS] = */ "numSockets", /*[NUMSOCKETS] = */ "numSockets",
nullptr nullptr

View File

@ -51,6 +51,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
void Interrupt() override { FairMQSocketSHM::Interrupt(); } void Interrupt() override { FairMQSocketSHM::Interrupt(); }
void Resume() override { FairMQSocketSHM::Resume(); } void Resume() override { FairMQSocketSHM::Resume(); }
void Reset() override {}
~FairMQTransportFactorySHM() override; ~FairMQTransportFactorySHM() override;

View File

@ -164,12 +164,11 @@ int FairMQSocketZMQ::SendImpl(FairMQMessagePtr& msg, const int flags, const int
int FairMQSocketZMQ::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout) int FairMQSocketZMQ::ReceiveImpl(FairMQMessagePtr& msg, const int flags, const int timeout)
{ {
int nbytes = -1;
int elapsed = 0; int elapsed = 0;
while (true) while (true)
{ {
nbytes = zmq_msg_recv(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags); int nbytes = zmq_msg_recv(static_cast<FairMQMessageZMQ*>(msg.get())->GetMessage(), fSocket, flags);
if (nbytes >= 0) if (nbytes >= 0)
{ {
fBytesRx += nbytes; fBytesRx += nbytes;

View File

@ -52,6 +52,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
void Interrupt() override { FairMQSocketZMQ::Interrupt(); } void Interrupt() override { FairMQSocketZMQ::Interrupt(); }
void Resume() override { FairMQSocketZMQ::Resume(); } void Resume() override { FairMQSocketZMQ::Resume(); }
void Reset() override {}
private: private:
static fair::mq::Transport fTransportType; static fair::mq::Transport fTransportType;