- Remove GenericSampler, GenericProcessor, and CRTP policy base classes

- Rename GenericFileSink to BaseMQFileSink, and move it to base/MQ/hosts directory
- Rename and clean files in the serialization examples
This commit is contained in:
winckler 2016-06-28 15:45:33 +02:00
parent 75aad5676d
commit c57bbf58fa
10 changed files with 0 additions and 863 deletions

View File

@ -1,54 +0,0 @@
/*
* File: BaseDeserializationPolicy.h
* Author: winckler
*
* Created on October 14, 2015, 1:01 PM
*/
#ifndef BASEDESERIALIZATIONPOLICY_H
#define BASEDESERIALIZATIONPOLICY_H
#include "FairMQMessage.h"
// c++11 code
#include <type_traits>
// CRTP base class
template <typename TDerived >
class BaseDeserializationPolicy
{
public:
BaseDeserializationPolicy() {}
virtual ~BaseDeserializationPolicy() {}
template<typename C = TDerived>
auto DeserializeMsg(FairMQMessage* msg)-> decltype(static_cast<C*>(this)->DeserializeMsg(msg))
{
static_assert(std::is_same<C, TDerived>{}, "BaseDeserializationPolicy::DeserializeMsg hack broken");
return static_cast<TDerived*>(this)->DeserializeMsg(msg);
}
};
/*
// c++14 code
// CRTP base class
template <typename TDerived >
class BaseDeserializationPolicy
{
public:
BaseDeserializationPolicy()
{}
virtual ~BaseDeserializationPolicy()
{}
auto DeSerializeMsg(FairMQMessage* msg)
{
return static_cast<TDerived*>(this)->DeSerializeMsg(msg);
}
};*/
#endif /* BASEDESERIALIZATIONPOLICY_H */

View File

@ -1,65 +0,0 @@
/*
* File: BaseProcessorTaskPolicy.h
* Author: winckler
*
* Created on October 14, 2015, 1:01 PM
*/
#ifndef BASEPROCESSORTASKPOLICY_H
#define BASEPROCESSORTASKPOLICY_H
#include <type_traits>
// CRTP base class
template <typename TDerived >
class BaseProcessorTaskPolicy
{
public:
BaseProcessorTaskPolicy() {}
virtual ~BaseProcessorTaskPolicy() {}
template<typename C = TDerived>
auto GetOutputData() -> decltype(static_cast<C*>(this)->GetOutputData())
{
static_assert(std::is_same<C, TDerived>{}, "BaseProcessorTaskPolicy::GetOutputData hack broken");
return static_cast<TDerived*>(this)->GetOutputData();
}
template<typename CONTAINER_TYPE, typename C = TDerived>
auto ExecuteTask(CONTAINER_TYPE container) -> decltype( static_cast<C*>(this)->ExecuteTask(container))
{
static_assert(std::is_same<C, TDerived>{}, "BaseProcessorTaskPolicy::ExecuteTask hack broken");
return static_cast<TDerived*>(this)->ExecuteTask(container);
}
};
/*
// c++14 code only
// CRTP base class
template <typename TDerived >
class BaseProcessorTaskPolicy
{
public:
BaseProcessorTaskPolicy()
{}
virtual ~BaseProcessorTaskPolicy()
{}
auto GetOutputData()
{
return static_cast<TDerived*>(this)->GetOutputData();
}
template<typename CONTAINER_TYPE>
auto ExecuteTask(CONTAINER_TYPE container)
{
return static_cast<TDerived*>(this)->ExecuteTask(container);
}
};
*/
#endif /* BASEPROCESSORTASKPOLICY_H */

View File

