From c42b6ca4aed1dfa81cb31dc69f4671e8f64a07a8 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Thu, 3 Mar 2016 11:23:47 +0100 Subject: [PATCH] Include device ID in the zeromq socket identity. For request sockets in ZeroMQ the socket identity must be unique, otherwise multiple clients will be rejected. Update the tests to test this use case. --- fairmq/FairMQChannel.cxx | 2 +- fairmq/FairMQDevice.cxx | 4 ++-- fairmq/FairMQTransportFactory.h | 2 +- fairmq/nanomsg/FairMQSocketNN.cxx | 4 ++-- fairmq/nanomsg/FairMQSocketNN.h | 2 +- fairmq/nanomsg/FairMQTransportFactoryNN.cxx | 4 ++-- fairmq/nanomsg/FairMQTransportFactoryNN.h | 2 +- fairmq/test/req-rep/FairMQTestRep.cxx | 14 ++++++++++++-- fairmq/test/req-rep/FairMQTestReq.cxx | 2 +- fairmq/test/req-rep/runTestRep.cxx | 2 +- fairmq/test/req-rep/runTestReq.cxx | 4 ++-- fairmq/test/test-fairmq-req-rep.sh.in | 9 ++++++--- fairmq/zeromq/FairMQSocketZMQ.cxx | 6 ++---- fairmq/zeromq/FairMQSocketZMQ.h | 2 +- fairmq/zeromq/FairMQTransportFactoryZMQ.cxx | 4 ++-- fairmq/zeromq/FairMQTransportFactoryZMQ.h | 2 +- 16 files changed, 38 insertions(+), 27 deletions(-) diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 2b30c62b..eedfd5d1 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -409,7 +409,7 @@ bool FairMQChannel::InitCommandInterface(FairMQTransportFactory* factory, int nu { fTransportFactory = factory; - fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", numIoThreads); + fCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", numIoThreads, "internal"); if (fCmdSocket) { fCmdSocket->Connect("inproc://commands"); diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 37a0beda..923fe8fe 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -98,7 +98,7 @@ void FairMQDevice::InitWrapper() if (!fCmdSocket) { - fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads); + fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads, fId); fCmdSocket->Bind("inproc://commands"); } @@ -191,7 +191,7 @@ bool FairMQDevice::InitChannel(FairMQChannel& ch) { LOG(DEBUG) << "Initializing channel " << ch.fChannelName << " (" << ch.fType << ")"; // initialize the socket - ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads); + ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId); // set high water marks ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 6951d889..5be87c6c 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -34,7 +34,7 @@ class FairMQTransportFactory virtual FairMQMessage* CreateMessage(const size_t size) = 0; virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL) = 0; - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads) = 0; + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") = 0; virtual FairMQPoller* CreatePoller(const std::vector& channels) = 0; virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList) = 0; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index eab99585..28204d1f 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -20,7 +20,7 @@ using namespace std; -FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, const int numIoThreads) +FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/) : FairMQSocket(0, 0, NN_DONTWAIT) , fSocket(-1) , fId() @@ -29,7 +29,7 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const std::string& name, cons , fMessagesTx(0) , fMessagesRx(0) { - fId = name + "." + type; + fId = id + "." + name + "." + type; if (numIoThreads > 1) { diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 09ae2763..71f07086 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -26,7 +26,7 @@ class FairMQSocketNN : public FairMQSocket { public: - FairMQSocketNN(const std::string& type, const std::string& name, const int numIoThreads); // numIoThreads is not used in nanomsg. + FairMQSocketNN(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); // numIoThreads is not used in nanomsg. FairMQSocketNN(const FairMQSocketNN&) = delete; FairMQSocketNN operator=(const FairMQSocketNN&) = delete; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 6d09d414..4f4e5fc8 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -36,9 +36,9 @@ FairMQMessage* FairMQTransportFactoryNN::CreateMessage(void* data, const size_t return new FairMQMessageNN(data, size, ffn, hint); } -FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, const std::string& name, const int numIoThreads) +FairMQSocket* FairMQTransportFactoryNN::CreateSocket(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/) { - return new FairMQSocketNN(type, name, numIoThreads); + return new FairMQSocketNN(type, name, numIoThreads, id); } FairMQPoller* FairMQTransportFactoryNN::CreatePoller(const vector& channels) diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 76638267..89efb8d0 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -31,7 +31,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(const size_t size); virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL); - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads); + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); virtual FairMQPoller* CreatePoller(const std::vector& channels); virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList); diff --git a/fairmq/test/req-rep/FairMQTestRep.cxx b/fairmq/test/req-rep/FairMQTestRep.cxx index 188eb470..16f4a198 100644 --- a/fairmq/test/req-rep/FairMQTestRep.cxx +++ b/fairmq/test/req-rep/FairMQTestRep.cxx @@ -23,12 +23,22 @@ FairMQTestRep::FairMQTestRep() void FairMQTestRep::Run() { - std::unique_ptr request(NewMessage()); - if (Receive(request, "data") >= 0) + std::unique_ptr request1(NewMessage()); + if (Receive(request1, "data") >= 0) { + LOG(INFO) << "Received request 1"; std::unique_ptr reply(NewMessage()); Send(reply, "data"); } + std::unique_ptr request2(NewMessage()); + if (Receive(request2, "data") >= 0) + { + LOG(INFO) << "Received request 2"; + std::unique_ptr reply(NewMessage()); + Send(reply, "data"); + } + + LOG(INFO) << "REQ-REP test successfull"; } FairMQTestRep::~FairMQTestRep() diff --git a/fairmq/test/req-rep/FairMQTestReq.cxx b/fairmq/test/req-rep/FairMQTestReq.cxx index a68abe02..1ed82cbc 100644 --- a/fairmq/test/req-rep/FairMQTestReq.cxx +++ b/fairmq/test/req-rep/FairMQTestReq.cxx @@ -29,7 +29,7 @@ void FairMQTestReq::Run() std::unique_ptr reply(NewMessage()); if (Receive(reply, "data") >= 0) { - LOG(INFO) << "REQ-REP test successfull"; + LOG(INFO) << "received reply"; } } diff --git a/fairmq/test/req-rep/runTestRep.cxx b/fairmq/test/req-rep/runTestRep.cxx index 3684859e..ce0abb54 100644 --- a/fairmq/test/req-rep/runTestRep.cxx +++ b/fairmq/test/req-rep/runTestRep.cxx @@ -25,7 +25,7 @@ int main(int argc, char** argv) testRep.SetProperty(FairMQTestRep::Id, "testRep"); - FairMQChannel repChannel("rep", "connect", "tcp://127.0.0.1:5558"); + FairMQChannel repChannel("rep", "bind", "tcp://127.0.0.1:5558"); testRep.fChannels["data"].push_back(repChannel); testRep.ChangeState("INIT_DEVICE"); diff --git a/fairmq/test/req-rep/runTestReq.cxx b/fairmq/test/req-rep/runTestReq.cxx index 8277b630..0715c172 100644 --- a/fairmq/test/req-rep/runTestReq.cxx +++ b/fairmq/test/req-rep/runTestReq.cxx @@ -23,9 +23,9 @@ int main(int argc, char** argv) testReq.CatchSignals(); testReq.SetTransport("zeromq"); - testReq.SetProperty(FairMQTestReq::Id, "testReq"); + testReq.SetProperty(FairMQTestReq::Id, "testReq" + std::to_string(getpid())); - FairMQChannel reqChannel("req", "bind", "tcp://127.0.0.1:5558"); + FairMQChannel reqChannel("req", "connect", "tcp://127.0.0.1:5558"); testReq.fChannels["data"].push_back(reqChannel); testReq.ChangeState("INIT_DEVICE"); diff --git a/fairmq/test/test-fairmq-req-rep.sh.in b/fairmq/test/test-fairmq-req-rep.sh.in index b523a5f6..22e3df1e 100755 --- a/fairmq/test/test-fairmq-req-rep.sh.in +++ b/fairmq/test/test-fairmq-req-rep.sh.in @@ -1,9 +1,12 @@ #!/bin/bash -trap 'kill -TERM $REQ_PID; kill -TERM $REP_PID; wait $REQ_PID; wait $REP_PID;' TERM +trap 'kill -TERM $REQ1_PID; kill -TERM $REQ2_PID; kill -TERM $REP_PID; wait $REQ1_PID; wait $REQ2_PID; wait $REP_PID;' TERM @CMAKE_BINARY_DIR@/bin/test-fairmq-req & -REQ_PID=$! +REQ1_PID=$! +@CMAKE_BINARY_DIR@/bin/test-fairmq-req & +REQ2_PID=$! @CMAKE_BINARY_DIR@/bin/test-fairmq-rep & REP_PID=$! -wait $REQ_PID +wait $REQ1_PID +wait $REQ2_PID wait $REP_PID diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 02fa9157..7f134621 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -24,7 +24,7 @@ using namespace std; // Context to hold the ZeroMQ sockets boost::shared_ptr FairMQSocketZMQ::fContext = boost::shared_ptr(new FairMQContextZMQ(1)); -FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads) +FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads, const std::string& id /*= ""*/) : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) , fSocket(NULL) , fId() @@ -33,7 +33,7 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const i , fMessagesTx(0) , fMessagesRx(0) { - fId = name + "." + type; + fId = id + "." + name + "." + type; if (zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads) != 0) { @@ -68,8 +68,6 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const i LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); } } - - // LOG(INFO) << "created socket " << fId; } string FairMQSocketZMQ::GetId() diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index f7ccbb58..2bbdc399 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -23,7 +23,7 @@ class FairMQSocketZMQ : public FairMQSocket { public: - FairMQSocketZMQ(const std::string& type, const std::string& name, const int numIoThreads); + FairMQSocketZMQ(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 40326a51..c5ea3317 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -40,9 +40,9 @@ FairMQMessage* FairMQTransportFactoryZMQ::CreateMessage(void* data, const size_t return new FairMQMessageZMQ(data, size, ffn, hint); } -FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, const int numIoThreads) +FairMQSocket* FairMQTransportFactoryZMQ::CreateSocket(const string& type, const std::string& name, const int numIoThreads, const std::string& id /*= ""*/) { - return new FairMQSocketZMQ(type, name, numIoThreads); + return new FairMQSocketZMQ(type, name, numIoThreads, id); } FairMQPoller* FairMQTransportFactoryZMQ::CreatePoller(const vector& channels) diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 5f932f84..8fb6572f 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -32,7 +32,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory virtual FairMQMessage* CreateMessage(const size_t size); virtual FairMQMessage* CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn = NULL, void* hint = NULL); - virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads); + virtual FairMQSocket* CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); virtual FairMQPoller* CreatePoller(const std::vector& channels); virtual FairMQPoller* CreatePoller(const std::unordered_map>& channelsMap, const std::initializer_list channelList);