Thread safe initialization of the channels

This commit is contained in:
Alexey Rybalchenko 2015-06-18 15:10:01 +02:00
parent 295d9bba57
commit d6a413534a
13 changed files with 171 additions and 50 deletions

View File

@ -130,14 +130,14 @@ endif(NANOMSG_FOUND)
# to copy src that are header-only files (e.g. c++ template) for FairRoot external installation # to copy src that are header-only files (e.g. c++ template) for FairRoot external installation
# manual install (globbing add not recommended) # manual install (globbing add not recommended)
Set(FairMQHDRFiles Set(FAIRMQHEADERS
devices/GenericSampler.h devices/GenericSampler.h
devices/GenericSampler.tpl devices/GenericSampler.tpl
devices/GenericProcessor.h devices/GenericProcessor.h
devices/GenericFileSink.h devices/GenericFileSink.h
tools/FairMQTools.h tools/FairMQTools.h
) )
install(FILES ${FairMQHDRFiles} DESTINATION include) install(FILES ${FAIRMQHEADERS} DESTINATION include)
set(DEPENDENCIES set(DEPENDENCIES
${DEPENDENCIES} ${DEPENDENCIES}

View File

@ -19,6 +19,8 @@
using namespace std; using namespace std;
boost::mutex FairMQChannel::channelMutex;
FairMQChannel::FairMQChannel() FairMQChannel::FairMQChannel()
: fType("unspecified") : fType("unspecified")
, fMethod("unspecified") , fMethod("unspecified")
@ -28,6 +30,7 @@ FairMQChannel::FairMQChannel()
, fRateLogging(1) , fRateLogging(1)
, fSocket() , fSocket()
, fChannelName("") , fChannelName("")
, fIsValid(false)
{ {
} }
@ -40,14 +43,97 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
, fRateLogging(1) , fRateLogging(1)
, fSocket() , fSocket()
, fChannelName("") , fChannelName("")
, fIsValid(false)
{ {
} }
std::string FairMQChannel::GetType()
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
return fType;
}
std::string FairMQChannel::GetMethod()
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
return fMethod;
}
std::string FairMQChannel::GetAddress()
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
return fAddress;
}
int FairMQChannel::GetSndBufSize()
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
return fSndBufSize;
}
int FairMQChannel::GetRcvBufSize()
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
return fRcvBufSize;
}
int FairMQChannel::GetRateLogging()
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
return fRateLogging;
}
void FairMQChannel::UpdateType(const std::string& type)
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
fType = type;
}
void FairMQChannel::UpdateMethod(const std::string& method)
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
fMethod = method;
}
void FairMQChannel::UpdateAddress(const std::string& address)
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
fAddress = address;
}
void FairMQChannel::UpdateSndBufSize(const int sndBufSize)
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
fSndBufSize = sndBufSize;
}
void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize)
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
fRcvBufSize = rcvBufSize;
}
void FairMQChannel::UpdateRateLogging(const int rateLogging)
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
fRateLogging = rateLogging;
}
bool FairMQChannel::IsValid()
{
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
return fIsValid;
}
bool FairMQChannel::ValidateChannel() bool FairMQChannel::ValidateChannel()
{ {
boost::unique_lock<boost::mutex> scoped_lock(channelMutex);
stringstream ss; stringstream ss;
ss << "Validating channel " << fChannelName << "... "; ss << "Validating channel " << fChannelName << "... ";
if (fIsValid)
{
ss << "ALREADY VALID";
LOG(DEBUG) << ss.str();
return true;
}
const string socketTypeNames[] = { "sub", "pub", "pull", "push", "req", "rep", "xsub", "xpub", "dealer", "router", "pair" }; const string socketTypeNames[] = { "sub", "pub", "pull", "push", "req", "rep", "xsub", "xpub", "dealer", "router", "pair" };
const set<string> socketTypes(socketTypeNames, socketTypeNames + sizeof(socketTypeNames) / sizeof(string)); const set<string> socketTypes(socketTypeNames, socketTypeNames + sizeof(socketTypeNames) / sizeof(string));
if (socketTypes.find(fType) == socketTypes.end()) if (socketTypes.find(fType) == socketTypes.end())
@ -92,11 +178,17 @@ bool FairMQChannel::ValidateChannel()
return false; return false;
} }
fIsValid = true;
ss << "VALID"; ss << "VALID";
LOG(DEBUG) << ss.str(); LOG(DEBUG) << ss.str();
return true; return true;
} }
void FairMQChannel::ResetChannel()
{
// TODO: implement resetting
}
int FairMQChannel::Send(FairMQMessage* msg, const string& flag) int FairMQChannel::Send(FairMQMessage* msg, const string& flag)
{ {
return fSocket->Send(msg, flag); return fSocket->Send(msg, flag);

View File

@ -17,6 +17,8 @@
#include <string> #include <string>
#include <boost/thread/mutex.hpp>
#include "FairMQSocket.h" #include "FairMQSocket.h"
class FairMQChannel class FairMQChannel
@ -28,14 +30,35 @@ class FairMQChannel
FairMQChannel(const std::string& type, const std::string& method, const std::string& address); FairMQChannel(const std::string& type, const std::string& method, const std::string& address);
virtual ~FairMQChannel(); virtual ~FairMQChannel();
std::string GetType();
std::string GetMethod();
std::string GetAddress();
int GetSndBufSize();
int GetRcvBufSize();
int GetRateLogging();
void UpdateType(const std::string& type);
void UpdateMethod(const std::string& method);
void UpdateAddress(const std::string& address);
void UpdateSndBufSize(const int sndBufSize);
void UpdateRcvBufSize(const int rcvBufSize);
void UpdateRateLogging(const int rateLogging);
bool IsValid();
bool ValidateChannel(); bool ValidateChannel();
void ResetChannel();
FairMQSocket* fSocket;
// Wrappers for the socket methods to simplify the usage of channels // Wrappers for the socket methods to simplify the usage of channels
int Send(FairMQMessage* msg, const std::string& flag = ""); int Send(FairMQMessage* msg, const std::string& flag = "");
int Send(FairMQMessage* msg, const int flags); int Send(FairMQMessage* msg, const int flags);
int Receive(FairMQMessage* msg, const std::string& flag = ""); int Receive(FairMQMessage* msg, const std::string& flag = "");
int Receive(FairMQMessage* msg, const int flags); int Receive(FairMQMessage* msg, const int flags);
private:
std::string fType; std::string fType;
std::string fMethod; std::string fMethod;
std::string fAddress; std::string fAddress;
@ -43,10 +66,14 @@ class FairMQChannel
int fRcvBufSize; int fRcvBufSize;
int fRateLogging; int fRateLogging;
FairMQSocket* fSocket;
private:
std::string fChannelName; std::string fChannelName;
bool fIsValid;
// use static mutex to make the class easily copyable
// implication: same mutex is used for all instances of the class
// this does not hurt much, because mutex is used only during initialization with very low contention
// possible TODO: improve this
static boost::mutex channelMutex;
}; };
#endif /* FAIRMQCHANNEL_H_ */ #endif /* FAIRMQCHANNEL_H_ */

