diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 1b359aa2..ff027244 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -130,14 +130,14 @@ endif(NANOMSG_FOUND) # to copy src that are header-only files (e.g. c++ template) for FairRoot external installation # manual install (globbing add not recommended) -Set(FairMQHDRFiles +Set(FAIRMQHEADERS devices/GenericSampler.h devices/GenericSampler.tpl devices/GenericProcessor.h devices/GenericFileSink.h tools/FairMQTools.h ) -install(FILES ${FairMQHDRFiles} DESTINATION include) +install(FILES ${FAIRMQHEADERS} DESTINATION include) set(DEPENDENCIES ${DEPENDENCIES} diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 7344931b..6e56cb94 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -19,6 +19,8 @@ using namespace std; +boost::mutex FairMQChannel::channelMutex; + FairMQChannel::FairMQChannel() : fType("unspecified") , fMethod("unspecified") @@ -28,6 +30,7 @@ FairMQChannel::FairMQChannel() , fRateLogging(1) , fSocket() , fChannelName("") + , fIsValid(false) { } @@ -40,14 +43,97 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str , fRateLogging(1) , fSocket() , fChannelName("") + , fIsValid(false) { } +std::string FairMQChannel::GetType() +{ + boost::unique_lock scoped_lock(channelMutex); + return fType; +} + +std::string FairMQChannel::GetMethod() +{ + boost::unique_lock scoped_lock(channelMutex); + return fMethod; +} + +std::string FairMQChannel::GetAddress() +{ + boost::unique_lock scoped_lock(channelMutex); + return fAddress; +} + +int FairMQChannel::GetSndBufSize() +{ + boost::unique_lock scoped_lock(channelMutex); + return fSndBufSize; +} + +int FairMQChannel::GetRcvBufSize() +{ + boost::unique_lock scoped_lock(channelMutex); + return fRcvBufSize; +} + +int FairMQChannel::GetRateLogging() +{ + boost::unique_lock scoped_lock(channelMutex); + return fRateLogging; +} + +void FairMQChannel::UpdateType(const std::string& type) +{ + boost::unique_lock scoped_lock(channelMutex); + fType = type; +} +void FairMQChannel::UpdateMethod(const std::string& method) +{ + boost::unique_lock scoped_lock(channelMutex); + fMethod = method; +} +void FairMQChannel::UpdateAddress(const std::string& address) +{ + boost::unique_lock scoped_lock(channelMutex); + fAddress = address; +} +void FairMQChannel::UpdateSndBufSize(const int sndBufSize) +{ + boost::unique_lock scoped_lock(channelMutex); + fSndBufSize = sndBufSize; +} +void FairMQChannel::UpdateRcvBufSize(const int rcvBufSize) +{ + boost::unique_lock scoped_lock(channelMutex); + fRcvBufSize = rcvBufSize; +} +void FairMQChannel::UpdateRateLogging(const int rateLogging) +{ + boost::unique_lock scoped_lock(channelMutex); + fRateLogging = rateLogging; +} + +bool FairMQChannel::IsValid() +{ + boost::unique_lock scoped_lock(channelMutex); + return fIsValid; +} + bool FairMQChannel::ValidateChannel() { + boost::unique_lock scoped_lock(channelMutex); + stringstream ss; ss << "Validating channel " << fChannelName << "... "; + if (fIsValid) + { + ss << "ALREADY VALID"; + LOG(DEBUG) << ss.str(); + return true; + } + const string socketTypeNames[] = { "sub", "pub", "pull", "push", "req", "rep", "xsub", "xpub", "dealer", "router", "pair" }; const set socketTypes(socketTypeNames, socketTypeNames + sizeof(socketTypeNames) / sizeof(string)); if (socketTypes.find(fType) == socketTypes.end()) @@ -92,11 +178,17 @@ bool FairMQChannel::ValidateChannel() return false; } + fIsValid = true; ss << "VALID"; LOG(DEBUG) << ss.str(); return true; } +void FairMQChannel::ResetChannel() +{ + // TODO: implement resetting +} + int FairMQChannel::Send(FairMQMessage* msg, const string& flag) { return fSocket->Send(msg, flag); diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index c1e788da..e63cbee4 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -17,6 +17,8 @@ #include +#include + #include "FairMQSocket.h" class FairMQChannel @@ -28,14 +30,35 @@ class FairMQChannel FairMQChannel(const std::string& type, const std::string& method, const std::string& address); virtual ~FairMQChannel(); + std::string GetType(); + std::string GetMethod(); + std::string GetAddress(); + int GetSndBufSize(); + int GetRcvBufSize(); + int GetRateLogging(); + + void UpdateType(const std::string& type); + void UpdateMethod(const std::string& method); + void UpdateAddress(const std::string& address); + void UpdateSndBufSize(const int sndBufSize); + void UpdateRcvBufSize(const int rcvBufSize); + void UpdateRateLogging(const int rateLogging); + + bool IsValid(); + bool ValidateChannel(); + void ResetChannel(); + + FairMQSocket* fSocket; + // Wrappers for the socket methods to simplify the usage of channels int Send(FairMQMessage* msg, const std::string& flag = ""); int Send(FairMQMessage* msg, const int flags); int Receive(FairMQMessage* msg, const std::string& flag = ""); int Receive(FairMQMessage* msg, const int flags); + private: std::string fType; std::string fMethod; std::string fAddress; @@ -43,10 +66,14 @@ class FairMQChannel int fRcvBufSize; int fRateLogging; - FairMQSocket* fSocket; - - private: std::string fChannelName; + bool fIsValid; + + // use static mutex to make the class easily copyable + // implication: same mutex is used for all instances of the class + // this does not hurt much, because mutex is used only during initialization with very low contention + // possible TODO: improve this + static boost::mutex channelMutex; }; #endif /* FAIRMQCHANNEL_H_ */ \ No newline at end of file diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 885cef6c..f67e019b 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -185,7 +185,7 @@ void FairMQDevice::InitTask() { } -bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs) +bool FairMQDevice::SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs) { return lhs.fAddress < rhs.fAddress; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 5e82544b..18751a60 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -54,6 +54,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable virtual void SetTransport(FairMQTransportFactory* factory); + static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); + virtual ~FairMQDevice(); std::map< std::string,std::vector > fChannels; diff --git a/fairmq/examples/req-rep/runExampleClient.cxx b/fairmq/examples/req-rep/runExampleClient.cxx index 5eda0879..de29cf43 100644 --- a/fairmq/examples/req-rep/runExampleClient.cxx +++ b/fairmq/examples/req-rep/runExampleClient.cxx @@ -116,9 +116,9 @@ int main(int argc, char** argv) client.SetProperty(FairMQExampleClient::NumIoThreads, 1); FairMQChannel requestChannel("req", "connect", "tcp://localhost:5005"); - requestChannel.fSndBufSize = 10000; - requestChannel.fRcvBufSize = 10000; - requestChannel.fRateLogging = 1; + requestChannel.UpdateSndBufSize(10000); + requestChannel.UpdateRcvBufSize(10000); + requestChannel.UpdateRateLogging(1); client.fChannels["data"].push_back(requestChannel); diff --git a/fairmq/examples/req-rep/runExampleServer.cxx b/fairmq/examples/req-rep/runExampleServer.cxx index 6801e007..d4f868ab 100644 --- a/fairmq/examples/req-rep/runExampleServer.cxx +++ b/fairmq/examples/req-rep/runExampleServer.cxx @@ -66,9 +66,9 @@ int main(int argc, char** argv) server.SetProperty(FairMQExampleServer::NumIoThreads, 1); FairMQChannel replyChannel("rep", "bind", "tcp://*:5005"); - replyChannel.fSndBufSize = 10000; - replyChannel.fRcvBufSize = 10000; - replyChannel.fRateLogging = 1; + replyChannel.UpdateSndBufSize(10000); + replyChannel.UpdateRcvBufSize(10000); + replyChannel.UpdateRateLogging(1); server.fChannels["data"].push_back(replyChannel); diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index 80e232c6..329d8a5f 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -104,7 +104,7 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& channelKey = q.second.get(".name"); } - if (formatFlag=="json") + if (formatFlag == "json") { channelKey = q.second.get("name"); } @@ -135,19 +135,19 @@ FairMQMap ptreeToMQMap(const boost::property_tree::ptree& pt, const std::string& socket << "\t \t [node = " << r.first << "] socket index = " << socketCounter; MQLOG(DEBUG) << socket.str(); - MQLOG(DEBUG) << "\t \t \t type = " << r.second.get("type", channel.fType); - MQLOG(DEBUG) << "\t \t \t method = " << r.second.get("method", channel.fMethod); - MQLOG(DEBUG) << "\t \t \t address = " << r.second.get("address", channel.fAddress); - MQLOG(DEBUG) << "\t \t \t sndBufSize = " << r.second.get("sndBufSize", channel.fSndBufSize); - MQLOG(DEBUG) << "\t \t \t rcvBufSize = " << r.second.get("rcvBufSize", channel.fRcvBufSize); - MQLOG(DEBUG) << "\t \t \t rateLogging = " << r.second.get("rateLogging", channel.fRateLogging); + MQLOG(DEBUG) << "\t \t \t type = " << r.second.get("type", channel.GetType()); + MQLOG(DEBUG) << "\t \t \t method = " << r.second.get("method", channel.GetMethod()); + MQLOG(DEBUG) << "\t \t \t address = " << r.second.get("address", channel.GetAddress()); + MQLOG(DEBUG) << "\t \t \t sndBufSize = " << r.second.get("sndBufSize", channel.GetSndBufSize()); + MQLOG(DEBUG) << "\t \t \t rcvBufSize = " << r.second.get("rcvBufSize", channel.GetRcvBufSize()); + MQLOG(DEBUG) << "\t \t \t rateLogging = " << r.second.get("rateLogging", channel.GetRateLogging()); - channel.fType = r.second.get("type", channel.fType); - channel.fMethod = r.second.get("method", channel.fMethod); - channel.fAddress = r.second.get("address", channel.fAddress); - channel.fSndBufSize = r.second.get("sndBufSize", channel.fSndBufSize); // int - channel.fRcvBufSize = r.second.get("rcvBufSize", channel.fRcvBufSize); // int - channel.fRateLogging = r.second.get("rateLogging", channel.fRateLogging); // int + channel.UpdateType(r.second.get("type", channel.GetType())); + channel.UpdateMethod(r.second.get("method", channel.GetMethod())); + channel.UpdateAddress(r.second.get("address", channel.GetAddress())); + channel.UpdateSndBufSize(r.second.get("sndBufSize", channel.GetSndBufSize())); // int + channel.UpdateRcvBufSize(r.second.get("rcvBufSize", channel.GetRcvBufSize())); // int + channel.UpdateRateLogging(r.second.get("rateLogging", channel.GetRateLogging())); // int channelList.push_back(channel); }// end socket loop diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 3926a2e6..11b3ac25 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -52,7 +52,7 @@ int FairMQProgOptions::ParseAll(const int argc, char** argv, bool AllowUnregiste } fVisible_options.add_options() - ("device-id", po::value< std::string >()->required(), "Device ID (required value)"); + ("device-id", po::value< std::string >()->required(), "Device ID (required value)"); // parse command line if (ParseCmdLine(argc,argv,fCmdline_options,fvarmap,AllowUnregistered)) diff --git a/fairmq/run/runBuffer.cxx b/fairmq/run/runBuffer.cxx index 45994373..bb828a8f 100644 --- a/fairmq/run/runBuffer.cxx +++ b/fairmq/run/runBuffer.cxx @@ -160,16 +160,16 @@ int main(int argc, char** argv) buffer.SetTransport(transportFactory); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.fSndBufSize = options.inputBufSize; - inputChannel.fRcvBufSize = options.inputBufSize; - inputChannel.fRateLogging = 1; + inputChannel.UpdateSndBufSize(options.inputBufSize); + inputChannel.UpdateRcvBufSize(options.inputBufSize); + inputChannel.UpdateRateLogging(1); buffer.fChannels["data-in"].push_back(inputChannel); FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); - outputChannel.fSndBufSize = options.outputBufSize; - outputChannel.fRcvBufSize = options.outputBufSize; - outputChannel.fRateLogging = 1; + outputChannel.UpdateSndBufSize(options.outputBufSize); + outputChannel.UpdateRcvBufSize(options.outputBufSize); + outputChannel.UpdateRateLogging(1); buffer.fChannels["data-out"].push_back(outputChannel); diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index 5178a513..ffa2cc7e 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -167,17 +167,17 @@ int main(int argc, char** argv) for (int i = 0; i < options.inputAddress.size(); ++i) { FairMQChannel inputChannel(options.inputSocketType.at(i), options.inputMethod.at(i), options.inputAddress.at(i)); - inputChannel.fSndBufSize = options.inputBufSize.at(i); - inputChannel.fRcvBufSize = options.inputBufSize.at(i); - inputChannel.fRateLogging = 1; + inputChannel.UpdateSndBufSize(options.inputBufSize.at(i)); + inputChannel.UpdateRcvBufSize(options.inputBufSize.at(i)); + inputChannel.UpdateRateLogging(1); merger.fChannels["data-in"].push_back(inputChannel); } FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); - outputChannel.fSndBufSize = options.outputBufSize; - outputChannel.fRcvBufSize = options.outputBufSize; - outputChannel.fRateLogging = 1; + outputChannel.UpdateSndBufSize(options.outputBufSize); + outputChannel.UpdateRcvBufSize(options.outputBufSize); + outputChannel.UpdateRateLogging(1); merger.fChannels["data-out"].push_back(outputChannel); diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index 25864045..16a73e92 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -160,16 +160,16 @@ int main(int argc, char** argv) proxy.SetTransport(transportFactory); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.fSndBufSize = options.inputBufSize; - inputChannel.fRcvBufSize = options.inputBufSize; - inputChannel.fRateLogging = 1; + inputChannel.UpdateSndBufSize(options.inputBufSize); + inputChannel.UpdateRcvBufSize(options.inputBufSize); + inputChannel.UpdateRateLogging(1); proxy.fChannels["data-in"].push_back(inputChannel); FairMQChannel outputChannel(options.outputSocketType, options.outputMethod, options.outputAddress); - outputChannel.fSndBufSize = options.outputBufSize; - outputChannel.fRcvBufSize = options.outputBufSize; - outputChannel.fRateLogging = 1; + outputChannel.UpdateSndBufSize(options.outputBufSize); + outputChannel.UpdateRcvBufSize(options.outputBufSize); + outputChannel.UpdateRateLogging(1); proxy.fChannels["data-out"].push_back(outputChannel); diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index bbea6d01..57395bb8 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -166,18 +166,18 @@ int main(int argc, char** argv) splitter.SetTransport(transportFactory); FairMQChannel inputChannel(options.inputSocketType, options.inputMethod, options.inputAddress); - inputChannel.fSndBufSize = options.inputBufSize; - inputChannel.fRcvBufSize = options.inputBufSize; - inputChannel.fRateLogging = 1; + inputChannel.UpdateSndBufSize(options.inputBufSize); + inputChannel.UpdateRcvBufSize(options.inputBufSize); + inputChannel.UpdateRateLogging(1); splitter.fChannels["data-in"].push_back(inputChannel); for (int i = 0; i < options.outputAddress.size(); ++i) { FairMQChannel outputChannel(options.outputSocketType.at(i), options.outputMethod.at(i), options.outputAddress.at(i)); - outputChannel.fSndBufSize = options.outputBufSize.at(i); - outputChannel.fRcvBufSize = options.outputBufSize.at(i); - outputChannel.fRateLogging = 1; + outputChannel.UpdateSndBufSize(options.outputBufSize.at(i)); + outputChannel.UpdateRcvBufSize(options.outputBufSize.at(i)); + outputChannel.UpdateRateLogging(1); splitter.fChannels["data-out"].push_back(outputChannel); }