mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 16:46:47 +00:00
New files from Ralf Kliemt
git-svn-id: https://subversion.gsi.de/fairroot/fairbase/trunk@22606 0381ead4-6506-0410-b988-94b70fbc4730
This commit is contained in:
parent
5121fe3ae5
commit
9c4d64f3b1
|
@ -43,7 +43,7 @@ Set(DEPENDENCIES
|
||||||
GENERATE_LIBRARY()
|
GENERATE_LIBRARY()
|
||||||
|
|
||||||
Set(Exe_Names bsampler buffer splitter merger sink proxy)
|
Set(Exe_Names bsampler buffer splitter merger sink proxy)
|
||||||
Set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx)
|
Set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx runNToOneMerger.cxx runOneToNSplitter.cxx)
|
||||||
|
|
||||||
List(LENGTH Exe_Names _length)
|
List(LENGTH Exe_Names _length)
|
||||||
Math(EXPR _length ${_length}-1)
|
Math(EXPR _length ${_length}-1)
|
||||||
|
|
|
@ -25,11 +25,11 @@ void FairMQStandaloneMerger::Run()
|
||||||
|
|
||||||
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this));
|
||||||
|
|
||||||
// Initialize poll set
|
zmq_pollitem_t items[fNumInputs];
|
||||||
zmq_pollitem_t items[] = {
|
for (Int_t iInput = 0; iInput < fNumInputs; iInput++) {
|
||||||
{ *(fPayloadInputs->at(0)->GetSocket()), 0, ZMQ_POLLIN, 0 },
|
zmq_pollitem_t tempitem( {*(fPayloadInputs->at(iInput)->GetSocket()), 0, ZMQ_POLLIN, 0});
|
||||||
{ *(fPayloadInputs->at(1)->GetSocket()), 0, ZMQ_POLLIN, 0 }
|
items[iInput] = tempitem;
|
||||||
};
|
}
|
||||||
|
|
||||||
Bool_t received = false;
|
Bool_t received = false;
|
||||||
|
|
||||||
|
@ -38,24 +38,15 @@ void FairMQStandaloneMerger::Run()
|
||||||
|
|
||||||
zmq_poll(items, fNumInputs, 100);
|
zmq_poll(items, fNumInputs, 100);
|
||||||
|
|
||||||
if (items[0].revents & ZMQ_POLLIN) {
|
for(Int_t iItem = 0; iItem < fNumInputs; iItem++) {
|
||||||
received = fPayloadInputs->at(0)->Receive(&msg);
|
if (items[iItem].revents & ZMQ_POLLIN) {
|
||||||
|
received = fPayloadInputs->at(iItem)->Receive(&msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (received) {
|
if (received) {
|
||||||
fPayloadOutputs->at(0)->Send(&msg);
|
fPayloadOutputs->at(0)->Send(&msg);
|
||||||
received = false;
|
received = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (items[1].revents & ZMQ_POLLIN) {
|
|
||||||
received = fPayloadInputs->at(1)->Receive(&msg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (received) {
|
|
||||||
fPayloadOutputs->at(0)->Send(&msg);
|
|
||||||
received = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
rateLogger.interrupt();
|
rateLogger.interrupt();
|
||||||
|
|
124
fairmq/runNToOneMerger.cxx
Normal file
124
fairmq/runNToOneMerger.cxx
Normal file
|
@ -0,0 +1,124 @@
|
||||||
|
/*
|
||||||
|
* runMerger.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <csignal>
|
||||||
|
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
#include "FairMQStandaloneMerger.h"
|
||||||
|
|
||||||
|
|
||||||
|
FairMQStandaloneMerger merger;
|
||||||
|
|
||||||
|
static void s_signal_handler (int signal)
|
||||||
|
{
|
||||||
|
std::cout << std::endl << "Caught signal " << signal << std::endl;
|
||||||
|
|
||||||
|
merger.ChangeState(FairMQStandaloneMerger::STOP);
|
||||||
|
merger.ChangeState(FairMQStandaloneMerger::END);
|
||||||
|
|
||||||
|
std::cout << "Shutdown complete. Bye!" << std::endl;
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void s_catch_signals (void)
|
||||||
|
{
|
||||||
|
struct sigaction action;
|
||||||
|
action.sa_handler = s_signal_handler;
|
||||||
|
action.sa_flags = 0;
|
||||||
|
sigemptyset(&action.sa_mask);
|
||||||
|
sigaction(SIGINT, &action, NULL);
|
||||||
|
sigaction(SIGTERM, &action, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv)
|
||||||
|
{
|
||||||
|
if ( argc < 16 || (argc-8)%4!=0 ) {
|
||||||
|
std::cout << "Usage: merger \tID numIoTreads numInputs\n"
|
||||||
|
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
|
||||||
|
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
|
||||||
|
<< "\t\t...\n"
|
||||||
|
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
|
||||||
|
<< argc << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
s_catch_signals();
|
||||||
|
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "PID: " << getpid();
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
|
||||||
|
int i = 1;
|
||||||
|
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::Id, argv[i]);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numIoThreads;
|
||||||
|
std::stringstream(argv[i]) >> numIoThreads;
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::NumIoThreads, numIoThreads);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numInputs;
|
||||||
|
std::stringstream(argv[i]) >> numInputs;
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::NumInputs, numInputs);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::NumOutputs, 1);
|
||||||
|
|
||||||
|
|
||||||
|
merger.ChangeState(FairMQStandaloneMerger::INIT);
|
||||||
|
|
||||||
|
|
||||||
|
int inputSocketType;
|
||||||
|
for (int iInput = 0; iInput < numInputs; iInput++ ) {
|
||||||
|
inputSocketType = ZMQ_SUB;
|
||||||
|
if (strcmp(argv[i], "pull") == 0) {
|
||||||
|
inputSocketType = ZMQ_PULL;
|
||||||
|
}
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::InputSocketType, inputSocketType, iInput);
|
||||||
|
++i;
|
||||||
|
int inputRcvBufSize;
|
||||||
|
std::stringstream(argv[i]) >> inputRcvBufSize;
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::InputRcvBufSize, inputRcvBufSize, iInput);
|
||||||
|
++i;
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::InputMethod, argv[i], iInput);
|
||||||
|
++i;
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::InputAddress, argv[i], iInput);
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
int outputSocketType = ZMQ_PUB;
|
||||||
|
if (strcmp(argv[i], "push") == 0) {
|
||||||
|
outputSocketType = ZMQ_PUSH;
|
||||||
|
}
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::OutputSocketType, outputSocketType, 0);
|
||||||
|
++i;
|
||||||
|
int outputSndBufSize;
|
||||||
|
std::stringstream(argv[i]) >> outputSndBufSize;
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::OutputSndBufSize, outputSndBufSize, 0);
|
||||||
|
++i;
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::OutputMethod, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
merger.SetProperty(FairMQStandaloneMerger::OutputAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
|
||||||
|
merger.ChangeState(FairMQStandaloneMerger::SETOUTPUT);
|
||||||
|
merger.ChangeState(FairMQStandaloneMerger::SETINPUT);
|
||||||
|
merger.ChangeState(FairMQStandaloneMerger::RUN);
|
||||||
|
|
||||||
|
|
||||||
|
char ch;
|
||||||
|
std::cin.get(ch);
|
||||||
|
|
||||||
|
merger.ChangeState(FairMQStandaloneMerger::STOP);
|
||||||
|
merger.ChangeState(FairMQStandaloneMerger::END);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
122
fairmq/runOneToNSplitter.cxx
Normal file
122
fairmq/runOneToNSplitter.cxx
Normal file
|
@ -0,0 +1,122 @@
|
||||||
|
/*
|
||||||
|
* runSplitter.cxx
|
||||||
|
*
|
||||||
|
* Created on: Dec 6, 2012
|
||||||
|
* Author: dklein
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <iostream>
|
||||||
|
#include <csignal>
|
||||||
|
|
||||||
|
#include "FairMQLogger.h"
|
||||||
|
#include "FairMQBalancedStandaloneSplitter.h"
|
||||||
|
|
||||||
|
|
||||||
|
FairMQBalancedStandaloneSplitter splitter;
|
||||||
|
|
||||||
|
static void s_signal_handler (int signal)
|
||||||
|
{
|
||||||
|
std::cout << std::endl << "Caught signal " << signal << std::endl;
|
||||||
|
|
||||||
|
splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP);
|
||||||
|
splitter.ChangeState(FairMQBalancedStandaloneSplitter::END);
|
||||||
|
|
||||||
|
std::cout << "Shutdown complete. Bye!" << std::endl;
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void s_catch_signals (void)
|
||||||
|
{
|
||||||
|
struct sigaction action;
|
||||||
|
action.sa_handler = s_signal_handler;
|
||||||
|
action.sa_flags = 0;
|
||||||
|
sigemptyset(&action.sa_mask);
|
||||||
|
sigaction(SIGINT, &action, NULL);
|
||||||
|
sigaction(SIGTERM, &action, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv)
|
||||||
|
{
|
||||||
|
if ( argc < 16 || (argc-8)%4!=0 ) { //argc{name,id,threads,nout,insock,inbuff,inmet,inadd, ... out}
|
||||||
|
std::cout << "Usage: splitter \tID numIoTreads numOutputs\n"
|
||||||
|
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
|
||||||
|
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
|
||||||
|
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
|
||||||
|
<< "\t\t..." << argc << " arguments provided" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
s_catch_signals();
|
||||||
|
|
||||||
|
std::stringstream logmsg;
|
||||||
|
logmsg << "PID: " << getpid();
|
||||||
|
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
|
||||||
|
|
||||||
|
int i = 1;
|
||||||
|
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::Id, argv[i]);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int numIoThreads;
|
||||||
|
std::stringstream(argv[i]) >> numIoThreads;
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumIoThreads, numIoThreads);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumInputs, 1);
|
||||||
|
|
||||||
|
int numOutputs;
|
||||||
|
std::stringstream(argv[i]) >> numOutputs;
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::NumOutputs, numOutputs);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
|
||||||
|
splitter.ChangeState(FairMQBalancedStandaloneSplitter::INIT);
|
||||||
|
|
||||||
|
|
||||||
|
int inputSocketType = ZMQ_SUB;
|
||||||
|
if (strcmp(argv[i], "pull") == 0) {
|
||||||
|
inputSocketType = ZMQ_PULL;
|
||||||
|
}
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputSocketType, inputSocketType, 0);
|
||||||
|
++i;
|
||||||
|
int inputRcvBufSize;
|
||||||
|
std::stringstream(argv[i]) >> inputRcvBufSize;
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputRcvBufSize, inputRcvBufSize, 0);
|
||||||
|
++i;
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputMethod, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::InputAddress, argv[i], 0);
|
||||||
|
++i;
|
||||||
|
|
||||||
|
int outputSocketType;
|
||||||
|
int outputSndBufSize;
|
||||||
|
for (int iOutput = 0; iOutput < numOutputs; iOutput++) {
|
||||||
|
outputSocketType = ZMQ_PUB;
|
||||||
|
if (strcmp(argv[i], "push") == 0) {
|
||||||
|
outputSocketType = ZMQ_PUSH;
|
||||||
|
}
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSocketType, outputSocketType, iOutput);
|
||||||
|
++i;
|
||||||
|
std::stringstream(argv[i]) >> outputSndBufSize;
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputSndBufSize, outputSndBufSize, iOutput);
|
||||||
|
++i;
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputMethod, argv[i], iOutput);
|
||||||
|
++i;
|
||||||
|
splitter.SetProperty(FairMQBalancedStandaloneSplitter::OutputAddress, argv[i], iOutput);
|
||||||
|
++i;
|
||||||
|
}
|
||||||
|
|
||||||
|
splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETOUTPUT);
|
||||||
|
splitter.ChangeState(FairMQBalancedStandaloneSplitter::SETINPUT);
|
||||||
|
splitter.ChangeState(FairMQBalancedStandaloneSplitter::RUN);
|
||||||
|
|
||||||
|
|
||||||
|
char ch;
|
||||||
|
std::cin.get(ch);
|
||||||
|
|
||||||
|
splitter.ChangeState(FairMQBalancedStandaloneSplitter::STOP);
|
||||||
|
splitter.ChangeState(FairMQBalancedStandaloneSplitter::END);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user