View File

@ -185,7 +185,7 @@ void FairMQDevice::InitTask()
{ {
} }
bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs) bool FairMQDevice::SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs)
{ {
return lhs.fAddress < rhs.fAddress; return lhs.fAddress < rhs.fAddress;
} }

View File

@ -54,6 +54,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
virtual void SetTransport(FairMQTransportFactory* factory); virtual void SetTransport(FairMQTransportFactory* factory);
static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs);
virtual ~FairMQDevice(); virtual ~FairMQDevice();
std::map< std::string,std::vector<FairMQChannel> > fChannels; std::map< std::string,std::vector<FairMQChannel> > fChannels;

View File

@ -116,9 +116,9 @@ int main(int argc, char** argv)
client.SetProperty(FairMQExampleClient::NumIoThreads, 1); client.SetProperty(FairMQExampleClient::NumIoThreads, 1);
FairMQChannel requestChannel("req", "connect", "tcp://localhost:5005"); FairMQChannel requestChannel("req", "connect", "tcp://localhost:5005");
requestChannel.fSndBufSize = 10000; requestChannel.UpdateSndBufSize(10000);
requestChannel.fRcvBufSize = 10000; requestChannel.UpdateRcvBufSize(10000);
requestChannel.fRateLogging = 1; requestChannel.UpdateRateLogging(1);
client.fChannels["data"].push_back(requestChannel); client.fChannels["data"].push_back(requestChannel);

View File

