move zeromq and nanomsg implementation files to separate directories

This commit is contained in:
Alexey Rybalchenko 2014-01-28 13:20:53 +01:00
parent dde65d3aeb
commit c7b80f3ff9
35 changed files with 981 additions and 27 deletions

View File

@ -50,10 +50,10 @@ set(SRCS
if(NANOMSG_FOUND) if(NANOMSG_FOUND)
set(SRCS set(SRCS
${SRCS} ${SRCS}
"FairMQTransportFactoryNN.cxx" "nanomsg/FairMQTransportFactoryNN.cxx"
"FairMQMessageNN.cxx" "nanomsg/FairMQMessageNN.cxx"
"FairMQSocketNN.cxx" "nanomsg/FairMQSocketNN.cxx"
"FairMQPollerNN.cxx" "nanomsg/FairMQPollerNN.cxx"
) )
set(DEPENDENCIES set(DEPENDENCIES
${NANOMSG_LIBRARY_SHARED} ${NANOMSG_LIBRARY_SHARED}
@ -61,11 +61,11 @@ if(NANOMSG_FOUND)
else(NANOMSG_FOUND) else(NANOMSG_FOUND)
set(SRCS set(SRCS
${SRCS} ${SRCS}
"FairMQTransportFactoryZMQ.cxx" "zeromq/FairMQTransportFactoryZMQ.cxx"
"FairMQMessageZMQ.cxx" "zeromq/FairMQMessageZMQ.cxx"
"FairMQSocketZMQ.cxx" "zeromq/FairMQSocketZMQ.cxx"
"FairMQPollerZMQ.cxx" "zeromq/FairMQPollerZMQ.cxx"
"FairMQContextZMQ.cxx" "zeromq/FairMQContextZMQ.cxx"
) )
set(DEPENDENCIES set(DEPENDENCIES
${ZMQ_LIBRARY_SHARED} ${ZMQ_LIBRARY_SHARED}
@ -83,7 +83,30 @@ set(LIBRARY_NAME FairMQ)
GENERATE_LIBRARY() GENERATE_LIBRARY()
set(Exe_Names bsampler buffer splitter merger sink proxy n_one_merger one_n_splitter) set(Exe_Names bsampler buffer splitter merger sink proxy n_one_merger one_n_splitter)
set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx runNToOneMerger.cxx runOneToNSplitter.cxx)
if(NANOMSG_FOUND)
set(Exe_Source
nanomsg/runBenchmarkSampler.cxx
nanomsg/runBuffer.cxx
nanomsg/runSplitter.cxx
nanomsg/runMerger.cxx
nanomsg/runSink.cxx
nanomsg/runProxy.cxx
nanomsg/runNToOneMerger.cxx
nanomsg/runOneToNSplitter.cxx
)
else(NANOMSG_FOUND)
set(Exe_Source
zeromq/runBenchmarkSampler.cxx
zeromq/runBuffer.cxx
zeromq/runSplitter.cxx
zeromq/runMerger.cxx
zeromq/runSink.cxx
zeromq/runProxy.cxx
zeromq/runNToOneMerger.cxx
zeromq/runOneToNSplitter.cxx
)
endif(NANOMSG_FOUND)
list(LENGTH Exe_Names _length) list(LENGTH Exe_Names _length)
math(EXPR _length ${_length}-1) math(EXPR _length ${_length}-1)

View File

@ -0,0 +1,115 @@
/**
* runBenchmarkSampler.cxx
*
* @since 2013-04-23
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "FairMQLogger.h"
#include "FairMQBenchmarkSampler.h"
#include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQBenchmarkSampler sampler;
static void s_signal_handler (int signal)
{
cout << endl << "Caught signal " << signal << endl;
sampler.ChangeState(FairMQBenchmarkSampler::STOP);
sampler.ChangeState(FairMQBenchmarkSampler::END);
cout << "Shutdown complete. Bye!" << endl;
exit(1);
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
if ( argc != 9 ) {
cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< endl;
return 1;
}
s_catch_signals();
stringstream logmsg;
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
sampler.SetTransport(transportFactory);
int i = 1;
sampler.SetProperty(FairMQBenchmarkSampler::Id, argv[i]);
++i;
int eventSize;
stringstream(argv[i]) >> eventSize;
sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize);
++i;
int eventRate;
stringstream(argv[i]) >> eventRate;
sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate);
++i;
int numIoThreads;
stringstream(argv[i]) >> numIoThreads;
sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads);
++i;
sampler.SetProperty(FairMQBenchmarkSampler::NumInputs, 0);
sampler.SetProperty(FairMQBenchmarkSampler::NumOutputs, 1);
sampler.ChangeState(FairMQBenchmarkSampler::INIT);
sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, argv[i], 0);
++i;
int outputSndBufSize;
stringstream(argv[i]) >> outputSndBufSize;
sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, outputSndBufSize, 0);
++i;
sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, argv[i], 0);
++i;
sampler.SetProperty(FairMQBenchmarkSampler::OutputAddress, argv[i], 0);
++i;
sampler.ChangeState(FairMQBenchmarkSampler::SETOUTPUT);
sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT);
sampler.ChangeState(FairMQBenchmarkSampler::RUN);
char ch;
cin.get(ch);
sampler.ChangeState(FairMQBenchmarkSampler::STOP);
sampler.ChangeState(FairMQBenchmarkSampler::END);
return 0;
}

View File

@ -0,0 +1,115 @@
/**
* runBuffer.cxx
*
* @since 2012-10-26
* @author: D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "FairMQLogger.h"
#include "FairMQBuffer.h"
#include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQBuffer buffer;
static void s_signal_handler (int signal)
{
cout << endl << "Caught signal " << signal << endl;
buffer.ChangeState(FairMQBuffer::STOP);
buffer.ChangeState(FairMQBuffer::END);
cout << "Shutdown complete. Bye!" << endl;
exit(1);
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
if ( argc != 11 ) {
cout << "Usage: buffer \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl;
return 1;
}
s_catch_signals();
stringstream logmsg;
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
buffer.SetTransport(transportFactory);
int i = 1;
buffer.SetProperty(FairMQBuffer::Id, argv[i]);
++i;
int numIoThreads;
stringstream(argv[i]) >> numIoThreads;
buffer.SetProperty(FairMQBuffer::NumIoThreads, numIoThreads);
++i;
buffer.SetProperty(FairMQBuffer::NumInputs, 1);
buffer.SetProperty(FairMQBuffer::NumOutputs, 1);
buffer.ChangeState(FairMQBuffer::INIT);
buffer.SetProperty(FairMQBuffer::InputSocketType, argv[i], 0);
++i;
int inputRcvBufSize;
stringstream(argv[i]) >> inputRcvBufSize;
buffer.SetProperty(FairMQBuffer::InputRcvBufSize, inputRcvBufSize, 0);
++i;
buffer.SetProperty(FairMQBuffer::InputMethod, argv[i], 0);
++i;
buffer.SetProperty(FairMQBuffer::InputAddress, argv[i], 0);
++i;
buffer.SetProperty(FairMQBuffer::OutputSocketType, argv[i], 0);
++i;
int outputSndBufSize;
stringstream(argv[i]) >> outputSndBufSize;
buffer.SetProperty(FairMQBuffer::OutputSndBufSize, outputSndBufSize, 0);
++i;
buffer.SetProperty(FairMQBuffer::OutputMethod, argv[i], 0);
++i;
buffer.SetProperty(FairMQBuffer::OutputAddress, argv[i], 0);
++i;
buffer.ChangeState(FairMQBuffer::SETOUTPUT);
buffer.ChangeState(FairMQBuffer::SETINPUT);
buffer.ChangeState(FairMQBuffer::RUN);
char ch;
cin.get(ch);
buffer.ChangeState(FairMQBuffer::STOP);
buffer.ChangeState(FairMQBuffer::END);
return 0;
}

View File

@ -0,0 +1,126 @@
/**
* runMerger.cxx
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "FairMQLogger.h"
#include "FairMQMerger.h"
#include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQMerger merger;
static void s_signal_handler (int signal)
{
cout << endl << "Caught signal " << signal << endl;
merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQMerger::END);
cout << "Shutdown complete. Bye!" << endl;
exit(1);
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
if ( argc != 15 ) {
cout << "Usage: merger \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl;
return 1;
}
s_catch_signals();
stringstream logmsg;
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
merger.SetTransport(transportFactory);
int i = 1;
merger.SetProperty(FairMQMerger::Id, argv[i]);
++i;
int numIoThreads;
stringstream(argv[i]) >> numIoThreads;
merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads);
++i;
merger.SetProperty(FairMQMerger::NumInputs, 2);
merger.SetProperty(FairMQMerger::NumOutputs, 1);
merger.ChangeState(FairMQMerger::INIT);
merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 0);
++i;
int inputRcvBufSize;
stringstream(argv[i]) >> inputRcvBufSize;
merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 0);
++i;
merger.SetProperty(FairMQMerger::InputMethod, argv[i], 0);
++i;
merger.SetProperty(FairMQMerger::InputAddress, argv[i], 0);
++i;
merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 1);
++i;
stringstream(argv[i]) >> inputRcvBufSize;
merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 1);
++i;
merger.SetProperty(FairMQMerger::InputMethod, argv[i], 1);
++i;
merger.SetProperty(FairMQMerger::InputAddress, argv[i], 1);
++i;
merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0);
++i;
int outputSndBufSize;
stringstream(argv[i]) >> outputSndBufSize;
merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0);
++i;
merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0);
++i;
merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0);
++i;
merger.ChangeState(FairMQMerger::SETOUTPUT);
merger.ChangeState(FairMQMerger::SETINPUT);
merger.ChangeState(FairMQMerger::RUN);
char ch;
cin.get(ch);
merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQMerger::END);
return 0;
}

View File

@ -0,0 +1,124 @@
/**
* runNToOneMerger.cxx
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "FairMQLogger.h"
#include "FairMQMerger.h"
#include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQMerger merger;
static void s_signal_handler (int signal)
{
cout << endl << "Caught signal " << signal << endl;
merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQMerger::END);
cout << "Shutdown complete. Bye!" << endl;
exit(1);
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
if ( argc < 16 || (argc-8)%4!=0 ) {
cout << "Usage: merger \tID numIoTreads numInputs\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\t...\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< argc << endl;
return 1;
}
s_catch_signals();
stringstream logmsg;
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
merger.SetTransport(transportFactory);
int i = 1;
merger.SetProperty(FairMQMerger::Id, argv[i]);
++i;
int numIoThreads;
stringstream(argv[i]) >> numIoThreads;
merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads);
++i;
int numInputs;
stringstream(argv[i]) >> numInputs;
merger.SetProperty(FairMQMerger::NumInputs, numInputs);
++i;
merger.SetProperty(FairMQMerger::NumOutputs, 1);
merger.ChangeState(FairMQMerger::INIT);
for (int iInput = 0; iInput < numInputs; iInput++ ) {
merger.SetProperty(FairMQMerger::InputSocketType, argv[i], iInput);
++i;
int inputRcvBufSize;
stringstream(argv[i]) >> inputRcvBufSize;
merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput);
++i;
merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput);
++i;
merger.SetProperty(FairMQMerger::InputAddress, argv[i], iInput);
++i;
}
merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0);
++i;
int outputSndBufSize;
stringstream(argv[i]) >> outputSndBufSize;
merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0);
++i;
merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0);
++i;
merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0);
++i;
merger.ChangeState(FairMQMerger::SETOUTPUT);
merger.ChangeState(FairMQMerger::SETINPUT);
merger.ChangeState(FairMQMerger::RUN);
char ch;
cin.get(ch);
merger.ChangeState(FairMQMerger::STOP);
merger.ChangeState(FairMQMerger::END);
return 0;
}

View File

@ -0,0 +1,122 @@
/**
* runOneToNSplitter.cxx
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "FairMQLogger.h"
#include "FairMQSplitter.h"
#include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQSplitter splitter;
static void s_signal_handler (int signal)
{
cout << endl << "Caught signal " << signal << endl;
splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQSplitter::END);
cout << "Shutdown complete. Bye!" << endl;
exit(1);
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
if ( argc < 16 || (argc - 8) % 4 != 0 ) { // argc{ name, id, threads, nout, insock, inbuff, inmet, inadd, ... out}
cout << "Usage: splitter \tID numIoTreads numOutputs\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< "\t\t..." << argc << " arguments provided" << endl;
return 1;
}
s_catch_signals();
stringstream logmsg;
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
splitter.SetTransport(transportFactory);
int i = 1;
splitter.SetProperty(FairMQSplitter::Id, argv[i]);
++i;
int numIoThreads;
stringstream(argv[i]) >> numIoThreads;
splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads);
++i;
splitter.SetProperty(FairMQSplitter::NumInputs, 1);
int numOutputs;
stringstream(argv[i]) >> numOutputs;
splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs);
++i;
splitter.ChangeState(FairMQSplitter::INIT);
splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0);
++i;
int inputRcvBufSize;
stringstream(argv[i]) >> inputRcvBufSize;
splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0);
++i;
splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0);
++i;
splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0);
++i;
int outputSndBufSize;
for (int iOutput = 0; iOutput < numOutputs; iOutput++) {
splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], iOutput);
++i;
stringstream(argv[i]) >> outputSndBufSize;
splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput);
++i;
splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput);
++i;
splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], iOutput);
++i;
}
splitter.ChangeState(FairMQSplitter::SETOUTPUT);
splitter.ChangeState(FairMQSplitter::SETINPUT);
splitter.ChangeState(FairMQSplitter::RUN);
char ch;
cin.get(ch);
splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQSplitter::END);
return 0;
}

115
fairmq/nanomsg/runProxy.cxx Normal file
View File

@ -0,0 +1,115 @@
/**
* runProxy.cxx
*
* @since 2013-10-07
* @author A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "FairMQLogger.h"
#include "FairMQProxy.h"
#include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQProxy proxy;
static void s_signal_handler (int signal)
{
cout << endl << "Caught signal " << signal << endl;
proxy.ChangeState(FairMQProxy::STOP);
proxy.ChangeState(FairMQProxy::END);
cout << "Shutdown complete. Bye!" << endl;
exit(1);
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
if ( argc != 11 ) {
cout << "Usage: proxy \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl;
return 1;
}
s_catch_signals();
stringstream logmsg;
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
proxy.SetTransport(transportFactory);
int i = 1;
proxy.SetProperty(FairMQProxy::Id, argv[i]);
++i;
int numIoThreads;
stringstream(argv[i]) >> numIoThreads;
proxy.SetProperty(FairMQProxy::NumIoThreads, numIoThreads);
++i;
proxy.SetProperty(FairMQProxy::NumInputs, 1);
proxy.SetProperty(FairMQProxy::NumOutputs, 1);
proxy.ChangeState(FairMQProxy::INIT);
proxy.SetProperty(FairMQProxy::InputSocketType, argv[i], 0);
++i;
int inputRcvBufSize;
stringstream(argv[i]) >> inputRcvBufSize;
proxy.SetProperty(FairMQProxy::InputRcvBufSize, inputRcvBufSize, 0);
++i;
proxy.SetProperty(FairMQProxy::InputMethod, argv[i], 0);
++i;
proxy.SetProperty(FairMQProxy::InputAddress, argv[i], 0);
++i;
proxy.SetProperty(FairMQProxy::OutputSocketType, argv[i], 0);
++i;
int outputSndBufSize;
stringstream(argv[i]) >> outputSndBufSize;
proxy.SetProperty(FairMQProxy::OutputSndBufSize, outputSndBufSize, 0);
++i;
proxy.SetProperty(FairMQProxy::OutputMethod, argv[i], 0);
++i;
proxy.SetProperty(FairMQProxy::OutputAddress, argv[i], 0);
++i;
proxy.ChangeState(FairMQProxy::SETOUTPUT);
proxy.ChangeState(FairMQProxy::SETINPUT);
proxy.ChangeState(FairMQProxy::RUN);
char ch;
cin.get(ch);
proxy.ChangeState(FairMQProxy::STOP);
proxy.ChangeState(FairMQProxy::END);
return 0;
}

104
fairmq/nanomsg/runSink.cxx Normal file
View File

@ -0,0 +1,104 @@
/**
* runSink.cxx
*
* @since 2013-01-21
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "FairMQLogger.h"
#include "FairMQSink.h"
#include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQSink sink;
static void s_signal_handler (int signal)
{
cout << endl << "Caught signal " << signal << endl;
sink.ChangeState(FairMQSink::STOP);
sink.ChangeState(FairMQSink::END);
cout << "Shutdown complete. Bye!" << endl;
exit(1);
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
if ( argc != 7 ) {
cout << "Usage: sink \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< endl;
return 1;
}
s_catch_signals();
stringstream logmsg;
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
sink.SetTransport(transportFactory);
int i = 1;
sink.SetProperty(FairMQSink::Id, argv[i]);
++i;
int numIoThreads;
stringstream(argv[i]) >> numIoThreads;
sink.SetProperty(FairMQSink::NumIoThreads, numIoThreads);
++i;
sink.SetProperty(FairMQSink::NumInputs, 1);
sink.SetProperty(FairMQSink::NumOutputs, 0);
sink.ChangeState(FairMQSink::INIT);
sink.SetProperty(FairMQSink::InputSocketType, argv[i], 0);
++i;
int inputRcvBufSize;
stringstream(argv[i]) >> inputRcvBufSize;
sink.SetProperty(FairMQSink::InputRcvBufSize, inputRcvBufSize, 0);
++i;
sink.SetProperty(FairMQSink::InputMethod, argv[i], 0);
++i;
sink.SetProperty(FairMQSink::InputAddress, argv[i], 0);
++i;
sink.ChangeState(FairMQSink::SETOUTPUT);
sink.ChangeState(FairMQSink::SETINPUT);
sink.ChangeState(FairMQSink::RUN);
char ch;
cin.get(ch);
sink.ChangeState(FairMQSink::STOP);
sink.ChangeState(FairMQSink::END);
return 0;
}

View File

@ -0,0 +1,126 @@
/**
* runSplitter.cxx
*
* @since 2012-12-06
* @author D. Klein, A. Rybalchenko
*/
#include <iostream>
#include <csignal>
#include "FairMQLogger.h"
#include "FairMQSplitter.h"
#include "FairMQTransportFactoryNN.h"
using std::cout;
using std::cin;
using std::endl;
using std::stringstream;
FairMQSplitter splitter;
static void s_signal_handler (int signal)
{
cout << endl << "Caught signal " << signal << endl;
splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQSplitter::END);
cout << "Shutdown complete. Bye!" << endl;
exit(1);
}
static void s_catch_signals (void)
{
struct sigaction action;
action.sa_handler = s_signal_handler;
action.sa_flags = 0;
sigemptyset(&action.sa_mask);
sigaction(SIGINT, &action, NULL);
sigaction(SIGTERM, &action, NULL);
}
int main(int argc, char** argv)
{
if ( argc != 15 ) {
cout << "Usage: splitter \tID numIoTreads\n"
<< "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n"
<< "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl;
return 1;
}
s_catch_signals();
stringstream logmsg;
logmsg << "PID: " << getpid();
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
splitter.SetTransport(transportFactory);
int i = 1;
splitter.SetProperty(FairMQSplitter::Id, argv[i]);
++i;
int numIoThreads;
stringstream(argv[i]) >> numIoThreads;
splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads);
++i;
splitter.SetProperty(FairMQSplitter::NumInputs, 1);
splitter.SetProperty(FairMQSplitter::NumOutputs, 2);
splitter.ChangeState(FairMQSplitter::INIT);
splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0);
++i;
int inputRcvBufSize;
stringstream(argv[i]) >> inputRcvBufSize;
splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0);
++i;
splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0);
++i;
splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0);
++i;
splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 0);
++i;
int outputSndBufSize;
stringstream(argv[i]) >> outputSndBufSize;
splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 0);
++i;
splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 0);
++i;
splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 0);
++i;
splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 1);
++i;
stringstream(argv[i]) >> outputSndBufSize;
splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 1);
++i;
splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 1);
++i;
splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 1);
++i;
splitter.ChangeState(FairMQSplitter::SETOUTPUT);
splitter.ChangeState(FairMQSplitter::SETINPUT);
splitter.ChangeState(FairMQSplitter::RUN);
char ch;
cin.get(ch);
splitter.ChangeState(FairMQSplitter::STOP);
splitter.ChangeState(FairMQSplitter::END);
return 0;
}

