mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
add 1) generic MQ-devices (Sampler, Processor, and FileSink) in fairmq, 2) policy classes in base/MQ and 3) a Tutorial7 in example
This commit is contained in:
committed by
Florian Uhlig
parent
eb33bc8280
commit
3e424354e7
82
fairmq/devices/GenericFileSink.tpl
Normal file
82
fairmq/devices/GenericFileSink.tpl
Normal file
@@ -0,0 +1,82 @@
|
||||
/*
|
||||
* File: GenericFileSink.tpl
|
||||
* Author: winckler
|
||||
*
|
||||
* Created on October 7, 2014, 7:21 PM
|
||||
*/
|
||||
|
||||
template <typename InputPolicy, typename OutputPolicy>
|
||||
GenericFileSink<InputPolicy, OutputPolicy>::GenericFileSink() :
|
||||
InputPolicy(),
|
||||
OutputPolicy()
|
||||
{
|
||||
}
|
||||
|
||||
template <typename InputPolicy, typename OutputPolicy>
|
||||
GenericFileSink<InputPolicy, OutputPolicy>::~GenericFileSink()
|
||||
{
|
||||
}
|
||||
|
||||
template <typename InputPolicy, typename OutputPolicy>
|
||||
void GenericFileSink<InputPolicy, OutputPolicy>::SetTransport(FairMQTransportFactory* transport)
|
||||
{
|
||||
FairMQDevice::SetTransport(transport);
|
||||
//InputPolicy::SetTransport(transport);
|
||||
}
|
||||
|
||||
|
||||
template <typename InputPolicy, typename OutputPolicy>
|
||||
void GenericFileSink<InputPolicy, OutputPolicy>::Init()
|
||||
{
|
||||
FairMQDevice::Init();
|
||||
InitOutputFile();
|
||||
//InputPolicy::Init();
|
||||
//OutputPolicy::Init();
|
||||
}
|
||||
|
||||
template <typename InputPolicy, typename OutputPolicy>
|
||||
void GenericFileSink<InputPolicy, OutputPolicy>::InitOutputFile()
|
||||
{
|
||||
OutputPolicy::InitOutFile();
|
||||
}
|
||||
|
||||
template <typename InputPolicy, typename OutputPolicy>
|
||||
void GenericFileSink<InputPolicy, OutputPolicy>::Run()
|
||||
{
|
||||
MQLOG(INFO) << ">>>>>>> Run <<<<<<<";
|
||||
|
||||
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
||||
|
||||
int received = 0;
|
||||
int receivedMsg = 0;
|
||||
|
||||
while (fState == RUNNING)
|
||||
{
|
||||
FairMQMessage* msg = fTransportFactory->CreateMessage();
|
||||
received = fPayloadInputs->at(0)->Receive(msg);
|
||||
if(received>0)
|
||||
{
|
||||
AddToFile(message(msg));
|
||||
receivedMsg++;
|
||||
}
|
||||
delete msg;
|
||||
}
|
||||
|
||||
MQLOG(INFO) << "Received " << receivedMsg << " messages!";
|
||||
try
|
||||
{
|
||||
rateLogger.interrupt();
|
||||
rateLogger.join();
|
||||
}
|
||||
catch(boost::thread_resource_error& e)
|
||||
{
|
||||
MQLOG(ERROR) << e.what();
|
||||
}
|
||||
|
||||
FairMQDevice::Shutdown();
|
||||
|
||||
// notify parent thread about end of processing.
|
||||
boost::lock_guard<boost::mutex> lock(fRunningMutex);
|
||||
fRunningFinished = true;
|
||||
fRunningCondition.notify_one();
|
||||
}
|
Reference in New Issue
Block a user