@ -66,9 +66,9 @@ int main(int argc, char** argv)
server.SetProperty(FairMQExampleServer::NumIoThreads, 1); server.SetProperty(FairMQExampleServer::NumIoThreads, 1);
FairMQChannel replyChannel("rep", "bind", "tcp://*:5005"); FairMQChannel replyChannel("rep", "bind", "tcp://*:5005");
replyChannel.fSndBufSize = 10000; replyChannel.UpdateSndBufSize(10000);
replyChannel.fRcvBufSize = 10000; replyChannel.UpdateRcvBufSize(10000);
replyChannel.fRateLogging = 1; replyChannel.UpdateRateLogging(1);
server.fChannels["data"].push_back(replyChannel); server.fChannels["data"].push_back(replyChannel);

View File

@ -135,19 +135,19 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string&
socket << "\t \t [node = " << r.first socket << "\t \t [node = " << r.first
<< "] socket index = " << socketCounter; << "] socket index = " << socketCounter;
MQLOG(DEBUG) << socket.str(); MQLOG(DEBUG) << socket.str();
MQLOG(DEBUG) << "\t \t \t type = " << r.second.get<std::string>("type", channel.fType); MQLOG(DEBUG) << "\t \t \t type = " << r.second.get<std::string>("type", channel.GetType());
MQLOG(DEBUG) << "\t \t \t method = " << r.second.get<std::string>("method", channel.fMethod); MQLOG(DEBUG) << "\t \t \t method = " << r.second.get<std::string>("method", channel.GetMethod());
MQLOG(DEBUG) << "\t \t \t address = " << r.second.get<std::string>("address", channel.fAddress); MQLOG(DEBUG) << "\t \t \t address = " << r.second.get<std::string>("address", channel.GetAddress());
MQLOG(DEBUG) << "\t \t \t sndBufSize = " << r.second.get<int>("sndBufSize", channel.fSndBufSize); MQLOG(DEBUG) << "\t \t \t sndBufSize = " << r.second.get<int>("sndBufSize", channel.GetSndBufSize());
MQLOG(DEBUG) << "\t \t \t rcvBufSize = " << r.second.get<int>("rcvBufSize", channel.fRcvBufSize); MQLOG(DEBUG) << "\t \t \t rcvBufSize = " << r.second.get<int>("rcvBufSize", channel.GetRcvBufSize());
MQLOG(DEBUG) << "\t \t \t rateLogging = " << r.second.get<int>("rateLogging", channel.fRateLogging); MQLOG(DEBUG) << "\t \t \t rateLogging = " << r.second.get<int>("rateLogging", channel.GetRateLogging());
channel.fType = r.second.get<std::string>("type", channel.fType); channel.UpdateType(r.second.get<std::string>("type", channel.GetType()));
channel.fMethod = r.second.get<std::string>("method", channel.fMethod); channel.UpdateMethod(r.second.get<std::string>("method", channel.GetMethod()));
channel.fAddress = r.second.get<std::string>("address", channel.fAddress); channel.UpdateAddress(r.second.get<std::string>("address", channel.GetAddress()));
channel.fSndBufSize = r.second.get<int>("sndBufSize", channel.fSndBufSize); // int channel.UpdateSndBufSize(r.second.get<int>("sndBufSize", channel.GetSndBufSize())); // int
channel.fRcvBufSize = r.second.get<int>("rcvBufSize", channel.fRcvBufSize); // int channel.UpdateRcvBufSize(r.second.get<int>("rcvBufSize", channel.GetRcvBufSize())); // int
channel.fRateLogging = r.second.get<int>("rateLogging", channel.fRateLogging); // int channel.UpdateRateLogging(r.second.get<int>("rateLogging", channel.GetRateLogging())); // int
channelList.push_back(channel); channelList.push_back(channel);
}// end socket loop }// end socket loop

View File

@ -160,16 +160,16 @@ int main(int argc, char** argv)
buffer.SetTransport(transportFactory); buffer.SetTransport(transportFactory);
FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress);
inputChannel.fSndBufSize = options.inputBufSize; inputChannel.UpdateSndBufSize(options.inputBufSize);
inputChannel.fRcvBufSize = options.inputBufSize; inputChannel.UpdateRcvBufSize(options.inputBufSize);
inputChannel.fRateLogging = 1; inputChannel.UpdateRateLogging(1);
buffer.fChannels["data-in"].push_back(inputChannel); buffer.fChannels["data-in"].push_back(inputChannel);
FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress);
outputChannel.fSndBufSize = options.outputBufSize; outputChannel.UpdateSndBufSize(options.outputBufSize);
outputChannel.fRcvBufSize = options.outputBufSize; outputChannel.UpdateRcvBufSize(options.outputBufSize);
outputChannel.fRateLogging = 1; outputChannel.UpdateRateLogging(1);
buffer.fChannels["data-out"].push_back(outputChannel); buffer.fChannels["data-out"].push_back(outputChannel);