@ -1,65 +0,0 @@
/*
* File: BaseSerializationPolicy.h
* Author: winckler
*
* Created on October 14, 2015, 1:01 PM
*/
#ifndef BASESERIALIZATIONPOLICY_H
#define BASESERIALIZATIONPOLICY_H
#include "FairMQMessage.h"
#include <type_traits>
// CRTP base class
template <typename TDerived >
class BaseSerializationPolicy
{
public:
BaseSerializationPolicy() {}
virtual ~BaseSerializationPolicy() {}
template<typename CONTAINER_TYPE, typename C = TDerived>
auto SerializeMsg(CONTAINER_TYPE container) -> decltype(static_cast<C*>(this)->SerializeMsg(container) )
{
static_assert(std::is_same<C, TDerived>{}, "BaseSerializationPolicy::SerializeMsg hack broken");
return static_cast<TDerived*>(this)->SerializeMsg(container);
}
template<typename C = TDerived>
auto SetMessage(FairMQMessage* msg)-> decltype(static_cast<C*>(this)->SetMessage(msg) )
{
static_assert(std::is_same<C, TDerived>{}, "BaseSerializationPolicy::SetMessage hack broken");
return static_cast<TDerived*>(this)->SetMessage(msg);
}
};
/*
// CRTP base class
// c++14 code
template <typename TDerived >
class BaseSerializationPolicy
{
public:
BaseSerializationPolicy()
{}
virtual ~BaseSerializationPolicy()
{}
template<typename CONTAINER_TYPE>
auto SerializeMsg(CONTAINER_TYPE container)
{
return static_cast<TDerived*>(this)->SerializeMsg(container);
}
auto SetMessage(FairMQMessage* msg)
{
return static_cast<TDerived*>(this)->SetMessage(msg);
}
};
*/
#endif /* BASESERIALIZATIONPOLICY_H */

View File

@ -1,37 +0,0 @@
/*
* File: BaseSinkPolicy.h
* Author: winckler
*
* Created on October 14, 2015, 1:01 PM
*/
#ifndef BASESINKPOLICY_H
#define BASESINKPOLICY_H
#include <type_traits>
// CRTP base class
template <typename TDerived >
class BaseSinkPolicy
{
public:
BaseSinkPolicy() {}
virtual ~BaseSinkPolicy() {}
template<typename CONTAINER_TYPE, typename C = TDerived>
auto AddToFile(CONTAINER_TYPE container) -> decltype(static_cast<C*>(this)->AddToFile(container) )
{
static_assert(std::is_same<C, TDerived>{}, "BaseSinkPolicy::AddToFile hack broken");
return static_cast<TDerived*>(this)->AddToFile(container);
}
template<typename C = TDerived>
auto InitOutputFile() -> decltype(static_cast<C*>(this)->InitOutputFile() )
{
static_assert(std::is_same<C, TDerived>{}, "BaseSinkPolicy::InitOutputFile hack broken");
return static_cast<TDerived*>(this)->InitOutputFile();
}
};
#endif /* BASESINKPOLICY_H */

View File

@ -1,89 +0,0 @@
/*
* File: BaseSourcePolicy.h
* Author: winckler
*
* Created on October 14, 2015, 1:01 PM
*/
#ifndef BASESOURCEPOLICY_H
#define BASESOURCEPOLICY_H
#include <type_traits>
// c++11 code
// CRTP base class
template <typename TDerived >
class BaseSourcePolicy
{
public:
BaseSourcePolicy() {}
virtual ~BaseSourcePolicy() {}
template<typename C = TDerived>
auto InitSource()-> decltype(static_cast<C*>(this)->InitSource() )
{
static_assert(std::is_same<C, TDerived>{}, "BaseSourcePolicy::InitSource hack broken");
return static_cast<TDerived*>(this)->InitSource();
}
template<typename C = TDerived>
int64_t GetNumberOfEvent()//-> decltype(static_cast<C*>(this)->GetNumberOfEvent() )
{
static_assert(std::is_same<C, TDerived>{}, "BaseSourcePolicy::GetNumberOfEvent hack broken");
return static_cast<TDerived*>(this)->GetNumberOfEvent();
}
template<typename C = TDerived>
auto SetIndex(int64_t eventIdx)-> decltype(static_cast<C*>(this)->SetIndex(eventIdx) )
{
static_assert(std::is_same<C, TDerived>{}, "BaseSourcePolicy::SetIndex hack broken");
return static_cast<TDerived*>(this)->SetIndex(eventIdx);
}
template<typename C = TDerived>
//auto GetOutData()-> decltype(static_cast<C*>(this)->GetOutData() )
decltype(std::declval<C*>()->GetOutData() ) GetOutData()
{
static_assert(std::is_same<C, TDerived>{}, "BaseSourcePolicy::GetOutData hack broken");
return static_cast<TDerived*>(this)->GetOutData();
}
};
/*
// c++14 code
// CRTP base class
template <typename TDerived >
class BaseSourcePolicy
{
public:
BaseSourcePolicy()
{}
virtual ~BaseSourcePolicy()
{}
auto InitSource()
{
return static_cast<TDerived*>(this)->InitSource();
}
int64_t GetNumberOfEvent()
{
return static_cast<TDerived*>(this)->GetNumberOfEvent();
}
auto SetIndex(int64_t eventIdx)
{
return static_cast<TDerived*>(this)->SetIndex(int64_t eventIdx);
}
auto GetOutData()
{
return static_cast<TDerived*>(this)->GetOutData();
}
};
*/
#endif /* BASESOURCEPOLICY_H */

