diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 4e412285..7f902017 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -128,12 +128,13 @@ Set(FairMQHDRFiles devices/GenericProcessor.h devices/GenericFileSink.h devices/GenericFileSink.tpl + tools/FairMQTools.h ) install(FILES ${FairMQHDRFiles} DESTINATION include) set(DEPENDENCIES ${DEPENDENCIES} - boost_thread boost_timer boost_system boost_program_options + boost_thread boost_timer boost_system boost_program_options boost_random ) set(LIBRARY_NAME FairMQ) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 935b846b..183491d8 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -13,6 +13,8 @@ */ #include +#include // for choosing random port in range +#include // for choosing random port in range #include "FairMQSocket.h" #include "FairMQDevice.h" @@ -23,6 +25,8 @@ FairMQDevice::FairMQDevice() , fNumIoThreads(1) , fNumInputs(0) , fNumOutputs(0) + , fPortRangeMin(22000) + , fPortRangeMax(32000) , fInputAddress() , fInputMethod() , fInputSocketType() @@ -79,22 +83,6 @@ void FairMQDevice::InitInput() socket->SetOption("rcv-hwm", &fInputRcvBufSize.at(i), sizeof(fInputRcvBufSize.at(i))); fPayloadInputs->push_back(socket); - - try - { - if (fInputMethod.at(i) == "bind") - { - fPayloadInputs->at(i)->Bind(fInputAddress.at(i)); - } - else - { - fPayloadInputs->at(i)->Connect(fInputAddress.at(i)); - } - } - catch (out_of_range& e) - { - LOG(ERROR) << e.what(); - } } } @@ -110,21 +98,89 @@ void FairMQDevice::InitOutput() socket->SetOption("rcv-hwm", &fOutputRcvBufSize.at(i), sizeof(fOutputRcvBufSize.at(i))); fPayloadOutputs->push_back(socket); + } +} - try +void FairMQDevice::Bind() +{ + LOG(INFO) << ">>>>>>> binding <<<<<<<"; + + int maxAttempts = 1000; + int numAttempts = 0; + + boost::random::mt19937 gen(getpid()); + boost::random::uniform_int_distribution<> randomPort(fPortRangeMin, fPortRangeMax); + + for (int i = 0; i < fNumOutputs; ++i) + { + if (fOutputMethod.at(i) == "bind") { - if (fOutputMethod.at(i) == "bind") + if (!fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i))) { - fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i)); - } - else - { - fPayloadOutputs->at(i)->Connect(fOutputAddress.at(i)); + do { + LOG(WARN) << "could not bind at " << fOutputAddress.at(i) << ", trying another port in range"; + ++numAttempts; + + size_t pos = fOutputAddress.at(i).rfind(":"); + stringstream ss; + ss << (int)randomPort(gen); + string portString = ss.str(); + fOutputAddress.at(i) = fOutputAddress.at(i).substr(0, pos + 1) + portString; + if (numAttempts > maxAttempts) + { + LOG(ERROR) << "could not bind output " << i << " to any port in the given range"; + break; + } + } while (!fPayloadOutputs->at(i)->Bind(fOutputAddress.at(i))); } } - catch (out_of_range& e) + } + + numAttempts = 0; + + for (int i = 0; i < fNumInputs; ++i) + { + if (fInputMethod.at(i) == "bind") { - LOG(ERROR) << e.what(); + if (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i))) + { + do { + LOG(WARN) << "could not bind at " << fInputAddress.at(i) << ", trying another port in range"; + ++numAttempts; + + size_t pos = fInputAddress.at(i).rfind(":"); + stringstream ss; + ss << (int)randomPort(gen); + string portString = ss.str(); + fInputAddress.at(i) = fInputAddress.at(i).substr(0, pos + 1) + portString; + if (numAttempts > maxAttempts) + { + LOG(ERROR) << "could not bind output " << i << " to any port in the given range"; + break; + } + } while (!fPayloadInputs->at(i)->Bind(fInputAddress.at(i))); + } + } + } +} + +void FairMQDevice::Connect() +{ + LOG(INFO) << ">>>>>>> connecting <<<<<<<"; + + for (int i = 0; i < fNumOutputs; ++i) + { + if (fOutputMethod.at(i) == "connect") + { + fPayloadOutputs->at(i)->Connect(fOutputAddress.at(i)); + } + } + + for (int i = 0; i < fNumInputs; ++i) + { + if (fInputMethod.at(i) == "connect") + { + fPayloadInputs->at(i)->Connect(fInputAddress.at(i)); } } } @@ -189,6 +245,12 @@ void FairMQDevice::SetProperty(const int key, const int value, const int slot /* case NumOutputs: fNumOutputs = value; break; + case PortRangeMin: + fPortRangeMin = value; + break; + case PortRangeMax: + fPortRangeMax = value; + break; case LogIntervalInMs: fLogIntervalInMs = value; break; @@ -253,6 +315,14 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/, const i { case NumIoThreads: return fNumIoThreads; + case NumInputs: + return fNumInputs; + case NumOutputs: + return fNumOutputs; + case PortRangeMin: + return fPortRangeMin; + case PortRangeMax: + return fPortRangeMax; case LogIntervalInMs: return fLogIntervalInMs; case InputSndBufSize: @@ -397,10 +467,6 @@ void FairMQDevice::LogSocketRates() LOG(INFO) << ">>>>>>> stopping FairMQDevice::LogSocketRates() <<<<<<<"; } -void FairMQDevice::ListenToCommands() -{ -} - void FairMQDevice::Shutdown() { LOG(INFO) << ">>>>>>> closing inputs <<<<<<<"; diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 9402cd82..bdd9868c 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -35,6 +35,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable NumIoThreads, NumInputs, NumOutputs, + PortRangeMin, + PortRangeMax, InputAddress, InputMethod, InputSocketType, @@ -54,7 +56,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable FairMQDevice(); virtual void LogSocketRates(); - virtual void ListenToCommands(); virtual void SetProperty(const int key, const string& value, const int slot = 0); virtual string GetProperty(const int key, const string& default_ = "", const int slot = 0); @@ -67,11 +68,15 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable protected: string fId; + int fNumIoThreads; int fNumInputs; int fNumOutputs; + int fPortRangeMin; + int fPortRangeMax; + vector fInputAddress; vector fInputMethod; vector fInputSocketType; @@ -99,6 +104,8 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable virtual void Shutdown(); virtual void InitOutput(); virtual void InitInput(); + virtual void Bind(); + virtual void Connect(); virtual void Terminate(); diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index a2965cbd..cc324923 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -35,7 +35,7 @@ class FairMQSocket virtual string GetId() = 0; - virtual void Bind(const string& address) = 0; + virtual bool Bind(const string& address) = 0; virtual void Connect(const string& address) = 0; virtual int Send(FairMQMessage* msg, const string& flag="") = 0; diff --git a/fairmq/FairMQStateMachine.cxx b/fairmq/FairMQStateMachine.cxx index 6addff9b..39e466a4 100644 --- a/fairmq/FairMQStateMachine.cxx +++ b/fairmq/FairMQStateMachine.cxx @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -28,7 +28,12 @@ FairMQStateMachine::~FairMQStateMachine() stop(); } -void FairMQStateMachine::ChangeState(int event) +int FairMQStateMachine::GetInterfaceVersion() +{ + return FAIRMQ_INTERFACE_VERSION; +} + +bool FairMQStateMachine::ChangeState(int event) { try { @@ -36,25 +41,35 @@ void FairMQStateMachine::ChangeState(int event) { case INIT: process_event(FairMQFSM::INIT()); - return; + return true; case SETOUTPUT: process_event(FairMQFSM::SETOUTPUT()); - return; + return true; case SETINPUT: process_event(FairMQFSM::SETINPUT()); - return; + return true; + case BIND: + process_event(FairMQFSM::BIND()); + return true; + case CONNECT: + process_event(FairMQFSM::CONNECT()); + return true; case RUN: process_event(FairMQFSM::RUN()); - return; + return true; case PAUSE: process_event(FairMQFSM::PAUSE()); - return; + return true; case STOP: process_event(FairMQFSM::STOP()); - return; + return true; case END: process_event(FairMQFSM::END()); - return; + return true; + default: + LOG(ERROR) << "Requested unsupported state: " << event; + LOG(ERROR) << "Supported are: INIT, SETOUTPUT, SETINPUT, BIND, CONNECT, RUN, PAUSE, STOP, END"; + return false; } } catch (boost::bad_function_call& e) @@ -62,3 +77,49 @@ void FairMQStateMachine::ChangeState(int event) LOG(ERROR) << e.what(); } } + +bool FairMQStateMachine::ChangeState(std::string event) +{ + if (event == "INIT") + { + return ChangeState(INIT); + } + else if (event == "SETOUTPUT") + { + return ChangeState(SETOUTPUT); + } + else if (event == "SETINPUT") + { + return ChangeState(SETINPUT); + } + else if (event == "BIND") + { + return ChangeState(BIND); + } + else if (event == "CONNECT") + { + return ChangeState(CONNECT); + } + else if (event == "RUN") + { + return ChangeState(RUN); + } + else if (event == "PAUSE") + { + return ChangeState(PAUSE); + } + else if (event == "STOP") + { + return ChangeState(STOP); + } + else if (event == "END") + { + return ChangeState(END); + } + else + { + LOG(ERROR) << "Requested unsupported state: " << event; + LOG(ERROR) << "Supported are: INIT, SETOUTPUT, SETINPUT, BIND, CONNECT, RUN, PAUSE, STOP, END"; + return false; + } +} diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 9d7fdb38..2f17da2d 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -1,8 +1,8 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ /** @@ -15,6 +15,10 @@ #ifndef FAIRMQSTATEMACHINE_H_ #define FAIRMQSTATEMACHINE_H_ +#define FAIRMQ_INTERFACE_VERSION 1 + +#include + #include #include #include @@ -39,6 +43,8 @@ namespace FairMQFSM struct INIT {}; struct SETOUTPUT {}; struct SETINPUT {}; + struct BIND {}; + struct CONNECT {}; struct PAUSE {}; struct RUN {}; struct STOP {}; @@ -48,9 +54,9 @@ namespace FairMQFSM struct FairMQFSM_ : public msm::front::state_machine_def { FairMQFSM_() - : fState() - , fRunningStateThread() - {} + : fState() + , fRunningStateThread() + {} // Destructor virtual ~FairMQFSM_() {}; @@ -71,6 +77,8 @@ namespace FairMQFSM struct INITIALIZING_FSM : public msm::front::state<> {}; struct SETTINGOUTPUT_FSM : public msm::front::state<> {}; struct SETTINGINPUT_FSM : public msm::front::state<> {}; + struct BINDING_FSM : public msm::front::state<> {}; + struct CONNECTING_FSM : public msm::front::state<> {}; struct WAITING_FSM : public msm::front::state<> {}; struct RUNNING_FSM : public msm::front::state<> {}; // Define initial state @@ -111,6 +119,24 @@ namespace FairMQFSM fsm.InitInput(); } }; + struct BindFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = BINDING; + fsm.Bind(); + } + }; + struct ConnectFct + { + template + void operator()(EVT const&, FSM& fsm, SourceState&, TargetState&) + { + fsm.fState = CONNECTING; + fsm.Connect(); + } + }; struct RunFct { template @@ -147,6 +173,8 @@ namespace FairMQFSM virtual void Shutdown() {} virtual void InitOutput() {} virtual void InitInput() {} + virtual void Bind() {} + virtual void Connect() {} virtual void Terminate() {} // Termination method called during StopFct action. // Transition table for FairMQFMS struct transition_table : mpl::vector< @@ -156,8 +184,10 @@ namespace FairMQFSM msmf::Row, // this is an invalid transition... msmf::Row, msmf::Row, - msmf::Row, - msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, + msmf::Row, msmf::Row, msmf::Row, msmf::Row, @@ -179,6 +209,8 @@ namespace FairMQFSM INITIALIZING, SETTINGOUTPUT, SETTINGINPUT, + BINDING, + CONNECTING, WAITING, RUNNING }; @@ -195,6 +227,8 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM INIT, SETOUTPUT, SETINPUT, + BIND, + CONNECT, PAUSE, RUN, STOP, @@ -202,7 +236,11 @@ class FairMQStateMachine : public FairMQFSM::FairMQFSM }; FairMQStateMachine(); virtual ~FairMQStateMachine(); - void ChangeState(int event); + + int GetInterfaceVersion(); + + bool ChangeState(int event); + bool ChangeState(std::string event); // condition variable to notify parent thread about end of running state. boost::condition_variable fRunningCondition; diff --git a/fairmq/examples/req-rep/runExampleClient.cxx b/fairmq/examples/req-rep/runExampleClient.cxx index 49ff0d2f..bd110453 100644 --- a/fairmq/examples/req-rep/runExampleClient.cxx +++ b/fairmq/examples/req-rep/runExampleClient.cxx @@ -130,6 +130,8 @@ int main(int argc, char** argv) client.ChangeState(FairMQExampleClient::SETOUTPUT); client.ChangeState(FairMQExampleClient::SETINPUT); + client.ChangeState(FairMQExampleClient::BIND); + client.ChangeState(FairMQExampleClient::CONNECT); client.ChangeState(FairMQExampleClient::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/examples/req-rep/runExampleServer.cxx b/fairmq/examples/req-rep/runExampleServer.cxx index 4e0d1ffd..f1324100 100644 --- a/fairmq/examples/req-rep/runExampleServer.cxx +++ b/fairmq/examples/req-rep/runExampleServer.cxx @@ -78,6 +78,8 @@ int main(int argc, char** argv) server.ChangeState(FairMQExampleServer::SETOUTPUT); server.ChangeState(FairMQExampleServer::SETINPUT); + server.ChangeState(FairMQExampleServer::BIND); + server.ChangeState(FairMQExampleServer::CONNECT); LOG(INFO) << "Listening for requests!"; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 83ce6035..8a209e1c 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -61,7 +61,7 @@ string FairMQSocketNN::GetId() return fId; } -void FairMQSocketNN::Bind(const string& address) +bool FairMQSocketNN::Bind(const string& address) { LOG(INFO) << "bind socket #" << fId << " on " << address; @@ -69,7 +69,9 @@ void FairMQSocketNN::Bind(const string& address) if (eid < 0) { LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << nn_strerror(errno); + return false; } + return true; } void FairMQSocketNN::Connect(const string& address) diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index abb564c2..8d8bee95 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -30,7 +30,7 @@ class FairMQSocketNN : public FairMQSocket virtual string GetId(); - virtual void Bind(const string& address); + virtual bool Bind(const string& address); virtual void Connect(const string& address); virtual int Send(FairMQMessage* msg, const string& flag=""); diff --git a/fairmq/run/runBenchmarkSampler.cxx b/fairmq/run/runBenchmarkSampler.cxx index c273b067..a1c24d0e 100644 --- a/fairmq/run/runBenchmarkSampler.cxx +++ b/fairmq/run/runBenchmarkSampler.cxx @@ -168,6 +168,8 @@ int main(int argc, char** argv) sampler.ChangeState(FairMQBenchmarkSampler::SETOUTPUT); sampler.ChangeState(FairMQBenchmarkSampler::SETINPUT); + sampler.ChangeState(FairMQBenchmarkSampler::BIND); + sampler.ChangeState(FairMQBenchmarkSampler::CONNECT); sampler.ChangeState(FairMQBenchmarkSampler::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/run/runBinSampler.cxx b/fairmq/run/runBinSampler.cxx index a6f6fc85..fe0ceaab 100644 --- a/fairmq/run/runBinSampler.cxx +++ b/fairmq/run/runBinSampler.cxx @@ -168,6 +168,8 @@ int main(int argc, char** argv) sampler.ChangeState(FairMQBinSampler::SETOUTPUT); sampler.ChangeState(FairMQBinSampler::SETINPUT); + sampler.ChangeState(FairMQBinSampler::BIND); + sampler.ChangeState(FairMQBinSampler::CONNECT); sampler.ChangeState(FairMQBinSampler::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/run/runBinSink.cxx b/fairmq/run/runBinSink.cxx index 3e898cc0..9b3fbe21 100644 --- a/fairmq/run/runBinSink.cxx +++ b/fairmq/run/runBinSink.cxx @@ -154,6 +154,8 @@ int main(int argc, char** argv) sink.ChangeState(FairMQBinSink::SETOUTPUT); sink.ChangeState(FairMQBinSink::SETINPUT); + sink.ChangeState(FairMQBinSink::BIND); + sink.ChangeState(FairMQBinSink::CONNECT); sink.ChangeState(FairMQBinSink::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/run/runBuffer.cxx b/fairmq/run/runBuffer.cxx index 229a47ea..cb60cfb3 100644 --- a/fairmq/run/runBuffer.cxx +++ b/fairmq/run/runBuffer.cxx @@ -180,6 +180,8 @@ int main(int argc, char** argv) buffer.ChangeState(FairMQBuffer::SETOUTPUT); buffer.ChangeState(FairMQBuffer::SETINPUT); + buffer.ChangeState(FairMQBuffer::BIND); + buffer.ChangeState(FairMQBuffer::CONNECT); buffer.ChangeState(FairMQBuffer::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/run/runMerger.cxx b/fairmq/run/runMerger.cxx index 75982b98..a45cc42c 100644 --- a/fairmq/run/runMerger.cxx +++ b/fairmq/run/runMerger.cxx @@ -188,6 +188,8 @@ int main(int argc, char** argv) merger.ChangeState(FairMQMerger::SETOUTPUT); merger.ChangeState(FairMQMerger::SETINPUT); + merger.ChangeState(FairMQMerger::BIND); + merger.ChangeState(FairMQMerger::CONNECT); merger.ChangeState(FairMQMerger::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/run/runProtoSampler.cxx b/fairmq/run/runProtoSampler.cxx index 0c689016..91d314cd 100644 --- a/fairmq/run/runProtoSampler.cxx +++ b/fairmq/run/runProtoSampler.cxx @@ -168,6 +168,8 @@ int main(int argc, char** argv) sampler.ChangeState(FairMQProtoSampler::SETOUTPUT); sampler.ChangeState(FairMQProtoSampler::SETINPUT); + sampler.ChangeState(FairMQProtoSampler::BIND); + sampler.ChangeState(FairMQProtoSampler::CONNECT); sampler.ChangeState(FairMQProtoSampler::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/run/runProtoSink.cxx b/fairmq/run/runProtoSink.cxx index 43aff5e8..9d3ab62f 100644 --- a/fairmq/run/runProtoSink.cxx +++ b/fairmq/run/runProtoSink.cxx @@ -154,6 +154,8 @@ int main(int argc, char** argv) sink.ChangeState(FairMQProtoSink::SETOUTPUT); sink.ChangeState(FairMQProtoSink::SETINPUT); + sink.ChangeState(FairMQProtoSink::BIND); + sink.ChangeState(FairMQProtoSink::CONNECT); sink.ChangeState(FairMQProtoSink::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/run/runProxy.cxx b/fairmq/run/runProxy.cxx index 319408e0..7720fc79 100644 --- a/fairmq/run/runProxy.cxx +++ b/fairmq/run/runProxy.cxx @@ -180,6 +180,8 @@ int main(int argc, char** argv) proxy.ChangeState(FairMQProxy::SETOUTPUT); proxy.ChangeState(FairMQProxy::SETINPUT); + proxy.ChangeState(FairMQProxy::BIND); + proxy.ChangeState(FairMQProxy::CONNECT); proxy.ChangeState(FairMQProxy::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/run/runSink.cxx b/fairmq/run/runSink.cxx index b38faa8c..86f83def 100644 --- a/fairmq/run/runSink.cxx +++ b/fairmq/run/runSink.cxx @@ -154,6 +154,8 @@ int main(int argc, char** argv) sink.ChangeState(FairMQSink::SETOUTPUT); sink.ChangeState(FairMQSink::SETINPUT); + sink.ChangeState(FairMQSink::BIND); + sink.ChangeState(FairMQSink::CONNECT); sink.ChangeState(FairMQSink::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/run/runSplitter.cxx b/fairmq/run/runSplitter.cxx index 074739bc..748da93d 100644 --- a/fairmq/run/runSplitter.cxx +++ b/fairmq/run/runSplitter.cxx @@ -188,6 +188,8 @@ int main(int argc, char** argv) splitter.ChangeState(FairMQSplitter::SETOUTPUT); splitter.ChangeState(FairMQSplitter::SETINPUT); + splitter.ChangeState(FairMQSplitter::BIND); + splitter.ChangeState(FairMQSplitter::CONNECT); splitter.ChangeState(FairMQSplitter::RUN); // wait until the running thread has finished processing. diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index ad950bd4..43598c72 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -72,7 +72,7 @@ string FairMQSocketZMQ::GetId() return fId; } -void FairMQSocketZMQ::Bind(const string& address) +bool FairMQSocketZMQ::Bind(const string& address) { LOG(INFO) << "bind socket #" << fId << " on " << address; @@ -80,7 +80,9 @@ void FairMQSocketZMQ::Bind(const string& address) if (rc != 0) { LOG(ERROR) << "failed binding socket #" << fId << ", reason: " << zmq_strerror(errno); + return false; } + return true; } void FairMQSocketZMQ::Connect(const string& address) diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index af2947e5..55c21946 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -29,7 +29,7 @@ class FairMQSocketZMQ : public FairMQSocket virtual string GetId(); - virtual void Bind(const string& address); + virtual bool Bind(const string& address); virtual void Connect(const string& address); virtual int Send(FairMQMessage* msg, const string& flag="");