View File

@ -167,17 +167,17 @@ int main(int argc, char** argv)
for (int i = 0; i < options.inputAddress.size(); ++i) for (int i = 0; i < options.inputAddress.size(); ++i)
{ {
FairMQChannel inputChannel(options.inputSocketType.at(i), options.inputMethod.at(i), options.inputAddress.at(i)); FairMQChannel inputChannel(options.inputSocketType.at(i), options.inputMethod.at(i), options.inputAddress.at(i));
inputChannel.fSndBufSize = options.inputBufSize.at(i); inputChannel.UpdateSndBufSize(options.inputBufSize.at(i));
inputChannel.fRcvBufSize = options.inputBufSize.at(i); inputChannel.UpdateRcvBufSize(options.inputBufSize.at(i));
inputChannel.fRateLogging = 1; inputChannel.UpdateRateLogging(1);
merger.fChannels["data-in"].push_back(inputChannel); merger.fChannels["data-in"].push_back(inputChannel);
} }
FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress);
outputChannel.fSndBufSize = options.outputBufSize; outputChannel.UpdateSndBufSize(options.outputBufSize);
outputChannel.fRcvBufSize = options.outputBufSize; outputChannel.UpdateRcvBufSize(options.outputBufSize);
outputChannel.fRateLogging = 1; outputChannel.UpdateRateLogging(1);
merger.fChannels["data-out"].push_back(outputChannel); merger.fChannels["data-out"].push_back(outputChannel);

View File

@ -160,16 +160,16 @@ int main(int argc, char** argv)
proxy.SetTransport(transportFactory); proxy.SetTransport(transportFactory);
FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress);
inputChannel.fSndBufSize = options.inputBufSize; inputChannel.UpdateSndBufSize(options.inputBufSize);
inputChannel.fRcvBufSize = options.inputBufSize; inputChannel.UpdateRcvBufSize(options.inputBufSize);
inputChannel.fRateLogging = 1; inputChannel.UpdateRateLogging(1);
proxy.fChannels["data-in"].push_back(inputChannel); proxy.fChannels["data-in"].push_back(inputChannel);
FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress);
outputChannel.fSndBufSize = options.outputBufSize; outputChannel.UpdateSndBufSize(options.outputBufSize);
outputChannel.fRcvBufSize = options.outputBufSize; outputChannel.UpdateRcvBufSize(options.outputBufSize);
outputChannel.fRateLogging = 1; outputChannel.UpdateRateLogging(1);
proxy.fChannels["data-out"].push_back(outputChannel); proxy.fChannels["data-out"].push_back(outputChannel);

View File

@ -166,18 +166,18 @@ int main(int argc, char** argv)
splitter.SetTransport(transportFactory); splitter.SetTransport(transportFactory);
FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress);
inputChannel.fSndBufSize = options.inputBufSize; inputChannel.UpdateSndBufSize(options.inputBufSize);
inputChannel.fRcvBufSize = options.inputBufSize; inputChannel.UpdateRcvBufSize(options.inputBufSize);
inputChannel.fRateLogging = 1; inputChannel.UpdateRateLogging(1);
splitter.fChannels["data-in"].push_back(inputChannel); splitter.fChannels["data-in"].push_back(inputChannel);
for (int i = 0; i < options.outputAddress.size(); ++i) for (int i = 0; i < options.outputAddress.size(); ++i)
{ {
FairMQChannel outputChannel(options.outputSocketType.at(i), options.outputMethod.at(i), options.outputAddress.at(i)); FairMQChannel outputChannel(options.outputSocketType.at(i), options.outputMethod.at(i), options.outputAddress.at(i));
outputChannel.fSndBufSize = options.outputBufSize.at(i); outputChannel.UpdateSndBufSize(options.outputBufSize.at(i));
outputChannel.fRcvBufSize = options.outputBufSize.at(i); outputChannel.UpdateRcvBufSize(options.outputBufSize.at(i));
outputChannel.fRateLogging = 1; outputChannel.UpdateRateLogging(1);
splitter.fChannels["data-out"].push_back(outputChannel); splitter.fChannels["data-out"].push_back(outputChannel);
} }