mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
Organize sockets as a map of vectors of FairMQChannels. Update FairMQStateMachine by removing SETTINGINPUT, SETTINGOUTPUT, BIND and CONNECT states and by adding INITIALIZING_TASK, RESETTING_TASK and RESETTING_DEVICE states. Run states functions in their own thread.
60 lines
1.5 KiB
Smarty
60 lines
1.5 KiB
Smarty
/*
|
|
* File: GenericFileSink.tpl
|
|
* Author: winckler
|
|
*
|
|
* Created on October 7, 2014, 7:21 PM
|
|
*/
|
|
|
|
template <typename InputPolicy, typename OutputPolicy>
|
|
GenericFileSink<InputPolicy, OutputPolicy>::GenericFileSink()
|
|
: InputPolicy()
|
|
, OutputPolicy()
|
|
{
|
|
}
|
|
|
|
template <typename InputPolicy, typename OutputPolicy>
|
|
GenericFileSink<InputPolicy, OutputPolicy>::~GenericFileSink()
|
|
{
|
|
}
|
|
|
|
template <typename InputPolicy, typename OutputPolicy>
|
|
void GenericFileSink<InputPolicy, OutputPolicy>::SetTransport(FairMQTransportFactory* transport)
|
|
{
|
|
FairMQDevice::SetTransport(transport);
|
|
// InputPolicy::SetTransport(transport);
|
|
}
|
|
|
|
|
|
template <typename InputPolicy, typename OutputPolicy>
|
|
void GenericFileSink<InputPolicy, OutputPolicy>::InitTask()
|
|
{
|
|
InitOutputFile();
|
|
// InputPolicy::Init();
|
|
// OutputPolicy::Init();
|
|
}
|
|
|
|
template <typename InputPolicy, typename OutputPolicy>
|
|
void GenericFileSink<InputPolicy, OutputPolicy>::InitOutputFile()
|
|
{
|
|
OutputPolicy::InitOutFile();
|
|
}
|
|
|
|
template <typename InputPolicy, typename OutputPolicy>
|
|
void GenericFileSink<InputPolicy, OutputPolicy>::Run()
|
|
{
|
|
int receivedMsg = 0;
|
|
|
|
while (GetCurrentState() == RUNNING)
|
|
{
|
|
FairMQMessage* msg = fTransportFactory->CreateMessage();
|
|
if (fChannels["data-in"].at(0).Receive(msg) > 0)
|
|
{
|
|
OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg));
|
|
receivedMsg++;
|
|
}
|
|
delete msg;
|
|
}
|
|
|
|
MQLOG(INFO) << "Received " << receivedMsg << " messages!";
|
|
}
|