From a332d9fc831ce8eccdf122d478da701ec7e56c70 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Fri, 3 Jun 2016 11:24:12 +0200 Subject: [PATCH] First version of the shared memory transport. Use via `--transport shmem` cmd option. No pub/sub. --- fairmq/CMakeLists.txt | 8 + fairmq/FairMQDevice.cxx | 142 +++-- fairmq/FairMQDevice.h | 32 +- fairmq/FairMQMessage.h | 6 +- fairmq/FairMQSocket.cxx | 94 ++-- fairmq/FairMQSocket.h | 3 + fairmq/FairMQStateMachine.h | 3 +- fairmq/FairMQTransportFactory.h | 2 +- fairmq/README.md | 27 +- fairmq/devices/FairMQBenchmarkSampler.cxx | 47 +- fairmq/devices/FairMQBenchmarkSampler.h | 3 +- fairmq/devices/FairMQSink.cxx | 9 +- fairmq/nanomsg/FairMQMessageNN.cxx | 41 +- fairmq/nanomsg/FairMQMessageNN.h | 10 +- fairmq/nanomsg/FairMQSocketNN.cxx | 67 +-- fairmq/nanomsg/FairMQSocketNN.h | 3 + fairmq/options/FairMQParser.cxx | 78 +-- fairmq/options/FairMQProgOptions.cxx | 16 +- fairmq/run/runBenchmarkSampler.cxx | 3 +- fairmq/run/runDDSCommandUI.cxx | 9 +- fairmq/run/startMQBenchmark.sh.in | 22 +- fairmq/shmem/FairMQContextSHM.cxx | 81 +++ fairmq/shmem/FairMQContextSHM.h | 27 + fairmq/shmem/FairMQMessageSHM.cxx | 292 ++++++++++ fairmq/shmem/FairMQMessageSHM.h | 63 +++ fairmq/shmem/FairMQPollerSHM.cxx | 240 +++++++++ fairmq/shmem/FairMQPollerSHM.h | 51 ++ fairmq/shmem/FairMQShmManager.h | 185 +++++++ fairmq/shmem/FairMQSocketSHM.cxx | 591 +++++++++++++++++++++ fairmq/shmem/FairMQSocketSHM.h | 76 +++ fairmq/shmem/FairMQTransportFactorySHM.cxx | 56 ++ fairmq/shmem/FairMQTransportFactorySHM.h | 37 ++ fairmq/shmem/README.md | 10 + fairmq/tools/runSimpleMQStateMachine.h | 5 + fairmq/zeromq/FairMQContextZMQ.cxx | 9 +- fairmq/zeromq/FairMQMessageZMQ.cxx | 21 +- fairmq/zeromq/FairMQMessageZMQ.h | 12 +- fairmq/zeromq/FairMQSocketZMQ.cxx | 46 +- fairmq/zeromq/FairMQSocketZMQ.h | 3 + 39 files changed, 2121 insertions(+), 309 deletions(-) create mode 100644 fairmq/shmem/FairMQContextSHM.cxx create mode 100644 fairmq/shmem/FairMQContextSHM.h create mode 100644 fairmq/shmem/FairMQMessageSHM.cxx create mode 100644 fairmq/shmem/FairMQMessageSHM.h create mode 100644 fairmq/shmem/FairMQPollerSHM.cxx create mode 100644 fairmq/shmem/FairMQPollerSHM.h create mode 100644 fairmq/shmem/FairMQShmManager.h create mode 100644 fairmq/shmem/FairMQSocketSHM.cxx create mode 100644 fairmq/shmem/FairMQSocketSHM.h create mode 100644 fairmq/shmem/FairMQTransportFactorySHM.cxx create mode 100644 fairmq/shmem/FairMQTransportFactorySHM.h create mode 100644 fairmq/shmem/README.md diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index e10315cc..2b1d3dc8 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -19,6 +19,7 @@ Set(INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/fairmq/options ${CMAKE_SOURCE_DIR}/fairmq/logger ${CMAKE_SOURCE_DIR}/fairmq/zeromq + ${CMAKE_SOURCE_DIR}/fairmq/shmem ${CMAKE_CURRENT_BINARY_DIR} ) @@ -67,6 +68,12 @@ Set(SRCS "zeromq/FairMQPollerZMQ.cxx" "zeromq/FairMQContextZMQ.cxx" + "shmem/FairMQTransportFactorySHM.cxx" + "shmem/FairMQMessageSHM.cxx" + "shmem/FairMQSocketSHM.cxx" + "shmem/FairMQPollerSHM.cxx" + "shmem/FairMQContextSHM.cxx" + "FairMQLogger.cxx" "FairMQConfigurable.cxx" "FairMQStateMachine.cxx" @@ -125,6 +132,7 @@ Set(DEPENDENCIES ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} + ${Boost_INTERPROCESS_LIBRARY} ) If(NANOMSG_FOUND) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index d6a8bb3f..7617f58e 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -35,6 +35,7 @@ #include "FairMQProgOptions.h" #include "FairMQTransportFactoryZMQ.h" +#include "FairMQTransportFactorySHM.h" #ifdef NANOMSG_FOUND #include "FairMQTransportFactoryNN.h" #endif @@ -177,6 +178,9 @@ void FairMQDevice::InitWrapper() { fCmdSocket = fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads, fId); fCmdSocket->Bind("inproc://commands"); + + FairMQMessagePtr msg(fTransportFactory->CreateMessage()); + msg->SetDeviceId(fId); } // Containers to store the uninitialized channels. @@ -301,66 +305,83 @@ bool FairMQDevice::ConnectChannel(FairMQChannel& ch) bool FairMQDevice::AttachChannel(FairMQChannel& ch) { - std::vector endpoints; - FairMQChannel::Tokenize(endpoints, ch.fAddress); - for (auto& endpoint : endpoints) - { - //(re-)init socket - if (!ch.fSocket) { - ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId); + std::vector endpoints; + FairMQChannel::Tokenize(endpoints, ch.fAddress); + for (auto& endpoint : endpoints) + { + //(re-)init socket + if (!ch.fSocket) + { + ch.fSocket = fTransportFactory->CreateSocket(ch.fType, ch.fChannelName, fNumIoThreads, fId); + } + + // set high water marks + ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); + ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); + + // set kernel transmit size + ch.fSocket->SetOption("snd-size", &(ch.fSndKernelSize), sizeof(ch.fSndKernelSize)); + ch.fSocket->SetOption("rcv-size", &(ch.fRcvKernelSize), sizeof(ch.fRcvKernelSize)); + + // attach + bool bind = (ch.fMethod == "bind"); + bool connectionModifier = false; + std::string address = endpoint; + + // check if the default fMethod is overridden by a modifier + if (endpoint[0] == '+' || endpoint[0] == '>') + { + connectionModifier = true; + bind = false; + address = endpoint.substr(1); + } + else if (endpoint[0] == '@') + { + connectionModifier = true; + bind = true; + address = endpoint.substr(1); + } + + bool rc = true; + // make the connection + if (bind) + { + rc = BindEndpoint(*ch.fSocket, address); + } + else + { + rc = ConnectEndpoint(*ch.fSocket, address); + } + + // bind might bind to an address different than requested, + // put the actual address back in the config + endpoint.clear(); + if (connectionModifier) + { + endpoint.push_back(bind?'@':'+'); + } + endpoint += address; + + LOG(DEBUG) << "Attached channel " << ch.fChannelName << " to " << endpoint << (bind ? " (bind) " : " (connect) "); + + // after the book keeping is done, exit in case of errors + if (!rc) + { + return rc; + } } - // set high water marks - ch.fSocket->SetOption("snd-hwm", &(ch.fSndBufSize), sizeof(ch.fSndBufSize)); - ch.fSocket->SetOption("rcv-hwm", &(ch.fRcvBufSize), sizeof(ch.fRcvBufSize)); + // put the (possibly) modified address back in the config + ch.UpdateAddress(boost::algorithm::join(endpoints, ",")); - // attach - bool bind = (ch.fMethod=="bind"); - bool connectionModifier = false; - std::string address = endpoint; - - // check if the default fMethod is overridden by a modifier - if (endpoint[0]=='+' || endpoint[0]=='>') { - connectionModifier = true; - bind = false; - address = endpoint.substr(1); - } else if (endpoint[0]=='@') { - connectionModifier = true; - bind = true; - address = endpoint.substr(1); - } - - bool rc = true; - // make the connection - if (bind) { - rc = BindEndpoint(*ch.fSocket, address); - } else { - rc = ConnectEndpoint(*ch.fSocket, address); - } - - // bind might bind to an address different than requested, - // put the actual address back in the config - endpoint.clear(); - if (connectionModifier) endpoint.push_back(bind?'@':'+'); - endpoint += address; - - LOG(DEBUG) << "Attached channel " << ch.fChannelName << " to " << endpoint - << (bind?" (bind) ":" (connect) "); - - // after the book keeping is done, exit in case of errors - if (!rc) return rc; - } - - // put the (possibly) modified address back in the config - ch.UpdateAddress(boost::algorithm::join(endpoints, ",")); - - return true; + return true; } bool FairMQDevice::ConnectEndpoint(FairMQSocket& socket, std::string& endpoint) { - socket.Connect(endpoint); - return true; + socket.Connect(endpoint); + + return true; } bool FairMQDevice::BindEndpoint(FairMQSocket& socket, std::string& endpoint) @@ -393,6 +414,7 @@ bool FairMQDevice::BindEndpoint(FairMQSocket& socket, std::string& endpoint) // TODO: thread safety? (this comes in as a reference and DOES get changed in this case). endpoint = endpoint.substr(0, pos + 1) + newPort.str(); } + return true; } @@ -475,6 +497,7 @@ void FairMQDevice::RunWrapper() std::thread rateLogger(&FairMQDevice::LogSocketRates, this); FairMQChannel::fInterrupted = false; + fCmdSocket->Resume(); try { @@ -732,19 +755,23 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/) void FairMQDevice::SetTransport(FairMQTransportFactory* factory) { - fTransportFactory = unique_ptr(factory); + fTransportFactory = shared_ptr(factory); } void FairMQDevice::SetTransport(const string& transport) { if (transport == "zeromq") { - fTransportFactory = unique_ptr(new FairMQTransportFactoryZMQ()); + fTransportFactory = make_shared(); + } + else if (transport == "shmem") + { + fTransportFactory = make_shared(); } #ifdef NANOMSG_FOUND else if (transport == "nanomsg") { - fTransportFactory = unique_ptr(new FairMQTransportFactoryNN()); + fTransportFactory = make_shared(); } #endif else @@ -875,7 +902,7 @@ void FairMQDevice::LogSocketRates() std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } - // LOG(DEBUG) << "FairMQDevice::LogSocketRates() stopping"; + LOG(DEBUG) << "FairMQDevice::LogSocketRates() stopping"; } void FairMQDevice::InteractiveStateLoop() @@ -978,6 +1005,7 @@ void FairMQDevice::InteractiveStateLoop() void FairMQDevice::Unblock() { FairMQChannel::fInterrupted = true; + fCmdSocket->Interrupt(); FairMQMessagePtr cmd(fTransportFactory->CreateMessage()); fCmdSocket->Send(cmd.get(), 0); } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index a242ab10..3e1e1f4c 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -21,6 +21,8 @@ #include #include #include +#include // static_assert +#include // is_trivially_copyable #include #include @@ -41,12 +43,6 @@ typedef std::function InputMultipartCallback; class FairMQProgOptions; -template -void FairMQSimpleMsgCleanup(void* /*data*/, void* hint) -{ - delete static_cast(hint); -} - class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable { friend class FairMQChannel; @@ -203,20 +199,42 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable return fTransportFactory->CreateMessage(size); } + template + static void FairMQSimpleMsgCleanup(void* /*data*/, void* hint) + { + delete static_cast(hint); + } + + static void FairMQNoCleanup(void* /*data*/, void* /*hint*/) + { + } + /// @brief Create new FairMQMessage with user provided buffer and size /// @param data pointer to user provided buffer /// @param size size of the user provided buffer /// @param ffn optional callback, called when the message is transfered (and can be deleted) /// @param hint optional helper pointer that can be used in the callback /// @return pointer to FairMQMessage - inline FairMQMessagePtr NewMessage(void* data, int size, fairmq_free_fn* ffn, void* hint = NULL) const + inline FairMQMessagePtr NewMessage(void* data, int size, fairmq_free_fn* ffn, void* hint = nullptr) const { return fTransportFactory->CreateMessage(data, size, ffn, hint); } + template + inline FairMQMessagePtr NewStaticMessage(const T& data) const + { + return fTransportFactory->CreateMessage(data, sizeof(T), FairMQNoCleanup, nullptr); + } + + inline FairMQMessagePtr NewStaticMessage(const std::string& str) const + { + return fTransportFactory->CreateMessage(const_cast(str.c_str()), str.length(), FairMQNoCleanup, nullptr); + } + template inline FairMQMessagePtr NewSimpleMessage(const T& data) const { + static_assert(std::is_trivially_copyable::value, "The argument type for NewSimpleMessage has to be trivially copyable!"); T* dataCopy = new T(data); return fTransportFactory->CreateMessage(dataCopy, sizeof(T), FairMQSimpleMsgCleanup, dataCopy); } diff --git a/fairmq/FairMQMessage.h b/fairmq/FairMQMessage.h index d8dbf3a2..71d56616 100644 --- a/fairmq/FairMQMessage.h +++ b/fairmq/FairMQMessage.h @@ -25,15 +25,15 @@ class FairMQMessage public: virtual void Rebuild() = 0; virtual void Rebuild(const size_t size) = 0; - virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) = 0; + virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) = 0; virtual void* GetMessage() = 0; virtual void* GetData() = 0; virtual size_t GetSize() = 0; virtual void SetMessage(void* data, size_t size) = 0; - virtual void CloseMessage() = 0; - virtual void Copy(FairMQMessage* msg) = 0; + virtual void SetDeviceId(const std::string& deviceId) = 0; + virtual void Copy(const std::unique_ptr& msg) = 0; virtual ~FairMQMessage() {}; diff --git a/fairmq/FairMQSocket.cxx b/fairmq/FairMQSocket.cxx index e57dc776..1578d31a 100644 --- a/fairmq/FairMQSocket.cxx +++ b/fairmq/FairMQSocket.cxx @@ -17,48 +17,64 @@ bool FairMQSocket::Attach(const std::string& config, bool serverish) { - if (config.empty()) - return false; - if (config.size()<2) - return false; - - const char* endpoints = config.c_str(); - - // We hold each individual endpoint here - char endpoint [256]; - while (*endpoints) { - const char *delimiter = strchr (endpoints, ','); - if (!delimiter) - delimiter = endpoints + strlen (endpoints); - if (delimiter - endpoints > 255) - return false; - memcpy (endpoint, endpoints, delimiter - endpoints); - endpoint [delimiter - endpoints] = 0; - - bool rc; - if (endpoint [0] == '@') { - rc = Bind(endpoint + 1); + if (config.empty()) + { + return false; } - else if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' ) { - Connect(endpoint + 1); - } - else if (serverish) { - rc = Bind(endpoint); - } - else { - Connect(endpoint); + if (config.size() < 2) + { + return false; } - if (!rc) { - return false; + const char* endpoints = config.c_str(); + + // We hold each individual endpoint here + char endpoint [256]; + while (*endpoints) + { + const char *delimiter = strchr(endpoints, ','); + if (!delimiter) + { + delimiter = endpoints + strlen(endpoints); + } + if (delimiter - endpoints > 255) + { + return false; + } + memcpy(endpoint, endpoints, delimiter - endpoints); + endpoint[delimiter - endpoints] = 0; + + bool rc = false; + + if (endpoint [0] == '@') + { + rc = Bind(endpoint + 1); + } + else if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' ) + { + Connect(endpoint + 1); + } + else if (serverish) + { + rc = Bind(endpoint); + } + else + { + Connect(endpoint); + } + + if (!rc) + { + return false; + } + + if (*delimiter == 0) + { + break; + } + + endpoints = delimiter + 1; } - if (*delimiter == 0) { - break; - } - - endpoints = delimiter + 1; - } - - return true; + return true; } diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 0d2a54dc..c77241c2 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -53,6 +53,9 @@ class FairMQSocket virtual void Close() = 0; virtual void Terminate() = 0; + virtual void Interrupt() = 0; + virtual void Resume() = 0; + virtual void SetOption(const std::string& option, const void* value, size_t valueSize) = 0; virtual void GetOption(const std::string& option, void* value, size_t* valueSize) = 0; diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index 073f589c..a2a99a06 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -213,7 +213,6 @@ struct FairMQFSM_ : public msmf::state_machine_def fsm.fState = PAUSED; fsm.Unblock(); - std::unique_lock lock(fsm.fWorkMutex); while (fsm.fWorkActive) { @@ -539,8 +538,8 @@ struct FairMQFSM_ : public msmf::state_machine_def bool fWorkAvailable; protected: - std::mutex fChangeStateMutex; std::atomic fState; + std::mutex fChangeStateMutex; }; // reactivate the warning for non-virtual destructor diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 48f57ae0..c0957ab4 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -33,7 +33,7 @@ class FairMQTransportFactory public: virtual FairMQMessagePtr CreateMessage() const = 0; virtual FairMQMessagePtr CreateMessage(const size_t size) const = 0; - virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const = 0; + virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0; virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const = 0; diff --git a/fairmq/README.md b/fairmq/README.md index af3f3bc6..973d04c0 100644 --- a/fairmq/README.md +++ b/fairmq/README.md @@ -14,27 +14,40 @@ Example of a simple FairMQ topology: ![example of FairMQ topology](../docs/images/fairmq-example-topology.png?raw=true "Example of possible FairMQ topology") +Within a topology each device needs a unique id (given to it via required command line option `--id`). + Topology configuration is currently happening via setup scripts. This is very rudimentary and a much more flexible system is now in development. For now, example setup scripts can be found in directory `FairRoot/example/Tutorial3/` along with some additional documentation. ## Communication Patterns -FairMQ devices communicate via the communication patterns offered by ZeroMQ (or nanomsg): PUSH-PULL, PUB-SUB, REQ-REP, PAIR, [more info here](http://api.zeromq.org/4-0:zmq-socket). +FairMQ devices communicate via the communication patterns offered by ZeroMQ (or nanomsg): PUSH-PULL, PUB-SUB, REQ-REP, PAIR, [more info here](http://api.zeromq.org/4-0:zmq-socket). Each transport may provide further patterns. ## Messages -Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content and transport either raw data or serialized data as described above. Message can be initialized in three different ways: - - **with no parameters**: This is usefull for receiving a message, since neither size nor contents are yet known. - - **given message size**: Initialize message body with a size and fill the contents later, either with `memcpy` or by writing directly into message memory. - - **given message size and buffer**: initialize the message given an existing buffer. This is a zero-copy operation. +Devices transport data between each other in form of `FairMQMessage`s. These can be filled with arbitrary content. Message can be initialized in three different ways: +- **with no parameters**: Initializes an empty message (typically used for receiving). +- **given message size**: Initializes message body with a given size. Fill the created contents via buffer pointer. +- **given existing buffer and a size**: Initialize the message from an existing buffer. In case of ZeroMQ this is a zero-copy operation. -After sending the message, the queueing system takes over control over the message body and will free it with `free()` after it is no longer used. A callback can be given to the message object, to be called instead of the destruction with `free()`. +After sending the message, the transport takes over control over the message body and will free it with `free()` after it is no longer used. A callback can be given to the message object, to be called instead of the destruction with `free()` (for initialization via buffer+size). ## Transport Interface -The communication layer is available through an interface. Two interface implementations are currently available. Main implementation uses the [ZeroMQ](http://zeromq.org) library. Alternative implementation relies on the [nanomsg](http://nanomsg.org) library. Here is an overview to give an idea how interface is implemented: +The communication layer is available through an interface. Three interface implementations are currently available. Main implementation uses the [ZeroMQ](http://zeromq.org) library. Alternative implementation relies on the [nanomsg](http://nanomsg.org) library. Third transport implementation is using shared memory via boost::interprocess & ZeroMQ combination. + +Here is an overview to give an idea how interface is implemented: ![FairMQ transport interface](../docs/images/fairmq-transport-interface.png?raw=true "FairMQ transport interface") +Currently, the transports have been tested to work with these communication patterns: + +| | ZeroMQ | nanomsg | Shared Memory | +| ------------- |--------| ------- | ------------- | +| PAIR | yes | yes | yes | +| PUSH/PULL | yes | yes | yes | +| PUB/SUB | yes | yes | no | +| REQ/REP | yes | yes | yes | + ## State Machine Each FairMQ device has an internal state machine: diff --git a/fairmq/devices/FairMQBenchmarkSampler.cxx b/fairmq/devices/FairMQBenchmarkSampler.cxx index 8bb11cc3..71593780 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.cxx +++ b/fairmq/devices/FairMQBenchmarkSampler.cxx @@ -23,10 +23,11 @@ using namespace std; FairMQBenchmarkSampler::FairMQBenchmarkSampler() - : fMsgSize(10000) - , fNumMsgs(0) + : fSameMessage(true) + , fMsgSize(10000) , fMsgCounter(0) , fMsgRate(1) + , fNumMsgs(0) , fOutChannelName() { } @@ -37,9 +38,10 @@ FairMQBenchmarkSampler::~FairMQBenchmarkSampler() void FairMQBenchmarkSampler::InitTask() { + fSameMessage = fConfig->GetValue("same-msg"); fMsgSize = fConfig->GetValue("msg-size"); - fNumMsgs = fConfig->GetValue("num-msgs"); fMsgRate = fConfig->GetValue("msg-rate"); + fNumMsgs = fConfig->GetValue("num-msgs"); fOutChannelName = fConfig->GetValue("out-channel"); } @@ -47,7 +49,7 @@ void FairMQBenchmarkSampler::Run() { // std::thread resetMsgCounter(&FairMQBenchmarkSampler::ResetMsgCounter, this); - int numSentMsgs = 0; + uint64_t numSentMsgs = 0; FairMQMessagePtr baseMsg(fTransportFactory->CreateMessage(fMsgSize)); @@ -59,17 +61,37 @@ void FairMQBenchmarkSampler::Run() while (CheckCurrentState(RUNNING)) { - FairMQMessagePtr msg(fTransportFactory->CreateMessage()); - msg->Copy(baseMsg); - - if (dataOutChannel.Send(msg) >= 0) + if (fSameMessage) { - if (fNumMsgs > 0) + FairMQMessagePtr msg(fTransportFactory->CreateMessage()); + msg->Copy(baseMsg); + + if (dataOutChannel.Send(msg) >= 0) { - if (++numSentMsgs >= fNumMsgs) + if (fNumMsgs > 0) { - break; + if (numSentMsgs >= fNumMsgs) + { + break; + } } + ++numSentMsgs; + } + } + else + { + FairMQMessagePtr msg(fTransportFactory->CreateMessage(fMsgSize)); + + if (dataOutChannel.Send(msg) >= 0) + { + if (fNumMsgs > 0) + { + if (numSentMsgs >= fNumMsgs) + { + break; + } + } + ++numSentMsgs; } } @@ -82,8 +104,7 @@ void FairMQBenchmarkSampler::Run() auto tEnd = chrono::high_resolution_clock::now(); - LOG(INFO) << "Sent " << numSentMsgs << " messages, leaving RUNNING state."; - LOG(INFO) << "Sending time: " << chrono::duration(tEnd - tStart).count() << " ms"; + LOG(INFO) << "Leaving RUNNING state. Sent " << numSentMsgs << " messages in " << chrono::duration(tEnd - tStart).count() << "ms."; // resetMsgCounter.join(); } diff --git a/fairmq/devices/FairMQBenchmarkSampler.h b/fairmq/devices/FairMQBenchmarkSampler.h index e0000249..f7301ab4 100644 --- a/fairmq/devices/FairMQBenchmarkSampler.h +++ b/fairmq/devices/FairMQBenchmarkSampler.h @@ -32,10 +32,11 @@ class FairMQBenchmarkSampler : public FairMQDevice void ResetMsgCounter(); protected: + bool fSameMessage; int fMsgSize; - int fNumMsgs; int fMsgCounter; int fMsgRate; + uint64_t fNumMsgs; std::string fOutChannelName; virtual void InitTask(); diff --git a/fairmq/devices/FairMQSink.cxx b/fairmq/devices/FairMQSink.cxx index b85dd015..21e26371 100644 --- a/fairmq/devices/FairMQSink.cxx +++ b/fairmq/devices/FairMQSink.cxx @@ -34,7 +34,7 @@ void FairMQSink::InitTask() void FairMQSink::Run() { - int numReceivedMsgs = 0; + uint64_t numReceivedMsgs = 0; // store the channel reference to avoid traversing the map on every loop iteration const FairMQChannel& dataInChannel = fChannels.at(fInChannelName).at(0); @@ -43,25 +43,24 @@ void FairMQSink::Run() while (CheckCurrentState(RUNNING)) { - std::unique_ptr msg(fTransportFactory->CreateMessage()); + FairMQMessagePtr msg(fTransportFactory->CreateMessage()); if (dataInChannel.Receive(msg) >= 0) { if (fNumMsgs > 0) { - numReceivedMsgs++; if (numReceivedMsgs >= fNumMsgs) { break; } } + numReceivedMsgs++; } } auto tEnd = chrono::high_resolution_clock::now(); - LOG(INFO) << "Received " << numReceivedMsgs << " messages, leaving RUNNING state."; - LOG(INFO) << "Receiving time: " << chrono::duration(tEnd - tStart).count() << " ms"; + LOG(INFO) << "Leaving RUNNING state. Received " << numReceivedMsgs << " messages in " << chrono::duration(tEnd - tStart).count() << "ms."; } FairMQSink::~FairMQSink() diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index ef0188d6..605500af 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -22,8 +22,10 @@ using namespace std; +string FairMQMessageNN::fDeviceID = string(); + FairMQMessageNN::FairMQMessageNN() - : fMessage(NULL) + : fMessage(nullptr) , fSize(0) , fReceiving(false) { @@ -35,7 +37,7 @@ FairMQMessageNN::FairMQMessageNN() } FairMQMessageNN::FairMQMessageNN(const size_t size) - : fMessage(NULL) + : fMessage(nullptr) , fSize(0) , fReceiving(false) { @@ -54,7 +56,7 @@ FairMQMessageNN::FairMQMessageNN(const size_t size) * possible TODO: make this zero copy (will should then be as efficient as ZeroMQ). */ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) - : fMessage(NULL) + : fMessage(nullptr) , fSize(0) , fReceiving(false) { @@ -71,6 +73,10 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* { ffn(data, hint); } + else + { + free(data); + } } } @@ -134,30 +140,9 @@ void FairMQMessageNN::SetMessage(void* data, const size_t size) fSize = size; } -void FairMQMessageNN::Copy(FairMQMessage* msg) +void FairMQMessageNN::SetDeviceId(const string& deviceId) { - // DEPRECATED: Use Copy(const unique_ptr&) - - if (fMessage) - { - if (nn_freemsg(fMessage) < 0) - { - LOG(ERROR) << "failed freeing message, reason: " << nn_strerror(errno); - } - } - - size_t size = msg->GetSize(); - - fMessage = nn_allocmsg(size, 0); - if (!fMessage) - { - LOG(ERROR) << "failed allocating message, reason: " << nn_strerror(errno); - } - else - { - memcpy(fMessage, msg->GetMessage(), size); - fSize = size; - } + fDeviceID = deviceId; } void FairMQMessageNN::Copy(const unique_ptr& msg) @@ -192,7 +177,7 @@ inline void FairMQMessageNN::Clear() } else { - fMessage = NULL; + fMessage = nullptr; fSize = 0; } } @@ -208,7 +193,7 @@ FairMQMessageNN::~FairMQMessageNN() } else { - fMessage = NULL; + fMessage = nullptr; fSize = 0; } } diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index 0cd826bb..e649e18f 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -16,6 +16,7 @@ #define FAIRMQMESSAGENN_H_ #include +#include #include "FairMQMessage.h" @@ -24,13 +25,13 @@ class FairMQMessageNN : public FairMQMessage public: FairMQMessageNN(); FairMQMessageNN(const size_t size); - FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); + FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); FairMQMessageNN(const FairMQMessageNN&) = delete; FairMQMessageNN operator=(const FairMQMessageNN&) = delete; virtual void Rebuild(); virtual void Rebuild(const size_t size); - virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); + virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); virtual void* GetMessage(); virtual void* GetData(); @@ -38,8 +39,8 @@ class FairMQMessageNN : public FairMQMessage virtual void SetMessage(void* data, const size_t size); - virtual void CloseMessage() {}; - virtual void Copy(FairMQMessage* msg); + virtual void SetDeviceId(const std::string& deviceId); + virtual void Copy(const std::unique_ptr& msg); virtual ~FairMQMessageNN(); @@ -50,6 +51,7 @@ class FairMQMessageNN : public FairMQMessage void* fMessage; size_t fSize; bool fReceiving; + static std::string fDeviceID; void Clear(); }; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index 6f0ce1ba..bbd36bbd 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -110,26 +110,7 @@ void FairMQSocketNN::Connect(const string& address) int FairMQSocketNN::Send(FairMQMessage* msg, const string& flag) { - void* ptr = msg->GetMessage(); - int nbytes = nn_send(fSocket, &ptr, NN_MSG, GetConstant(flag)); - if (nbytes >= 0) - { - fBytesTx += nbytes; - ++fMessagesTx; - static_cast(msg)->fReceiving = false; - return nbytes; - } - if (nn_errno() == EAGAIN) - { - return -2; - } - if (nn_errno() == ETERM) - { - LOG(INFO) << "terminating socket " << fId; - return -1; - } - LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << nn_strerror(errno); - return nbytes; + return Send(msg, GetConstant(flag)); } int FairMQSocketNN::Send(FairMQMessage* msg, const int flags) @@ -198,28 +179,7 @@ int64_t FairMQSocketNN::Send(const vector>& msgVec, co int FairMQSocketNN::Receive(FairMQMessage* msg, const string& flag) { - void* ptr = NULL; - int nbytes = nn_recv(fSocket, &ptr, NN_MSG, GetConstant(flag)); - if (nbytes >= 0) - { - fBytesRx += nbytes; - ++fMessagesRx; - msg->Rebuild(); - msg->SetMessage(ptr, nbytes); - static_cast(msg)->fReceiving = true; - return nbytes; - } - if (nn_errno() == EAGAIN) - { - return -2; - } - if (nn_errno() == ETERM) - { - LOG(INFO) << "terminating socket " << fId; - return -1; - } - LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << nn_strerror(errno); - return nbytes; + return Receive(msg, GetConstant(flag)); } int FairMQSocketNN::Receive(FairMQMessage* msg, const int flags) @@ -318,6 +278,14 @@ void FairMQSocketNN::Terminate() nn_term(); } +void FairMQSocketNN::Interrupt() +{ +} + +void FairMQSocketNN::Resume() +{ +} + void* FairMQSocketNN::GetSocket() const { return NULL; // dummy method to comply with the interface. functionality not possible in zeromq. @@ -330,6 +298,21 @@ int FairMQSocketNN::GetSocket(int /*nothing*/) const void FairMQSocketNN::SetOption(const string& option, const void* value, size_t valueSize) { + if (option == "snd-size" || option == "rcv-size") + { + int val = *(static_cast(const_cast(value))); + if (val <= 0) + { + LOG(WARN) << "nanomsg: value for sndKernelSize/rcvKernelSize should be greater than 0, using defaults (128kB)."; + return; + } + } + + if (option == "snd-hwm" || option == "rcv-hwm") + { + return; + } + int rc = nn_setsockopt(fSocket, NN_SOL_SOCKET, GetConstant(option), value, valueSize); if (rc < 0) { diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 09cb0a2f..181028d4 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -45,6 +45,9 @@ class FairMQSocketNN : public FairMQSocket virtual void Close(); virtual void Terminate(); + virtual void Interrupt(); + virtual void Resume(); + virtual void SetOption(const std::string& option, const void* value, size_t valueSize); virtual void GetOption(const std::string& option, void* value, size_t* valueSize); diff --git a/fairmq/options/FairMQParser.cxx b/fairmq/options/FairMQParser.cxx index 388147d9..4fc152ae 100644 --- a/fairmq/options/FairMQParser.cxx +++ b/fairmq/options/FairMQParser.cxx @@ -230,6 +230,8 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa commonChannel.UpdateAddress(q.second.get("address", commonChannel.GetAddress())); commonChannel.UpdateSndBufSize(q.second.get("sndBufSize", commonChannel.GetSndBufSize())); commonChannel.UpdateRcvBufSize(q.second.get("rcvBufSize", commonChannel.GetRcvBufSize())); + commonChannel.UpdateSndKernelSize(q.second.get("sndKernelSize", commonChannel.GetSndKernelSize())); + commonChannel.UpdateRcvKernelSize(q.second.get("rcvKernelSize", commonChannel.GetRcvKernelSize())); commonChannel.UpdateRateLogging(q.second.get("rateLogging", commonChannel.GetRateLogging())); // temporary FairMQChannel container @@ -241,12 +243,14 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa LOG(DEBUG) << "\tnumSockets of " << numSockets << " specified,"; LOG(DEBUG) << "\tapplying common settings to each:"; - LOG(DEBUG) << "\ttype = " << commonChannel.GetType(); - LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod(); - LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress(); - LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize(); - LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize(); - LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging(); + LOG(DEBUG) << "\ttype = " << commonChannel.GetType(); + LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod(); + LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress(); + LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize(); + LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize(); + LOG(DEBUG) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize(); + LOG(DEBUG) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize(); + LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging(); for (int i = 0; i < numSockets; ++i) { @@ -287,6 +291,8 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa commonChannel.UpdateAddress(p.second.get("address", commonChannel.GetAddress())); commonChannel.UpdateSndBufSize(p.second.get("sndBufSize", commonChannel.GetSndBufSize())); commonChannel.UpdateRcvBufSize(p.second.get("rcvBufSize", commonChannel.GetRcvBufSize())); + commonChannel.UpdateSndKernelSize(p.second.get("sndKernelSize", commonChannel.GetSndKernelSize())); + commonChannel.UpdateRcvKernelSize(p.second.get("rcvKernelSize", commonChannel.GetRcvKernelSize())); commonChannel.UpdateRateLogging(p.second.get("rateLogging", commonChannel.GetRateLogging())); } @@ -299,12 +305,14 @@ void ChannelParser(const boost::property_tree::ptree& tree, FairMQMap& channelMa LOG(DEBUG) << "\tnumSockets of " << numSockets << " specified,"; LOG(DEBUG) << "\tapplying common settings to each:"; - LOG(DEBUG) << "\ttype = " << commonChannel.GetType(); - LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod(); - LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress(); - LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize(); - LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize(); - LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging(); + LOG(DEBUG) << "\ttype = " << commonChannel.GetType(); + LOG(DEBUG) << "\tmethod = " << commonChannel.GetMethod(); + LOG(DEBUG) << "\taddress = " << commonChannel.GetAddress(); + LOG(DEBUG) << "\tsndBufSize = " << commonChannel.GetSndBufSize(); + LOG(DEBUG) << "\trcvBufSize = " << commonChannel.GetRcvBufSize(); + LOG(DEBUG) << "\tsndKernelSize = " << commonChannel.GetSndKernelSize(); + LOG(DEBUG) << "\trcvKernelSize = " << commonChannel.GetRcvKernelSize(); + LOG(DEBUG) << "\trateLogging = " << commonChannel.GetRateLogging(); for (int i = 0; i < numSockets; ++i) { @@ -342,15 +350,19 @@ void SocketParser(const boost::property_tree::ptree& tree, vector channel.UpdateAddress(q.second.get("address", channel.GetAddress())); channel.UpdateSndBufSize(q.second.get("sndBufSize", channel.GetSndBufSize())); channel.UpdateRcvBufSize(q.second.get("rcvBufSize", channel.GetRcvBufSize())); + channel.UpdateSndKernelSize(q.second.get("sndKernelSize", channel.GetSndKernelSize())); + channel.UpdateRcvKernelSize(q.second.get("rcvKernelSize", channel.GetRcvKernelSize())); channel.UpdateRateLogging(q.second.get("rateLogging", channel.GetRateLogging())); LOG(DEBUG) << "" << channelName << "[" << socketCounter << "]:"; - LOG(DEBUG) << "\ttype = " << channel.GetType(); - LOG(DEBUG) << "\tmethod = " << channel.GetMethod(); - LOG(DEBUG) << "\taddress = " << channel.GetAddress(); - LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize(); - LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize(); - LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging(); + LOG(DEBUG) << "\ttype = " << channel.GetType(); + LOG(DEBUG) << "\tmethod = " << channel.GetMethod(); + LOG(DEBUG) << "\taddress = " << channel.GetAddress(); + LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize(); + LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize(); + LOG(DEBUG) << "\tsndKernelSize = " << channel.GetSndKernelSize(); + LOG(DEBUG) << "\trcvKernelSize = " << channel.GetRcvKernelSize(); + LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging(); channelList.push_back(channel); ++socketCounter; @@ -368,15 +380,19 @@ void SocketParser(const boost::property_tree::ptree& tree, vector channel.UpdateAddress(p.second.get("address", channel.GetAddress())); channel.UpdateSndBufSize(p.second.get("sndBufSize", channel.GetSndBufSize())); channel.UpdateRcvBufSize(p.second.get("rcvBufSize", channel.GetRcvBufSize())); + channel.UpdateSndKernelSize(p.second.get("sndKernelSize", channel.GetSndKernelSize())); + channel.UpdateRcvKernelSize(p.second.get("rcvKernelSize", channel.GetRcvKernelSize())); channel.UpdateRateLogging(p.second.get("rateLogging", channel.GetRateLogging())); LOG(DEBUG) << "" << channelName << "[" << socketCounter << "]:"; - LOG(DEBUG) << "\ttype = " << channel.GetType(); - LOG(DEBUG) << "\tmethod = " << channel.GetMethod(); - LOG(DEBUG) << "\taddress = " << channel.GetAddress(); - LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize(); - LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize(); - LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging(); + LOG(DEBUG) << "\ttype = " << channel.GetType(); + LOG(DEBUG) << "\tmethod = " << channel.GetMethod(); + LOG(DEBUG) << "\taddress = " << channel.GetAddress(); + LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize(); + LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize(); + LOG(DEBUG) << "\tsndKernelSize = " << channel.GetSndKernelSize(); + LOG(DEBUG) << "\trcvKernelSize = " << channel.GetRcvKernelSize(); + LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging(); channelList.push_back(channel); ++socketCounter; @@ -395,12 +411,14 @@ void SocketParser(const boost::property_tree::ptree& tree, vector FairMQChannel channel(commonChannel); - LOG(DEBUG) << "\ttype = " << channel.GetType(); - LOG(DEBUG) << "\tmethod = " << channel.GetMethod(); - LOG(DEBUG) << "\taddress = " << channel.GetAddress(); - LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize(); - LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize(); - LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging(); + LOG(DEBUG) << "\ttype = " << channel.GetType(); + LOG(DEBUG) << "\tmethod = " << channel.GetMethod(); + LOG(DEBUG) << "\taddress = " << channel.GetAddress(); + LOG(DEBUG) << "\tsndBufSize = " << channel.GetSndBufSize(); + LOG(DEBUG) << "\trcvBufSize = " << channel.GetRcvBufSize(); + LOG(DEBUG) << "\tsndKernelSize = " << channel.GetSndKernelSize(); + LOG(DEBUG) << "\trcvKernelSize = " << channel.GetRcvKernelSize(); + LOG(DEBUG) << "\trateLogging = " << channel.GetRateLogging(); channelList.push_back(channel); } diff --git a/fairmq/options/FairMQProgOptions.cxx b/fairmq/options/FairMQProgOptions.cxx index 2e981892..b228a076 100644 --- a/fairmq/options/FairMQProgOptions.cxx +++ b/fairmq/options/FairMQProgOptions.cxx @@ -235,6 +235,8 @@ void FairMQProgOptions::UpdateMQValues() string addressKey = p.first + "." + to_string(index) + ".address"; string sndBufSizeKey = p.first + "." + to_string(index) + ".sndBufSize"; string rcvBufSizeKey = p.first + "." + to_string(index) + ".rcvBufSize"; + string sndKernelSizeKey = p.first + "." + to_string(index) + ".sndKernelSize"; + string rcvKernelSizeKey = p.first + "." + to_string(index) + ".rcvKernelSize"; string rateLoggingKey = p.first + "." + to_string(index) + ".rateLogging"; fMQKeyMap[typeKey] = make_tuple(p.first, index, "type"); @@ -242,6 +244,8 @@ void FairMQProgOptions::UpdateMQValues() fMQKeyMap[addressKey] = make_tuple(p.first, index, "address"); fMQKeyMap[sndBufSizeKey] = make_tuple(p.first, index, "sndBufSize"); fMQKeyMap[rcvBufSizeKey] = make_tuple(p.first, index, "rcvBufSize"); + fMQKeyMap[sndKernelSizeKey] = make_tuple(p.first, index, "sndKernelSize"); + fMQKeyMap[rcvKernelSizeKey] = make_tuple(p.first, index, "rcvkernelSize"); fMQKeyMap[rateLoggingKey] = make_tuple(p.first, index, "rateLogging"); UpdateVarMap(typeKey, channel.GetType()); @@ -255,6 +259,12 @@ void FairMQProgOptions::UpdateMQValues() //UpdateVarMap(rcvBufSizeKey, to_string(channel.GetRcvBufSize()));// string API UpdateVarMap(rcvBufSizeKey, channel.GetRcvBufSize()); + //UpdateVarMap(sndKernelSizeKey, to_string(channel.GetSndKernelSize()));// string API + UpdateVarMap(sndKernelSizeKey, channel.GetSndKernelSize()); + + //UpdateVarMap(rcvKernelSizeKey, to_string(channel.GetRcvKernelSize()));// string API + UpdateVarMap(rcvKernelSizeKey, channel.GetRcvKernelSize()); + //UpdateVarMap(rateLoggingKey,to_string(channel.GetRateLogging()));// string API UpdateVarMap(rateLoggingKey, channel.GetRateLogging()); @@ -265,6 +275,8 @@ void FairMQProgOptions::UpdateMQValues() LOG(DEBUG) << "key = " << addressKey <<"\t value = " << GetValue(addressKey); LOG(DEBUG) << "key = " << sndBufSizeKey << "\t value = " << GetValue(sndBufSizeKey); LOG(DEBUG) << "key = " << rcvBufSizeKey <<"\t value = " << GetValue(rcvBufSizeKey); + LOG(DEBUG) << "key = " << sndKernelSizeKey << "\t value = " << GetValue(sndKernelSizeKey); + LOG(DEBUG) << "key = " << rcvKernelSizeKey <<"\t value = " << GetValue(rcvKernelSizeKey); LOG(DEBUG) << "key = " << rateLoggingKey <<"\t value = " << GetValue(rateLoggingKey); */ index++; @@ -376,7 +388,7 @@ int FairMQProgOptions::UpdateChannelMap(const string& channelName, int index, co { //if we get there it means something is wrong LOG(ERROR) << "update of FairMQChannel map failed for the following key: " - << channelName<<"."<()->default_value("data"), "Name of the output channel") + ("same-msg", bpo::value()->default_value(true), "Re-send the same message (default), or recreate for each iteration") ("msg-size", bpo::value()->default_value(1000), "Message size in bytes") - ("num-msgs", bpo::value()->default_value(0), "Number of messages to send") + ("num-msgs", bpo::value()->default_value(0), "Number of messages to send") ("msg-rate", bpo::value()->default_value(0), "Msg rate limit in maximum number of messages per second"); } diff --git a/fairmq/run/runDDSCommandUI.cxx b/fairmq/run/runDDSCommandUI.cxx index 1781f2d9..deeb4817 100644 --- a/fairmq/run/runDDSCommandUI.cxx +++ b/fairmq/run/runDDSCommandUI.cxx @@ -26,7 +26,8 @@ int main(int argc, char* argv[]) CIntercomService service; CCustomCmd ddsCustomCmd(service); - service.subscribeOnError([](const EErrorCode errorCode, const string& errorMsg) { + service.subscribeOnError([](const EErrorCode errorCode, const string& errorMsg) + { cout << "DDS error received: error code: " << errorCode << ", error message: " << errorMsg << endl; }); @@ -46,10 +47,14 @@ int main(int argc, char* argv[]) t.c_lflag &= ~ICANON; // disable canonical input tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings - if ( argc != 2 ) + if (argc != 2) + { PrintControlsHelp(); + } else + { cin.putback(argv[1][0]); + } while (cin >> c) { diff --git a/fairmq/run/startMQBenchmark.sh.in b/fairmq/run/startMQBenchmark.sh.in index 5648db56..af1a6729 100755 --- a/fairmq/run/startMQBenchmark.sh.in +++ b/fairmq/run/startMQBenchmark.sh.in @@ -2,17 +2,28 @@ numMsgs="0" msgSize="1000000" +transport="zeromq" +sameMsg="true" if [[ $1 =~ ^[0-9]+$ ]]; then msgSize=$1 fi -echo "Starting benchmark with message size of $msgSize bytes." - if [[ $2 =~ ^[0-9]+$ ]]; then numMsgs=$2 fi +if [[ $3 =~ ^[a-z]+$ ]]; then + transport=$3 +fi + +if [[ $4 =~ ^[a-z]+$ ]]; then + sameMsg=$4 +fi + +echo "Starting benchmark with message size of $msgSize bytes ($numMsgs messages) and $transport transport." +echo "Using $transport transport." + if [ $numMsgs = 0 ]; then echo "Unlimited number of messages." else @@ -20,14 +31,15 @@ else fi echo "" -echo "Usage: startBenchmark [message size=1000000] [number of messages=0]" +echo "Usage: startBenchmark [message size=1000000] [number of messages=0] [transport=zeromq/nanomsg/shmem] [resend same message=true]" SAMPLER="bsampler" SAMPLER+=" --id bsampler1" #SAMPLER+=" --io-threads 2" #SAMPLER+=" --control static" -#SAMPLER+=" --transport nanomsg" +SAMPLER+=" --transport $transport" SAMPLER+=" --msg-size $msgSize" +SAMPLER+=" --same-msg $sameMsg" # SAMPLER+=" --msg-rate 1000" SAMPLER+=" --num-msgs $numMsgs" SAMPLER+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" @@ -37,7 +49,7 @@ SINK="sink" SINK+=" --id sink1" #SINK+=" --io-threads 2" #SINK+=" --control static" -#SINK+=" --transport nanomsg" +SINK+=" --transport $transport" SINK+=" --num-msgs $numMsgs" SINK+=" --mq-config @CMAKE_BINARY_DIR@/bin/config/benchmark.json" xterm -geometry 80x23+500+0 -hold -e @CMAKE_BINARY_DIR@/bin/$SINK & diff --git a/fairmq/shmem/FairMQContextSHM.cxx b/fairmq/shmem/FairMQContextSHM.cxx new file mode 100644 index 00000000..07e3f2b1 --- /dev/null +++ b/fairmq/shmem/FairMQContextSHM.cxx @@ -0,0 +1,81 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#include + +#include + +#include "FairMQLogger.h" +#include "FairMQContextSHM.h" +#include "FairMQShmManager.h" + +using namespace FairMQ::shmem; + +FairMQContextSHM::FairMQContextSHM(int numIoThreads) + : fContext() +{ + fContext = zmq_ctx_new(); + if (fContext == NULL) + { + LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); + exit(EXIT_FAILURE); + } + + if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) + { + LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + } + + // Set the maximum number of allowed sockets on the context. + if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) + { + LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + } + + Manager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemory", 2000000000); + LOG(INFO) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are " << Manager::Instance().Segment()->get_free_memory() << " bytes."; +} + +FairMQContextSHM::~FairMQContextSHM() +{ + Close(); + + if (boost::interprocess::shared_memory_object::remove("FairMQSharedMemory")) + { + LOG(INFO) << "Successfully removed shared memory after the device has stopped."; + } + else + { + LOG(INFO) << "Did not remove shared memory after the device stopped. Still in use?"; + } +} + +void* FairMQContextSHM::GetContext() +{ + return fContext; +} + +void FairMQContextSHM::Close() +{ + if (fContext == NULL) + { + return; + } + + if (zmq_ctx_destroy(fContext) != 0) + { + if (errno == EINTR) + { + LOG(ERROR) << " failed closing context, reason: " << zmq_strerror(errno); + } + else + { + fContext = NULL; + return; + } + } +} diff --git a/fairmq/shmem/FairMQContextSHM.h b/fairmq/shmem/FairMQContextSHM.h new file mode 100644 index 00000000..04113dbc --- /dev/null +++ b/fairmq/shmem/FairMQContextSHM.h @@ -0,0 +1,27 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIRMQCONTEXTSHM_H_ +#define FAIRMQCONTEXTSHM_H_ + +class FairMQContextSHM +{ + public: + /// Constructor + FairMQContextSHM(int numIoThreads); + FairMQContextSHM(const FairMQContextSHM&) = delete; + FairMQContextSHM operator=(const FairMQContextSHM&) = delete; + + virtual ~FairMQContextSHM(); + void* GetContext(); + void Close(); + + private: + void* fContext; +}; + +#endif /* FAIRMQCONTEXTSHM_H_ */ diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx new file mode 100644 index 00000000..b92f14de --- /dev/null +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -0,0 +1,292 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#include +#include + +#include "FairMQMessageSHM.h" +#include "FairMQLogger.h" + +using namespace std; +using namespace FairMQ::shmem; + +uint64_t FairMQMessageSHM::fMessageID = 0; +string FairMQMessageSHM::fDeviceID = string(); +atomic FairMQMessageSHM::fInterrupted(false); + +FairMQMessageSHM::FairMQMessageSHM() + : fMessage() + , fOwner(nullptr) + , fReceiving(false) + , fQueued(false) +{ + if (zmq_msg_init(&fMessage) != 0) + { + LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno); + } +} + +void FairMQMessageSHM::StringDeleter(void* /*data*/, void* str) +{ + delete static_cast(str); +} + +FairMQMessageSHM::FairMQMessageSHM(const size_t size) + : fMessage() + , fOwner(nullptr) + , fReceiving(false) + , fQueued(false) +{ + InitializeChunk(size); +} + +FairMQMessageSHM::FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) + : fMessage() + , fOwner(nullptr) + , fReceiving(false) + , fQueued(false) +{ + InitializeChunk(size); + + memcpy(fOwner->fPtr->GetData(), data, size); + if (ffn) + { + ffn(data, hint); + } + else + { + free(data); + } +} + +void FairMQMessageSHM::InitializeChunk(const size_t size) +{ + string chunkID = fDeviceID + "c" + to_string(fMessageID); + string* ownerID = new string(fDeviceID + "o" + to_string(fMessageID)); + + bool success = false; + + while (!success) + { + try + { + fOwner = Manager::Instance().Segment()->construct(ownerID->c_str())( + make_managed_shared_ptr(Manager::Instance().Segment()->construct(chunkID.c_str())(size), + *(Manager::Instance().Segment()))); + success = true; + } + catch (bipc::bad_alloc& ba) + { + LOG(WARN) << "Shared memory full..."; + this_thread::sleep_for(chrono::milliseconds(50)); + if (fInterrupted) + { + break; + } + else + { + continue; + } + } + } + + if (zmq_msg_init_data(&fMessage, const_cast(ownerID->c_str()), ownerID->length(), StringDeleter, ownerID) != 0) + { + LOG(ERROR) << "failed initializing meta message, reason: " << zmq_strerror(errno); + } + + ++fMessageID; +} + +void FairMQMessageSHM::Rebuild() +{ + CloseMessage(); + + fReceiving = false; + fQueued = false; + + if (zmq_msg_init(&fMessage) != 0) + { + LOG(ERROR) << "failed initializing message, reason: " << zmq_strerror(errno); + } +} + +void FairMQMessageSHM::Rebuild(const size_t size) +{ + CloseMessage(); + + fReceiving = false; + fQueued = false; + + InitializeChunk(size); +} + +void FairMQMessageSHM::Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) +{ + CloseMessage(); + + fReceiving = false; + fQueued = false; + + InitializeChunk(size); + + memcpy(fOwner->fPtr->GetData(), data, size); + if (ffn) + { + ffn(data, hint); + } + else + { + free(data); + } +} + +void* FairMQMessageSHM::GetMessage() +{ + return &fMessage; +} + +void* FairMQMessageSHM::GetData() +{ + if (fOwner) + { + return fOwner->fPtr->GetData(); + } + else + { + LOG(ERROR) << "Trying to get data of an empty shared memory message"; + exit(EXIT_FAILURE); + } +} + +size_t FairMQMessageSHM::GetSize() +{ + if (fOwner) + { + return fOwner->fPtr->GetSize(); + } + else + { + return 0; + } +} + +void FairMQMessageSHM::SetMessage(void*, const size_t) +{ + // dummy method to comply with the interface. functionality not allowed in zeromq. +} + +void FairMQMessageSHM::SetDeviceId(const string& deviceId) +{ + fDeviceID = deviceId; +} + +void FairMQMessageSHM::Copy(const unique_ptr& msg) +{ + if (!fOwner) + { + FairMQ::shmem::ShPtrOwner* otherOwner = static_cast(msg.get())->fOwner; + if (otherOwner) + { + InitializeChunk(otherOwner->fPtr->GetSize()); + + memcpy(fOwner->fPtr->GetData(), otherOwner->fPtr->GetData(), otherOwner->fPtr->GetSize()); + } + else + { + LOG(ERROR) << "FairMQMessageSHM::Copy() fail: source message not initialized!"; + } + } + else + { + LOG(ERROR) << "FairMQMessageSHM::Copy() fail: target message already initialized!"; + } + + // version with sharing the sent data + // if (!fOwner) + // { + // if (static_cast(msg.get())->fOwner) + // { + // string* ownerID = new string(fDeviceID + "o" + to_string(fMessageID)); + + // bool success = false; + + // do + // { + // try + // { + // fOwner = Manager::Instance().Segment()->construct(ownerID->c_str())(*(static_cast(msg.get())->fOwner)); + // success = true; + // } + // catch (bipc::bad_alloc& ba) + // { + // LOG(WARN) << "Shared memory full..."; + // this_thread::sleep_for(chrono::milliseconds(10)); + // if (fInterrupted) + // { + // break; + // } + // else + // { + // continue; + // } + // } + // } + // while (!success); + + // if (zmq_msg_init_data(&fMessage, const_cast(ownerID->c_str()), ownerID->length(), StringDeleter, ownerID) != 0) + // { + // LOG(ERROR) << "failed initializing meta message, reason: " << zmq_strerror(errno); + // } + + // ++fMessageID; + // } + // else + // { + // LOG(ERROR) << "FairMQMessageSHM::Copy() fail: source message not initialized!"; + // } + // } + // else + // { + // LOG(ERROR) << "FairMQMessageSHM::Copy() fail: target message already initialized!"; + // } +} + +void FairMQMessageSHM::CloseMessage() +{ + if (fReceiving) + { + if (fOwner) + { + Manager::Instance().Segment()->destroy_ptr(fOwner); + fOwner = nullptr; + } + else + { + LOG(ERROR) << "No shared pointer owner when closing a received message"; + } + } + else + { + if (fOwner && !fQueued) + { + LOG(WARN) << "Destroying unsent message"; + Manager::Instance().Segment()->destroy_ptr(fOwner); + fOwner = nullptr; + } + } + + if (zmq_msg_close(&fMessage) != 0) + { + LOG(ERROR) << "failed closing message, reason: " << zmq_strerror(errno); + } +} + +FairMQMessageSHM::~FairMQMessageSHM() +{ + CloseMessage(); +} diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h new file mode 100644 index 00000000..b0c88248 --- /dev/null +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -0,0 +1,63 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIRMQMESSAGESHM_H_ +#define FAIRMQMESSAGESHM_H_ + +#include +#include +#include + +#include + +#include "FairMQMessage.h" +#include "FairMQShmManager.h" + +class FairMQMessageSHM : public FairMQMessage +{ + friend class FairMQSocketSHM; + + public: + FairMQMessageSHM(); + FairMQMessageSHM(const size_t size); + FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + FairMQMessageSHM(const FairMQMessageSHM&) = delete; + FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete; + + void InitializeChunk(const size_t size); + + virtual void Rebuild(); + virtual void Rebuild(const size_t size); + virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + + virtual void* GetMessage(); + virtual void* GetData(); + virtual size_t GetSize(); + + virtual void SetMessage(void* data, const size_t size); + + virtual void SetDeviceId(const std::string& deviceId); + + virtual void Copy(const std::unique_ptr& msg); + + void CloseMessage(); + + virtual ~FairMQMessageSHM(); + + static void StringDeleter(void* data, void* str); + + private: + zmq_msg_t fMessage; + FairMQ::shmem::ShPtrOwner* fOwner; + static uint64_t fMessageID; + static std::string fDeviceID; + bool fReceiving; + bool fQueued; + static std::atomic fInterrupted; +}; + +#endif /* FAIRMQMESSAGESHM_H_ */ diff --git a/fairmq/shmem/FairMQPollerSHM.cxx b/fairmq/shmem/FairMQPollerSHM.cxx new file mode 100644 index 00000000..926d9881 --- /dev/null +++ b/fairmq/shmem/FairMQPollerSHM.cxx @@ -0,0 +1,240 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQPollerSHM.cxx + * + * @since 2014-01-23 + * @author A. Rybalchenko + */ + +#include + +#include "FairMQPollerSHM.h" +#include "FairMQLogger.h" + +using namespace std; + +FairMQPollerSHM::FairMQPollerSHM(const vector& channels) + : items() + , fNumItems(0) + , fOffsetMap() +{ + fNumItems = channels.size(); + items = new zmq_pollitem_t[fNumItems]; + + for (int i = 0; i < fNumItems; ++i) + { + items[i].socket = channels.at(i).fSocket->GetSocket(); + items[i].fd = 0; + items[i].revents = 0; + + int type = 0; + size_t size = sizeof(type); + zmq_getsockopt (channels.at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); + + if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) + { + items[i].events = ZMQ_POLLIN|ZMQ_POLLOUT; + } + else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) + { + items[i].events = ZMQ_POLLOUT; + } + else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) + { + items[i].events = ZMQ_POLLIN; + } + else + { + LOG(ERROR) << "invalid poller configuration, exiting."; + exit(EXIT_FAILURE); + } + } +} + +FairMQPollerSHM::FairMQPollerSHM(const unordered_map>& channelsMap, const vector& channelList) + : items() + , fNumItems(0) + , fOffsetMap() +{ + int offset = 0; + + try + { + // calculate offsets and the total size of the poll item set + for (string channel : channelList) + { + fOffsetMap[channel] = offset; + offset += channelsMap.at(channel).size(); + fNumItems += channelsMap.at(channel).size(); + } + + items = new zmq_pollitem_t[fNumItems]; + + int index = 0; + for (string channel : channelList) + { + for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) + { + index = fOffsetMap[channel] + i; + + items[index].socket = channelsMap.at(channel).at(i).fSocket->GetSocket(); + items[index].fd = 0; + items[index].revents = 0; + + int type = 0; + size_t size = sizeof(type); + zmq_getsockopt (channelsMap.at(channel).at(i).fSocket->GetSocket(), ZMQ_TYPE, &type, &size); + + if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) + { + items[index].events = ZMQ_POLLIN|ZMQ_POLLOUT; + } + else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) + { + items[index].events = ZMQ_POLLOUT; + } + else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) + { + items[index].events = ZMQ_POLLIN; + } + else + { + LOG(ERROR) << "invalid poller configuration, exiting."; + exit(EXIT_FAILURE); + } + } + } + } + catch (const std::out_of_range& oor) + { + LOG(ERROR) << "At least one of the provided channel keys for poller initialization is invalid"; + LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; + exit(EXIT_FAILURE); + } +} + +FairMQPollerSHM::FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) + : items() + , fNumItems(2) + , fOffsetMap() +{ + items = new zmq_pollitem_t[fNumItems]; + + items[0].socket = cmdSocket.GetSocket(); + items[0].fd = 0; + items[0].events = ZMQ_POLLIN; + items[0].revents = 0; + + items[1].socket = dataSocket.GetSocket(); + items[1].fd = 0; + items[1].revents = 0; + + int type = 0; + size_t size = sizeof(type); + zmq_getsockopt(dataSocket.GetSocket(), ZMQ_TYPE, &type, &size); + + if (type == ZMQ_REQ || type == ZMQ_REP || type == ZMQ_PAIR || type == ZMQ_DEALER || type == ZMQ_ROUTER) + { + items[1].events = ZMQ_POLLIN|ZMQ_POLLOUT; + } + else if (type == ZMQ_PUSH || type == ZMQ_PUB || type == ZMQ_XPUB) + { + items[1].events = ZMQ_POLLOUT; + } + else if (type == ZMQ_PULL || type == ZMQ_SUB || type == ZMQ_XSUB) + { + items[1].events = ZMQ_POLLIN; + } + else + { + LOG(ERROR) << "invalid poller configuration, exiting."; + exit(EXIT_FAILURE); + } +} + +void FairMQPollerSHM::Poll(const int timeout) +{ + if (zmq_poll(items, fNumItems, timeout) < 0) + { + if (errno == ETERM) + { + LOG(DEBUG) << "polling exited, reason: " << zmq_strerror(errno); + } + else + { + LOG(ERROR) << "polling failed, reason: " << zmq_strerror(errno); + } + } +} + +bool FairMQPollerSHM::CheckInput(const int index) +{ + if (items[index].revents & ZMQ_POLLIN) + { + return true; + } + + return false; +} + +bool FairMQPollerSHM::CheckOutput(const int index) +{ + if (items[index].revents & ZMQ_POLLOUT) + { + return true; + } + + return false; +} + +bool FairMQPollerSHM::CheckInput(const string channelKey, const int index) +{ + try + { + if (items[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN) + { + return true; + } + + return false; + } + catch (const std::out_of_range& oor) + { + LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; + LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; + exit(EXIT_FAILURE); + } +} + +bool FairMQPollerSHM::CheckOutput(const string channelKey, const int index) +{ + try + { + if (items[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT) + { + return true; + } + + return false; + } + catch (const std::out_of_range& oor) + { + LOG(ERROR) << "Invalid channel key: \"" << channelKey << "\""; + LOG(ERROR) << "Out of Range error: " << oor.what() << '\n'; + exit(EXIT_FAILURE); + } +} + +FairMQPollerSHM::~FairMQPollerSHM() +{ + if (items != NULL) + { + delete[] items; + } +} diff --git a/fairmq/shmem/FairMQPollerSHM.h b/fairmq/shmem/FairMQPollerSHM.h new file mode 100644 index 00000000..ec68c9ed --- /dev/null +++ b/fairmq/shmem/FairMQPollerSHM.h @@ -0,0 +1,51 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIRMQPOLLERSHM_H_ +#define FAIRMQPOLLERSHM_H_ + +#include +#include +#include + +#include + +#include "FairMQPoller.h" +#include "FairMQChannel.h" +#include "FairMQTransportFactorySHM.h" + +class FairMQChannel; + +class FairMQPollerSHM : public FairMQPoller +{ + friend class FairMQChannel; + friend class FairMQTransportFactorySHM; + + public: + FairMQPollerSHM(const std::vector& channels); + FairMQPollerSHM(const std::unordered_map>& channelsMap, const std::vector& channelList); + FairMQPollerSHM(const FairMQPollerSHM&) = delete; + FairMQPollerSHM operator=(const FairMQPollerSHM&) = delete; + + virtual void Poll(const int timeout); + virtual bool CheckInput(const int index); + virtual bool CheckOutput(const int index); + virtual bool CheckInput(const std::string channelKey, const int index); + virtual bool CheckOutput(const std::string channelKey, const int index); + + virtual ~FairMQPollerSHM(); + + private: + FairMQPollerSHM(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket); + + zmq_pollitem_t* items; + int fNumItems; + + std::unordered_map fOffsetMap; +}; + +#endif /* FAIRMQPOLLERSHM_H_ */ \ No newline at end of file diff --git a/fairmq/shmem/FairMQShmManager.h b/fairmq/shmem/FairMQShmManager.h new file mode 100644 index 00000000..82cdd503 --- /dev/null +++ b/fairmq/shmem/FairMQShmManager.h @@ -0,0 +1,185 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +/** + * FairMQShmManager.h + * + * @since 2016-04-08 + * @author A. Rybalchenko + */ + +#ifndef FAIRMQSHMMANAGER_H_ +#define FAIRMQSHMMANAGER_H_ + +#include +#include + +#include +#include + +#include "FairMQLogger.h" + +namespace bipc = boost::interprocess; + +namespace FairMQ +{ +namespace shmem +{ + +class Manager +{ + public: + static Manager& Instance() + { + static Manager man; + return man; + } + + void InitializeSegment(const std::string& op, const std::string& name, const size_t size = 0) + { + if (!fSegment) + { + try + { + if (op == "open_or_create") + { + fSegment = new bipc::managed_shared_memory(bipc::open_or_create, name.c_str(), size); + } + else if (op == "create_only") + { + fSegment = new bipc::managed_shared_memory(bipc::create_only, name.c_str(), size); + } + else if (op == "open_only") + { + int numTries = 0; + bool success = false; + + do + { + try + { + fSegment = new bipc::managed_shared_memory(bipc::open_only, name.c_str()); + success = true; + } + catch (bipc::interprocess_exception& ie) + { + if (++numTries == 5) + { + LOG(ERROR) << "Could not open shared memory after " << numTries << " attempts, exiting!"; + exit(EXIT_FAILURE); + } + else + { + LOG(DEBUG) << "Could not open shared memory segment on try " << numTries << ". Retrying in 1 second..."; + LOG(DEBUG) << ie.what(); + + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + } + } + } + while (!success); + } + else + { + LOG(ERROR) << "Unknown operation when initializing shared memory segment: " << op; + } + } + catch (std::exception& e) + { + LOG(ERROR) << "Exception during shared memory segment initialization: " << e.what() << ", application will now exit"; + exit(EXIT_FAILURE); + } + } + else + { + LOG(INFO) << "Segment already initialized"; + } + } + + bipc::managed_shared_memory* Segment() const + { + if (fSegment) + { + return fSegment; + } + else + { + LOG(ERROR) << "Segment not initialized"; + exit(EXIT_FAILURE); + } + } + + private: + Manager() + : fSegment(nullptr) + {} + + bipc::managed_shared_memory* fSegment; +}; + +class Chunk +{ + public: + Chunk() + : fHandle() + , fSize(0) + { + } + + Chunk(const size_t size) + : fHandle() + , fSize(size) + { + void* ptr = Manager::Instance().Segment()->allocate(size); + fHandle = Manager::Instance().Segment()->get_handle_from_address(ptr); + } + + ~Chunk() + { + Manager::Instance().Segment()->deallocate(Manager::Instance().Segment()->get_address_from_handle(fHandle)); + } + + // bipc::managed_shared_memory::handle_t GetHandle() const + // { + // return fHandle; + // } + + void* GetData() const + { + return Manager::Instance().Segment()->get_address_from_handle(fHandle); + } + + size_t GetSize() const + { + return fSize; + } + + private: + bipc::managed_shared_memory::handle_t fHandle; + size_t fSize; +}; + +typedef bipc::managed_shared_ptr::type ShPtrType; + +struct ShPtrOwner +{ + ShPtrOwner(const ShPtrType& other) + : fPtr(other) + {} + + ShPtrOwner(const ShPtrOwner& other) + : fPtr(other.fPtr) + {} + + ShPtrType fPtr; +}; + +} // namespace shmem + +} // namespace FairMQ + +#endif /* FAIRMQSHMMANAGER_H_ */ diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx new file mode 100644 index 00000000..e55fc684 --- /dev/null +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -0,0 +1,591 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#include + +#include + +#include "FairMQSocketSHM.h" +#include "FairMQMessageSHM.h" +#include "FairMQLogger.h" + +using namespace std; +using namespace FairMQ::shmem; + +// Context to hold the ZeroMQ sockets +unique_ptr FairMQSocketSHM::fContext = unique_ptr(new FairMQContextSHM(1)); +// bool FairMQSocketSHM::fContextInitialized = false; + +FairMQSocketSHM::FairMQSocketSHM(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) + : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) + , fSocket(NULL) + , fId() + , fBytesTx(0) + , fBytesRx(0) + , fMessagesTx(0) + , fMessagesRx(0) +{ + fId = id + "." + name + "." + type; + + // if (!fContextInitialized) + // { + // fContext = unique_ptr(new FairMQContextSHM(1)); + // fContextInitialized = true; + // } + + if (zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads) != 0) + { + LOG(ERROR) << "Failed configuring context, reason: " << zmq_strerror(errno); + } + + fSocket = zmq_socket(fContext->GetContext(), GetConstant(type)); + + if (fSocket == NULL) + { + LOG(ERROR) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); + exit(EXIT_FAILURE); + } + + if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) + { + LOG(ERROR) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); + } + + // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. + // Default value for ZeroMQ is -1, which is to wait forever. + int linger = 500; + if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) + { + LOG(ERROR) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); + } + + int kernelSndSize = 10000; + if (zmq_setsockopt(fSocket, ZMQ_SNDBUF, &kernelSndSize, sizeof(kernelSndSize)) != 0) + { + LOG(ERROR) << "Failed setting ZMQ_SNDBUF socket option, reason: " << zmq_strerror(errno); + } + + int kernelRcvSize = 10000; + if (zmq_setsockopt(fSocket, ZMQ_RCVBUF, &kernelRcvSize, sizeof(kernelRcvSize)) != 0) + { + LOG(ERROR) << "Failed setting ZMQ_RCVBUF socket option, reason: " << zmq_strerror(errno); + } + + if (type == "sub") + { + if (zmq_setsockopt(fSocket, ZMQ_SUBSCRIBE, NULL, 0) != 0) + { + LOG(ERROR) << "Failed setting ZMQ_SUBSCRIBE socket option, reason: " << zmq_strerror(errno); + } + } + + // LOG(INFO) << "created socket " << fId; +} + +string FairMQSocketSHM::GetId() +{ + return fId; +} + +bool FairMQSocketSHM::Bind(const string& address) +{ + // LOG(INFO) << "bind socket " << fId << " on " << address; + + if (zmq_bind(fSocket, address.c_str()) != 0) + { + if (errno == EADDRINUSE) { + // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. + return false; + } + LOG(ERROR) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + return true; +} + +void FairMQSocketSHM::Connect(const string& address) +{ + // LOG(INFO) << "connect socket " << fId << " on " << address; + + if (zmq_connect(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); + // error here means incorrect configuration. exit if it happens. + exit(EXIT_FAILURE); + } +} + +int FairMQSocketSHM::Send(FairMQMessage* msg, const string& flag) +{ + return Send(msg, GetConstant(flag)); +} + +int FairMQSocketSHM::Send(FairMQMessage* msg, const int flags) +{ + int nbytes = zmq_msg_send(static_cast(msg->GetMessage()), fSocket, flags); + if (nbytes >= 0) + { + static_cast(msg)->fReceiving = false; + static_cast(msg)->fQueued = true; + size_t size = msg->GetSize(); + + fBytesTx += size; + ++fMessagesTx; + + return size; + } + if (zmq_errno() == EAGAIN) + { + return -2; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + return nbytes; +} + +int64_t FairMQSocketSHM::Send(const vector& msgVec, const int flags) +{ + // Sending vector typicaly handles more then one part + if (msgVec.size() > 1) + { + int64_t totalSize = 0; + + for (unsigned int i = 0; i < msgVec.size() - 1; ++i) + { + int nbytes = zmq_msg_send(static_cast(msgVec[i]->GetMessage()), fSocket, ZMQ_SNDMORE|flags); + if (nbytes >= 0) + { + static_cast(msgVec[i].get())->fReceiving = false; + static_cast(msgVec[i].get())->fQueued = true; + size_t size = msgVec[i]->GetSize(); + + totalSize += size; + fBytesTx += size; + } + else + { + if (zmq_errno() == EAGAIN) + { + return -2; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + return nbytes; + } + } + + int nbytes = zmq_msg_send(static_cast(msgVec.back()->GetMessage()), fSocket, flags); + if (nbytes >= 0) + { + static_cast(msgVec.back().get())->fReceiving = false; + static_cast(msgVec.back().get())->fQueued = true; + size_t size = msgVec.back()->GetSize(); + + totalSize += size; + fBytesTx += size; + } + else + { + if (zmq_errno() == EAGAIN) + { + return -2; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); + return nbytes; + } + + // store statistics on how many messages have been sent (handle all parts as a single message) + ++fMessagesTx; + return totalSize; + } // If there's only one part, send it as a regular message + else if (msgVec.size() == 1) + { + return Send(msgVec.back().get(), flags); + } + else // if the vector is empty, something might be wrong + { + LOG(WARN) << "Will not send empty vector"; + return -1; + } +} + +int FairMQSocketSHM::Receive(FairMQMessage* msg, const string& flag) +{ + return Receive(msg, GetConstant(flag)); +} + +int FairMQSocketSHM::Receive(FairMQMessage* msg, const int flags) +{ + zmq_msg_t* msgPtr = static_cast(msg->GetMessage()); + int nbytes = zmq_msg_recv(msgPtr, fSocket, flags); + if (nbytes == 0) + { + ++fMessagesRx; + return nbytes; + } + else if (nbytes > 0) + { + string ownerID(static_cast(zmq_msg_data(msgPtr)), zmq_msg_size(msgPtr)); + ShPtrOwner* owner = Manager::Instance().Segment()->find(ownerID.c_str()).first; + size_t size = 0; + if (owner) + { + static_cast(msg)->fOwner = owner; + static_cast(msg)->fReceiving = true; + size = msg->GetSize(); + + fBytesRx += size; + ++fMessagesRx; + + return size; + } + else + { + LOG(ERROR) << "Received meta data, but could not find corresponding chunk"; + return -1; + } + } + if (zmq_errno() == EAGAIN) + { + return -2; + } + if (zmq_errno() == ETERM) + { + LOG(INFO) << "terminating socket " << fId; + return -1; + } + LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); + return nbytes; +} + +int64_t FairMQSocketSHM::Receive(vector& msgVec, const int flags) +{ + // Warn if the vector is filled before Receive() and empty it. + if (msgVec.size() > 0) + { + LOG(WARN) << "Message vector contains elements before Receive(), they will be deleted!"; + msgVec.clear(); + } + + int64_t totalSize = 0; + int64_t more = 0; + + do + { + FairMQMessagePtr part(new FairMQMessageSHM()); + zmq_msg_t* msgPtr = static_cast(part->GetMessage()); + + int nbytes = zmq_msg_recv(msgPtr, fSocket, flags); + if (nbytes == 0) + { + msgVec.push_back(move(part)); + } + else if (nbytes > 0) + { + string ownerID(static_cast(zmq_msg_data(msgPtr)), zmq_msg_size(msgPtr)); + ShPtrOwner* owner = Manager::Instance().Segment()->find(ownerID.c_str()).first; + size_t size = 0; + if (owner) + { + static_cast(part.get())->fOwner = owner; + static_cast(part.get())->fReceiving = true; + size = part->GetSize(); + + msgVec.push_back(move(part)); + + fBytesRx += size; + totalSize += size; + } + else + { + LOG(ERROR) << "Received meta data, but could not find corresponding chunk"; + return -1; + } + } + else + { + return nbytes; + } + + size_t more_size = sizeof(more); + zmq_getsockopt(fSocket, ZMQ_RCVMORE, &more, &more_size); + } + while (more); + + // store statistics on how many messages have been received (handle all parts as a single message) + ++fMessagesRx; + return totalSize; +} + +void FairMQSocketSHM::Close() +{ + // LOG(DEBUG) << "Closing socket " << fId; + + if (fSocket == NULL) + { + return; + } + + if (zmq_close(fSocket) != 0) + { + LOG(ERROR) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); + } + + fSocket = NULL; +} + +void FairMQSocketSHM::Terminate() +{ + if (zmq_ctx_destroy(fContext->GetContext()) != 0) + { + LOG(ERROR) << "Failed terminating context, reason: " << zmq_strerror(errno); + } +} + +void FairMQSocketSHM::Interrupt() +{ + FairMQMessageSHM::fInterrupted = true; +} + +void FairMQSocketSHM::Resume() +{ + FairMQMessageSHM::fInterrupted = false; +} + +void* FairMQSocketSHM::GetSocket() const +{ + return fSocket; +} + +int FairMQSocketSHM::GetSocket(int) const +{ + // dummy method to comply with the interface. functionality not possible in zeromq. + return -1; +} + +void FairMQSocketSHM::SetOption(const string& option, const void* value, size_t valueSize) +{ + if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) + { + LOG(ERROR) << "Failed setting socket option, reason: " << zmq_strerror(errno); + } +} + +void FairMQSocketSHM::GetOption(const string& option, void* value, size_t* valueSize) +{ + if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) + { + LOG(ERROR) << "Failed getting socket option, reason: " << zmq_strerror(errno); + } +} + +unsigned long FairMQSocketSHM::GetBytesTx() const +{ + return fBytesTx; +} + +unsigned long FairMQSocketSHM::GetBytesRx() const +{ + return fBytesRx; +} + +unsigned long FairMQSocketSHM::GetMessagesTx() const +{ + return fMessagesTx; +} + +unsigned long FairMQSocketSHM::GetMessagesRx() const +{ + return fMessagesRx; +} + +bool FairMQSocketSHM::SetSendTimeout(const int timeout, const string& address, const string& method) +{ + if (method == "bind") + { + if (zmq_unbind(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_bind(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + } + else if (method == "connect") + { + if (zmq_disconnect(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_connect(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + } + else + { + LOG(ERROR) << "SetSendTimeout() failed - unknown method provided!"; + return false; + } + + return true; +} + +int FairMQSocketSHM::GetSendTimeout() const +{ + int timeout = -1; + size_t size = sizeof(timeout); + + if (zmq_getsockopt(fSocket, ZMQ_SNDTIMEO, &timeout, &size) != 0) + { + LOG(ERROR) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); + } + + return timeout; +} + +bool FairMQSocketSHM::SetReceiveTimeout(const int timeout, const string& address, const string& method) +{ + if (method == "bind") + { + if (zmq_unbind(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed unbinding socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_bind(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed binding socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + } + else if (method == "connect") + { + if (zmq_disconnect(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed disconnecting socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, sizeof(int)) != 0) + { + LOG(ERROR) << "Failed setting option on socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + if (zmq_connect(fSocket, address.c_str()) != 0) + { + LOG(ERROR) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); + return false; + } + } + else + { + LOG(ERROR) << "SetReceiveTimeout() failed - unknown method provided!"; + return false; + } + + return true; +} + +int FairMQSocketSHM::GetReceiveTimeout() const +{ + int timeout = -1; + size_t size = sizeof(timeout); + + if (zmq_getsockopt(fSocket, ZMQ_RCVTIMEO, &timeout, &size) != 0) + { + LOG(ERROR) << "Failed getting option 'receive timeout' on socket " << fId << ", reason: " << zmq_strerror(errno); + } + + return timeout; +} + +int FairMQSocketSHM::GetConstant(const string& constant) +{ + if (constant == "") + return 0; + if (constant == "sub") + return ZMQ_SUB; + if (constant == "pub") + return ZMQ_PUB; + if (constant == "xsub") + return ZMQ_XSUB; + if (constant == "xpub") + return ZMQ_XPUB; + if (constant == "push") + return ZMQ_PUSH; + if (constant == "pull") + return ZMQ_PULL; + if (constant == "req") + return ZMQ_REQ; + if (constant == "rep") + return ZMQ_REP; + if (constant == "dealer") + return ZMQ_DEALER; + if (constant == "router") + return ZMQ_ROUTER; + if (constant == "pair") + return ZMQ_PAIR; + + if (constant == "snd-hwm") + return ZMQ_SNDHWM; + if (constant == "rcv-hwm") + return ZMQ_RCVHWM; + if (constant == "snd-size") + return ZMQ_SNDBUF; + if (constant == "rcv-size") + return ZMQ_RCVBUF; + if (constant == "snd-more") + return ZMQ_SNDMORE; + if (constant == "rcv-more") + return ZMQ_RCVMORE; + + if (constant == "linger") + return ZMQ_LINGER; + if (constant == "no-block") + return ZMQ_DONTWAIT; + if (constant == "snd-more no-block") + return ZMQ_DONTWAIT|ZMQ_SNDMORE; + + return -1; +} + +FairMQSocketSHM::~FairMQSocketSHM() +{ +} diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/FairMQSocketSHM.h new file mode 100644 index 00000000..59e4fc60 --- /dev/null +++ b/fairmq/shmem/FairMQSocketSHM.h @@ -0,0 +1,76 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIRMQSOCKETSHM_H_ +#define FAIRMQSOCKETSHM_H_ + +#include + +#include // unique_ptr + +#include "FairMQSocket.h" +#include "FairMQContextSHM.h" +#include "FairMQShmManager.h" + +class FairMQSocketSHM : public FairMQSocket +{ + public: + FairMQSocketSHM(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); + FairMQSocketSHM(const FairMQSocketSHM&) = delete; + FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete; + + virtual std::string GetId(); + + virtual bool Bind(const std::string& address); + virtual void Connect(const std::string& address); + + virtual int Send(FairMQMessage* msg, const std::string& flag = ""); + virtual int Send(FairMQMessage* msg, const int flags = 0); + virtual int64_t Send(const std::vector>& msgVec, const int flags = 0); + + virtual int Receive(FairMQMessage* msg, const std::string& flag = ""); + virtual int Receive(FairMQMessage* msg, const int flags = 0); + virtual int64_t Receive(std::vector>& msgVec, const int flags = 0); + + virtual void* GetSocket() const; + virtual int GetSocket(int nothing) const; + virtual void Close(); + virtual void Terminate(); + + virtual void Interrupt(); + virtual void Resume(); + + virtual void SetOption(const std::string& option, const void* value, size_t valueSize); + virtual void GetOption(const std::string& option, void* value, size_t* valueSize); + + virtual unsigned long GetBytesTx() const; + virtual unsigned long GetBytesRx() const; + virtual unsigned long GetMessagesTx() const; + virtual unsigned long GetMessagesRx() const; + + virtual bool SetSendTimeout(const int timeout, const std::string& address, const std::string& method); + virtual int GetSendTimeout() const; + virtual bool SetReceiveTimeout(const int timeout, const std::string& address, const std::string& method); + virtual int GetReceiveTimeout() const; + + static int GetConstant(const std::string& constant); + + virtual ~FairMQSocketSHM(); + + private: + void* fSocket; + std::string fId; + std::atomic fBytesTx; + std::atomic fBytesRx; + std::atomic fMessagesTx; + std::atomic fMessagesRx; + + static std::unique_ptr fContext; + // static bool fContextInitialized; +}; + +#endif /* FAIRMQSOCKETSHM_H_ */ diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx new file mode 100644 index 00000000..3e2a194f --- /dev/null +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -0,0 +1,56 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#include "zmq.h" +#include + +#include "FairMQTransportFactorySHM.h" + +using namespace std; + +FairMQTransportFactorySHM::FairMQTransportFactorySHM() +{ + int major, minor, patch; + zmq_version(&major, &minor, &patch); + LOG(DEBUG) << "Using ZeroMQ (" << major << "." << minor << "." << patch << ") & " + << "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")"; +} + +FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage() const +{ + return unique_ptr(new FairMQMessageSHM()); +} + +FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(const size_t size) const +{ + return unique_ptr(new FairMQMessageSHM(size)); +} + +FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint) const +{ + return unique_ptr(new FairMQMessageSHM(data, size, ffn, hint)); +} + +FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) const +{ + return unique_ptr(new FairMQSocketSHM(type, name, numIoThreads, id)); +} + +FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector& channels) const +{ + return unique_ptr(new FairMQPollerSHM(channels)); +} + +FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const unordered_map>& channelsMap, const vector& channelList) const +{ + return unique_ptr(new FairMQPollerSHM(channelsMap, channelList)); +} + +FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const +{ + return unique_ptr(new FairMQPollerSHM(cmdSocket, dataSocket)); +} diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h new file mode 100644 index 00000000..e0759ce3 --- /dev/null +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -0,0 +1,37 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIRMQTRANSPORTFACTORYSHM_H_ +#define FAIRMQTRANSPORTFACTORYSHM_H_ + +#include + +#include "FairMQTransportFactory.h" +#include "FairMQContextSHM.h" +#include "FairMQMessageSHM.h" +#include "FairMQSocketSHM.h" +#include "FairMQPollerSHM.h" + +class FairMQTransportFactorySHM : public FairMQTransportFactory +{ + public: + FairMQTransportFactorySHM(); + + virtual FairMQMessagePtr CreateMessage() const; + virtual FairMQMessagePtr CreateMessage(const size_t size) const; + virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const; + + virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const; + + virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const; + virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const; + virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const; + + virtual ~FairMQTransportFactorySHM() {}; +}; + +#endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */ diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md new file mode 100644 index 00000000..b3637f10 --- /dev/null +++ b/fairmq/shmem/README.md @@ -0,0 +1,10 @@ +# Shared Memory transport + +First version of the shared memory transport. To try with existing devices, run the devices with `--transport shmem` option. + +The transport manages shared memory via boost::interprocess library. The transfer of the meta data, required to locate the content in the share memory, is done via ZeroMQ. The transport supports all communication patterns where a single message is received by a single receiver. For multiple receivers for the same message, the message has to be copied. + +Under development: +- Cleanup of the shared memory segment in case all devices crash. Currently at least one device has to stop properly for a cleanup. +- Implement more than one transport per device. +- Configuration of the shared memory size (currently hard-coded). diff --git a/fairmq/tools/runSimpleMQStateMachine.h b/fairmq/tools/runSimpleMQStateMachine.h index 2328ae94..8e5f56d3 100644 --- a/fairmq/tools/runSimpleMQStateMachine.h +++ b/fairmq/tools/runSimpleMQStateMachine.h @@ -96,6 +96,11 @@ inline int runStateMachine(TMQDevice& device, FairMQProgOptions& cfg) LOG(ERROR) << "Cannot load fairmqControlPlugin(): " << dlsymError; fairmqControlPlugin = nullptr; dlclose(ldControlHandle); + // also close the config plugin before quiting with error. + if (ldConfigHandle) + { + dlclose(ldConfigHandle); + } return 1; } diff --git a/fairmq/zeromq/FairMQContextZMQ.cxx b/fairmq/zeromq/FairMQContextZMQ.cxx index 6ece7d63..4cb52841 100644 --- a/fairmq/zeromq/FairMQContextZMQ.cxx +++ b/fairmq/zeromq/FairMQContextZMQ.cxx @@ -12,8 +12,6 @@ * @author D. Klein, A. Rybalchenko */ -#include - #include #include "FairMQLogger.h" @@ -60,9 +58,12 @@ void FairMQContextZMQ::Close() if (zmq_ctx_destroy(fContext) != 0) { - if (errno == EINTR) { + if (errno == EINTR) + { LOG(ERROR) << " failed closing context, reason: " << zmq_strerror(errno); - } else { + } + else + { fContext = NULL; return; } diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 56bae5c8..cebf713e 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -20,6 +20,8 @@ using namespace std; +string FairMQMessageZMQ::fDeviceID = string(); + FairMQMessageZMQ::FairMQMessageZMQ() : fMessage() { @@ -94,22 +96,9 @@ void FairMQMessageZMQ::SetMessage(void*, const size_t) // dummy method to comply with the interface. functionality not allowed in zeromq. } -void FairMQMessageZMQ::Copy(FairMQMessage* msg) +void FairMQMessageZMQ::SetDeviceId(const string& deviceId) { - // DEPRECATED: Use Copy(const unique_ptr&) - - // Shares the message buffer between msg and this fMessage. - if (zmq_msg_copy(&fMessage, static_cast(msg->GetMessage())) != 0) - { - LOG(ERROR) << "failed copying message, reason: " << zmq_strerror(errno); - } - - // Alternatively, following code does a hard copy of the message, which allows to modify the original after making a copy, without affecting the new msg. - - // CloseMessage(); - // size_t size = msg->GetSize(); - // zmq_msg_init_size(&fMessage, size); - // memcpy(zmq_msg_data(&fMessage), msg->GetData(), size); + fDeviceID = deviceId; } void FairMQMessageZMQ::Copy(const unique_ptr& msg) @@ -128,7 +117,7 @@ void FairMQMessageZMQ::Copy(const unique_ptr& msg) // memcpy(zmq_msg_data(&fMessage), msg->GetData(), size); } -inline void FairMQMessageZMQ::CloseMessage() +void FairMQMessageZMQ::CloseMessage() { if (zmq_msg_close(&fMessage) != 0) { diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index 1b87b4c1..0674aaa8 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -16,6 +16,7 @@ #define FAIRMQMESSAGEZMQ_H_ #include +#include #include @@ -26,11 +27,11 @@ class FairMQMessageZMQ : public FairMQMessage public: FairMQMessageZMQ(); FairMQMessageZMQ(const size_t size); - FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); + FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); virtual void Rebuild(); virtual void Rebuild(const size_t size); - virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL); + virtual void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); virtual void* GetMessage(); virtual void* GetData(); @@ -38,14 +39,17 @@ class FairMQMessageZMQ : public FairMQMessage virtual void SetMessage(void* data, const size_t size); - virtual void CloseMessage(); - virtual void Copy(FairMQMessage* msg); + virtual void SetDeviceId(const std::string& deviceId); + virtual void Copy(const std::unique_ptr& msg); + void CloseMessage(); + virtual ~FairMQMessageZMQ(); private: zmq_msg_t fMessage; + static std::string fDeviceID; }; #endif /* FAIRMQMESSAGEZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 2959035a..15ce5d2c 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -108,24 +108,7 @@ void FairMQSocketZMQ::Connect(const string& address) int FairMQSocketZMQ::Send(FairMQMessage* msg, const string& flag) { - int nbytes = zmq_msg_send(static_cast(msg->GetMessage()), fSocket, GetConstant(flag)); - if (nbytes >= 0) - { - fBytesTx += nbytes; - ++fMessagesTx; - return nbytes; - } - if (zmq_errno() == EAGAIN) - { - return -2; - } - if (zmq_errno() == ETERM) - { - LOG(INFO) << "terminating socket " << fId; - return -1; - } - LOG(ERROR) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); - return nbytes; + return Send(msg, GetConstant(flag)); } int FairMQSocketZMQ::Send(FairMQMessage* msg, const int flags) @@ -218,24 +201,7 @@ int64_t FairMQSocketZMQ::Send(const vector>& msgVec, c int FairMQSocketZMQ::Receive(FairMQMessage* msg, const string& flag) { - int nbytes = zmq_msg_recv(static_cast(msg->GetMessage()), fSocket, GetConstant(flag)); - if (nbytes >= 0) - { - fBytesRx += nbytes; - ++fMessagesRx; - return nbytes; - } - if (zmq_errno() == EAGAIN) - { - return -2; - } - if (zmq_errno() == ETERM) - { - LOG(INFO) << "terminating socket " << fId; - return -1; - } - LOG(ERROR) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); - return nbytes; + return Receive(msg, GetConstant(flag)); } int FairMQSocketZMQ::Receive(FairMQMessage* msg, const int flags) @@ -323,6 +289,14 @@ void FairMQSocketZMQ::Terminate() } } +void FairMQSocketZMQ::Interrupt() +{ +} + +void FairMQSocketZMQ::Resume() +{ +} + void* FairMQSocketZMQ::GetSocket() const { return fSocket; diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 6ec7c47c..d57e1b0c 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -47,6 +47,9 @@ class FairMQSocketZMQ : public FairMQSocket virtual void Close(); virtual void Terminate(); + virtual void Interrupt(); + virtual void Resume(); + virtual void SetOption(const std::string& option, const void* value, size_t valueSize); virtual void GetOption(const std::string& option, void* value, size_t* valueSize);