View File

@ -1,7 +1,7 @@
/** /**
* runBenchmarkSampler.cxx * runBenchmarkSampler.cxx
* *
* @since Apr 23, 2013-04-23 * @since 2013-04-23
* @author D. Klein, A. Rybalchenko * @author D. Klein, A. Rybalchenko
*/ */
@ -11,7 +11,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQBenchmarkSampler.h" #include "FairMQBenchmarkSampler.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout; using std::cout;
using std::cin; using std::cin;
@ -58,7 +57,6 @@ int main(int argc, char** argv)
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
sampler.SetTransport(transportFactory); sampler.SetTransport(transportFactory);
int i = 1; int i = 1;

View File

@ -11,7 +11,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQBuffer.h" #include "FairMQBuffer.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout; using std::cout;
using std::cin; using std::cin;
@ -58,7 +57,6 @@ int main(int argc, char** argv)
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
buffer.SetTransport(transportFactory); buffer.SetTransport(transportFactory);
int i = 1; int i = 1;

View File

@ -11,7 +11,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQMerger.h" #include "FairMQMerger.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout; using std::cout;
using std::cin; using std::cin;
@ -59,7 +58,6 @@ int main(int argc, char** argv)
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
merger.SetTransport(transportFactory); merger.SetTransport(transportFactory);
int i = 1; int i = 1;