View File

@ -1,91 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: GenericFileSink.h
* Author: winckler
*
* Created on October 7, 2014, 6:06 PM
*/
#ifndef GENERICFILESINK_H
#define GENERICFILESINK_H
#include "FairMQDevice.h"
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQLogger.h"
/*********************************************************************
* -------------- NOTES -----------------------
* All policies must have a default constructor
* Function to define in (parent) policy classes :
*
* -------- INPUT POLICY --------
* deserialization_type::InitContainer(...)
* CONTAINER_TYPE deserialization_type::DeserializeMsg(FairMQMessage* msg)
*
*
* -------- OUTPUT POLICY --------
* sink_type::AddToFile(CONTAINER_TYPE);
* sink_type::InitOutputFile()
**********************************************************************/
#include "FairMQDevice.h"
template <typename T, typename U>
class GenericFileSink : public FairMQDevice, public T, public U
{
public:
typedef T input_policy;
typedef U sink_type;
GenericFileSink()
: FairMQDevice(), input_policy(), sink_type()
{}
virtual ~GenericFileSink()
{}
template<typename... Args>
void InitInputData(Args&&... args)
{
input_policy::Create(std::forward<Args>(args)...);
}
protected:
using input_policy::fInput;
virtual void InitTask()
{
sink_type::InitOutputFile();
}
typedef typename input_policy::deserialization_type deserializer_type;
virtual void Run()
{
int receivedMsg = 0;
while (CheckCurrentState(RUNNING))
{
std::unique_ptr<FairMQMessage> msg(NewMessage());
if (Receive(msg,"data-in") > 0)
{
Deserialize<deserializer_type>(*msg,fInput);
U::Serialize(fInput);// add fInput to file
receivedMsg++;
}
}
LOG(INFO) << "Received " << receivedMsg << " messages!";
}
};
#endif /* GENERICFILESINK_H */

View File

@ -1,75 +0,0 @@
/*
* File: GenericMerger.h
* Author: winckler
*
* Created on April 9, 2015, 1:37 PM
*/
#ifndef GENERICMERGER_H
#define GENERICMERGER_H
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "FairMQDevice.h"
#include "FairMQLogger.h"
#include "FairMQPoller.h"
template <typename MergerPolicy, typename InputPolicy, typename OutputPolicy>
class GenericMerger : public FairMQDevice, public MergerPolicy, public InputPolicy, public OutputPolicy
{
public:
GenericMerger()
: fBlockingTime(100)
{}
virtual ~GenericMerger()
{}
template <typename... Args>
void SetTransport(Args... args)
{
FairMQDevice::SetTransport(std::forward<Args>(args)...);
}
protected:
int fBlockingTime;
virtual void Run()
{
std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels["data-in"]));
int received = 0;
while (GetCurrentState() == RUNNING)
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
// MergerPolicy::
poller->Poll(fBlockingTime);
for (int i = 0; i < fChannels.at("data-in").size(); i++)
{
if (poller->CheckInput(i))
{
received = fChannels.at("data-in").at(i).Receive(msg)
if (received > 0)
{
MergerPolicy::Merge(InputPolicy::DeserializeMsg(msg));
}
}
OutputPolicy::SetMessage(msg);
if (received > 0 && MergerPolicy::ReadyToSend())
{
fChannels["data-out"].at(0).Send(OutputPolicy::SerializeMsg(MergerPolicy::GetOutputData()));
received = 0;
}
}
}
}
};
#endif /* GENERICMERGER_H */

