Update JSON files & readme, use FairMQDevicePtr, cleanup.

This commit is contained in:
Alexey Rybalchenko 2016-10-28 15:42:15 +02:00
parent f18c6e50e2
commit 626ebdd298
21 changed files with 41 additions and 62 deletions

View File

@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options)
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out"); ("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out");
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample1Sampler(); return new FairMQExample1Sampler();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample1Sink(); return new FairMQExample1Sink();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample2Processor(); return new FairMQExample2Processor();
} }

View File

@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options)
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out"); ("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out");
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample2Sampler(); return new FairMQExample2Sampler();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample2Sink(); return new FairMQExample2Sink();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& config)
{ {
return new FairMQExample3Processor(); return new FairMQExample3Processor();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample3Sampler(); return new FairMQExample3Sampler();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample3Sink(); return new FairMQExample3Sink();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample4Sampler(); return new FairMQExample4Sampler();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample4Sink(); return new FairMQExample4Sink();
} }

View File

@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options)
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out"); ("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out");
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample5Client(); return new FairMQExample5Client();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample5Server(); return new FairMQExample5Server();
} }

View File

@ -22,36 +22,22 @@ using namespace std;
FairMQExample6Sink::FairMQExample6Sink() FairMQExample6Sink::FairMQExample6Sink()
{ {
OnData("broadcast", &FairMQExample6Sink::HandleBroadcast);
OnData("data", &FairMQExample6Sink::HandleData);
} }
void FairMQExample6Sink::Run() bool FairMQExample6Sink::HandleBroadcast(FairMQMessagePtr& msg, int /*index*/)
{
std::unique_ptr<FairMQPoller> poller(fTransportFactory->CreatePoller(fChannels, { "data", "broadcast" }));
while (CheckCurrentState(RUNNING))
{
poller->Poll(100);
if (poller->CheckInput("broadcast", 0))
{
FairMQMessagePtr msg(NewMessage());
if (Receive(msg, "broadcast") > 0)
{ {
LOG(INFO) << "Received broadcast: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\""; LOG(INFO) << "Received broadcast: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
}
return true;
} }
if (poller->CheckInput("data", 0)) bool FairMQExample6Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
{
FairMQMessagePtr msg(NewMessage());
if (Receive(msg, "data") > 0)
{ {
LOG(INFO) << "Received message: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\""; LOG(INFO) << "Received message: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
}
} return true;
}
} }
FairMQExample6Sink::~FairMQExample6Sink() FairMQExample6Sink::~FairMQExample6Sink()

View File

@ -24,7 +24,8 @@ class FairMQExample6Sink : public FairMQDevice
virtual ~FairMQExample6Sink(); virtual ~FairMQExample6Sink();
protected: protected:
virtual void Run(); bool HandleBroadcast(FairMQMessagePtr&, int);
bool HandleData(FairMQMessagePtr&, int);
}; };
#endif /* FAIRMQEXAMPLE6SINK_H_ */ #endif /* FAIRMQEXAMPLE6SINK_H_ */

View File

@ -1,11 +1,11 @@
{ {
"fairMQOptions": "fairMQOptions":
{ {
"device": "devices":
{ [{
"id": "sampler1", "id": "sampler1",
"channel": "channels":
{ [{
"name": "data", "name": "data",
"socket": "socket":
{ {
@ -17,7 +17,6 @@
"rateLogging": "0" "rateLogging": "0"
} }
}, },
"channel":
{ {
"name": "broadcast", "name": "broadcast",
"socket": "socket":
@ -29,14 +28,12 @@
"rcvBufSize": "1000", "rcvBufSize": "1000",
"rateLogging": "0" "rateLogging": "0"
} }
} }]
}, },
"device":
{ {
"id": "sink1", "id": "sink1",
"channel": "channels":
{ [{
"name": "data", "name": "data",
"socket": "socket":
{ {
@ -48,7 +45,6 @@
"rateLogging": "0" "rateLogging": "0"
} }
}, },
"channel":
{ {
"name": "broadcast", "name": "broadcast",
"socket": "socket":
@ -60,10 +56,8 @@
"rcvBufSize": "1000", "rcvBufSize": "1000",
"rateLogging": "0" "rateLogging": "0"
} }
} }]
}, },
"device":
{ {
"id": "broadcaster1", "id": "broadcaster1",
"channel": "channel":
@ -79,7 +73,7 @@
"rateLogging": "0" "rateLogging": "0"
} }
} }
} }]
} }
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample6Broadcaster(); return new FairMQExample6Broadcaster();
} }

View File

@ -17,7 +17,7 @@ void addCustomOptions(bpo::options_description& options)
("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out"); ("text", bpo::value<std::string>()->default_value("Hello"), "Text to send out");
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample6Sampler(); return new FairMQExample6Sampler();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample6Sink(); return new FairMQExample6Sink();
} }

View File

@ -1,8 +1,8 @@
{ {
"fairMQOptions": "fairMQOptions":
{ {
"device": "devices":
{ [{
"id": "sampler1", "id": "sampler1",
"channel": "channel":
{ {
@ -18,8 +18,6 @@
} }
} }
}, },
"device":
{ {
"id": "sink1", "id": "sink1",
"channel": "channel":
@ -35,6 +33,6 @@
"rateLogging": "0" "rateLogging": "0"
} }
} }
} }]
} }
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample8Sampler(); return new FairMQExample8Sampler();
} }

View File

@ -15,7 +15,7 @@ void addCustomOptions(bpo::options_description& /*options*/)
{ {
} }
FairMQDevice* getDevice(const FairMQProgOptions& /*config*/) FairMQDevicePtr getDevice(const FairMQProgOptions& /*config*/)
{ {
return new FairMQExample8Sink(); return new FairMQExample8Sink();
} }