View File

@ -11,7 +11,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQMerger.h" #include "FairMQMerger.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout; using std::cout;
using std::cin; using std::cin;
@ -61,7 +60,6 @@ int main(int argc, char** argv)
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
merger.SetTransport(transportFactory); merger.SetTransport(transportFactory);
int i = 1; int i = 1;

View File

@ -11,7 +11,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQSplitter.h" #include "FairMQSplitter.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout; using std::cout;
using std::cin; using std::cin;
@ -60,7 +59,6 @@ int main(int argc, char** argv)
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
splitter.SetTransport(transportFactory); splitter.SetTransport(transportFactory);
int i = 1; int i = 1;

View File

@ -11,7 +11,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQProxy.h" #include "FairMQProxy.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout; using std::cout;
using std::cin; using std::cin;
@ -58,7 +57,6 @@ int main(int argc, char** argv)
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
proxy.SetTransport(transportFactory); proxy.SetTransport(transportFactory);
int i = 1; int i = 1;

View File

@ -11,7 +11,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQSink.h" #include "FairMQSink.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout; using std::cout;
using std::cin; using std::cin;
@ -58,7 +57,6 @@ int main(int argc, char** argv)
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
sink.SetTransport(transportFactory); sink.SetTransport(transportFactory);
int i = 1; int i = 1;

View File

@ -11,7 +11,6 @@
#include "FairMQLogger.h" #include "FairMQLogger.h"
#include "FairMQSplitter.h" #include "FairMQSplitter.h"
#include "FairMQTransportFactoryZMQ.h" #include "FairMQTransportFactoryZMQ.h"
// #include "FairMQTransportFactoryNN.h"
using std::cout; using std::cout;
using std::cin; using std::cin;
@ -59,7 +58,6 @@ int main(int argc, char** argv)
FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str());
FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ();
// FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN();
splitter.SetTransport(transportFactory); splitter.SetTransport(transportFactory);
int i = 1; int i = 1;