View File

@ -1,98 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: GenericProcessor.h
* Author: winckler
*
* Created on December 1, 2014, 10:22 PM
*/
#ifndef GENERICPROCESSOR_H
#define GENERICPROCESSOR_H
#include "FairMQDevice.h"
template < typename T/*=deserialization type*/,
typename U/*=serialization type*/,
typename V/*=task type*///,
//typename W/*=input creator type*/,
//typename X/*=output creator type*/
>
class GenericProcessor : public FairMQDevice, public T, public U,
public V//,
//public W,
//public X
{
public:
typedef T input_policy;
typedef U output_policy;
typedef V task_type;
//typedef W input_creator_type;
//typedef X output_creator_type;
GenericProcessor()
: FairMQDevice(), T(), U()
, task_type()
//, input_creator_type()
//, output_creator_type()
{}
virtual ~GenericProcessor()
{}
template<typename... Args>
void InitInputData(Args&&... args)
{
input_policy::Create(std::forward<Args>(args)...);
}
template<typename... Args>
void InitOutputData(Args&&... args)
{
output_policy::Create(std::forward<Args>(args)...);
}
protected:
using input_policy::fInput;
using output_policy::fOutput;
virtual void InitTask()
{
}
virtual void Run()
{
int receivedMsgs = 0;
int sentMsgs = 0;
while (CheckCurrentState(RUNNING))
{
std::unique_ptr<FairMQMessage> msg(NewMessage());
if (Receive(fInput, "data-in") > 0)
{
Deserialize<T>(*msg,fInput);
receivedMsgs++;
task_type::Exec(fInput,fOutput);
Serialize<U>(*msg,fOutput);
Send(fOutput, "data-out");
sentMsgs++;
}
}
LOG(INFO) << "Received " << receivedMsgs << " and sent " << sentMsgs << " messages!";
}
};
#endif /* GENERICPROCESSOR_H */

View File

@ -1,112 +0,0 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/*
* File: GenericSampler.h
* Author: winckler
*
* Created on November 24, 2014, 3:30 PM
*/
#ifndef GENERICSAMPLER_H
#define GENERICSAMPLER_H
#include <vector>
#include <iostream>
#include <functional>
#include <stdint.h>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/timer/timer.hpp>
#include "FairMQDevice.h"
#include "FairMQLogger.h"
#include "FairMQTools.h"
/* GENERIC SAMPLER (data source) MQ-DEVICE */
/********************************************************************* */
template<typename T, typename U>
using enable_if_match = typename std::enable_if<std::is_same<T,U>::value,int>::type;
struct DefaultSamplerRun {};
template < typename T,
typename U,
typename R=DefaultSamplerRun
>
class base_GenericSampler : public FairMQDevice, public T, public U
{
public:
typedef T input_policy;// sampler source
typedef U output_policy;// deserialization
typedef R run_type;
base_GenericSampler() : FairMQDevice(), fOutChanName("data-out"), T(), U()
{}
virtual ~base_GenericSampler()
{}
template <typename... Args>
void SetFileProperties(Args&&... args)
{
input_policy::SetFileProperties(std::forward<Args>(args)...);
}
protected:
using input_policy::fInput;
virtual void Init()
{
input_policy::InitSource();
}
template <typename RUN = run_type, enable_if_match<RUN,DefaultSamplerRun> = 0>
inline void Run_impl()
{
int64_t sentMsgs(0);
int64_t numEvents = input_policy::GetNumberOfEvent();
LOG(INFO) << "Number of events to process: " << numEvents;
boost::timer::auto_cpu_timer timer;
for (int64_t idx(0); idx < numEvents; idx++)
{
std::unique_ptr<FairMQMessage> msg(NewMessage());
T::Deserialize(idx);
Serialize<U>(msg,fInput);
Send(msg, fOutChanName);
sentMsgs++;
if (!CheckCurrentState(RUNNING))
break;
}
boost::timer::cpu_times const elapsed_time(timer.elapsed());
LOG(INFO) << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2);
LOG(INFO) << "Sent " << sentMsgs << " messages!";
}
virtual void Run()
{
Run_impl();
}
private:
std::string fOutChanName;
};
void SendHeader(int /*socketIdx*/) {}
#endif /* GENERICSAMPLER_H */

