Add FairMQ Example 6 - Working with multiple channels

This commit is contained in:
Alexey Rybalchenko 2015-10-07 15:50:10 +02:00
parent 0b11ad9274
commit 023d88d0ef
20 changed files with 776 additions and 21 deletions

View File

@ -17,6 +17,7 @@ If(DDS_FOUND)
EndIf(DDS_FOUND) EndIf(DDS_FOUND)
add_subdirectory(examples/4-copypush) add_subdirectory(examples/4-copypush)
add_subdirectory(examples/5-req-rep) add_subdirectory(examples/5-req-rep)
add_subdirectory(examples/6-multiple-channels)
add_subdirectory(test) add_subdirectory(test)

View File

@ -39,6 +39,8 @@ void FairMQMerger::Run()
dataInChannels.at(i) = &(fChannels.at("data-in").at(i)); dataInChannels.at(i) = &(fChannels.at("data-in").at(i));
} }
int numInputs = fChannels.at("data-in").size();
while (CheckCurrentState(RUNNING)) while (CheckCurrentState(RUNNING))
{ {
std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage()); std::unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
@ -46,7 +48,7 @@ void FairMQMerger::Run()
poller->Poll(100); poller->Poll(100);
// Loop over the data input channels. // Loop over the data input channels.
for (int i = 0; i < fChannels.at("data-in").size(); ++i) for (int i = 0; i < numInputs; ++i)
{ {
// Check if the channel has data ready to be received. // Check if the channel has data ready to be received.
if (poller->CheckInput(i)) if (poller->CheckInput(i))

View File

@ -14,8 +14,6 @@
#include <iostream> #include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQParser.h" #include "FairMQParser.h"
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
@ -27,8 +25,6 @@
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
#endif #endif
using namespace boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQExample1Sink sink; FairMQExample1Sink sink;

View File

@ -14,8 +14,6 @@
#include <iostream> #include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQParser.h" #include "FairMQParser.h"
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
@ -27,8 +25,6 @@
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
#endif #endif
using namespace boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQExample2Processor processor; FairMQExample2Processor processor;

View File

@ -14,8 +14,6 @@
#include <iostream> #include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQParser.h" #include "FairMQParser.h"
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
@ -27,8 +25,6 @@
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
#endif #endif
using namespace boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQExample2Sink sink; FairMQExample2Sink sink;

View File

@ -14,8 +14,6 @@
#include <iostream> #include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQParser.h" #include "FairMQParser.h"
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
@ -27,8 +25,6 @@
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
#endif #endif
using namespace boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQExample4Sampler sampler; FairMQExample4Sampler sampler;

View File

@ -14,8 +14,6 @@
#include <iostream> #include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQParser.h" #include "FairMQParser.h"
#include "FairMQProgOptions.h" #include "FairMQProgOptions.h"
@ -27,8 +25,6 @@
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
#endif #endif
using namespace boost::program_options;
int main(int argc, char** argv) int main(int argc, char** argv)
{ {
FairMQExample4Sink sink; FairMQExample4Sink sink;

View File

@ -0,0 +1,82 @@
################################################################################
# Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH #
# #
# This software is distributed under the terms of the #
# GNU Lesser General Public Licence version 3 (LGPL) version 3, #
# copied verbatim in the file "LICENSE" #
################################################################################
configure_file(${CMAKE_SOURCE_DIR}/fairmq/examples/6-multiple-channels/ex6-multiple-channels.json ${CMAKE_BINARY_DIR}/bin/config/ex6-multiple-channels.json)
Set(INCLUDE_DIRECTORIES
${CMAKE_SOURCE_DIR}/fairmq
${CMAKE_SOURCE_DIR}/fairmq/devices
${CMAKE_SOURCE_DIR}/fairmq/tools
${CMAKE_SOURCE_DIR}/fairmq/options
${CMAKE_SOURCE_DIR}/fairmq/examples/6-multiple-channels
${CMAKE_CURRENT_BINARY_DIR}
)
Set(SYSTEM_INCLUDE_DIRECTORIES
${Boost_INCLUDE_DIR}
)
If(NANOMSG_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/nanomsg
)
Else(NANOMSG_FOUND)
Set(INCLUDE_DIRECTORIES
${INCLUDE_DIRECTORIES}
${CMAKE_SOURCE_DIR}/fairmq/zeromq
)
EndIf(NANOMSG_FOUND)
Include_Directories(${INCLUDE_DIRECTORIES})
Include_Directories(SYSTEM ${SYSTEM_INCLUDE_DIRECTORIES})
Set(LINK_DIRECTORIES
${Boost_LIBRARY_DIRS}
)
Link_Directories(${LINK_DIRECTORIES})
Set(SRCS
"FairMQExample6Sampler.cxx"
"FairMQExample6Sink.cxx"
"FairMQExample6Broadcaster.cxx"
)
Set(DEPENDENCIES
${DEPENDENCIES}
FairMQ
)
Set(LIBRARY_NAME FairMQExample6)
GENERATE_LIBRARY()
Set(Exe_Names
ex6-sampler
ex6-sink
ex6-broadcaster
)
Set(Exe_Source
runExample6Sampler.cxx
runExample6Sink.cxx
runExample6Broadcaster.cxx
)
list(LENGTH Exe_Names _length)
math(EXPR _length ${_length}-1)
ForEach(_file RANGE 0 ${_length})
list(GET Exe_Names ${_file} _name)
list(GET Exe_Source ${_file} _src)
set(EXE_NAME ${_name})
set(SRCS ${_src})
set(DEPENDENCIES FairMQExample6)
GENERATE_EXECUTABLE()
EndForEach(_file RANGE 0 ${_length})

View File

@ -0,0 +1,49 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample6Broadcaster.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include <string>
#include <boost/thread.hpp>
#include "FairMQExample6Broadcaster.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample6Broadcaster::FairMQExample6Broadcaster()
{
}
void FairMQExample6Broadcaster::CustomCleanup(void *data, void *object)
{
delete (string*)object;
}
void FairMQExample6Broadcaster::Run()
{
while (CheckCurrentState(RUNNING))
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
string* text = new string("OK");
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
LOG(INFO) << "Sending \"" << "OK" << "\"";
fChannels.at("broadcast-out").at(0).Send(msg);
}
}
FairMQExample6Broadcaster::~FairMQExample6Broadcaster()
{
}

View File

@ -0,0 +1,32 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample6Broadcaster.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE6BROADCASTER_H_
#define FAIRMQEXAMPLE6BROADCASTER_H_
#include "FairMQDevice.h"
class FairMQExample6Broadcaster : public FairMQDevice
{
public:
FairMQExample6Broadcaster();
virtual ~FairMQExample6Broadcaster();
static void CustomCleanup(void* data, void* hint);
protected:
virtual void Run();
};
#endif /* FAIRMQEXAMPLE6BROADCASTER_H_ */

View File

@ -0,0 +1,116 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample6Sampler.cpp
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include <memory> // unique_ptr
#include <boost/thread.hpp>
#include "FairMQExample6Sampler.h"
#include "FairMQPoller.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample6Sampler::FairMQExample6Sampler()
: fText()
{
}
void FairMQExample6Sampler::CustomCleanup(void *data, void *object)
{
delete (string*)object;
}
void FairMQExample6Sampler::Run()
{
std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels, { "data-out", "broadcast-in" }));
while (CheckCurrentState(RUNNING))
{
poller->Poll(-1);
if (poller->CheckInput("broadcast-in", 0))
{
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("broadcast-in").at(0).Receive(msg) > 0)
{
LOG(INFO) << "Received broadcast: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
<< "\"";
}
} // if (poller->CheckInput("broadcast-in", 0))
if (poller->CheckOutput("data-out", 0))
{
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
string* text = new string(fText);
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage(const_cast<char*>(text->c_str()), text->length(), CustomCleanup, text));
LOG(INFO) << "Sending \"" << fText << "\"";
fChannels.at("data-out").at(0).Send(msg);
} // if (poller->CheckOutput("data-out", 0))
} // while (CheckCurrentState(RUNNING))
}
FairMQExample6Sampler::~FairMQExample6Sampler()
{
}
void FairMQExample6Sampler::SetProperty(const int key, const string& value)
{
switch (key)
{
case Text:
fText = value;
break;
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
string FairMQExample6Sampler::GetProperty(const int key, const string& default_ /*= ""*/)
{
switch (key)
{
case Text:
return fText;
break;
default:
return FairMQDevice::GetProperty(key, default_);
}
}
void FairMQExample6Sampler::SetProperty(const int key, const int value)
{
switch (key)
{
default:
FairMQDevice::SetProperty(key, value);
break;
}
}
int FairMQExample6Sampler::GetProperty(const int key, const int default_ /*= 0*/)
{
switch (key)
{
default:
return FairMQDevice::GetProperty(key, default_);
}
}

View File

@ -0,0 +1,46 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample6Sampler.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE6SAMPLER_H_
#define FAIRMQEXAMPLE6SAMPLER_H_
#include <string>
#include "FairMQDevice.h"
class FairMQExample6Sampler : public FairMQDevice
{
public:
enum
{
Text = FairMQDevice::Last,
Last
};
FairMQExample6Sampler();
virtual ~FairMQExample6Sampler();
static void CustomCleanup(void* data, void* hint);
virtual void SetProperty(const int key, const std::string& value);
virtual std::string GetProperty(const int key, const std::string& default_ = "");
virtual void SetProperty(const int key, const int value);
virtual int GetProperty(const int key, const int default_ = 0);
protected:
std::string fText;
virtual void Run();
};
#endif /* FAIRMQEXAMPLE6SAMPLER_H_ */

View File

@ -0,0 +1,61 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample6Sink.cxx
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#include "FairMQExample6Sink.h"
#include "FairMQPoller.h"
#include "FairMQLogger.h"
using namespace std;
FairMQExample6Sink::FairMQExample6Sink()
{
}
void FairMQExample6Sink::Run()
{
std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels, { "data-in", "broadcast-in" }));
while (CheckCurrentState(RUNNING))
{
poller->Poll(-1);
if (poller->CheckInput("broadcast-in", 0))
{
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("broadcast-in").at(0).Receive(msg) > 0)
{
LOG(INFO) << "Received broadcast: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
<< "\"";
}
}
if (poller->CheckInput("data-in", 0))
{
unique_ptr<FairMQMessage> msg(fTransportFactory->CreateMessage());
if (fChannels.at("data-in").at(0).Receive(msg) > 0)
{
LOG(INFO) << "Received message: \""
<< string(static_cast<char*>(msg->GetData()), msg->GetSize())
<< "\"";
}
}
}
}
FairMQExample6Sink::~FairMQExample6Sink()
{
}

