/* * File: GenericSampler.tpl * Author: winckler * * Created on November 24, 2014, 3:59 PM */ template base_GenericSampler::base_GenericSampler() : fOutChanName("data-out") , fNumEvents(0) , fCurrentIdx(0) , fEventRate(1) , fEventCounter(0) , fContinuous(false) { } template base_GenericSampler::~base_GenericSampler() { } template void base_GenericSampler::SetTransport(FairMQTransportFactory* factory) { FairMQDevice::SetTransport(factory); } template void base_GenericSampler::InitTask() { BindingSendPart(); BindingGetSocketNumber(); BindingGetCurrentIndex(); source_type::InitSource(); fNumEvents = source_type::GetNumberOfEvent(); } template void base_GenericSampler::Run() { // boost::thread resetEventCounter(boost::bind(&GenericSampler::ResetEventCounter, this)); int sentMsgs = 0; boost::timer::auto_cpu_timer timer; LOG(INFO) << "Number of events to process: " << fNumEvents; do { for (fCurrentIdx = 0; fCurrentIdx < fNumEvents; fCurrentIdx++) { for (auto& p : fChannels[fOutChanName]) { std::unique_ptr msg(fTransportFactory->CreateMessage()); serialization_type::SetMessage(msg.get()); source_type::SetIndex(fCurrentIdx); ExecuteTasks(); p.Send(serialization_type::SerializeMsg(source_type::GetOutData())); sentMsgs++; // Optional event rate limiting // --fEventCounter; // while (fEventCounter == 0) { // boost::this_thread::sleep(boost::posix_time::milliseconds(1)); // } if (!CheckCurrentState(RUNNING)) { break; } } if (!CheckCurrentState(RUNNING)) { break; } } } while (CheckCurrentState(RUNNING) && fContinuous); boost::timer::cpu_times const elapsed_time(timer.elapsed()); LOG(INFO) << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2); LOG(INFO) << "Sent " << sentMsgs << " messages!"; } template void base_GenericSampler::SendHeader(int socketIdx) { std::unique_ptr msg(fTransportFactory->CreateMessage()); serialization_type::SetMessage(msg.get()); fChannels.at(fOutChanName).at(socketIdx).Send(serialization_type::SerializeMsg(source_type::GetOutData()), "snd-more"); } template int base_GenericSampler::GetSocketNumber() const { return fChannels.at(fOutChanName).size(); } template int base_GenericSampler::GetCurrentIndex() const { return fCurrentIdx; } template void base_GenericSampler::SetContinuous(bool flag) { fContinuous = flag; } template void base_GenericSampler::ResetEventCounter() { while (true) { try { fEventCounter = fEventRate / 100; boost::this_thread::sleep(boost::posix_time::milliseconds(10)); } catch (boost::thread_interrupted &) { LOG(DEBUG) << "resetEventCounter interrupted"; break; } } LOG(DEBUG) << ">>>>>>> stopping resetEventCounter <<<<<<<"; } template void base_GenericSampler::SetProperty(const int key, const int value) { switch (key) { case EventRate: fEventRate = value; break; default: FairMQDevice::SetProperty(key, value); break; } } template int base_GenericSampler::GetProperty(const int key, const int default_/*= 0*/) { switch (key) { case EventRate: return fEventRate; default: return FairMQDevice::GetProperty(key, default_); } } template void base_GenericSampler::SetProperty(const int key, const std::string& value) { switch (key) { case OutChannelName: fOutChanName = value; break; default: FairMQDevice::SetProperty(key, value); break; } } template std::string base_GenericSampler::GetProperty(const int key, const std::string& default_) { switch (key) { case OutChannelName: return fOutChanName; default: return FairMQDevice::GetProperty(key, default_); } } template using GenericSampler = base_GenericSampler>; typedef std::map> SamplerTasksMap;