View File

@ -1,177 +0,0 @@
/*
* File: GenericSampler.tpl
* Author: winckler
*
* Created on November 24, 2014, 3:59 PM
*/
template <typename T, typename U, typename K, typename L>
base_GenericSampler<T,U,K,L>::base_GenericSampler()
: fOutChanName("data-out")
, fNumEvents(0)
, fCurrentIdx(0)
, fEventRate(1)
, fEventCounter(0)
, fContinuous(false)
, fTaskList()
{
}
template <typename T, typename U, typename K, typename L>
base_GenericSampler<T,U,K,L>::~base_GenericSampler()
{
}
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::InitTask()
{
BindingSendPart();
BindingGetSocketNumber();
BindingGetCurrentIndex();
source_type::InitSource();
fNumEvents = source_type::GetNumberOfEvent();
}
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::Run()
{
// boost::thread resetEventCounter(boost::bind(&GenericSampler::ResetEventCounter, this));
int sentMsgs = 0;
boost::timer::auto_cpu_timer timer;
LOG(INFO) << "Number of events to process: " << fNumEvents;
do
{
for (fCurrentIdx = 0; fCurrentIdx < fNumEvents; fCurrentIdx++)
{
for (auto& p : fChannels.at(fOutChanName))
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
serialization_type::SetMessage(msg.get());
source_type::SetIndex(fCurrentIdx);
ExecuteTasks();
p.Send(serialization_type::SerializeMsg(source_type::GetOutData()));
sentMsgs++;
// Optional event rate limiting
// --fEventCounter;
// while (fEventCounter == 0) {
// boost::this_thread::sleep(boost::posix_time::milliseconds(1));
// }
if (!CheckCurrentState(RUNNING))
{
break;
}
}
if (!CheckCurrentState(RUNNING))
{
break;
}
}
}
while (CheckCurrentState(RUNNING) && fContinuous);
boost::timer::cpu_times const elapsed_time(timer.elapsed());
LOG(INFO) << "Sent everything in:\n" << boost::timer::format(elapsed_time, 2);
LOG(INFO) << "Sent " << sentMsgs << " messages!";
}
template <typename T, typename U, typename K, typename L>
int base_GenericSampler<T,U,K,L>::GetSocketNumber() const
{
return fChannels.at(fOutChanName).size();
}
template <typename T, typename U, typename K, typename L>
int base_GenericSampler<T,U,K,L>::GetCurrentIndex() const
{
return fCurrentIdx;
}
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SetContinuous(bool flag)
{
fContinuous = flag;
}
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::ResetEventCounter()
{
while (true)
{
try
{
fEventCounter = fEventRate / 100;
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
}
catch (boost::thread_interrupted &)
{
LOG(DEBUG) << "resetEventCounter interrupted";
break;
}
}
LOG(DEBUG) << ">>>>>>> stopping resetEventCounter <<<<<<<";
}
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SetProperty(const int key, const int value)
{
switch (key)
{
case EventRate:
fEventRate = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
template <typename T, typename U, typename K, typename L>
int base_GenericSampler<T,U,K,L>::GetProperty(const int key, const int default_/*= 0*/)
{
switch (key)
{
case EventRate:
return fEventRate;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
template <typename T, typename U, typename K, typename L>
void base_GenericSampler<T,U,K,L>::SetProperty(const int key, const std::string& value)
{
switch (key)
{
case OutChannelName:
fOutChanName = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
template <typename T, typename U, typename K, typename L>
std::string base_GenericSampler<T,U,K,L>::GetProperty(const int key, const std::string& default_)
{
switch (key)
{
case OutChannelName:
return fOutChanName;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
template<typename T, typename U>
using GenericSampler = base_GenericSampler<T,U,int,std::function<void()>>;
typedef std::map<int, std::function<void()>> SamplerTasksMap;