diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index bb2216de..78291a89 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -15,7 +15,8 @@ if(PROTOBUF_FOUND) set(INCLUDE_DIRECTORIES ${INCLUDE_DIRECTORIES} ${PROTOBUF_INCLUDE_DIR} - ${CMAKE_SOURCE_DIR}/fairmq/prototest + # # following directory is only for protobuf tests and is not essential part of FairMQ + #${CMAKE_SOURCE_DIR}/fairmq/prototest ) endif(PROTOBUF_FOUND) @@ -59,14 +60,15 @@ set(SRCS ) if(PROTOBUF_FOUND) - set(SRCS - ${SRCS} - "prototest/payload.pb.cc" - "prototest/FairMQProtoSampler.cxx" - "prototest/FairMQBinSampler.cxx" - "prototest/FairMQBinSink.cxx" - "prototest/FairMQProtoSink.cxx" - ) + # following source files are only for protobuf tests and are not essential part of FairMQ + # set(SRCS + # ${SRCS} + # "prototest/payload.pb.cc" + # "prototest/FairMQProtoSampler.cxx" + # "prototest/FairMQBinSampler.cxx" + # "prototest/FairMQBinSink.cxx" + # "prototest/FairMQProtoSink.cxx" + # ) set(DEPENDENCIES ${DEPENDENCIES} ${PROTOBUF_LIBRARY} @@ -117,15 +119,16 @@ set(Exe_Names sink proxy) -if(PROTOBUF_FOUND) - set(Exe_Names - ${Exe_Names} - binsampler - protosampler - binsink - protosink - ) -endif(PROTOBUF_FOUND) +# following executables are only for protobuf tests and are not essential part of FairMQ +# if(PROTOBUF_FOUND) +# set(Exe_Names +# ${Exe_Names} +# binsampler +# protosampler +# binsink +# protosink +# ) +# endif(PROTOBUF_FOUND) set(Exe_Source run/runBenchmarkSampler.cxx @@ -136,15 +139,16 @@ set(Exe_Source run/runProxy.cxx ) -if(PROTOBUF_FOUND) - set(Exe_Source - ${Exe_Source} - run/runBinSampler.cxx - run/runProtoSampler.cxx - run/runBinSink.cxx - run/runProtoSink.cxx - ) -endif(PROTOBUF_FOUND) +# following source files are only for protobuf tests and are not essential part of FairMQ +# if(PROTOBUF_FOUND) +# set(Exe_Source +# ${Exe_Source} +# run/runBinSampler.cxx +# run/runProtoSampler.cxx +# run/runBinSink.cxx +# run/runProtoSink.cxx +# ) +# endif(PROTOBUF_FOUND) list(LENGTH Exe_Names _length) math(EXPR _length ${_length}-1) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 6fff84b8..3d370f73 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -256,79 +256,61 @@ void FairMQDevice::LogSocketRates() timestamp_t t0; timestamp_t t1; - timestamp_t timeSinceLastLog_ms; + timestamp_t msSinceLastLog; - vector bytesInput(fNumInputs); - vector messagesInput(fNumInputs); - vector bytesOutput(fNumOutputs); - vector messagesOutput(fNumOutputs); + vector bytesIn(fNumInputs); + vector msgIn(fNumInputs); + vector bytesOut(fNumOutputs); + vector msgOut(fNumOutputs); - vector bytesInputNew(fNumInputs); - vector messagesInputNew(fNumInputs); - vector bytesOutputNew(fNumOutputs); - vector messagesOutputNew(fNumOutputs); + vector bytesInNew(fNumInputs); + vector msgInNew(fNumInputs); + vector bytesOutNew(fNumOutputs); + vector msgOutNew(fNumOutputs); - vector megabytesPerSecondInput(fNumInputs); - vector messagesPerSecondInput(fNumInputs); - vector megabytesPerSecondOutput(fNumOutputs); - vector messagesPerSecondOutput(fNumOutputs); - - // Temp stuff for process termination - // bool receivedSomething = false; - // bool sentSomething = false; - // int didNotReceiveFor = 0; - // int didNotSendFor = 0; - // End of temp stuff + vector mbPerSecIn(fNumInputs); + vector msgPerSecIn(fNumInputs); + vector mbPerSecOut(fNumOutputs); + vector msgPerSecOut(fNumOutputs); int i = 0; for (vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) { - bytesInput.at(i) = (*itr)->GetBytesRx(); - messagesInput.at(i) = (*itr)->GetMessagesRx(); + bytesIn.at(i) = (*itr)->GetBytesRx(); + msgIn.at(i) = (*itr)->GetMessagesRx(); ++i; } i = 0; for (vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) { - bytesOutput.at(i) = (*itr)->GetBytesTx(); - messagesOutput.at(i) = (*itr)->GetMessagesTx(); + bytesOut.at(i) = (*itr)->GetBytesTx(); + msgOut.at(i) = (*itr)->GetMessagesTx(); ++i; } t0 = get_timestamp(); - while (true) + while (fState == RUNNING) { try { t1 = get_timestamp(); - timeSinceLastLog_ms = (t1 - t0) / 1000.0L; + msSinceLastLog = (t1 - t0) / 1000.0L; i = 0; for (vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) { - bytesInputNew.at(i) = (*itr)->GetBytesRx(); - megabytesPerSecondInput.at(i) = ((double)(bytesInputNew.at(i) - bytesInput.at(i)) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.; - bytesInput.at(i) = bytesInputNew.at(i); - messagesInputNew.at(i) = (*itr)->GetMessagesRx(); - messagesPerSecondInput.at(i) = (double)(messagesInputNew.at(i) - messagesInput.at(i)) / (double)timeSinceLastLog_ms * 1000.; - messagesInput.at(i) = messagesInputNew.at(i); + bytesInNew.at(i) = (*itr)->GetBytesRx(); + mbPerSecIn.at(i) = ((double)(bytesInNew.at(i) - bytesIn.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.; + bytesIn.at(i) = bytesInNew.at(i); + msgInNew.at(i) = (*itr)->GetMessagesRx(); + msgPerSecIn.at(i) = (double)(msgInNew.at(i) - msgIn.at(i)) / (double)msSinceLastLog * 1000.; + msgIn.at(i) = msgInNew.at(i); - LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondInput.at(i) << " msg/s, " << megabytesPerSecondInput.at(i) << " MB/s"; - - // Temp stuff for process termination - // if ( !receivedSomething && messagesPerSecondInput.at(i) > 0 ) { - // receivedSomething = true; - // } - // if ( receivedSomething && messagesPerSecondInput.at(i) == 0 ) { - // cout << "Did not receive anything on socket " << i << " for " << didNotReceiveFor++ << " seconds." << endl; - // } else { - // didNotReceiveFor = 0; - // } - // End of temp stuff + LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << msgPerSecIn.at(i) << " msg/s, " << mbPerSecIn.at(i) << " MB/s"; ++i; } @@ -337,52 +319,29 @@ void FairMQDevice::LogSocketRates() for (vector::iterator itr = fPayloadOutputs->begin(); itr != fPayloadOutputs->end(); itr++) { - bytesOutputNew.at(i) = (*itr)->GetBytesTx(); - megabytesPerSecondOutput.at(i) = ((double)(bytesOutputNew.at(i) - bytesOutput.at(i)) / (1024. * 1024.)) / (double)timeSinceLastLog_ms * 1000.; - bytesOutput.at(i) = bytesOutputNew.at(i); - messagesOutputNew.at(i) = (*itr)->GetMessagesTx(); - messagesPerSecondOutput.at(i) = (double)(messagesOutputNew.at(i) - messagesOutput.at(i)) / (double)timeSinceLastLog_ms * 1000.; - messagesOutput.at(i) = messagesOutputNew.at(i); + bytesOutNew.at(i) = (*itr)->GetBytesTx(); + mbPerSecOut.at(i) = ((double)(bytesOutNew.at(i) - bytesOut.at(i)) / (1024. * 1024.)) / (double)msSinceLastLog * 1000.; + bytesOut.at(i) = bytesOutNew.at(i); + msgOutNew.at(i) = (*itr)->GetMessagesTx(); + msgPerSecOut.at(i) = (double)(msgOutNew.at(i) - msgOut.at(i)) / (double)msSinceLastLog * 1000.; + msgOut.at(i) = msgOutNew.at(i); - LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << messagesPerSecondOutput.at(i) << " msg/s, " << megabytesPerSecondOutput.at(i) - << " MB/s"; - - // Temp stuff for process termination - // if ( !sentSomething && messagesPerSecondOutput.at(i) > 0 ) { - // sentSomething = true; - // } - // if ( sentSomething && messagesPerSecondOutput.at(i) == 0 ) { - // cout << "Did not send anything on socket " << i << " for " << didNotSendFor++ << " seconds." << endl; - // } else { - // didNotSendFor = 0; - // } - // End of temp stuff + LOG(DEBUG) << "#" << fId << "." << (*itr)->GetId() << ": " << msgPerSecOut.at(i) << " msg/s, " << mbPerSecOut.at(i) << " MB/s"; ++i; } - // Temp stuff for process termination - // if (receivedSomething && didNotReceiveFor > 5) { - // cout << "stopping because nothing was received for 5 seconds." << endl; - // ChangeState(STOP); - // } - // if (sentSomething && didNotSendFor > 5) { - // cout << "stopping because nothing was sent for 5 seconds." << endl; - // ChangeState(STOP); - // } - // End of temp stuff - t0 = t1; boost::this_thread::sleep(boost::posix_time::milliseconds(fLogIntervalInMs)); } catch (boost::thread_interrupted&) { - cout << "rateLogger interrupted" << endl; + LOG(INFO) << "FairMQDevice::LogSocketRates() interrupted"; break; } } - LOG(INFO) << ">>>>>>> stopping rateLogger <<<<<<<"; + LOG(INFO) << ">>>>>>> stopping FairMQDevice::LogSocketRates() <<<<<<<"; } void FairMQDevice::ListenToCommands() @@ -404,6 +363,24 @@ void FairMQDevice::Shutdown() } } +void FairMQDevice::Terminate() +{ + // Termination signal has to be sent only once to any socket. + // Find available socket and send termination signal to it. + if (fPayloadInputs->size() > 0) + { + fPayloadInputs->at(0)->Terminate(); + } + else if (fPayloadOutputs->size() > 0) + { + fPayloadOutputs->at(0)->Terminate(); + } + else + { + LOG(ERROR) << "No sockets available to terminate."; + } +} + FairMQDevice::~FairMQDevice() { for (vector::iterator itr = fPayloadInputs->begin(); itr != fPayloadInputs->end(); itr++) diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 46019cdb..89a1f103 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -88,6 +88,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable vector* fPayloadInputs; vector* fPayloadOutputs; + FairMQSocket* fCommandSocket; + int fLogIntervalInMs; FairMQTransportFactory* fTransportFactory; @@ -98,6 +100,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable virtual void Shutdown(); virtual void InitOutput(); virtual void InitInput(); + + virtual void Terminate(); }; #endif /* FAIRMQDEVICE_H_ */ diff --git a/fairmq/FairMQLogger.cxx b/fairmq/FairMQLogger.cxx index bd33430f..21e80832 100644 --- a/fairmq/FairMQLogger.cxx +++ b/fairmq/FairMQLogger.cxx @@ -32,14 +32,6 @@ FairMQLogger::~FairMQLogger() std::ostringstream& FairMQLogger::Log(int type) { - timestamp_t tm = get_timestamp(); - timestamp_t ms = tm / 1000.0L; - timestamp_t s = ms / 1000.0L; - std::time_t t = s; - // std::size_t fractional_seconds = ms % 1000; - char mbstr[100]; - std::strftime(mbstr, 100, "%H:%M:%S", std::localtime(&t)); - string type_str; switch (type) { @@ -61,6 +53,14 @@ std::ostringstream& FairMQLogger::Log(int type) break; } + timestamp_t tm = get_timestamp(); + timestamp_t ms = tm / 1000.0L; + timestamp_t s = ms / 1000.0L; + std::time_t t = s; + // std::size_t fractional_seconds = ms % 1000; + char mbstr[100]; + std::strftime(mbstr, 100, "%H:%M:%S", std::localtime(&t)); + os << "[\033[01;36m" << mbstr << "\033[0m]" << "[" << type_str << "]" << " "; diff --git a/fairmq/FairMQPoller.h b/fairmq/FairMQPoller.h index f5924ea6..fa833db3 100644 --- a/fairmq/FairMQPoller.h +++ b/fairmq/FairMQPoller.h @@ -20,6 +20,7 @@ class FairMQPoller public: virtual void Poll(int timeout) = 0; virtual bool CheckInput(int index) = 0; + virtual bool CheckOutput(int index) = 0; virtual ~FairMQPoller() {}; }; diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index a6c16f70..79c15102 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -29,12 +29,13 @@ class FairMQSocket virtual void Bind(const string& address) = 0; virtual void Connect(const string& address) = 0; - virtual size_t Send(FairMQMessage* msg, const string& flag="") = 0; - virtual size_t Receive(FairMQMessage* msg, const string& flag="") = 0; + virtual int Send(FairMQMessage* msg, const string& flag="") = 0; + virtual int Receive(FairMQMessage* msg, const string& flag="") = 0; virtual void* GetSocket() = 0; virtual int GetSocket(int nothing) = 0; virtual void Close() = 0; + virtual void Terminate() = 0; virtual void SetOption(const string& option, const void* value, size_t valueSize) = 0; virtual void GetOption(const string& option, void* value, size_t* valueSize) = 0; diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 868525d0..9a4c6408 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -33,27 +33,13 @@ namespace msmf = boost::msm::front; namespace FairMQFSM { // defining events for the boost MSM state machine - struct INIT - { - }; - struct SETOUTPUT - { - }; - struct SETINPUT - { - }; - struct PAUSE - { - }; - struct RUN - { - }; - struct STOP - { - }; - struct END - { - }; + struct INIT {}; + struct SETOUTPUT {}; + struct SETINPUT {}; + struct PAUSE {}; + struct RUN {}; + struct STOP {}; + struct END {}; // defining the boost MSM state machine struct FairMQFSM_ : public msm::front::state_machine_def @@ -70,24 +56,12 @@ namespace FairMQFSM LOG(STATE) << "Exiting FairMQ state machine"; } // The list of FSM states - struct IDLE_FSM : public msm::front::state<> - { - }; - struct INITIALIZING_FSM : public msm::front::state<> - { - }; - struct SETTINGOUTPUT_FSM : public msm::front::state<> - { - }; - struct SETTINGINPUT_FSM : public msm::front::state<> - { - }; - struct WAITING_FSM : public msm::front::state<> - { - }; - struct RUNNING_FSM : public msm::front::state<> - { - }; + struct IDLE_FSM : public msm::front::state<> {}; + struct INITIALIZING_FSM : public msm::front::state<> {}; + struct SETTINGOUTPUT_FSM : public msm::front::state<> {}; + struct SETTINGINPUT_FSM : public msm::front::state<> {}; + struct WAITING_FSM : public msm::front::state<> {}; + struct RUNNING_FSM : public msm::front::state<> {}; // Define initial state typedef IDLE_FSM initial_state; // Actions @@ -141,8 +115,8 @@ namespace FairMQFSM void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) { fsm.fState = IDLE; + fsm.Terminate(); fsm.running_state.join(); - fsm.Shutdown(); } }; struct PauseFct @@ -156,24 +130,13 @@ namespace FairMQFSM } }; // actions to be overwritten by derived classes - virtual void Init() - { - } - virtual void Run() - { - } - virtual void Pause() - { - } - virtual void Shutdown() - { - } - virtual void InitOutput() - { - } - virtual void InitInput() - { - } + virtual void Init() {} + virtual void Run() {} + virtual void Pause() {} + virtual void Shutdown() {} + virtual void InitOutput() {} + virtual void InitInput() {} + virtual void Terminate() {} // Termination method called during StopFct action. // Transition table for FairMQFMS struct transition_table : mpl::vector< // Start Event Next Action Guard diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index c018c72a..3d907cb4 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -68,12 +68,14 @@ void FairMQBenchmarkSampler::Run() try { rateLogger.interrupt(); - resetEventCounter.interrupt(); rateLogger.join(); + resetEventCounter.interrupt(); resetEventCounter.join(); } catch(boost::thread_resource_error& e) { LOG(ERROR) << e.what(); } + + FairMQDevice::Shutdown(); } void FairMQBenchmarkSampler::ResetEventCounter() diff --git a/fairmq/devices/FairMQBuffer.cxx b/fairmq/devices/FairMQBuffer.cxx index 62fea1dd..10b4ce63 100644 --- a/fairmq/devices/FairMQBuffer.cxx +++ b/fairmq/devices/FairMQBuffer.cxx @@ -30,17 +30,17 @@ void FairMQBuffer::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - bool received = false; + int received = 0; while (fState == RUNNING) { FairMQMessage* msg = fTransportFactory->CreateMessage(); received = fPayloadInputs->at(0)->Receive(msg); - if (received) + if (received > 0) { fPayloadOutputs->at(0)->Send(msg); - received = false; + received = 0; } delete msg; @@ -52,6 +52,8 @@ void FairMQBuffer::Run() } catch(boost::thread_resource_error& e) { LOG(ERROR) << e.what(); } + + FairMQDevice::Shutdown(); } FairMQBuffer::~FairMQBuffer() diff --git a/fairmq/devices/FairMQMerger.cxx b/fairmq/devices/FairMQMerger.cxx index 3f7fb04d..1d7adcac 100644 --- a/fairmq/devices/FairMQMerger.cxx +++ b/fairmq/devices/FairMQMerger.cxx @@ -35,7 +35,7 @@ void FairMQMerger::Run() FairMQPoller* poller = fTransportFactory->CreatePoller(*fPayloadInputs); - bool received = false; + int received = 0; while (fState == RUNNING) { @@ -49,10 +49,10 @@ void FairMQMerger::Run() { received = fPayloadInputs->at(i)->Receive(msg); } - if (received) + if (received > 0) { fPayloadOutputs->at(0)->Send(msg); - received = false; + received = 0; } } @@ -67,4 +67,6 @@ void FairMQMerger::Run() } catch(boost::thread_resource_error& e) { LOG(ERROR) << e.what(); } + + FairMQDevice::Shutdown(); } diff --git a/fairmq/devices/FairMQProxy.cxx b/fairmq/devices/FairMQProxy.cxx index 44770a3a..17bf7575 100644 --- a/fairmq/devices/FairMQProxy.cxx +++ b/fairmq/devices/FairMQProxy.cxx @@ -34,15 +34,15 @@ void FairMQProxy::Run() FairMQMessage* msg = fTransportFactory->CreateMessage(); - size_t bytes_received = 0; + int received = 0; while (fState == RUNNING) { - bytes_received = fPayloadInputs->at(0)->Receive(msg); - if (bytes_received) + received = fPayloadInputs->at(0)->Receive(msg); + if (received > 0) { fPayloadOutputs->at(0)->Send(msg); - bytes_received = 0; + received = 0; } } @@ -54,4 +54,6 @@ void FairMQProxy::Run() } catch(boost::thread_resource_error& e) { LOG(ERROR) << e.what(); } + + FairMQDevice::Shutdown(); } diff --git a/fairmq/devices/FairMQSink.cxx b/fairmq/devices/FairMQSink.cxx index b3f923aa..ae7154b2 100644 --- a/fairmq/devices/FairMQSink.cxx +++ b/fairmq/devices/FairMQSink.cxx @@ -28,13 +28,13 @@ void FairMQSink::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - size_t bytes_received = 0; + int received = 0; while (fState == RUNNING) { FairMQMessage* msg = fTransportFactory->CreateMessage(); - bytes_received = fPayloadInputs->at(0)->Receive(msg); + received = fPayloadInputs->at(0)->Receive(msg); delete msg; } @@ -45,6 +45,8 @@ void FairMQSink::Run() } catch(boost::thread_resource_error& e) { LOG(ERROR) << e.what(); } + + FairMQDevice::Shutdown(); } FairMQSink::~FairMQSink() diff --git a/fairmq/devices/FairMQSplitter.cxx b/fairmq/devices/FairMQSplitter.cxx index f995e23a..8c41daf5 100644 --- a/fairmq/devices/FairMQSplitter.cxx +++ b/fairmq/devices/FairMQSplitter.cxx @@ -32,7 +32,7 @@ void FairMQSplitter::Run() boost::thread rateLogger(boost::bind(&FairMQDevice::LogSocketRates, this)); - bool received = false; + int received = 0; int direction = 0; while (fState == RUNNING) @@ -41,7 +41,7 @@ void FairMQSplitter::Run() received = fPayloadInputs->at(0)->Receive(msg); - if (received) + if (received > 0) { fPayloadOutputs->at(direction)->Send(msg); direction++; @@ -49,7 +49,7 @@ void FairMQSplitter::Run() { direction = 0; } - received = false; + received = 0; } delete msg; @@ -61,4 +61,6 @@ void FairMQSplitter::Run() } catch(boost::thread_resource_error& e) { LOG(ERROR) << e.what(); } + + FairMQDevice::Shutdown(); } diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index d535dffb..f804fc7a 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -13,6 +13,7 @@ */ #include +#include #include diff --git a/fairmq/nanomsg/FairMQPollerNN.cxx b/fairmq/nanomsg/FairMQPollerNN.cxx index 40b98475..4aa0c2b2 100644 --- a/fairmq/nanomsg/FairMQPollerNN.cxx +++ b/fairmq/nanomsg/FairMQPollerNN.cxx @@ -41,6 +41,14 @@ bool FairMQPollerNN::CheckInput(int index) return false; } +bool FairMQPollerNN::CheckOutput(int index) +{ + if (items[index].revents & NN_POLLOUT) + return true; + + return false; +} + FairMQPollerNN::~FairMQPollerNN() { if (items != NULL) diff --git a/fairmq/nanomsg/FairMQPollerNN.h b/fairmq/nanomsg/FairMQPollerNN.h index f9713e13..7cc9d2bc 100644 --- a/fairmq/nanomsg/FairMQPollerNN.h +++ b/fairmq/nanomsg/FairMQPollerNN.h @@ -29,6 +29,7 @@ class FairMQPollerNN : public FairMQPoller virtual void Poll(int timeout); virtual bool CheckInput(int index); + virtual bool CheckOutput(int index); virtual ~FairMQPollerNN(); diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index c4ab2330..7c444d45 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -69,7 +69,7 @@ void FairMQSocketNN::Connect(const string& address) } } -size_t FairMQSocketNN::Send(FairMQMessage* msg, const string& flag) +int FairMQSocketNN::Send(FairMQMessage* msg, const string& flag) { void* ptr = msg->GetMessage(); int rc = nn_send(fSocket, &ptr, NN_MSG, 0); @@ -87,7 +87,7 @@ size_t FairMQSocketNN::Send(FairMQMessage* msg, const string& flag) return rc; } -size_t FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) +int FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) { void* ptr = NULL; int rc = nn_recv(fSocket, &ptr, NN_MSG, 0); @@ -106,6 +106,16 @@ size_t FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) return rc; } +void FairMQSocketNN::Close() +{ + nn_close(fSocket); +} + +void FairMQSocketNN::Terminate() +{ + nn_term(); +} + void* FairMQSocketNN::GetSocket() { return NULL; // dummy method to comply with the interface. functionality not possible in zeromq. @@ -116,11 +126,6 @@ int FairMQSocketNN::GetSocket(int nothing) return fSocket; } -void FairMQSocketNN::Close() -{ - nn_close(fSocket); -} - void FairMQSocketNN::SetOption(const string& option, const void* value, size_t valueSize) { int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); @@ -186,6 +191,8 @@ int FairMQSocketNN::GetConstant(const string& constant) LOG(ERROR) << "Multipart messages functionality currently not supported by nanomsg!"; return -1; } + if (constant == "linger") + return NN_LINGER; return -1; } diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 8ae536e2..4d8b6c38 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -31,12 +31,13 @@ class FairMQSocketNN : public FairMQSocket virtual void Bind(const string& address); virtual void Connect(const string& address); - virtual size_t Send(FairMQMessage* msg, const string& flag=""); - virtual size_t Receive(FairMQMessage* msg, const string& flag=""); + virtual int Send(FairMQMessage* msg, const string& flag=""); + virtual int Receive(FairMQMessage* msg, const string& flag=""); virtual void* GetSocket(); virtual int GetSocket(int nothing); virtual void Close(); + virtual void Terminate(); virtual void SetOption(const string& option, const void* value, size_t valueSize); virtual void GetOption(const string& option, void* value, size_t* valueSize); diff --git a/fairmq/zeromq/FairMQContextZMQ.cxx b/fairmq/zeromq/FairMQContextZMQ.cxx index cf1fac36..f43203e4 100644 --- a/fairmq/zeromq/FairMQContextZMQ.cxx +++ b/fairmq/zeromq/FairMQContextZMQ.cxx @@ -51,8 +51,11 @@ void FairMQContextZMQ::Close() int rc = zmq_ctx_destroy(fContext); if (rc != 0) { - LOG(ERROR) << "failed closing context, reason: " << zmq_strerror(errno); + if (errno == EINTR) { + LOG(ERROR) << " failed closing context, reason: " << zmq_strerror(errno); + } else { + fContext = NULL; + return; + } } - - fContext = NULL; } diff --git a/fairmq/zeromq/FairMQPollerZMQ.cxx b/fairmq/zeromq/FairMQPollerZMQ.cxx index 77fcf73c..0292e67d 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.cxx +++ b/fairmq/zeromq/FairMQPollerZMQ.cxx @@ -15,6 +15,7 @@ #include #include "FairMQPollerZMQ.h" +#include "FairMQLogger.h" FairMQPollerZMQ::FairMQPollerZMQ(const vector& inputs) { @@ -32,7 +33,11 @@ FairMQPollerZMQ::FairMQPollerZMQ(const vector& inputs) void FairMQPollerZMQ::Poll(int timeout) { - zmq_poll(items, fNumItems, timeout); + int rc = zmq_poll(items, fNumItems, timeout); + if (rc < 0) + { + LOG(ERROR) << "polling failed, reason: " << zmq_strerror(errno); + } } bool FairMQPollerZMQ::CheckInput(int index) @@ -43,6 +48,14 @@ bool FairMQPollerZMQ::CheckInput(int index) return false; } +bool FairMQPollerZMQ::CheckOutput(int index) +{ + if (items[index].revents & ZMQ_POLLOUT) + return true; + + return false; +} + FairMQPollerZMQ::~FairMQPollerZMQ() { if (items != NULL) diff --git a/fairmq/zeromq/FairMQPollerZMQ.h b/fairmq/zeromq/FairMQPollerZMQ.h index 0ca839d1..d76c42f3 100644 --- a/fairmq/zeromq/FairMQPollerZMQ.h +++ b/fairmq/zeromq/FairMQPollerZMQ.h @@ -29,6 +29,7 @@ class FairMQPollerZMQ : public FairMQPoller virtual void Poll(int timeout); virtual bool CheckInput(int index); + virtual bool CheckOutput(int index); virtual ~FairMQPollerZMQ(); diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 43027f31..9e7aa7da 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -40,7 +40,16 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads) rc = zmq_setsockopt(fSocket, ZMQ_IDENTITY, &fId, fId.length()); if (rc != 0) { - LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno); + LOG(ERROR) << "failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); + } + + // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. + // Default value for ZeroMQ is -1, which is to wait forever. + int linger = 500; + rc = zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)); + if (rc != 0) + { + LOG(ERROR) << "failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); } if (type == "sub") @@ -48,7 +57,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, int num, int numIoThreads) rc = zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0); if (rc != 0) { - LOG(ERROR) << "failed setting socket option, reason: " << zmq_strerror(errno); + LOG(ERROR) << "failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); } } @@ -82,7 +91,7 @@ void FairMQSocketZMQ::Connect(const string& address) } } -size_t FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag) +int FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag) { int nbytes = zmq_msg_send(static_cast(msg->GetMessage()), fSocket, GetConstant(flag)); if (nbytes >= 0) @@ -93,13 +102,18 @@ size_t FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag) } if (zmq_errno() == EAGAIN) { - return false; + return 0; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket #" << fId; + return -1; } LOG(ERROR) << "failed sending on socket #" << fId << ", reason: " << zmq_strerror(errno); return nbytes; } -size_t FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag) +int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag) { int nbytes = zmq_msg_recv(static_cast(msg->GetMessage()), fSocket, GetConstant(flag)); if (nbytes >= 0) @@ -110,7 +124,12 @@ size_t FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag) } if (zmq_errno() == EAGAIN) { - return false; + return 0; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket #" << fId; + return -1; } LOG(ERROR) << "failed receiving on socket #" << fId << ", reason: " << zmq_strerror(errno); return nbytes; @@ -132,6 +151,15 @@ void FairMQSocketZMQ::Close() fSocket = NULL; } +void FairMQSocketZMQ::Terminate() +{ + int rc = zmq_ctx_destroy(fContext->GetContext()); + if (rc != 0) + { + LOG(ERROR) << "failed terminating context, reason: " << zmq_strerror(errno); + } +} + void* FairMQSocketZMQ::GetSocket() { return fSocket; @@ -204,6 +232,8 @@ int FairMQSocketZMQ::GetConstant(const string& constant) return ZMQ_SNDMORE; if (constant == "rcv-more") return ZMQ_RCVMORE; + if (constant == "linger") + return ZMQ_LINGER; return -1; } diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index e42687d1..77ae2ed6 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -32,12 +32,13 @@ class FairMQSocketZMQ : public FairMQSocket virtual void Bind(const string& address); virtual void Connect(const string& address); - virtual size_t Send(FairMQMessage* msg, const string& flag=""); - virtual size_t Receive(FairMQMessage* msg, const string& flag=""); + virtual int Send(FairMQMessage* msg, const string& flag=""); + virtual int Receive(FairMQMessage* msg, const string& flag=""); virtual void* GetSocket(); virtual int GetSocket(int nothing); virtual void Close(); + virtual void Terminate(); virtual void SetOption(const string& option, const void* value, size_t valueSize); virtual void GetOption(const string& option, void* value, size_t* valueSize);