only reset sockets if needed

This commit is contained in:
Alexey Rybalchenko 2017-04-13 14:08:00 +02:00 committed by Mohammad Al-Turany
parent c51575f88c
commit 541538e3de
3 changed files with 76 additions and 39 deletions

View File

@ -45,6 +45,8 @@ FairMQChannel::FairMQChannel()
, fNoBlockFlag(0) , fNoBlockFlag(0)
, fSndMoreFlag(0) , fSndMoreFlag(0)
, fMultipart(false) , fMultipart(false)
, fModified(true)
, fReset(false)
{ {
} }
@ -68,6 +70,8 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fNoBlockFlag(0) , fNoBlockFlag(0)
, fSndMoreFlag(0) , fSndMoreFlag(0)
, fMultipart(false) , fMultipart(false)
, fModified(true)
, fReset(false)
{ {
} }
@ -91,6 +95,8 @@ FairMQChannel::FairMQChannel(const FairMQChannel& chan)
, fNoBlockFlag(chan.fNoBlockFlag) , fNoBlockFlag(chan.fNoBlockFlag)
, fSndMoreFlag(chan.fSndMoreFlag) , fSndMoreFlag(chan.fSndMoreFlag)
, fMultipart(chan.fMultipart) , fMultipart(chan.fMultipart)
, fModified(chan.fModified)
, fReset(false)
{} {}
FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan) FairMQChannel& FairMQChannel::operator=(const FairMQChannel& chan)
@ -267,6 +273,7 @@ void FairMQChannel::UpdateType(const string& type)
unique_lock<mutex> lock(fChannelMutex); unique_lock<mutex> lock(fChannelMutex);
fIsValid = false; fIsValid = false;
fType = type; fType = type;
fModified = true;
} }
catch (exception& e) catch (exception& e)
{ {
@ -282,6 +289,7 @@ void FairMQChannel::UpdateMethod(const string& method)
unique_lock<mutex> lock(fChannelMutex); unique_lock<mutex> lock(fChannelMutex);
fIsValid = false; fIsValid = false;
fMethod = method; fMethod = method;
fModified = true;
} }
catch (exception& e) catch (exception& e)
{ {
@ -297,6 +305,7 @@ void FairMQChannel::UpdateAddress(const string& address)
unique_lock<mutex> lock(fChannelMutex); unique_lock<mutex> lock(fChannelMutex);
fIsValid = false; fIsValid = false;
fAddress = address; fAddress = address;
fModified = true;
} }
catch (exception& e) catch (exception& e)
{ {
@ -312,6 +321,7 @@ void FairMQChannel::UpdateTransport(const string& transport)
unique_lock<mutex> lock(fChannelMutex); unique_lock<mutex> lock(fChannelMutex);
fIsValid = false; fIsValid = false;
fTransport = transport; fTransport = transport;
fModified = true;
} }
catch (exception& e) catch (exception& e)
{ {
@ -327,6 +337,7 @@ void FairMQChannel::UpdateSndBufSize(const int sndBufSize)
unique_lock<mutex> lock(fChannelMutex); unique_lock<mutex> lock(fChannelMutex);
fIsValid = false; fIsValid = false;
fSndBufSize = sndBufSize; fSndBufSize = sndBufSize;
fModified = true;
} }
catch (exception& e) catch (exception& e)
{ {
@ -342,6 +353,7 @@ void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize)
unique_lock<mutex> lock(fChannelMutex); unique_lock<mutex> lock(fChannelMutex);
fIsValid = false; fIsValid = false;
fRcvBufSize = rcvBufSize; fRcvBufSize = rcvBufSize;
fModified = true;
} }
catch (exception& e) catch (exception& e)
{ {
@ -357,6 +369,7 @@ void FairMQChannel::UpdateSndKernelSize(const int sndKernelSize)
unique_lock<mutex> lock(fChannelMutex); unique_lock<mutex> lock(fChannelMutex);
fIsValid = false; fIsValid = false;
fSndKernelSize = sndKernelSize; fSndKernelSize = sndKernelSize;
fModified = true;
} }
catch (exception& e) catch (exception& e)
{ {
@ -372,6 +385,7 @@ void FairMQChannel::UpdateRcvKernelSize(const int rcvKernelSize)
unique_lock<mutex> lock(fChannelMutex); unique_lock<mutex> lock(fChannelMutex);
fIsValid = false; fIsValid = false;
fRcvKernelSize = rcvKernelSize; fRcvKernelSize = rcvKernelSize;
fModified = true;
} }
catch (exception& e) catch (exception& e)
{ {
@ -387,6 +401,7 @@ void FairMQChannel::UpdateRateLogging(const int rateLogging)
unique_lock<mutex> lock(fChannelMutex); unique_lock<mutex> lock(fChannelMutex);
fIsValid = false; fIsValid = false;
fRateLogging = rateLogging; fRateLogging = rateLogging;
fModified = true;
} }
catch (exception& e) catch (exception& e)
{ {

View File

@ -244,7 +244,6 @@ class FairMQChannel
std::string fName; std::string fName;
std::atomic<bool> fIsValid; std::atomic<bool> fIsValid;
FairMQPollerPtr fPoller; FairMQPollerPtr fPoller;
FairMQSocketPtr fChannelCmdSocket; FairMQSocketPtr fChannelCmdSocket;
@ -270,6 +269,8 @@ class FairMQChannel
static std::atomic<bool> fInterrupted; static std::atomic<bool> fInterrupted;
bool fMultipart; bool fMultipart;
bool fModified;
bool fReset;
}; };
#endif /* FAIRMQCHANNEL_H_ */ #endif /* FAIRMQCHANNEL_H_ */