View File

@ -0,0 +1,30 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* FairMQExample6Sink.h
*
* @since 2014-10-10
* @author A. Rybalchenko
*/
#ifndef FAIRMQEXAMPLE6SINK_H_
#define FAIRMQEXAMPLE6SINK_H_
#include "FairMQDevice.h"
class FairMQExample6Sink : public FairMQDevice
{
public:
FairMQExample6Sink();
virtual ~FairMQExample6Sink();
protected:
virtual void Run();
};
#endif /* FAIRMQEXAMPLE6SINK_H_ */

View File

@ -0,0 +1,8 @@
Example 6: Multiple Channels
===============
This example demonstrates how to work with multiple channels and multiplex between them.
A topology of three devices - **Sampler**, **Sink** and **Broadcaster**. The Sampler sends data to the Sink via the PUSH-PULL pattern. The Broadcaster device sends a message to both Sampler and Sink containing a string "OK" every second. The Broadcaster sends the message via PUB pattern. Both Sampler and Sink, besides doing their PUSH-PULL job, listen via SUB to the Broadcaster.
The multiplexing between their data channels and the broadcast channels happens with `FairMQPoller`.

View File

@ -0,0 +1,85 @@
{
"fairMQOptions":
{
"device":
{
"id": "sampler1",
"channel":
{
"name": "data-out",
"socket":
{
"type": "push",
"method": "bind",
"address": "tcp://*:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
},
"channel":
{
"name": "broadcast-in",
"socket":
{
"type": "sub",
"method": "connect",
"address": "tcp://localhost:5005",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
},
"device":
{
"id": "sink1",
"channel":
{
"name": "data-in",
"socket":
{
"type": "pull",
"method": "connect",
"address": "tcp://localhost:5555",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
},
"channel":
{
"name": "broadcast-in",
"socket":
{
"type": "sub",
"method": "connect",
"address": "tcp://localhost:5005",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
},
"device":
{
"id": "broadcaster1",
"channel":
{
"name": "broadcast-out",
"socket":
{
"type": "pub",
"method": "bind",
"address": "tcp://*:5005",
"sndBufSize": "1000",
"rcvBufSize": "1000",
"rateLogging": "0"
}
}
}
}
}

View File

@ -0,0 +1,79 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample6Broadcaster.cxx
*
* @since 2013-04-23
* @author A. Rybalchenko
*/
#include <iostream>
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample6Broadcaster.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
int main(int argc, char** argv)
{
FairMQExample6Broadcaster broadcaster;
broadcaster.CatchSignals();
FairMQProgOptions config;
try
{
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
broadcaster.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
broadcaster.SetTransport(transportFactory);
broadcaster.SetProperty(FairMQExample6Broadcaster::Id, id);
broadcaster.ChangeState("INIT_DEVICE");
broadcaster.WaitForEndOfState("INIT_DEVICE");
broadcaster.ChangeState("INIT_TASK");
broadcaster.WaitForEndOfState("INIT_TASK");
broadcaster.ChangeState("RUN");
broadcaster.InteractiveStateLoop();
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@ -0,0 +1,92 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample6Sampler.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample6Sampler.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace boost::program_options;
int main(int argc, char** argv)
{
FairMQExample6Sampler sampler;
sampler.CatchSignals();
FairMQProgOptions config;
try
{
std::string text;
options_description samplerOptions("Sampler options");
samplerOptions.add_options()
("text", value<std::string>(&text)->default_value("Hello"), "Text to send out");
config.AddToCmdLineOptions(samplerOptions);
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sampler.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sampler.SetTransport(transportFactory);
sampler.SetProperty(FairMQExample6Sampler::Id, id);
sampler.SetProperty(FairMQExample6Sampler::Text, text);
sampler.ChangeState("INIT_DEVICE");
sampler.WaitForEndOfState("INIT_DEVICE");
sampler.ChangeState("INIT_TASK");
sampler.WaitForEndOfState("INIT_TASK");
sampler.ChangeState("RUN");
sampler.InteractiveStateLoop();
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@ -0,0 +1,83 @@
/********************************************************************************
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
* *
* This software is distributed under the terms of the *
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
* copied verbatim in the file "LICENSE" *
********************************************************************************/
/**
* runExample6Sink.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include "boost/program_options.hpp"
#include "FairMQLogger.h"
#include "FairMQParser.h"
#include "FairMQProgOptions.h"
#include "FairMQExample6Sink.h"
#ifdef NANOMSG
#include "FairMQTransportFactoryNN.h"
#else
#include "FairMQTransportFactoryZMQ.h"
#endif
using namespace boost::program_options;
int main(int argc, char** argv)
{
FairMQExample6Sink sink;
sink.CatchSignals();
FairMQProgOptions config;
try
{
if (config.ParseAll(argc, argv))
{
return 0;
}
std::string filename = config.GetValue<std::string>("config-json-file");
std::string id = config.GetValue<std::string>("id");
config.UserParser<FairMQParser::JSON>(filename, id);
sink.fChannels = config.GetFairMQMap();
LOG(INFO) << "PID: " << getpid();
#ifdef NANOMSG
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
#else
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
#endif
sink.SetTransport(transportFactory);
sink.SetProperty(FairMQExample6Sink::Id, id);
sink.ChangeState("INIT_DEVICE");
sink.WaitForEndOfState("INIT_DEVICE");
sink.ChangeState("INIT_TASK");
sink.WaitForEndOfState("INIT_TASK");
sink.ChangeState("RUN");
sink.InteractiveStateLoop();
}
catch (std::exception& e)
{
LOG(ERROR) << e.what();
LOG(INFO) << "Command line options are the following: ";
config.PrintHelp();
return 1;
}
return 0;
}

View File

@ -7,18 +7,27 @@ Example 1: Sampler -> Sink
-------------------------- --------------------------
A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends data to **Sink** with the **PUSH-PULL** pattern. A simple topology of two devices - **Sampler** and **Sink**. **Sampler** sends data to **Sink** with the **PUSH-PULL** pattern.
Example 2: Sampler -> Processor -> Sink Example 2: Sampler -> Processor -> Sink
--------------------------------------- ---------------------------------------
A simple topology of three devices - **Sampler**, **Processor** and **Sink**. **Sampler** sends data to one or more **Processor**s, who modify the data and send it to one **Sink**. Transport with the **PUSH-PULL** pattern. A simple topology of three devices - **Sampler**, **Processor** and **Sink**. **Sampler** sends data to one or more **Processor**s, who modify the data and send it to one **Sink**. Transport with the **PUSH-PULL** pattern.
Example 3: DDS Example 3: DDS
-------------- --------------
This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices. This example demonstrates usage of the Dynamic Deployment System ([DDS](http://dds.gsi.de/)) to dynamically deploy and configure a topology of devices. The topology is similar to those of Example 2, but now it can be easily distributed on different computing nodes without the need for manual reconfiguration of the devices.
Example 4: Copy & Push Example 4: Copy & Push
---------------------- ----------------------
A topology consisting of one **Sampler** and two **Sink**s. The **Sampler** uses the `Copy` method to send the same data to both sinks with the **PUSH_PULL** pattern. In countrary to the **PUB-PATTERN** pattern, this insures that all receivers are connected and no data is lost, but requires additional sockets. A topology consisting of one **Sampler** and two **Sink**s. The **Sampler** uses the `Copy` method to send the same data to both sinks with the **PUSH_PULL** pattern. In countrary to the **PUB-PATTERN** pattern, this insures that all receivers are connected and no data is lost, but requires additional sockets.
Example 5: Request & Reply Example 5: Request & Reply
-------------------------- --------------------------
This topology contains two devices that communicate with each other via the **REQUEST-REPLY** pettern. Bidirectional communication via a single socket. This topology contains two devices that communicate with each other via the **REQUEST-REPLY** pettern. Bidirectional communication via a single socket.
Example 6: Multiple Channels
----------------------------
This example demonstrates how to work with multiple channels and multiplex between them.