diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index bd1c4322..a9b2e084 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -50,10 +50,10 @@ set(SRCS if(NANOMSG_FOUND) set(SRCS ${SRCS} - "FairMQTransportFactoryNN.cxx" - "FairMQMessageNN.cxx" - "FairMQSocketNN.cxx" - "FairMQPollerNN.cxx" + "nanomsg/FairMQTransportFactoryNN.cxx" + "nanomsg/FairMQMessageNN.cxx" + "nanomsg/FairMQSocketNN.cxx" + "nanomsg/FairMQPollerNN.cxx" ) set(DEPENDENCIES ${NANOMSG_LIBRARY_SHARED} @@ -61,11 +61,11 @@ if(NANOMSG_FOUND) else(NANOMSG_FOUND) set(SRCS ${SRCS} - "FairMQTransportFactoryZMQ.cxx" - "FairMQMessageZMQ.cxx" - "FairMQSocketZMQ.cxx" - "FairMQPollerZMQ.cxx" - "FairMQContextZMQ.cxx" + "zeromq/FairMQTransportFactoryZMQ.cxx" + "zeromq/FairMQMessageZMQ.cxx" + "zeromq/FairMQSocketZMQ.cxx" + "zeromq/FairMQPollerZMQ.cxx" + "zeromq/FairMQContextZMQ.cxx" ) set(DEPENDENCIES ${ZMQ_LIBRARY_SHARED} @@ -83,7 +83,30 @@ set(LIBRARY_NAME FairMQ) GENERATE_LIBRARY() set(Exe_Names bsampler buffer splitter merger sink proxy n_one_merger one_n_splitter) -set(Exe_Source runBenchmarkSampler.cxx runBuffer.cxx runSplitter.cxx runMerger.cxx runSink.cxx runProxy.cxx runNToOneMerger.cxx runOneToNSplitter.cxx) + +if(NANOMSG_FOUND) + set(Exe_Source + nanomsg/runBenchmarkSampler.cxx + nanomsg/runBuffer.cxx + nanomsg/runSplitter.cxx + nanomsg/runMerger.cxx + nanomsg/runSink.cxx + nanomsg/runProxy.cxx + nanomsg/runNToOneMerger.cxx + nanomsg/runOneToNSplitter.cxx + ) +else(NANOMSG_FOUND) + set(Exe_Source + zeromq/runBenchmarkSampler.cxx + zeromq/runBuffer.cxx + zeromq/runSplitter.cxx + zeromq/runMerger.cxx + zeromq/runSink.cxx + zeromq/runProxy.cxx + zeromq/runNToOneMerger.cxx + zeromq/runOneToNSplitter.cxx + ) +endif(NANOMSG_FOUND) list(LENGTH Exe_Names _length) math(EXPR _length ${_length}-1) diff --git a/fairmq/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx similarity index 100% rename from fairmq/FairMQMessageNN.cxx rename to fairmq/nanomsg/FairMQMessageNN.cxx diff --git a/fairmq/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h similarity index 100% rename from fairmq/FairMQMessageNN.h rename to fairmq/nanomsg/FairMQMessageNN.h diff --git a/fairmq/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx similarity index 100% rename from fairmq/FairMQPollerNN.cxx rename to fairmq/nanomsg/FairMQPollerNN.cxx diff --git a/fairmq/FairMQPollerNN.h b/fairmq/nanomsg/FairMQPollerNN.h similarity index 100% rename from fairmq/FairMQPollerNN.h rename to fairmq/nanomsg/FairMQPollerNN.h diff --git a/fairmq/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx similarity index 100% rename from fairmq/FairMQSocketNN.cxx rename to fairmq/nanomsg/FairMQSocketNN.cxx diff --git a/fairmq/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h similarity index 100% rename from fairmq/FairMQSocketNN.h rename to fairmq/nanomsg/FairMQSocketNN.h diff --git a/fairmq/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx similarity index 100% rename from fairmq/FairMQTransportFactoryNN.cxx rename to fairmq/nanomsg/FairMQTransportFactoryNN.cxx diff --git a/fairmq/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h similarity index 100% rename from fairmq/FairMQTransportFactoryNN.h rename to fairmq/nanomsg/FairMQTransportFactoryNN.h diff --git a/fairmq/nanomsg/runBenchmarkSampler.cxx b/fairmq/nanomsg/runBenchmarkSampler.cxx new file mode 100644 index 00000000..68615e2d --- /dev/null +++ b/fairmq/nanomsg/runBenchmarkSampler.cxx @@ -0,0 +1,115 @@ +/** + * runBenchmarkSampler.cxx + * + * @since 2013-04-23 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQBenchmarkSampler.h" +#include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQBenchmarkSampler sampler; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + sampler.ChangeState(FairMQBenchmarkSampler::STOP); + sampler.ChangeState(FairMQBenchmarkSampler::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc != 9 ) { + cout << "Usage: bsampler ID eventSize eventRate numIoTreads\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << endl; + return 1; + } + + s_catch_signals(); + + stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + sampler.SetTransport(transportFactory); + + int i = 1; + + sampler.SetProperty(FairMQBenchmarkSampler::Id, argv[i]); + ++i; + + int eventSize; + stringstream(argv[i]) >> eventSize; + sampler.SetProperty(FairMQBenchmarkSampler::EventSize, eventSize); + ++i; + + int eventRate; + stringstream(argv[i]) >> eventRate; + sampler.SetProperty(FairMQBenchmarkSampler::EventRate, eventRate); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + sampler.SetProperty(FairMQBenchmarkSampler::NumIoThreads, numIoThreads); + ++i; + + sampler.SetProperty(FairMQBenchmarkSampler::NumInputs, 0); + sampler.SetProperty(FairMQBenchmarkSampler::NumOutputs, 1); + + + sampler.ChangeState(FairMQBenchmarkSampler::INIT); + + + sampler.SetProperty(FairMQBenchmarkSampler::OutputSocketType, argv[i], 0); + ++i; + int outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; + sampler.SetProperty(FairMQBenchmarkSampler::OutputSndBufSize, outputSndBufSize, 0); + ++i; + sampler.SetProperty(FairMQBenchmarkSampler::OutputMethod, argv[i], 0); + ++i; + sampler.SetProperty(FairMQBenchmarkSampler::OutputAddress, argv[i], 0); + ++i; + + + sampler.ChangeState(FairMQBenchmarkSampler::SETOUTPUT); + sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT); + sampler.ChangeState(FairMQBenchmarkSampler::RUN); + + + + char ch; + cin.get(ch); + + sampler.ChangeState(FairMQBenchmarkSampler::STOP); + sampler.ChangeState(FairMQBenchmarkSampler::END); + + return 0; +} + diff --git a/fairmq/nanomsg/runBuffer.cxx b/fairmq/nanomsg/runBuffer.cxx new file mode 100644 index 00000000..dc678256 --- /dev/null +++ b/fairmq/nanomsg/runBuffer.cxx @@ -0,0 +1,115 @@ +/** + * runBuffer.cxx + * + * @since 2012-10-26 + * @author: D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQBuffer.h" +#include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQBuffer buffer; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + buffer.ChangeState(FairMQBuffer::STOP); + buffer.ChangeState(FairMQBuffer::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc != 11 ) { + cout << "Usage: buffer \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + return 1; + } + + s_catch_signals(); + + stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + buffer.SetTransport(transportFactory); + + int i = 1; + + buffer.SetProperty(FairMQBuffer::Id, argv[i]); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + buffer.SetProperty(FairMQBuffer::NumIoThreads, numIoThreads); + ++i; + buffer.SetProperty(FairMQBuffer::NumInputs, 1); + buffer.SetProperty(FairMQBuffer::NumOutputs, 1); + + + buffer.ChangeState(FairMQBuffer::INIT); + + + buffer.SetProperty(FairMQBuffer::InputSocketType, argv[i], 0); + ++i; + int inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; + buffer.SetProperty(FairMQBuffer::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + buffer.SetProperty(FairMQBuffer::InputMethod, argv[i], 0); + ++i; + buffer.SetProperty(FairMQBuffer::InputAddress, argv[i], 0); + ++i; + + buffer.SetProperty(FairMQBuffer::OutputSocketType, argv[i], 0); + ++i; + int outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; + buffer.SetProperty(FairMQBuffer::OutputSndBufSize, outputSndBufSize, 0); + ++i; + buffer.SetProperty(FairMQBuffer::OutputMethod, argv[i], 0); + ++i; + buffer.SetProperty(FairMQBuffer::OutputAddress, argv[i], 0); + ++i; + + + buffer.ChangeState(FairMQBuffer::SETOUTPUT); + buffer.ChangeState(FairMQBuffer::SETINPUT); + buffer.ChangeState(FairMQBuffer::RUN); + + + + char ch; + cin.get(ch); + + buffer.ChangeState(FairMQBuffer::STOP); + buffer.ChangeState(FairMQBuffer::END); + + return 0; +} + diff --git a/fairmq/nanomsg/runMerger.cxx b/fairmq/nanomsg/runMerger.cxx new file mode 100644 index 00000000..866f6240 --- /dev/null +++ b/fairmq/nanomsg/runMerger.cxx @@ -0,0 +1,126 @@ +/** + * runMerger.cxx + * + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQMerger.h" +#include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQMerger merger; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + merger.ChangeState(FairMQMerger::STOP); + merger.ChangeState(FairMQMerger::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc != 15 ) { + cout << "Usage: merger \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + return 1; + } + + s_catch_signals(); + + stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + merger.SetTransport(transportFactory); + + int i = 1; + + merger.SetProperty(FairMQMerger::Id, argv[i]); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); + ++i; + + merger.SetProperty(FairMQMerger::NumInputs, 2); + merger.SetProperty(FairMQMerger::NumOutputs, 1); + + + merger.ChangeState(FairMQMerger::INIT); + + + merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 0); + ++i; + int inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; + merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + merger.SetProperty(FairMQMerger::InputMethod, argv[i], 0); + ++i; + merger.SetProperty(FairMQMerger::InputAddress, argv[i], 0); + ++i; + + merger.SetProperty(FairMQMerger::InputSocketType, argv[i], 1); + ++i; + stringstream(argv[i]) >> inputRcvBufSize; + merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, 1); + ++i; + merger.SetProperty(FairMQMerger::InputMethod, argv[i], 1); + ++i; + merger.SetProperty(FairMQMerger::InputAddress, argv[i], 1); + ++i; + + merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0); + ++i; + int outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; + merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); + ++i; + merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); + ++i; + merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); + ++i; + + + merger.ChangeState(FairMQMerger::SETOUTPUT); + merger.ChangeState(FairMQMerger::SETINPUT); + merger.ChangeState(FairMQMerger::RUN); + + + char ch; + cin.get(ch); + + merger.ChangeState(FairMQMerger::STOP); + merger.ChangeState(FairMQMerger::END); + + return 0; +} + diff --git a/fairmq/nanomsg/runNToOneMerger.cxx b/fairmq/nanomsg/runNToOneMerger.cxx new file mode 100644 index 00000000..9dbc7746 --- /dev/null +++ b/fairmq/nanomsg/runNToOneMerger.cxx @@ -0,0 +1,124 @@ +/** + * runNToOneMerger.cxx + * + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQMerger.h" +#include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQMerger merger; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + merger.ChangeState(FairMQMerger::STOP); + merger.ChangeState(FairMQMerger::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc < 16 || (argc-8)%4!=0 ) { + cout << "Usage: merger \tID numIoTreads numInputs\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\t...\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << argc << endl; + return 1; + } + + s_catch_signals(); + + stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + merger.SetTransport(transportFactory); + + int i = 1; + + merger.SetProperty(FairMQMerger::Id, argv[i]); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + merger.SetProperty(FairMQMerger::NumIoThreads, numIoThreads); + ++i; + + int numInputs; + stringstream(argv[i]) >> numInputs; + merger.SetProperty(FairMQMerger::NumInputs, numInputs); + ++i; + + merger.SetProperty(FairMQMerger::NumOutputs, 1); + + + merger.ChangeState(FairMQMerger::INIT); + + + for (int iInput = 0; iInput < numInputs; iInput++ ) { + merger.SetProperty(FairMQMerger::InputSocketType, argv[i], iInput); + ++i; + int inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; + merger.SetProperty(FairMQMerger::InputRcvBufSize, inputRcvBufSize, iInput); + ++i; + merger.SetProperty(FairMQMerger::InputMethod, argv[i], iInput); + ++i; + merger.SetProperty(FairMQMerger::InputAddress, argv[i], iInput); + ++i; + } + + merger.SetProperty(FairMQMerger::OutputSocketType, argv[i], 0); + ++i; + int outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; + merger.SetProperty(FairMQMerger::OutputSndBufSize, outputSndBufSize, 0); + ++i; + merger.SetProperty(FairMQMerger::OutputMethod, argv[i], 0); + ++i; + merger.SetProperty(FairMQMerger::OutputAddress, argv[i], 0); + ++i; + + + merger.ChangeState(FairMQMerger::SETOUTPUT); + merger.ChangeState(FairMQMerger::SETINPUT); + merger.ChangeState(FairMQMerger::RUN); + + + char ch; + cin.get(ch); + + merger.ChangeState(FairMQMerger::STOP); + merger.ChangeState(FairMQMerger::END); + + return 0; +} + diff --git a/fairmq/nanomsg/runOneToNSplitter.cxx b/fairmq/nanomsg/runOneToNSplitter.cxx new file mode 100644 index 00000000..9fab511e --- /dev/null +++ b/fairmq/nanomsg/runOneToNSplitter.cxx @@ -0,0 +1,122 @@ +/** + * runOneToNSplitter.cxx + * + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQSplitter.h" +#include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQSplitter splitter; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + splitter.ChangeState(FairMQSplitter::STOP); + splitter.ChangeState(FairMQSplitter::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc < 16 || (argc - 8) % 4 != 0 ) { // argc{ name, id, threads, nout, insock, inbuff, inmet, inadd, ... out} + cout << "Usage: splitter \tID numIoTreads numOutputs\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << "\t\t..." << argc << " arguments provided" << endl; + return 1; + } + + s_catch_signals(); + + stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + splitter.SetTransport(transportFactory); + + int i = 1; + + splitter.SetProperty(FairMQSplitter::Id, argv[i]); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); + ++i; + + splitter.SetProperty(FairMQSplitter::NumInputs, 1); + + int numOutputs; + stringstream(argv[i]) >> numOutputs; + splitter.SetProperty(FairMQSplitter::NumOutputs, numOutputs); + ++i; + + + splitter.ChangeState(FairMQSplitter::INIT); + + + splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); + ++i; + int inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; + splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); + ++i; + splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); + ++i; + + int outputSndBufSize; + for (int iOutput = 0; iOutput < numOutputs; iOutput++) { + splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], iOutput); + ++i; + stringstream(argv[i]) >> outputSndBufSize; + splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, iOutput); + ++i; + splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], iOutput); + ++i; + splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], iOutput); + ++i; + } + + splitter.ChangeState(FairMQSplitter::SETOUTPUT); + splitter.ChangeState(FairMQSplitter::SETINPUT); + splitter.ChangeState(FairMQSplitter::RUN); + + + char ch; + cin.get(ch); + + splitter.ChangeState(FairMQSplitter::STOP); + splitter.ChangeState(FairMQSplitter::END); + + return 0; +} + diff --git a/fairmq/nanomsg/runProxy.cxx b/fairmq/nanomsg/runProxy.cxx new file mode 100644 index 00000000..f5e1f946 --- /dev/null +++ b/fairmq/nanomsg/runProxy.cxx @@ -0,0 +1,115 @@ +/** + * runProxy.cxx + * + * @since 2013-10-07 + * @author A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQProxy.h" +#include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQProxy proxy; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + proxy.ChangeState(FairMQProxy::STOP); + proxy.ChangeState(FairMQProxy::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc != 11 ) { + cout << "Usage: proxy \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + return 1; + } + + s_catch_signals(); + + stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + proxy.SetTransport(transportFactory); + + int i = 1; + + proxy.SetProperty(FairMQProxy::Id, argv[i]); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + proxy.SetProperty(FairMQProxy::NumIoThreads, numIoThreads); + ++i; + + proxy.SetProperty(FairMQProxy::NumInputs, 1); + proxy.SetProperty(FairMQProxy::NumOutputs, 1); + + + proxy.ChangeState(FairMQProxy::INIT); + + + proxy.SetProperty(FairMQProxy::InputSocketType, argv[i], 0); + ++i; + int inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; + proxy.SetProperty(FairMQProxy::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + proxy.SetProperty(FairMQProxy::InputMethod, argv[i], 0); + ++i; + proxy.SetProperty(FairMQProxy::InputAddress, argv[i], 0); + ++i; + + proxy.SetProperty(FairMQProxy::OutputSocketType, argv[i], 0); + ++i; + int outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; + proxy.SetProperty(FairMQProxy::OutputSndBufSize, outputSndBufSize, 0); + ++i; + proxy.SetProperty(FairMQProxy::OutputMethod, argv[i], 0); + ++i; + proxy.SetProperty(FairMQProxy::OutputAddress, argv[i], 0); + ++i; + + + proxy.ChangeState(FairMQProxy::SETOUTPUT); + proxy.ChangeState(FairMQProxy::SETINPUT); + proxy.ChangeState(FairMQProxy::RUN); + + + char ch; + cin.get(ch); + + proxy.ChangeState(FairMQProxy::STOP); + proxy.ChangeState(FairMQProxy::END); + + return 0; +} + diff --git a/fairmq/nanomsg/runSink.cxx b/fairmq/nanomsg/runSink.cxx new file mode 100644 index 00000000..cbb3c965 --- /dev/null +++ b/fairmq/nanomsg/runSink.cxx @@ -0,0 +1,104 @@ +/** + * runSink.cxx + * + * @since 2013-01-21 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQSink.h" +#include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQSink sink; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + sink.ChangeState(FairMQSink::STOP); + sink.ChangeState(FairMQSink::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc != 7 ) { + cout << "Usage: sink \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << endl; + return 1; + } + + s_catch_signals(); + + stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + sink.SetTransport(transportFactory); + + int i = 1; + + sink.SetProperty(FairMQSink::Id, argv[i]); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + sink.SetProperty(FairMQSink::NumIoThreads, numIoThreads); + ++i; + + sink.SetProperty(FairMQSink::NumInputs, 1); + sink.SetProperty(FairMQSink::NumOutputs, 0); + + + sink.ChangeState(FairMQSink::INIT); + + + sink.SetProperty(FairMQSink::InputSocketType, argv[i], 0); + ++i; + int inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; + sink.SetProperty(FairMQSink::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + sink.SetProperty(FairMQSink::InputMethod, argv[i], 0); + ++i; + sink.SetProperty(FairMQSink::InputAddress, argv[i], 0); + ++i; + + + sink.ChangeState(FairMQSink::SETOUTPUT); + sink.ChangeState(FairMQSink::SETINPUT); + sink.ChangeState(FairMQSink::RUN); + + + char ch; + cin.get(ch); + + sink.ChangeState(FairMQSink::STOP); + sink.ChangeState(FairMQSink::END); + + return 0; +} + diff --git a/fairmq/nanomsg/runSplitter.cxx b/fairmq/nanomsg/runSplitter.cxx new file mode 100644 index 00000000..5f46265a --- /dev/null +++ b/fairmq/nanomsg/runSplitter.cxx @@ -0,0 +1,126 @@ +/** + * runSplitter.cxx + * + * @since 2012-12-06 + * @author D. Klein, A. Rybalchenko + */ + +#include +#include + +#include "FairMQLogger.h" +#include "FairMQSplitter.h" +#include "FairMQTransportFactoryNN.h" + +using std::cout; +using std::cin; +using std::endl; +using std::stringstream; + + +FairMQSplitter splitter; + +static void s_signal_handler (int signal) +{ + cout << endl << "Caught signal " << signal << endl; + + splitter.ChangeState(FairMQSplitter::STOP); + splitter.ChangeState(FairMQSplitter::END); + + cout << "Shutdown complete. Bye!" << endl; + exit(1); +} + +static void s_catch_signals (void) +{ + struct sigaction action; + action.sa_handler = s_signal_handler; + action.sa_flags = 0; + sigemptyset(&action.sa_mask); + sigaction(SIGINT, &action, NULL); + sigaction(SIGTERM, &action, NULL); +} + +int main(int argc, char** argv) +{ + if ( argc != 15 ) { + cout << "Usage: splitter \tID numIoTreads\n" + << "\t\tinputSocketType inputRcvBufSize inputMethod inputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" + << "\t\toutputSocketType outputSndBufSize outputMethod outputAddress\n" << endl; + return 1; + } + + s_catch_signals(); + + stringstream logmsg; + logmsg << "PID: " << getpid(); + FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); + + FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); + splitter.SetTransport(transportFactory); + + int i = 1; + + splitter.SetProperty(FairMQSplitter::Id, argv[i]); + ++i; + + int numIoThreads; + stringstream(argv[i]) >> numIoThreads; + splitter.SetProperty(FairMQSplitter::NumIoThreads, numIoThreads); + ++i; + + splitter.SetProperty(FairMQSplitter::NumInputs, 1); + splitter.SetProperty(FairMQSplitter::NumOutputs, 2); + + + splitter.ChangeState(FairMQSplitter::INIT); + + + splitter.SetProperty(FairMQSplitter::InputSocketType, argv[i], 0); + ++i; + int inputRcvBufSize; + stringstream(argv[i]) >> inputRcvBufSize; + splitter.SetProperty(FairMQSplitter::InputRcvBufSize, inputRcvBufSize, 0); + ++i; + splitter.SetProperty(FairMQSplitter::InputMethod, argv[i], 0); + ++i; + splitter.SetProperty(FairMQSplitter::InputAddress, argv[i], 0); + ++i; + + splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 0); + ++i; + int outputSndBufSize; + stringstream(argv[i]) >> outputSndBufSize; + splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 0); + ++i; + splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 0); + ++i; + splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 0); + ++i; + + splitter.SetProperty(FairMQSplitter::OutputSocketType, argv[i], 1); + ++i; + stringstream(argv[i]) >> outputSndBufSize; + splitter.SetProperty(FairMQSplitter::OutputSndBufSize, outputSndBufSize, 1); + ++i; + splitter.SetProperty(FairMQSplitter::OutputMethod, argv[i], 1); + ++i; + splitter.SetProperty(FairMQSplitter::OutputAddress, argv[i], 1); + ++i; + + + splitter.ChangeState(FairMQSplitter::SETOUTPUT); + splitter.ChangeState(FairMQSplitter::SETINPUT); + splitter.ChangeState(FairMQSplitter::RUN); + + + char ch; + cin.get(ch); + + splitter.ChangeState(FairMQSplitter::STOP); + splitter.ChangeState(FairMQSplitter::END); + + return 0; +} + diff --git a/fairmq/FairMQContextZMQ.cxx b/fairmq/zeromq/FairMQContextZMQ.cxx similarity index 100% rename from fairmq/FairMQContextZMQ.cxx rename to fairmq/zeromq/FairMQContextZMQ.cxx diff --git a/fairmq/FairMQContextZMQ.h b/fairmq/zeromq/FairMQContextZMQ.h similarity index 100% rename from fairmq/FairMQContextZMQ.h rename to fairmq/zeromq/FairMQContextZMQ.h diff --git a/fairmq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx similarity index 100% rename from fairmq/FairMQMessageZMQ.cxx rename to fairmq/zeromq/FairMQMessageZMQ.cxx diff --git a/fairmq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h similarity index 100% rename from fairmq/FairMQMessageZMQ.h rename to fairmq/zeromq/FairMQMessageZMQ.h diff --git a/fairmq/FairMQPollerZMQ.cxx b/fairmq/zeromq/FairMQPollerZMQ.cxx similarity index 100% rename from fairmq/FairMQPollerZMQ.cxx rename to fairmq/zeromq/FairMQPollerZMQ.cxx diff --git a/fairmq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h similarity index 100% rename from fairmq/FairMQPollerZMQ.h rename to fairmq/zeromq/FairMQPollerZMQ.h diff --git a/fairmq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx similarity index 100% rename from fairmq/FairMQSocketZMQ.cxx rename to fairmq/zeromq/FairMQSocketZMQ.cxx diff --git a/fairmq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h similarity index 100% rename from fairmq/FairMQSocketZMQ.h rename to fairmq/zeromq/FairMQSocketZMQ.h diff --git a/fairmq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx similarity index 100% rename from fairmq/FairMQTransportFactoryZMQ.cxx rename to fairmq/zeromq/FairMQTransportFactoryZMQ.cxx diff --git a/fairmq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h similarity index 100% rename from fairmq/FairMQTransportFactoryZMQ.h rename to fairmq/zeromq/FairMQTransportFactoryZMQ.h diff --git a/fairmq/runBenchmarkSampler.cxx b/fairmq/zeromq/runBenchmarkSampler.cxx similarity index 94% rename from fairmq/runBenchmarkSampler.cxx rename to fairmq/zeromq/runBenchmarkSampler.cxx index a5ce4988..b7b073ef 100644 --- a/fairmq/runBenchmarkSampler.cxx +++ b/fairmq/zeromq/runBenchmarkSampler.cxx @@ -1,7 +1,7 @@ /** * runBenchmarkSampler.cxx * - * @since Apr 23, 2013-04-23 + * @since 2013-04-23 * @author D. Klein, A. Rybalchenko */ @@ -11,7 +11,6 @@ #include "FairMQLogger.h" #include "FairMQBenchmarkSampler.h" #include "FairMQTransportFactoryZMQ.h" -// #include "FairMQTransportFactoryNN.h" using std::cout; using std::cin; @@ -58,7 +57,6 @@ int main(int argc, char** argv) FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); sampler.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/runBuffer.cxx b/fairmq/zeromq/runBuffer.cxx similarity index 95% rename from fairmq/runBuffer.cxx rename to fairmq/zeromq/runBuffer.cxx index e13bdcb6..01123074 100644 --- a/fairmq/runBuffer.cxx +++ b/fairmq/zeromq/runBuffer.cxx @@ -11,7 +11,6 @@ #include "FairMQLogger.h" #include "FairMQBuffer.h" #include "FairMQTransportFactoryZMQ.h" -// #include "FairMQTransportFactoryNN.h" using std::cout; using std::cin; @@ -58,7 +57,6 @@ int main(int argc, char** argv) FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); buffer.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/runMerger.cxx b/fairmq/zeromq/runMerger.cxx similarity index 96% rename from fairmq/runMerger.cxx rename to fairmq/zeromq/runMerger.cxx index 43e6b246..00049cc3 100644 --- a/fairmq/runMerger.cxx +++ b/fairmq/zeromq/runMerger.cxx @@ -11,7 +11,6 @@ #include "FairMQLogger.h" #include "FairMQMerger.h" #include "FairMQTransportFactoryZMQ.h" -// #include "FairMQTransportFactoryNN.h" using std::cout; using std::cin; @@ -59,7 +58,6 @@ int main(int argc, char** argv) FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); merger.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/runNToOneMerger.cxx b/fairmq/zeromq/runNToOneMerger.cxx similarity index 96% rename from fairmq/runNToOneMerger.cxx rename to fairmq/zeromq/runNToOneMerger.cxx index abc84158..c7071fe8 100644 --- a/fairmq/runNToOneMerger.cxx +++ b/fairmq/zeromq/runNToOneMerger.cxx @@ -11,7 +11,6 @@ #include "FairMQLogger.h" #include "FairMQMerger.h" #include "FairMQTransportFactoryZMQ.h" -// #include "FairMQTransportFactoryNN.h" using std::cout; using std::cin; @@ -61,7 +60,6 @@ int main(int argc, char** argv) FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); merger.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/runOneToNSplitter.cxx b/fairmq/zeromq/runOneToNSplitter.cxx similarity index 96% rename from fairmq/runOneToNSplitter.cxx rename to fairmq/zeromq/runOneToNSplitter.cxx index a2694fe2..ad2314fe 100644 --- a/fairmq/runOneToNSplitter.cxx +++ b/fairmq/zeromq/runOneToNSplitter.cxx @@ -11,7 +11,6 @@ #include "FairMQLogger.h" #include "FairMQSplitter.h" #include "FairMQTransportFactoryZMQ.h" -// #include "FairMQTransportFactoryNN.h" using std::cout; using std::cin; @@ -60,7 +59,6 @@ int main(int argc, char** argv) FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); splitter.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/runProxy.cxx b/fairmq/zeromq/runProxy.cxx similarity index 95% rename from fairmq/runProxy.cxx rename to fairmq/zeromq/runProxy.cxx index 06984c93..8ddc8623 100644 --- a/fairmq/runProxy.cxx +++ b/fairmq/zeromq/runProxy.cxx @@ -11,7 +11,6 @@ #include "FairMQLogger.h" #include "FairMQProxy.h" #include "FairMQTransportFactoryZMQ.h" -// #include "FairMQTransportFactoryNN.h" using std::cout; using std::cin; @@ -58,7 +57,6 @@ int main(int argc, char** argv) FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); proxy.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/runSink.cxx b/fairmq/zeromq/runSink.cxx similarity index 94% rename from fairmq/runSink.cxx rename to fairmq/zeromq/runSink.cxx index 664cf1c3..cbb45716 100644 --- a/fairmq/runSink.cxx +++ b/fairmq/zeromq/runSink.cxx @@ -11,7 +11,6 @@ #include "FairMQLogger.h" #include "FairMQSink.h" #include "FairMQTransportFactoryZMQ.h" -// #include "FairMQTransportFactoryNN.h" using std::cout; using std::cin; @@ -58,7 +57,6 @@ int main(int argc, char** argv) FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); sink.SetTransport(transportFactory); int i = 1; diff --git a/fairmq/runSplitter.cxx b/fairmq/zeromq/runSplitter.cxx similarity index 96% rename from fairmq/runSplitter.cxx rename to fairmq/zeromq/runSplitter.cxx index f1080242..303633b2 100644 --- a/fairmq/runSplitter.cxx +++ b/fairmq/zeromq/runSplitter.cxx @@ -11,7 +11,6 @@ #include "FairMQLogger.h" #include "FairMQSplitter.h" #include "FairMQTransportFactoryZMQ.h" -// #include "FairMQTransportFactoryNN.h" using std::cout; using std::cin; @@ -59,7 +58,6 @@ int main(int argc, char** argv) FairMQLogger::GetInstance()->Log(FairMQLogger::INFO, logmsg.str()); FairMQTransportFactory* transportFactory = new FairMQTransportFactoryZMQ(); - // FairMQTransportFactory* transportFactory = new FairMQTransportFactoryNN(); splitter.SetTransport(transportFactory); int i = 1;