View File

@ -129,6 +129,7 @@ void FairMQDevice::AttachChannels(list<FairMQChannel*>& chans)
if (AttachChannel(**itr)) if (AttachChannel(**itr))
{ {
(*itr)->InitCommandInterface(); (*itr)->InitCommandInterface();
(*itr)->fModified = false;
chans.erase(itr++); chans.erase(itr++);
} }
else else
@ -177,40 +178,53 @@ void FairMQDevice::InitWrapper()
{ {
for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi) for (auto vi = (mi->second).begin(); vi != (mi->second).end(); ++vi)
{ {
// set channel name: name + vector index if (vi->fModified)
stringstream ss;
ss << mi->first << "[" << vi - (mi->second).begin() << "]";
vi->fName = ss.str();
if (vi->fMethod == "bind")
{ {
// if binding address is not specified, try getting it from the configured network interface if (vi->fReset)
if (vi->fAddress == "unspecified" || vi->fAddress == "")
{ {
// if the configured network interface is default, get its name from the default route vi->fSocket->Close();
if (fNetworkInterface == "default") vi->fSocket = nullptr;
{
fNetworkInterface = FairMQ::tools::getDefaultRouteNetworkInterface(); vi->fPoller = nullptr;
}
vi->fAddress = "tcp://" + FairMQ::tools::getInterfaceIP(fNetworkInterface) + ":1"; vi->fChannelCmdSocket->Close();
vi->fChannelCmdSocket = nullptr;
}
// set channel name: name + vector index
stringstream ss;
ss << mi->first << "[" << vi - (mi->second).begin() << "]";
vi->fName = ss.str();
if (vi->fMethod == "bind")
{
// if binding address is not specified, try getting it from the configured network interface
if (vi->fAddress == "unspecified" || vi->fAddress == "")
{
// if the configured network interface is default, get its name from the default route
if (fNetworkInterface == "default")
{
fNetworkInterface = FairMQ::tools::getDefaultRouteNetworkInterface();
}
vi->fAddress = "tcp://" + FairMQ::tools::getInterfaceIP(fNetworkInterface) + ":1";
}
// fill the uninitialized list
uninitializedBindingChannels.push_back(&(*vi));
}
else if (vi->fMethod == "connect")
{
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&(*vi));
}
else if (vi->fAddress.find_first_of("@+>") != string::npos)
{
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&(*vi));
}
else
{
LOG(ERROR) << "Cannot update configuration. Socket method (bind/connect) not specified.";
exit(EXIT_FAILURE);
} }
// fill the uninitialized list
uninitializedBindingChannels.push_back(&(*vi));
}
else if (vi->fMethod == "connect")
{
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&(*vi));
}
else if (vi->fAddress.find_first_of("@+>") != string::npos)
{
// fill the uninitialized list
uninitializedConnectingChannels.push_back(&(*vi));
}
else
{
LOG(ERROR) << "Cannot update configuration. Socket method (bind/connect) not specified.";
exit(EXIT_FAILURE);
} }
} }
} }
@ -1180,13 +1194,14 @@ void FairMQDevice::Reset()
// iterate over the channels vector // iterate over the channels vector
for (auto& vi : mi.second) for (auto& vi : mi.second)
{ {
vi.fSocket->Close(); vi.fReset = true;
vi.fSocket = nullptr; // vi.fSocket->Close();
// vi.fSocket = nullptr;
vi.fPoller = nullptr; // vi.fPoller = nullptr;
vi.fChannelCmdSocket->Close(); // vi.fChannelCmdSocket->Close();
vi.fChannelCmdSocket = nullptr; // vi.fChannelCmdSocket = nullptr;
} }
} }
} }
@ -1207,18 +1222,24 @@ void FairMQDevice::Exit()
LOG(DEBUG) << "Closing sockets..."; LOG(DEBUG) << "Closing sockets...";
// iterate over the channels // iterate over the channels
for (const auto& c : fChannels) for (auto& c : fChannels)
{ {
// iterate over the sub-channels // iterate over the sub-channels
for (const auto& sc : c.second) for (auto& sc : c.second)
{ {
if (sc.fSocket) if (sc.fSocket)
{ {
sc.fSocket->Close(); sc.fSocket->Close();
sc.fSocket = nullptr;
} }
if (sc.fChannelCmdSocket) if (sc.fChannelCmdSocket)
{ {
sc.fChannelCmdSocket->Close(); sc.fChannelCmdSocket->Close();
sc.fChannelCmdSocket = nullptr;
}
if (sc.fPoller)
{
sc.fPoller = nullptr;
} }
} }
} }