- FairMQ options:

a) move the XML parser into the FairMQ/options/FairMQParser.h
b) add a routine in FairMQProgOption to check whether the necessary XML or JSON input files are there, and send an error message if not there

- Policy based devices:
a) rename GenericSampler to base_GenericSampler and use an alias template named GenericSampler
b) in base_GenericSampler, rename template parameter to simple variables <T,U,… > and use typedef for clarity
c) introduce an anonymous function container in the base_GenericSampler host class with a register task template member function and an Executetasks()
d) add two new template parameters in base_GenericSampler for the anonymous function container map. parameter is K for the key type (default=int) and L for the value type (default=std::function<void()>)

- Tutorial7:
a) use FairMQProgOption to configure devices in tutorial7
b) introduce several template functions helper in tutorial7 to reduce code redundancy
c) show examples in tutorial7 of task registration with callback and lambda expression for the sampler devices
d) separate the executable build of the tutorial7 data generator to remove the Roofit banner when executing the MQdevices
This commit is contained in:
winckler
2015-07-02 12:13:56 +02:00
committed by Mohammad Al-Turany
parent 6ed9cc3da1
commit d1bba61939
12 changed files with 440 additions and 209 deletions

View File

@@ -5,72 +5,82 @@
* Created on November 24, 2014, 3:59 PM
*/
template <typename SamplerPolicy, typename OutputPolicy>
GenericSampler<SamplerPolicy,OutputPolicy>::GenericSampler()
: fNumEvents(0)
template <typename T, typename U, typename K, typename L>
base_GenericSampler<T,U,K,L>::base_GenericSampler()
: fOutChanName("data-out")
, fNumEvents(0)
, fCurrentIdx(0)
, fEventRate(1)
, fEventCounter(0)
, fContinuous(false)
, fInputFile()
, fParFile()
, fBranch()
{
}
template <typename SamplerPolicy, typename OutputPolicy>
GenericSampler<SamplerPolicy,OutputPolicy>::~GenericSampler()
template <typename T, typename U, typename K, typename L>
base_GenericSampler<T,U,K,L>::~base_GenericSampler()
{
}
template <typename SamplerPolicy, typename OutputPolicy>
void GenericSampler<SamplerPolicy,OutputPolicy>::SetTransport(FairMQTransportFactory* factory)
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SetTransport(FairMQTransportFactory* factory)
{
FairMQDevice::SetTransport(factory);
// OutputPolicy::SetTransport(factory);
}
template <typename SamplerPolicy, typename OutputPolicy>
void GenericSampler<SamplerPolicy,OutputPolicy>::InitTask()
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::InitTask()
{
SamplerPolicy::InitSampler();
fNumEvents = SamplerPolicy::GetNumberOfEvent();
BindingSendPart();
BindingGetSocketNumber();
BindingGetCurrentIndex();
source_type::InitSampler();
fNumEvents = source_type::GetNumberOfEvent();
}
template <typename SamplerPolicy, typename OutputPolicy>
void GenericSampler<SamplerPolicy,OutputPolicy>::Run()
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::Run()
{
// boost::thread resetEventCounter(boost::bind(&GenericSampler::ResetEventCounter, this));
int sentMsgs = 0;
boost::timer::auto_cpu_timer timer;
LOG(INFO) << "Number of events to process: " << fNumEvents;
do {
for (int64_t eventNr = 0; eventNr < fNumEvents; ++eventNr)
for (fCurrentIdx = 0; fCurrentIdx < fNumEvents; fCurrentIdx++)
{
//fSamplerTask->SetEventIndex(eventNr);
FairMQMessage* msg = fTransportFactory->CreateMessage();
OutputPolicy::SetMessage(msg);
fChannels["data-out"].at(0).Send(OutputPolicy::SerializeMsg(SamplerPolicy::GetDataBranch(eventNr)));
++sentMsgs;
if (msg)
for(auto& p : fChannels[fOutChanName])
{
msg->CloseMessage();
}
FairMQMessage* msg = fTransportFactory->CreateMessage();
serialization_type::SetMessage(msg);
source_type::SetIndex(fCurrentIdx);
ExecuteTasks();
p.Send(serialization_type::SerializeMsg(source_type::GetOutData()));
if (msg)
msg->CloseMessage();
sentMsgs++;
// Optional event rate limiting
// --fEventCounter;
// while (fEventCounter == 0) {
// boost::this_thread::sleep(boost::posix_time::milliseconds(1));
// }
if (GetCurrentState() != RUNNING)
{
break;
if(fChannels[fOutChanName].size()>1)
fCurrentIdx++;
// Optional event rate limiting
// --fEventCounter;
// while (fEventCounter == 0) {
// boost::this_thread::sleep(boost::posix_time::milliseconds(1));
// }
if (GetCurrentState() != RUNNING)
break;
}
// if more than one socket, remove the last incrementation
if(fChannels[fOutChanName].size()>1)
fCurrentIdx--;
}
}
while ( GetCurrentState() == RUNNING && fContinuous );
@@ -80,23 +90,43 @@ void GenericSampler<SamplerPolicy,OutputPolicy>::Run()
LOG(INFO) << "Sent " << sentMsgs << " messages!";
}
/*
template <typename SamplerPolicy, typename OutputPolicy>
void GenericSampler<SamplerPolicy,OutputPolicy>::SendPart()
{
fChannels["data-out"].at(0).Send(OutputPolicy::GetMessage(), "snd-more");
OutputPolicy::CloseMessage();
}
*/
template <typename SamplerPolicy, typename OutputPolicy>
void GenericSampler<SamplerPolicy,OutputPolicy>::SetContinuous(bool flag)
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SendPart(int socketIdx)
{
fCurrentIdx++;
if(fCurrentIdx<fNumEvents)
{
FairMQMessage* msg = fTransportFactory->CreateMessage();
serialization_type::SetMessage(msg);
source_type::SetIndex(fCurrentIdx);
fChannels[fOutChanName].at(socketIdx).Send(serialization_type::SerializeMsg(source_type::GetOutData()), "snd-more");
if (msg)
msg->CloseMessage();
}
}
template <typename T, typename U, typename K, typename L>
int base_GenericSampler<T,U,K,L>::GetSocketNumber() const
{
return fChannels.at(fOutChanName).size();
}
template <typename T, typename U, typename K, typename L>
int base_GenericSampler<T,U,K,L>::GetCurrentIndex() const
{
return fCurrentIdx;
}
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SetContinuous(bool flag)
{
fContinuous = flag;
}
template <typename SamplerPolicy, typename OutputPolicy>
void GenericSampler<SamplerPolicy,OutputPolicy>::ResetEventCounter()
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::ResetEventCounter()
{
while (GetCurrentState() == RUNNING)
{
@@ -114,44 +144,8 @@ void GenericSampler<SamplerPolicy,OutputPolicy>::ResetEventCounter()
LOG(DEBUG) << ">>>>>>> stopping resetEventCounter <<<<<<<";
}
template <typename SamplerPolicy, typename OutputPolicy>
void GenericSampler<SamplerPolicy,OutputPolicy>::SetProperty(const int key, const std::string& value)
{
switch (key)
{
case InputFile:
fInputFile = value;
break;
case ParFile:
fParFile = value;
break;
case Branch:
fBranch = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
template <typename SamplerPolicy, typename OutputPolicy>
std::string GenericSampler<SamplerPolicy,OutputPolicy>::GetProperty(const int key, const std::string& default_/*= ""*/)
{
switch (key)
{
case InputFile:
return fInputFile;
case ParFile:
return fParFile;
case Branch:
return fBranch;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
template <typename SamplerPolicy, typename OutputPolicy>
void GenericSampler<SamplerPolicy,OutputPolicy>::SetProperty(const int key, const int value)
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SetProperty(const int key, const int value)
{
switch (key)
{
@@ -164,8 +158,8 @@ void GenericSampler<SamplerPolicy,OutputPolicy>::SetProperty(const int key, cons
}
}
template <typename SamplerPolicy, typename OutputPolicy>
int GenericSampler<SamplerPolicy,OutputPolicy>::GetProperty(const int key, const int default_/*= 0*/)
template <typename T, typename U, typename K, typename L>
int base_GenericSampler<T,U,K,L>::GetProperty(const int key, const int default_/*= 0*/)
{
switch (key)
{
@@ -175,3 +169,35 @@ int GenericSampler<SamplerPolicy,OutputPolicy>::GetProperty(const int key, const
return FairMQDevice::GetProperty(key, default_);
}
}
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SetProperty(const int key, const std::string& value)
{
switch (key)
{
case OutChannelName:
fOutChanName = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
template <typename T, typename U, typename K, typename L>
std::string base_GenericSampler<T,U,K,L>::GetProperty(const int key, const std::string& default_)
{
switch (key)
{
case OutChannelName:
return fOutChanName;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
template<typename T, typename U>
using GenericSampler = base_GenericSampler<T,U,int,std::function<void()> >;
typedef std::map<int, std::function<void()> > SamplerTasksMap;