From c57bbf58fa454d8015086b838850308a7492dea9 Mon Sep 17 00:00:00 2001 From: winckler Date: Tue, 28 Jun 2016 15:45:33 +0200 Subject: [PATCH] - Remove GenericSampler, GenericProcessor, and CRTP policy base classes - Rename GenericFileSink to BaseMQFileSink, and move it to base/MQ/hosts directory - Rename and clean files in the serialization examples --- fairmq/devices/BaseDeserializationPolicy.h | 54 ------- fairmq/devices/BaseProcessorTaskPolicy.h | 65 -------- fairmq/devices/BaseSerializationPolicy.h | 65 -------- fairmq/devices/BaseSinkPolicy.h | 37 ----- fairmq/devices/BaseSourcePolicy.h | 89 ----------- fairmq/devices/GenericFileSink.h | 91 ----------- fairmq/devices/GenericMerger.h | 75 --------- fairmq/devices/GenericProcessor.h | 98 ------------ fairmq/devices/GenericSampler.h | 112 ------------- fairmq/devices/GenericSampler.tpl | 177 --------------------- 10 files changed, 863 deletions(-) delete mode 100644 fairmq/devices/BaseDeserializationPolicy.h delete mode 100644 fairmq/devices/BaseProcessorTaskPolicy.h delete mode 100644 fairmq/devices/BaseSerializationPolicy.h delete mode 100644 fairmq/devices/BaseSinkPolicy.h delete mode 100644 fairmq/devices/BaseSourcePolicy.h delete mode 100644 fairmq/devices/GenericFileSink.h delete mode 100644 fairmq/devices/GenericMerger.h delete mode 100644 fairmq/devices/GenericProcessor.h delete mode 100644 fairmq/devices/GenericSampler.h delete mode 100644 fairmq/devices/GenericSampler.tpl diff --git a/fairmq/devices/BaseDeserializationPolicy.h b/fairmq/devices/BaseDeserializationPolicy.h deleted file mode 100644 index 74efc26c..00000000 --- a/fairmq/devices/BaseDeserializationPolicy.h +++ /dev/null @@ -1,54 +0,0 @@ -/* - * File: BaseDeserializationPolicy.h - * Author: winckler - * - * Created on October 14, 2015, 1:01 PM - */ - -#ifndef BASEDESERIALIZATIONPOLICY_H -#define BASEDESERIALIZATIONPOLICY_H - -#include "FairMQMessage.h" - -// c++11 code -#include - -// CRTP base class -template -class BaseDeserializationPolicy -{ - public: - BaseDeserializationPolicy() {} - - virtual ~BaseDeserializationPolicy() {} - - template - auto DeserializeMsg(FairMQMessage* msg)-> decltype(static_cast(this)->DeserializeMsg(msg)) - { - static_assert(std::is_same{}, "BaseDeserializationPolicy::DeserializeMsg hack broken"); - return static_cast(this)->DeserializeMsg(msg); - } -}; - - -/* -// c++14 code -// CRTP base class -template -class BaseDeserializationPolicy -{ -public: - BaseDeserializationPolicy() - {} - - virtual ~BaseDeserializationPolicy() - {} - - auto DeSerializeMsg(FairMQMessage* msg) - { - return static_cast(this)->DeSerializeMsg(msg); - } - -};*/ - -#endif /* BASEDESERIALIZATIONPOLICY_H */ diff --git a/fairmq/devices/BaseProcessorTaskPolicy.h b/fairmq/devices/BaseProcessorTaskPolicy.h deleted file mode 100644 index 150fef38..00000000 --- a/fairmq/devices/BaseProcessorTaskPolicy.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * File: BaseProcessorTaskPolicy.h - * Author: winckler - * - * Created on October 14, 2015, 1:01 PM - */ - -#ifndef BASEPROCESSORTASKPOLICY_H -#define BASEPROCESSORTASKPOLICY_H - - -#include -// CRTP base class -template -class BaseProcessorTaskPolicy -{ - public: - BaseProcessorTaskPolicy() {} - - virtual ~BaseProcessorTaskPolicy() {} - - template - auto GetOutputData() -> decltype(static_cast(this)->GetOutputData()) - { - static_assert(std::is_same{}, "BaseProcessorTaskPolicy::GetOutputData hack broken"); - return static_cast(this)->GetOutputData(); - } - - template - auto ExecuteTask(CONTAINER_TYPE container) -> decltype( static_cast(this)->ExecuteTask(container)) - { - static_assert(std::is_same{}, "BaseProcessorTaskPolicy::ExecuteTask hack broken"); - return static_cast(this)->ExecuteTask(container); - } -}; - -/* - -// c++14 code only -// CRTP base class -template -class BaseProcessorTaskPolicy -{ -public: - BaseProcessorTaskPolicy() - {} - - virtual ~BaseProcessorTaskPolicy() - {} - - auto GetOutputData() - { - return static_cast(this)->GetOutputData(); - } - - template - auto ExecuteTask(CONTAINER_TYPE container) - { - return static_cast(this)->ExecuteTask(container); - } - -}; -*/ - -#endif /* BASEPROCESSORTASKPOLICY_H */ diff --git a/fairmq/devices/BaseSerializationPolicy.h b/fairmq/devices/BaseSerializationPolicy.h deleted file mode 100644 index a6a10a70..00000000 --- a/fairmq/devices/BaseSerializationPolicy.h +++ /dev/null @@ -1,65 +0,0 @@ -/* - * File: BaseSerializationPolicy.h - * Author: winckler - * - * Created on October 14, 2015, 1:01 PM - */ - -#ifndef BASESERIALIZATIONPOLICY_H -#define BASESERIALIZATIONPOLICY_H - -#include "FairMQMessage.h" - -#include -// CRTP base class -template -class BaseSerializationPolicy -{ - public: - BaseSerializationPolicy() {} - - virtual ~BaseSerializationPolicy() {} - - template - auto SerializeMsg(CONTAINER_TYPE container) -> decltype(static_cast(this)->SerializeMsg(container) ) - { - static_assert(std::is_same{}, "BaseSerializationPolicy::SerializeMsg hack broken"); - return static_cast(this)->SerializeMsg(container); - } - - template - auto SetMessage(FairMQMessage* msg)-> decltype(static_cast(this)->SetMessage(msg) ) - { - static_assert(std::is_same{}, "BaseSerializationPolicy::SetMessage hack broken"); - return static_cast(this)->SetMessage(msg); - } -}; - -/* -// CRTP base class -// c++14 code -template -class BaseSerializationPolicy -{ -public: - BaseSerializationPolicy() - {} - - virtual ~BaseSerializationPolicy() - {} - - template - auto SerializeMsg(CONTAINER_TYPE container) - { - return static_cast(this)->SerializeMsg(container); - } - - auto SetMessage(FairMQMessage* msg) - { - return static_cast(this)->SetMessage(msg); - } - -}; -*/ - -#endif /* BASESERIALIZATIONPOLICY_H */ diff --git a/fairmq/devices/BaseSinkPolicy.h b/fairmq/devices/BaseSinkPolicy.h deleted file mode 100644 index 99e65885..00000000 --- a/fairmq/devices/BaseSinkPolicy.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * File: BaseSinkPolicy.h - * Author: winckler - * - * Created on October 14, 2015, 1:01 PM - */ - -#ifndef BASESINKPOLICY_H -#define BASESINKPOLICY_H - -#include - -// CRTP base class -template -class BaseSinkPolicy -{ - public: - BaseSinkPolicy() {} - - virtual ~BaseSinkPolicy() {} - - template - auto AddToFile(CONTAINER_TYPE container) -> decltype(static_cast(this)->AddToFile(container) ) - { - static_assert(std::is_same{}, "BaseSinkPolicy::AddToFile hack broken"); - return static_cast(this)->AddToFile(container); - } - - template - auto InitOutputFile() -> decltype(static_cast(this)->InitOutputFile() ) - { - static_assert(std::is_same{}, "BaseSinkPolicy::InitOutputFile hack broken"); - return static_cast(this)->InitOutputFile(); - } -}; - -#endif /* BASESINKPOLICY_H */ diff --git a/fairmq/devices/BaseSourcePolicy.h b/fairmq/devices/BaseSourcePolicy.h deleted file mode 100644 index f79c8f61..00000000 --- a/fairmq/devices/BaseSourcePolicy.h +++ /dev/null @@ -1,89 +0,0 @@ -/* - * File: BaseSourcePolicy.h - * Author: winckler - * - * Created on October 14, 2015, 1:01 PM - */ - -#ifndef BASESOURCEPOLICY_H -#define BASESOURCEPOLICY_H - -#include - -// c++11 code -// CRTP base class -template -class BaseSourcePolicy -{ - public: - BaseSourcePolicy() {} - - virtual ~BaseSourcePolicy() {} - - template - auto InitSource()-> decltype(static_cast(this)->InitSource() ) - { - static_assert(std::is_same{}, "BaseSourcePolicy::InitSource hack broken"); - return static_cast(this)->InitSource(); - } - - template - int64_t GetNumberOfEvent()//-> decltype(static_cast(this)->GetNumberOfEvent() ) - { - static_assert(std::is_same{}, "BaseSourcePolicy::GetNumberOfEvent hack broken"); - return static_cast(this)->GetNumberOfEvent(); - } - - template - auto SetIndex(int64_t eventIdx)-> decltype(static_cast(this)->SetIndex(eventIdx) ) - { - static_assert(std::is_same{}, "BaseSourcePolicy::SetIndex hack broken"); - return static_cast(this)->SetIndex(eventIdx); - } - - template - //auto GetOutData()-> decltype(static_cast(this)->GetOutData() ) - decltype(std::declval()->GetOutData() ) GetOutData() - { - static_assert(std::is_same{}, "BaseSourcePolicy::GetOutData hack broken"); - return static_cast(this)->GetOutData(); - } -}; - -/* -// c++14 code -// CRTP base class -template -class BaseSourcePolicy -{ -public: - BaseSourcePolicy() - {} - - virtual ~BaseSourcePolicy() - {} - - auto InitSource() - { - return static_cast(this)->InitSource(); - } - - int64_t GetNumberOfEvent() - { - return static_cast(this)->GetNumberOfEvent(); - } - - auto SetIndex(int64_t eventIdx) - { - return static_cast(this)->SetIndex(int64_t eventIdx); - } - - auto GetOutData() - { - return static_cast(this)->GetOutData(); - } - -}; -*/ - -#endif /* BASESOURCEPOLICY_H */ diff --git a/fairmq/devices/GenericFileSink.h b/fairmq/devices/GenericFileSink.h deleted file mode 100644 index a87ad007..00000000 --- a/fairmq/devices/GenericFileSink.h +++ /dev/null @@ -1,91 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -/* - * File: GenericFileSink.h - * Author: winckler - * - * Created on October 7, 2014, 6:06 PM - */ - -#ifndef GENERICFILESINK_H -#define GENERICFILESINK_H - -#include "FairMQDevice.h" -#include -#include -#include "FairMQLogger.h" - -/********************************************************************* - * -------------- NOTES ----------------------- - * All policies must have a default constructor - * Function to define in (parent) policy classes : - * - * -------- INPUT POLICY -------- - * deserialization_type::InitContainer(...) - * CONTAINER_TYPE deserialization_type::DeserializeMsg(FairMQMessage* msg) - * - * - * -------- OUTPUT POLICY -------- - * sink_type::AddToFile(CONTAINER_TYPE); - * sink_type::InitOutputFile() - **********************************************************************/ - -#include "FairMQDevice.h" - -template -class GenericFileSink : public FairMQDevice, public T, public U -{ - public: - typedef T input_policy; - typedef U sink_type; - GenericFileSink() - : FairMQDevice(), input_policy(), sink_type() - {} - - virtual ~GenericFileSink() - {} - - - template - void InitInputData(Args&&... args) - { - input_policy::Create(std::forward(args)...); - } - - protected: - - - using input_policy::fInput; - - - virtual void InitTask() - { - sink_type::InitOutputFile(); - } - - typedef typename input_policy::deserialization_type deserializer_type; - virtual void Run() - { - int receivedMsg = 0; - while (CheckCurrentState(RUNNING)) - { - std::unique_ptr msg(NewMessage()); - if (Receive(msg,"data-in") > 0) - { - Deserialize(*msg,fInput); - U::Serialize(fInput);// add fInput to file - receivedMsg++; - } - } - - LOG(INFO) << "Received " << receivedMsg << " messages!"; - } -}; - -#endif /* GENERICFILESINK_H */ diff --git a/fairmq/devices/GenericMerger.h b/fairmq/devices/GenericMerger.h deleted file mode 100644 index ba06d631..00000000 --- a/fairmq/devices/GenericMerger.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * File: GenericMerger.h - * Author: winckler - * - * Created on April 9, 2015, 1:37 PM - */ - -#ifndef GENERICMERGER_H -#define GENERICMERGER_H - - -#include -#include - -#include "FairMQDevice.h" -#include "FairMQLogger.h" -#include "FairMQPoller.h" - - -template -class GenericMerger : public FairMQDevice, public MergerPolicy, public InputPolicy, public OutputPolicy -{ - public: - GenericMerger() - : fBlockingTime(100) - {} - - virtual ~GenericMerger() - {} - - template - void SetTransport(Args... args) - { - FairMQDevice::SetTransport(std::forward(args)...); - } - - protected: - int fBlockingTime; - - virtual void Run() - { - std::unique_ptr poller(fTransportFactory->CreatePoller(fChannels["data-in"])); - - int received = 0; - - while (GetCurrentState() == RUNNING) - { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - // MergerPolicy:: - poller->Poll(fBlockingTime); - - for (int i = 0; i < fChannels.at("data-in").size(); i++) - { - if (poller->CheckInput(i)) - { - received = fChannels.at("data-in").at(i).Receive(msg) - if (received > 0) - { - MergerPolicy::Merge(InputPolicy::DeserializeMsg(msg)); - } - } - - OutputPolicy::SetMessage(msg); - - if (received > 0 && MergerPolicy::ReadyToSend()) - { - fChannels["data-out"].at(0).Send(OutputPolicy::SerializeMsg(MergerPolicy::GetOutputData())); - received = 0; - } - } - } - } -}; - -#endif /* GENERICMERGER_H */ diff --git a/fairmq/devices/GenericProcessor.h b/fairmq/devices/GenericProcessor.h deleted file mode 100644 index 29313e60..00000000 --- a/fairmq/devices/GenericProcessor.h +++ /dev/null @@ -1,98 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/* - * File: GenericProcessor.h - * Author: winckler - * - * Created on December 1, 2014, 10:22 PM - */ - -#ifndef GENERICPROCESSOR_H -#define GENERICPROCESSOR_H - -#include "FairMQDevice.h" - -template < typename T/*=deserialization type*/, - typename U/*=serialization type*/, - typename V/*=task type*///, - //typename W/*=input creator type*/, - //typename X/*=output creator type*/ - > -class GenericProcessor : public FairMQDevice, public T, public U, - public V//, - //public W, - //public X -{ - public: - typedef T input_policy; - typedef U output_policy; - typedef V task_type; - - //typedef W input_creator_type; - //typedef X output_creator_type; - - - - GenericProcessor() - : FairMQDevice(), T(), U() - , task_type() - //, input_creator_type() - //, output_creator_type() - {} - - virtual ~GenericProcessor() - {} - - template - void InitInputData(Args&&... args) - { - input_policy::Create(std::forward(args)...); - } - - template - void InitOutputData(Args&&... args) - { - output_policy::Create(std::forward(args)...); - } - - protected: - using input_policy::fInput; - using output_policy::fOutput; - - virtual void InitTask() - { - } - - virtual void Run() - { - int receivedMsgs = 0; - int sentMsgs = 0; - - while (CheckCurrentState(RUNNING)) - { - std::unique_ptr msg(NewMessage()); - if (Receive(fInput, "data-in") > 0) - { - Deserialize(*msg,fInput); - receivedMsgs++; - task_type::Exec(fInput,fOutput); - - Serialize(*msg,fOutput); - Send(fOutput, "data-out"); - sentMsgs++; - } - } - LOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!"; - } -}; - - - - -#endif /* GENERICPROCESSOR_H */ - diff --git a/fairmq/devices/GenericSampler.h b/fairmq/devices/GenericSampler.h deleted file mode 100644 index 8068ce56..00000000 --- a/fairmq/devices/GenericSampler.h +++ /dev/null @@ -1,112 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/* - * File: GenericSampler.h - * Author: winckler - * - * Created on November 24, 2014, 3:30 PM - */ - -#ifndef GENERICSAMPLER_H -#define GENERICSAMPLER_H - -#include -#include -#include -#include - -#include -#include -#include - -#include "FairMQDevice.h" -#include "FairMQLogger.h" -#include "FairMQTools.h" - -/* GENERIC SAMPLER (data source) MQ-DEVICE */ -/********************************************************************* */ - -template -using enable_if_match = typename std::enable_if::value,int>::type; - -struct DefaultSamplerRun {}; - -template < typename T, - typename U, - typename R=DefaultSamplerRun - > -class base_GenericSampler : public FairMQDevice, public T, public U -{ - public: - typedef T input_policy;// sampler source - typedef U output_policy;// deserialization - typedef R run_type; - - base_GenericSampler() : FairMQDevice(), fOutChanName("data-out"), T(), U() - {} - - virtual ~base_GenericSampler() - {} - - template - void SetFileProperties(Args&&... args) - { - input_policy::SetFileProperties(std::forward(args)...); - } - - protected: - - - using input_policy::fInput; - virtual void Init() - { - input_policy::InitSource(); - - } - - template = 0> - inline void Run_impl() - { - int64_t sentMsgs(0); - int64_t numEvents = input_policy::GetNumberOfEvent(); - LOG(INFO) << "Number of events to process: " << numEvents; - boost::timer::auto_cpu_timer timer; - for (int64_t idx(0); idx < numEvents; idx++) - { - std::unique_ptr msg(NewMessage()); - T::Deserialize(idx); - Serialize(msg,fInput); - Send(msg, fOutChanName); - sentMsgs++; - if (!CheckCurrentState(RUNNING)) - break; - - } - - boost::timer::cpu_times const elapsed_time(timer.elapsed()); - LOG(INFO) << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2); - LOG(INFO) << "Sent " << sentMsgs << " messages!"; - } - - virtual void Run() - { - Run_impl(); - } - - private: - std::string fOutChanName; - -}; - - - void SendHeader(int /*socketIdx*/) {} - - - - -#endif /* GENERICSAMPLER_H */ diff --git a/fairmq/devices/GenericSampler.tpl b/fairmq/devices/GenericSampler.tpl deleted file mode 100644 index 97f15fa1..00000000 --- a/fairmq/devices/GenericSampler.tpl +++ /dev/null @@ -1,177 +0,0 @@ -/* - * File: GenericSampler.tpl - * Author: winckler - * - * Created on November 24, 2014, 3:59 PM - */ - -template -base_GenericSampler::base_GenericSampler() - : fOutChanName("data-out") - , fNumEvents(0) - , fCurrentIdx(0) - , fEventRate(1) - , fEventCounter(0) - , fContinuous(false) - , fTaskList() -{ -} - -template -base_GenericSampler::~base_GenericSampler() -{ -} - -template -void base_GenericSampler::InitTask() -{ - BindingSendPart(); - BindingGetSocketNumber(); - BindingGetCurrentIndex(); - - source_type::InitSource(); - fNumEvents = source_type::GetNumberOfEvent(); -} - -template -void base_GenericSampler::Run() -{ - // boost::thread resetEventCounter(boost::bind(&GenericSampler::ResetEventCounter, this)); - - int sentMsgs = 0; - - boost::timer::auto_cpu_timer timer; - - LOG(INFO) << "Number of events to process: " << fNumEvents; - - do - { - for (fCurrentIdx = 0; fCurrentIdx < fNumEvents; fCurrentIdx++) - { - for (auto& p : fChannels.at(fOutChanName)) - { - std::unique_ptr msg(fTransportFactory->CreateMessage()); - serialization_type::SetMessage(msg.get()); - source_type::SetIndex(fCurrentIdx); - ExecuteTasks(); - p.Send(serialization_type::SerializeMsg(source_type::GetOutData())); - sentMsgs++; - - // Optional event rate limiting - // --fEventCounter; - // while (fEventCounter == 0) { - // boost::this_thread::sleep(boost::posix_time::milliseconds(1)); - // } - - if (!CheckCurrentState(RUNNING)) - { - break; - } - } - - if (!CheckCurrentState(RUNNING)) - { - break; - } - } - } - while (CheckCurrentState(RUNNING) && fContinuous); - - boost::timer::cpu_times const elapsed_time(timer.elapsed()); - LOG(INFO) << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2); - LOG(INFO) << "Sent " << sentMsgs << " messages!"; -} - - -template -int base_GenericSampler::GetSocketNumber() const -{ - return fChannels.at(fOutChanName).size(); -} - -template -int base_GenericSampler::GetCurrentIndex() const -{ - return fCurrentIdx; -} - -template -void base_GenericSampler::SetContinuous(bool flag) -{ - fContinuous = flag; -} - -template -void base_GenericSampler::ResetEventCounter() -{ - while (true) - { - try - { - fEventCounter = fEventRate / 100; - boost::this_thread::sleep(boost::posix_time::milliseconds(10)); - } - catch (boost::thread_interrupted &) - { - LOG(DEBUG) << "resetEventCounter interrupted"; - break; - } - } - LOG(DEBUG) << ">>>>>>> stopping resetEventCounter <<<<<<<"; -} - -template -void base_GenericSampler::SetProperty(const int key, const int value) -{ - switch (key) - { - case EventRate: - fEventRate = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -template -int base_GenericSampler::GetProperty(const int key, const int default_/*= 0*/) -{ - switch (key) - { - case EventRate: - return fEventRate; - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -template -void base_GenericSampler::SetProperty(const int key, const std::string& value) -{ - switch (key) - { - case OutChannelName: - fOutChanName = value; - break; - default: - FairMQDevice::SetProperty(key, value); - break; - } -} - -template -std::string base_GenericSampler::GetProperty(const int key, const std::string& default_) -{ - switch (key) - { - case OutChannelName: - return fOutChanName; - default: - return FairMQDevice::GetProperty(key, default_); - } -} - -template -using GenericSampler = base_GenericSampler>; -typedef std::map> SamplerTasksMap;