Simplify Tutorial3 structure.

Use same executable for bin/boost/protobuf/root data format,
configured now via `--data-format <binary/boost/protobuf/tmessage>`
command line parameter.
This commit is contained in:
Alexey Rybalchenko 2015-09-09 16:57:04 +02:00 committed by Mohammad Al-Turany
parent 8b71e4d20b
commit 5136c88d3a
3 changed files with 10 additions and 69 deletions

View File

@ -551,7 +551,10 @@ bool FairMQChannel::ExpectsAnotherPart() const
inline bool FairMQChannel::HandleUnblock() const inline bool FairMQChannel::HandleUnblock() const
{ {
FairMQMessage* cmd = fTransportFactory->CreateMessage(); FairMQMessage* cmd = fTransportFactory->CreateMessage();
fCmdSocket->Receive(cmd, 0); if (fCmdSocket->Receive(cmd, 0) >= 0)
{
LOG(DEBUG) << "unblocked";
}
delete cmd; delete cmd;
return true; return true;
} }

View File

@ -79,8 +79,8 @@ void FairMQDevice::SignalHandler(int signal)
MQLOG(INFO) << "Exiting."; MQLOG(INFO) << "Exiting.";
stop(); stop();
std::abort(); // std::abort();
// exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
void FairMQDevice::InitWrapper() void FairMQDevice::InitWrapper()
@ -573,10 +573,10 @@ void FairMQDevice::InteractiveStateLoop()
LOG(INFO) << "[h] help"; LOG(INFO) << "[h] help";
PrintInteractiveStateLoopHelp(); PrintInteractiveStateLoopHelp();
break; break;
case 'x': // case 'x':
LOG(INFO) << "[x] ERROR"; // LOG(INFO) << "[x] ERROR";
ChangeState("ERROR_FOUND"); // ChangeState("ERROR_FOUND");
break; // break;
case 'q': case 'q':
LOG(INFO) << "[q] end"; LOG(INFO) << "[q] end";
ChangeState("END"); ChangeState("END");

View File

@ -1,62 +0,0 @@
/*
* File: GenericFileSink.tpl
* Author: winckler
*
* Created on October 7, 2014, 7:21 PM
*/
template <typename InputPolicy, typename OutputPolicy>
GenericFileSink<InputPolicy, OutputPolicy>::GenericFileSink()
: InputPolicy()
, OutputPolicy()
{
}
template <typename InputPolicy, typename OutputPolicy>
GenericFileSink<InputPolicy, OutputPolicy>::~GenericFileSink()
{
}
template <typename InputPolicy, typename OutputPolicy>
void GenericFileSink<InputPolicy, OutputPolicy>::SetTransport(FairMQTransportFactory* transport)
{
FairMQDevice::SetTransport(transport);
// InputPolicy::SetTransport(transport);
}
template <typename InputPolicy, typename OutputPolicy>
void GenericFileSink<InputPolicy, OutputPolicy>::InitTask()
{
InitOutputFile();
// InputPolicy::Init();
// OutputPolicy::Init();
}
template <typename InputPolicy, typename OutputPolicy>
void GenericFileSink<InputPolicy, OutputPolicy>::InitOutputFile()
{
OutputPolicy::InitOutFile();
}
template <typename InputPolicy, typename OutputPolicy>
void GenericFileSink<InputPolicy, OutputPolicy>::Run()
{
int receivedMsg = 0;
// store the channel reference to avoid traversing the map on every loop iteration
const FairMQChannel& inputChannel = fChannels["data-in"].at(0);
while (CheckCurrentState(RUNNING))
{
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (inputChannel.Receive(msg) > 0)
{
OutputPolicy::AddToFile(InputPolicy::DeSerializeMsg(msg.get()));
receivedMsg++;
}
}
MQLOG(INFO) << "Received " << receivedMsg << " messages!";
}