diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 0f864aee..248acaf0 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -62,6 +62,7 @@ set(FAIRMQ_HEADER_FILES FairMQMessage.h FairMQParts.h FairMQPoller.h + FairMQRegion.h FairMQSocket.h FairMQStateMachine.h FairMQTransportFactory.h @@ -87,10 +88,11 @@ set(FAIRMQ_HEADER_FILES runFairMQDevice.h shmem/FairMQMessageSHM.h shmem/FairMQPollerSHM.h + shmem/FairMQRegionSHM.h shmem/FairMQSocketSHM.h shmem/FairMQTransportFactorySHM.h shmem/FairMQShmMonitor.h - shmem/FairMQShmDeviceCounter.h + shmem/FairMQShmCommon.h tools/CppSTL.h tools/Network.h tools/runSimpleMQStateMachine.h @@ -98,6 +100,7 @@ set(FAIRMQ_HEADER_FILES tools/Version.h zeromq/FairMQMessageZMQ.h zeromq/FairMQPollerZMQ.h + zeromq/FairMQRegionZMQ.h zeromq/FairMQSocketZMQ.h zeromq/FairMQTransportFactoryZMQ.h ) @@ -106,6 +109,7 @@ if(NANOMSG_FOUND) set(FAIRMQ_HEADER_FILES ${FAIRMQ_HEADER_FILES} nanomsg/FairMQMessageNN.h nanomsg/FairMQPollerNN.h + nanomsg/FairMQRegionNN.h nanomsg/FairMQSocketNN.h nanomsg/FairMQTransportFactoryNN.h ) @@ -144,11 +148,13 @@ set(FAIRMQ_SOURCE_FILES PluginServices.cxx shmem/FairMQMessageSHM.cxx shmem/FairMQPollerSHM.cxx + shmem/FairMQRegionSHM.cxx shmem/FairMQSocketSHM.cxx shmem/FairMQTransportFactorySHM.cxx shmem/FairMQShmMonitor.cxx zeromq/FairMQMessageZMQ.cxx zeromq/FairMQPollerZMQ.cxx + zeromq/FairMQRegionZMQ.cxx zeromq/FairMQSocketZMQ.cxx zeromq/FairMQTransportFactoryZMQ.cxx ) @@ -157,6 +163,7 @@ if(NANOMSG_FOUND) set(FAIRMQ_SOURCE_FILES ${FAIRMQ_SOURCE_FILES} nanomsg/FairMQMessageNN.cxx nanomsg/FairMQPollerNN.cxx + nanomsg/FairMQRegionNN.cxx nanomsg/FairMQSocketNN.cxx nanomsg/FairMQTransportFactoryNN.cxx ) diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index 2aeed993..e8e9074a 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -47,12 +47,12 @@ static void CallSignalHandler(int signal) } FairMQDevice::FairMQDevice() - : fChannels() + : fTransportFactory(nullptr) + , fTransports() + , fChannels() , fConfig(nullptr) , fId() , fNumIoThreads(1) - , fTransportFactory(nullptr) - , fTransports() , fInitialValidationFinished(false) , fInitialValidationCondition() , fInitialValidationMutex() @@ -69,6 +69,7 @@ FairMQDevice::FairMQDevice() , fMsgInputs() , fMultipartInputs() , fMultitransportInputs() + , fChannelRegistry() , fInputChannelKeys() , fMultitransportMutex() , fMultitransportProceed(false) @@ -78,12 +79,12 @@ FairMQDevice::FairMQDevice() } FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) - : fChannels() + : fTransportFactory(nullptr) + , fTransports() + , fChannels() , fConfig(nullptr) , fId() , fNumIoThreads(1) - , fTransportFactory(nullptr) - , fTransports() , fInitialValidationFinished(false) , fInitialValidationCondition() , fInitialValidationMutex() @@ -100,6 +101,7 @@ FairMQDevice::FairMQDevice(const fair::mq::tools::Version version) , fMsgInputs() , fMultipartInputs() , fMultitransportInputs() + , fChannelRegistry() , fInputChannelKeys() , fMultitransportMutex() , fMultitransportProceed(false) @@ -1293,8 +1295,6 @@ const FairMQChannel& FairMQDevice::GetChannel(const std::string& channelName, co void FairMQDevice::Exit() { - LOG(DEBUG) << "All transports are shut down."; - if (!fExternalConfig && fConfig) { delete fConfig; @@ -1303,5 +1303,5 @@ void FairMQDevice::Exit() FairMQDevice::~FairMQDevice() { - LOG(DEBUG) << "Device destroyed"; + LOG(DEBUG) << "Destructing device " << fId; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 70e8494b..519fb5fe 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -18,6 +18,7 @@ #include "FairMQChannel.h" #include "FairMQMessage.h" #include "FairMQParts.h" +#include "FairMQRegion.h" #include #include // unique_ptr @@ -242,6 +243,16 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable return fChannels.at(channel).at(index).NewSimpleMessage(data); } + FairMQRegionPtr NewRegion(const size_t size) + { + return Transport()->CreateRegion(size); + } + + FairMQRegionPtr NewRegionFor(const std::string& channel, int index, const size_t size) + { + return fChannels.at(channel).at(index).Transport()->CreateRegion(size); + } + template FairMQPollerPtr NewPoller(const Ts&... inputs) { @@ -272,7 +283,7 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable { FairMQ::Transport type = channels.at(0)->Transport()->GetType(); - for (int i = 1; i < channels.size(); ++i) + for (unsigned int i = 1; i < channels.size(); ++i) { if (type != channels.at(i)->Transport()->GetType()) { diff --git a/fairmq/shmem/FairMQShmDeviceCounter.h b/fairmq/FairMQRegion.h similarity index 58% rename from fairmq/shmem/FairMQShmDeviceCounter.h rename to fairmq/FairMQRegion.h index fa589704..c18f6121 100644 --- a/fairmq/shmem/FairMQShmDeviceCounter.h +++ b/fairmq/FairMQRegion.h @@ -1,33 +1,26 @@ /******************************************************************************** * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIRMQSHMDEVICECOUNTER_H_ -#define FAIRMQSHMDEVICECOUNTER_H_ -#include +#ifndef FAIRMQREGION_H_ +#define FAIRMQREGION_H_ -namespace fair -{ -namespace mq -{ -namespace shmem -{ +#include // size_t +#include // unique_ptr -struct DeviceCounter +class FairMQRegion { - DeviceCounter(unsigned int c) - : count(c) - {} + public: + virtual void* GetData() const = 0; + virtual size_t GetSize() const = 0; - std::atomic count; + virtual ~FairMQRegion() {}; }; -} // namespace shmem -} // namespace mq -} // namespace fair +using FairMQRegionPtr = std::unique_ptr; -#endif /* FAIRMQSHMDEVICECOUNTER_H_ */ +#endif /* FAIRMQREGION_H_ */ diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 52d14f18..6570b964 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -12,6 +12,7 @@ #include "FairMQMessage.h" #include "FairMQSocket.h" #include "FairMQPoller.h" +#include "FairMQRegion.h" #include "FairMQLogger.h" #include "FairMQTransports.h" #include @@ -50,6 +51,8 @@ class FairMQTransportFactory /// @return pointer to FairMQMessage virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0; + virtual FairMQMessagePtr CreateMessage(FairMQRegionPtr& region, void* data, const size_t size) const = 0; + /// Create a socket virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const = 0; @@ -62,6 +65,8 @@ class FairMQTransportFactory /// Create a poller for two sockets virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const = 0; + virtual FairMQRegionPtr CreateRegion(const size_t size) const = 0; + /// Get transport type virtual FairMQ::Transport GetType() const = 0; diff --git a/fairmq/docs/Development.md b/fairmq/docs/Development.md index cedbd848..d19e1a2d 100644 --- a/fairmq/docs/Development.md +++ b/fairmq/docs/Development.md @@ -31,4 +31,11 @@ Possible further implementation would be to run the monitor with `--self-destruc The FairMQShmMonitor class can also be used independently from the supplied executable (built from `runFairMQShmMonitor.cxx`), allowing integration on any level. For example invoking the monitor could be a functionality that a device offers. +FairMQ Shared Memory currently uses following names to register shared memory on the system: + +`fairmq_shmem_main` - main segment name, used for user data (this name can be overridden via `--shm-segment-name`). +`fairmq_shmem_management` - management segment name, used for storing management data. +`fairmq_shmem_control_queue` - message queue for communicating between shm transport and shm monitor (exists independent of above segments). +`fairmq_shmem_mutex` - boost::interprocess::named_mutex for management purposes (exists independent of above segments). + ← [Back](../README.md) diff --git a/fairmq/nanomsg/FairMQMessageNN.cxx b/fairmq/nanomsg/FairMQMessageNN.cxx index 01553a99..a8066bb0 100644 --- a/fairmq/nanomsg/FairMQMessageNN.cxx +++ b/fairmq/nanomsg/FairMQMessageNN.cxx @@ -29,6 +29,7 @@ FairMQMessageNN::FairMQMessageNN() : fMessage(nullptr) , fSize(0) , fReceiving(false) + , fRegion(false) { fMessage = nn_allocmsg(0, 0); if (!fMessage) @@ -41,6 +42,7 @@ FairMQMessageNN::FairMQMessageNN(const size_t size) : fMessage(nullptr) , fSize(0) , fReceiving(false) + , fRegion(false) { fMessage = nn_allocmsg(size, 0); if (!fMessage) @@ -60,6 +62,7 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* : fMessage(nullptr) , fSize(0) , fReceiving(false) + , fRegion(false) { fMessage = nn_allocmsg(size, 0); if (!fMessage) @@ -81,6 +84,15 @@ FairMQMessageNN::FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* } } +FairMQMessageNN::FairMQMessageNN(FairMQRegionPtr& region, void* data, const size_t size) + : fMessage(data) + , fSize(size) + , fReceiving(false) + , fRegion(true) +{ + // currently nanomsg will copy the buffer (data) inside nn_sendmsg() +} + void FairMQMessageNN::Rebuild() { Clear(); diff --git a/fairmq/nanomsg/FairMQMessageNN.h b/fairmq/nanomsg/FairMQMessageNN.h index 3b507f4b..71109801 100644 --- a/fairmq/nanomsg/FairMQMessageNN.h +++ b/fairmq/nanomsg/FairMQMessageNN.h @@ -19,6 +19,7 @@ #include #include "FairMQMessage.h" +#include "FairMQRegion.h" class FairMQMessageNN : public FairMQMessage { @@ -26,6 +27,8 @@ class FairMQMessageNN : public FairMQMessage FairMQMessageNN(); FairMQMessageNN(const size_t size); FairMQMessageNN(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + FairMQMessageNN(FairMQRegionPtr& region, void* data, const size_t size); + FairMQMessageNN(const FairMQMessageNN&) = delete; FairMQMessageNN operator=(const FairMQMessageNN&) = delete; @@ -53,6 +56,7 @@ class FairMQMessageNN : public FairMQMessage void* fMessage; size_t fSize; bool fReceiving; + bool fRegion; static std::string fDeviceID; static FairMQ::Transport fTransportType; diff --git a/fairmq/nanomsg/FairMQRegionNN.cxx b/fairmq/nanomsg/FairMQRegionNN.cxx new file mode 100644 index 00000000..4e9338c8 --- /dev/null +++ b/fairmq/nanomsg/FairMQRegionNN.cxx @@ -0,0 +1,34 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "FairMQRegionNN.h" +#include "FairMQLogger.h" + +using namespace std; + +FairMQRegionNN::FairMQRegionNN(const size_t size) + : fBuffer(malloc(size)) + , fSize(size) +{ +} + +void* FairMQRegionNN::GetData() const +{ + return fBuffer; +} + +size_t FairMQRegionNN::GetSize() const +{ + return fSize; +} + +FairMQRegionNN::~FairMQRegionNN() +{ + LOG(DEBUG) << "destroying region"; + free(fBuffer); +} diff --git a/fairmq/nanomsg/FairMQRegionNN.h b/fairmq/nanomsg/FairMQRegionNN.h new file mode 100644 index 00000000..b027757e --- /dev/null +++ b/fairmq/nanomsg/FairMQRegionNN.h @@ -0,0 +1,35 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIRMQREGIONNN_H_ +#define FAIRMQREGIONNN_H_ + +#include "FairMQRegion.h" + +#include // size_t + +class FairMQRegionNN : public FairMQRegion +{ + friend class FairMQSocketNN; + + public: + FairMQRegionNN(const size_t size); + FairMQRegionNN(const FairMQRegionNN&) = delete; + FairMQRegionNN operator=(const FairMQRegionNN&) = delete; + + virtual void* GetData() const override; + virtual size_t GetSize() const override; + + virtual ~FairMQRegionNN(); + + private: + void* fBuffer; + size_t fSize; +}; + +#endif /* FAIRMQREGIONNN_H_ */ \ No newline at end of file diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index aa54f3fb..24054382 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -127,7 +127,14 @@ int FairMQSocketNN::Send(FairMQMessagePtr& msg, const int flags) while (true) { void* ptr = msg->GetMessage(); - nbytes = nn_send(fSocket, &ptr, NN_MSG, flags); + if (static_cast(msg.get())->fRegion == false) + { + nbytes = nn_send(fSocket, &ptr, NN_MSG, flags); + } + else + { + nbytes = nn_send(fSocket, ptr, msg->GetSize(), flags); + } if (nbytes >= 0) { fBytesTx += nbytes; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 51b4e117..01ae7192 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -35,6 +35,11 @@ FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(void* data, const size_ return unique_ptr(new FairMQMessageNN(data, size, ffn, hint)); } +FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(FairMQRegionPtr& region, void* data, const size_t size) const +{ + return unique_ptr(new FairMQMessageNN(region, data, size)); +} + FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) const { return unique_ptr(new FairMQSocketNN(type, name, GetId())); @@ -60,6 +65,11 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSo return unique_ptr(new FairMQPollerNN(cmdSocket, dataSocket)); } +FairMQRegionPtr FairMQTransportFactoryNN::CreateRegion(const size_t size) const +{ + return unique_ptr(new FairMQRegionNN(size)); +} + FairMQ::Transport FairMQTransportFactoryNN::GetType() const { return fTransportType; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index a47d2b24..c2e5c908 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -13,6 +13,7 @@ #include "FairMQMessageNN.h" #include "FairMQSocketNN.h" #include "FairMQPollerNN.h" +#include "FairMQRegionNN.h" #include #include @@ -27,6 +28,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory FairMQMessagePtr CreateMessage() const override; FairMQMessagePtr CreateMessage(const size_t size) const override; FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; + FairMQMessagePtr CreateMessage(FairMQRegionPtr& region, void* data, const size_t size) const override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; @@ -35,6 +37,8 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; + FairMQRegionPtr CreateRegion(const size_t size) const override; + FairMQ::Transport GetType() const override; private: diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index ea27884f..c2cb506f 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -9,11 +9,15 @@ #include #include "FairMQMessageSHM.h" +#include "FairMQRegionSHM.h" #include "FairMQLogger.h" +#include "FairMQShmCommon.h" using namespace std; using namespace fair::mq::shmem; +namespace bipc = boost::interprocess; + // uint64_t FairMQMessageSHM::fMessageID = 0; // string FairMQMessageSHM::fDeviceID = string(); atomic FairMQMessageSHM::fInterrupted(false); @@ -25,9 +29,11 @@ FairMQMessageSHM::FairMQMessageSHM() // , fReceiving(false) , fQueued(false) , fMetaCreated(false) + , fRegionId(0) , fHandle() - , fChunkSize(0) + , fSize(0) , fLocalPtr(nullptr) + , fRemoteRegion(nullptr) { if (zmq_msg_init(&fMessage) != 0) { @@ -47,9 +53,11 @@ FairMQMessageSHM::FairMQMessageSHM(const size_t size) // , fReceiving(false) , fQueued(false) , fMetaCreated(false) + , fRegionId(0) , fHandle() - , fChunkSize(0) + , fSize(0) , fLocalPtr(nullptr) + , fRemoteRegion(nullptr) { InitializeChunk(size); } @@ -60,9 +68,11 @@ FairMQMessageSHM::FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn // , fReceiving(false) , fQueued(false) , fMetaCreated(false) + , fRegionId(0) , fHandle() - , fChunkSize(0) + , fSize(0) , fLocalPtr(nullptr) + , fRemoteRegion(nullptr) { if (InitializeChunk(size)) { @@ -78,6 +88,35 @@ FairMQMessageSHM::FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn } } +FairMQMessageSHM::FairMQMessageSHM(FairMQRegionPtr& region, void* data, const size_t size) + : fMessage() + // , fOwner(nullptr) + // , fReceiving(false) + , fQueued(false) + , fMetaCreated(false) + , fRegionId(static_cast(region.get())->fRegionId) + , fHandle() + , fSize(size) + , fLocalPtr(data) + , fRemoteRegion(nullptr) +{ + fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast(data) - reinterpret_cast(region->GetData())); + + if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) + { + LOG(ERROR) << "failed initializing meta message, reason: " << zmq_strerror(errno); + } + else + { + MetaHeader* metaPtr = new(zmq_msg_data(&fMessage)) MetaHeader(); + metaPtr->fSize = size; + metaPtr->fHandle = fHandle; + metaPtr->fRegionId = fRegionId; + + fMetaCreated = true; + } +} + bool FairMQMessageSHM::InitializeChunk(const size_t size) { // string chunkID = fDeviceID + "c" + to_string(fMessageID); @@ -109,7 +148,7 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size) fHandle = Manager::Instance().Segment()->get_handle_from_address(fLocalPtr); } - fChunkSize = size; + fSize = size; if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) { @@ -119,6 +158,7 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size) MetaHeader* metaPtr = new(zmq_msg_data(&fMessage)) MetaHeader(); metaPtr->fSize = size; metaPtr->fHandle = fHandle; + metaPtr->fRegionId = fRegionId; // if (zmq_msg_init_data(&fMessage, const_cast(ownerID->c_str()), ownerID->length(), StringDeleter, ownerID) != 0) // { @@ -187,14 +227,21 @@ void* FairMQMessageSHM::GetData() { return fLocalPtr; } - else if (fHandle) - { - return Manager::Instance().Segment()->get_address_from_handle(fHandle); - } else { - // LOG(ERROR) << "Trying to get data of an empty shared memory message"; - return nullptr; + if (fRegionId == 0) + { + return Manager::Instance().Segment()->get_address_from_handle(fHandle); + } + else + { + if (!fRemoteRegion) + { + fRemoteRegion = FairMQRegionPtr(new FairMQRegionSHM(fRegionId, true)); + } + fLocalPtr = reinterpret_cast(fRemoteRegion->GetData()) + fHandle; + return fLocalPtr; + } } // if (fOwner) @@ -210,7 +257,7 @@ void* FairMQMessageSHM::GetData() size_t FairMQMessageSHM::GetSize() { - return fChunkSize; + return fSize; // if (fOwner) // { // return fOwner->fPtr->GetSize(); @@ -324,7 +371,7 @@ void FairMQMessageSHM::CloseMessage() // } // else // { - if (fHandle && !fQueued) + if (fHandle && !fQueued && fRegionId == 0) { // LOG(WARN) << "Destroying unsent message"; // Manager::Instance().Segment()->destroy_ptr(fHandle); diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h index 49b0da8c..df094e5e 100644 --- a/fairmq/shmem/FairMQMessageSHM.h +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -8,13 +8,17 @@ #ifndef FAIRMQMESSAGESHM_H_ #define FAIRMQMESSAGESHM_H_ -#include +#include // size_t #include #include #include +#include +#include + #include "FairMQMessage.h" +#include "FairMQRegion.h" #include "FairMQShmManager.h" class FairMQMessageSHM : public FairMQMessage @@ -25,6 +29,8 @@ class FairMQMessageSHM : public FairMQMessage FairMQMessageSHM(); FairMQMessageSHM(const size_t size); FairMQMessageSHM(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + FairMQMessageSHM(FairMQRegionPtr& region, void* data, const size_t size); + FairMQMessageSHM(const FairMQMessageSHM&) = delete; FairMQMessageSHM operator=(const FairMQMessageSHM&) = delete; @@ -62,9 +68,11 @@ class FairMQMessageSHM : public FairMQMessage bool fMetaCreated; static std::atomic fInterrupted; static FairMQ::Transport fTransportType; + uint64_t fRegionId; bipc::managed_shared_memory::handle_t fHandle; - size_t fChunkSize; + size_t fSize; void* fLocalPtr; + FairMQRegionPtr fRemoteRegion; }; #endif /* FAIRMQMESSAGESHM_H_ */ diff --git a/fairmq/shmem/FairMQRegionSHM.cxx b/fairmq/shmem/FairMQRegionSHM.cxx new file mode 100644 index 00000000..3fd5f6c0 --- /dev/null +++ b/fairmq/shmem/FairMQRegionSHM.cxx @@ -0,0 +1,97 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "FairMQRegionSHM.h" +#include "FairMQShmManager.h" +#include "FairMQShmCommon.h" + +using namespace std; +using namespace fair::mq::shmem; + +namespace bipc = boost::interprocess; + +atomic FairMQRegionSHM::fInterrupted(false); + +FairMQRegionSHM::FairMQRegionSHM(const size_t size) + : fShmemObject() + , fRegion() + , fRegionId(0) + , fRegionIdStr() + , fRemote(false) +{ + try + { + RegionCounter* rc = Manager::Instance().ManagementSegment().find(bipc::unique_instance).first; + if (rc) + { + LOG(DEBUG) << "shmem: region counter found, with value of " << rc->fCount << ". incrementing."; + (rc->fCount)++; + LOG(DEBUG) << "shmem: incremented region counter, now: " << rc->fCount; + } + else + { + LOG(DEBUG) << "shmem: no region counter found, creating one and initializing with 1"; + rc = Manager::Instance().ManagementSegment().construct(bipc::unique_instance)(1); + LOG(DEBUG) << "shmem: initialized region counter with: " << rc->fCount; + } + + fRegionId = rc->fCount; + fRegionIdStr = "fairmq_shmem_region_" + std::to_string(fRegionId); + + fShmemObject = unique_ptr(new bipc::shared_memory_object(bipc::create_only, fRegionIdStr.c_str(), bipc::read_write)); + fShmemObject->truncate(size); + fRegion = unique_ptr(new bipc::mapped_region(*fShmemObject, bipc::read_write)); // TODO: add HUGEPAGES flag here + } + catch (bipc::interprocess_exception& e) + { + LOG(ERROR) << "shmem: cannot create region. Already created/not cleaned up?"; + LOG(ERROR) << e.what(); + exit(EXIT_FAILURE); + } +} + +FairMQRegionSHM::FairMQRegionSHM(const uint64_t id, bool remote) + : fShmemObject() + , fRegion() + , fRegionId(id) + , fRegionIdStr() + , fRemote(remote) +{ + try + { + fRegionIdStr = "fairmq_shmem_region_" + std::to_string(fRegionId); + + fShmemObject = unique_ptr(new bipc::shared_memory_object(bipc::open_only, fRegionIdStr.c_str(), bipc::read_write)); + fRegion = unique_ptr(new bipc::mapped_region(*fShmemObject, bipc::read_write)); // TODO: add HUGEPAGES flag here + } + catch (bipc::interprocess_exception& e) + { + LOG(ERROR) << "shmem: cannot open region. Already closed?"; + LOG(ERROR) << e.what(); + exit(EXIT_FAILURE); + } +} + +void* FairMQRegionSHM::GetData() const +{ + return fRegion->get_address(); +} + +size_t FairMQRegionSHM::GetSize() const +{ + return fRegion->get_size(); +} + +FairMQRegionSHM::~FairMQRegionSHM() +{ + if (!fRemote) + { + LOG(DEBUG) << "destroying region"; + bipc::shared_memory_object::remove(fRegionIdStr.c_str()); + } +} diff --git a/fairmq/shmem/FairMQRegionSHM.h b/fairmq/shmem/FairMQRegionSHM.h new file mode 100644 index 00000000..28ad1ff2 --- /dev/null +++ b/fairmq/shmem/FairMQRegionSHM.h @@ -0,0 +1,46 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIRMQREGIONSHM_H_ +#define FAIRMQREGIONSHM_H_ + +#include "FairMQRegion.h" + +#include +#include + +#include // size_t +#include +#include +#include + +class FairMQRegionSHM : public FairMQRegion +{ + friend class FairMQSocketSHM; + friend class FairMQMessageSHM; + + public: + FairMQRegionSHM(const size_t size); + + virtual void* GetData() const override; + virtual size_t GetSize() const override; + + virtual ~FairMQRegionSHM(); + + private: + FairMQRegionSHM(const uint64_t id, bool remote); + + static std::atomic fInterrupted; + std::unique_ptr fShmemObject; + std::unique_ptr fRegion; + uint64_t fRegionId; + std::string fRegionIdStr; + bool fRemote; +}; + +#endif /* FAIRMQREGIONSHM_H_ */ \ No newline at end of file diff --git a/fairmq/shmem/FairMQShmCommon.h b/fairmq/shmem/FairMQShmCommon.h new file mode 100644 index 00000000..c43d6a19 --- /dev/null +++ b/fairmq/shmem/FairMQShmCommon.h @@ -0,0 +1,60 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIR_MQ_SHMEM_COMMON_H_ +#define FAIR_MQ_SHMEM_COMMON_H_ + +#include + +#include + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +struct DeviceCounter +{ + DeviceCounter(unsigned int c) + : fCount(c) + {} + + std::atomic fCount; +}; + +struct RegionCounter +{ + RegionCounter(unsigned int c) + : fCount(c) + {} + + std::atomic fCount; +}; + +struct MonitorStatus +{ + MonitorStatus() + : fActive(true) + {} + + bool fActive; +}; + +struct alignas(32) MetaHeader +{ + uint64_t fSize; + uint64_t fRegionId; + boost::interprocess::managed_shared_memory::handle_t fHandle; +}; + +} // namespace shmem +} // namespace mq +} // namespace fair + +#endif /* FAIR_MQ_SHMEM_COMMON_H_ */ diff --git a/fairmq/shmem/FairMQShmManager.h b/fairmq/shmem/FairMQShmManager.h index 84900183..4ca13ebc 100644 --- a/fairmq/shmem/FairMQShmManager.h +++ b/fairmq/shmem/FairMQShmManager.h @@ -115,18 +115,42 @@ class Manager } } + void Remove() + { + if (bipc::shared_memory_object::remove("fairmq_shmem_main")) + { + LOG(DEBUG) << "shmem: successfully removed \"fairmq_shmem_main\" segment after the device has stopped."; + } + else + { + LOG(DEBUG) << "shmem: did not remove \"fairmq_shmem_main\" segment after the device stopped. Already removed?"; + } + + if (bipc::shared_memory_object::remove("fairmq_shmem_management")) + { + LOG(DEBUG) << "shmem: successfully removed \"fairmq_shmem_management\" segment after the device has stopped."; + } + else + { + LOG(DEBUG) << "shmem: did not remove \"fairmq_shmem_management\" segment after the device stopped. Already removed?"; + } + } + + bipc::managed_shared_memory& ManagementSegment() + { + return fManagementSegment; + } + private: Manager() : fSegment(nullptr) + , fManagementSegment(bipc::open_or_create, "fairmq_shmem_management", 65536) {} + Manager(const Manager&) = delete; + Manager operator=(const Manager&) = delete; bipc::managed_shared_memory* fSegment; -}; - -struct alignas(16) MetaHeader -{ - uint64_t fSize; - bipc::managed_shared_memory::handle_t fHandle; + bipc::managed_shared_memory fManagementSegment; }; // class Chunk diff --git a/fairmq/shmem/FairMQShmMonitor.cxx b/fairmq/shmem/FairMQShmMonitor.cxx index f12412f6..9ae12045 100644 --- a/fairmq/shmem/FairMQShmMonitor.cxx +++ b/fairmq/shmem/FairMQShmMonitor.cxx @@ -7,16 +7,18 @@ ********************************************************************************/ #include "FairMQShmMonitor.h" -#include "FairMQShmDeviceCounter.h" +#include "FairMQShmCommon.h" #include #include #include #include +#include #include #include +#include #include #include @@ -32,6 +34,11 @@ using String = bipc::basic_string, CharAll using StringAllocator = bipc::allocator; using StringVector = bipc::vector; +namespace +{ + volatile std::sig_atomic_t gSignalStatus; +} + namespace fair { namespace mq @@ -39,6 +46,11 @@ namespace mq namespace shmem { +void signalHandler(int signal) +{ + gSignalStatus = signal; +} + Monitor::Monitor(const string& segmentName, bool selfDestruct, bool interactive, unsigned int timeoutInMS) : fSelfDestruct(selfDestruct) , fInteractive(interactive) @@ -48,15 +60,43 @@ Monitor::Monitor(const string& segmentName, bool selfDestruct, bool interactive, , fTerminating(false) , fHeartbeatTriggered(false) , fLastHeartbeat() - , fHeartbeatThread() + , fSignalThread() + , fManagementSegment(bipc::open_or_create, "fairmq_shmem_management", 65536) { - if (bipc::message_queue::remove("fairmq_shmem_control_queue")) + MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; + if (monitorStatus != nullptr) { - // cout << "successfully removed control queue" << endl; + cout << "shmmonitor already started or not properly exited. Try `shmmonitor --cleanup`" << endl; + exit(EXIT_FAILURE); } - else + fManagementSegment.construct(bipc::unique_instance)(); + + CleanupControlQueues(); +} + +void Monitor::CatchSignals() +{ + signal(SIGINT, signalHandler); + signal(SIGTERM, signalHandler); + fSignalThread = thread(&Monitor::SignalMonitor, this); +} + +void Monitor::SignalMonitor() +{ + while (true) { - // cout << "could not remove control queue" << endl; + if (gSignalStatus != 0) + { + fTerminating = true; + cout << "signal: " << gSignalStatus << endl; + break; + } + else if (fTerminating) + { + break; + } + + this_thread::sleep_for(chrono::milliseconds(100)); } } @@ -109,22 +149,15 @@ void Monitor::MonitorHeartbeats() cout << ie.what() << endl; } - if (bipc::message_queue::remove("fairmq_shmem_control_queue")) - { - // cout << "successfully removed control queue" << endl; - } - else - { - cout << "could not remove control queue" << endl; - } + CleanupControlQueues(); } void Monitor::Interactive() { - char input; - pollfd inputFd[1]; - inputFd[0].fd = fileno(stdin); - inputFd[0].events = POLLIN; + char c; + pollfd cinfd[1]; + cinfd[0].fd = fileno(stdin); + cinfd[0].events = POLLIN; struct termios t; tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure @@ -138,11 +171,16 @@ void Monitor::Interactive() while (!fTerminating) { - if (poll(inputFd, 1, 100)) + if (poll(cinfd, 1, 100)) { - input = getchar(); + if (fTerminating || gSignalStatus != 0) + { + break; + } - switch (input) + c = getchar(); + + switch (c) { case 'q': cout << "[q] --> quitting." << endl; @@ -165,7 +203,7 @@ void Monitor::Interactive() cout << "[\\n] --> invalid input." << endl; break; default: - cout << "[" << input << "] --> invalid input." << endl; + cout << "[" << c << "] --> invalid input." << endl; break; } @@ -177,6 +215,11 @@ void Monitor::Interactive() PrintHeader(); } + if (fTerminating) + { + break; + } + CheckSegment(); if (!fTerminating) @@ -228,10 +271,10 @@ void Monitor::CheckSegment() unsigned int numDevices = 0; - pair result = segment.find(bipc::unique_instance); - if (result.first != nullptr) + fair::mq::shmem::DeviceCounter* dc = segment.find(bipc::unique_instance).first; + if (dc) { - numDevices = result.first->count; + numDevices = dc->fCount; } auto now = chrono::high_resolution_clock::now(); @@ -294,13 +337,57 @@ void Monitor::CheckSegment() void Monitor::Cleanup(const string& segmentName) { - if (bipc::shared_memory_object::remove(segmentName.c_str())) + try { - cout << "Successfully removed shared memory \"" << segmentName.c_str() << "\"." << endl; + bipc::managed_shared_memory managementSegment(bipc::open_only, "fairmq_shmem_management"); + RegionCounter* rc = managementSegment.find(bipc::unique_instance).first; + if (rc) + { + cout << "Region counter found: " << rc->fCount << endl; + unsigned int regionCount = rc->fCount; + for (int i = 1; i <= regionCount; ++i) + { + RemoveObject("fairmq_shmem_region_" + to_string(regionCount)); + } + } + else + { + cout << "shmem: no region counter found. no regions to cleanup." << endl; + } + + RemoveObject("fairmq_shmem_management"); + } + catch (bipc::interprocess_exception& ie) + { + cout << "Did not find \"fairmq_shmem_management\" shared memory segment. No regions to cleanup." << endl; + } + + RemoveObject(segmentName); + + boost::interprocess::named_mutex::remove("fairmq_shmem_mutex"); +} + +void Monitor::RemoveObject(const std::string& name) +{ + if (bipc::shared_memory_object::remove(name.c_str())) + { + cout << "Successfully removed \"" << name << "\" shared memory segment." << endl; } else { - cout << "Did not remove shared memory. Already removed?" << endl; + cout << "Did not remove \"" << name << "\" shared memory segment. Already removed?" << endl; + } +} + +void Monitor::CleanupControlQueues() +{ + if (bipc::message_queue::remove("fairmq_shmem_control_queue")) + { + // cout << "successfully removed control queue" << endl; + } + else + { + // cout << "could not remove control queue" << endl; } } @@ -311,19 +398,19 @@ void Monitor::PrintQueues() try { bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); - pair queues = segment.find("fairmq_shmem_queues"); - if (queues.first != nullptr) + StringVector* queues = segment.find("fairmq_shmem_queues").first; + if (queues) { - cout << "found " << queues.first->size() << " queue(s):" << endl; + cout << "found " << queues->size() << " queue(s):" << endl; - for (int i = 0; i < queues.first->size(); ++i) + for (int i = 0; i < queues->size(); ++i) { - string name(queues.first->at(i).c_str()); + string name(queues->at(i).c_str()); cout << '\t' << name << " : "; - pair*, size_t> queueSize = segment.find>(name.c_str()); - if (queueSize.first != nullptr) + atomic* queueSize = segment.find>(name.c_str()).first; + if (queueSize) { - cout << *(queueSize.first) << " messages" << endl; + cout << *queueSize << " messages" << endl; } else { @@ -368,6 +455,15 @@ void Monitor::PrintHelp() cout << "controls: [x] close memory, [p] print queues, [h] help, [q] quit." << endl; } +Monitor::~Monitor() +{ + fManagementSegment.destroy(bipc::unique_instance); + if (fSignalThread.joinable()) + { + fSignalThread.join(); + } +} + } // namespace shmem } // namespace mq } // namespace fair diff --git a/fairmq/shmem/FairMQShmMonitor.h b/fairmq/shmem/FairMQShmMonitor.h index de5c273e..dd20f385 100644 --- a/fairmq/shmem/FairMQShmMonitor.h +++ b/fairmq/shmem/FairMQShmMonitor.h @@ -5,8 +5,10 @@ * GNU Lesser General Public Licence version 3 (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#ifndef FAIRMQSHMMONITOR_H_ -#define FAIRMQSHMMONITOR_H_ +#ifndef FAIR_MQ_SHMEM_MONITOR_H_ +#define FAIR_MQ_SHMEM_MONITOR_H_ + +#include #include #include @@ -28,11 +30,13 @@ class Monitor Monitor(const Monitor&) = delete; Monitor operator=(const Monitor&) = delete; + void CatchSignals(); void Run(); - virtual ~Monitor() {} + virtual ~Monitor(); static void Cleanup(const std::string& segmentName); + static void CleanupControlQueues(); private: void PrintHeader(); @@ -41,6 +45,8 @@ class Monitor void MonitorHeartbeats(); void CheckSegment(); void Interactive(); + void SignalMonitor(); + static void RemoveObject(const std::string&); bool fSelfDestruct; // will self-destruct after the memory has been closed bool fInteractive; // running in interactive mode @@ -50,11 +56,12 @@ class Monitor std::atomic fTerminating; std::atomic fHeartbeatTriggered; std::chrono::high_resolution_clock::time_point fLastHeartbeat; - std::thread fHeartbeatThread; + std::thread fSignalThread; + boost::interprocess::managed_shared_memory fManagementSegment; }; } // namespace shmem } // namespace mq } // namespace fair -#endif /* FAIRMQSHMMONITOR_H_ */ \ No newline at end of file +#endif /* FAIR_MQ_SHMEM_MONITOR_H_ */ \ No newline at end of file diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index a1bcc605..da2c3aef 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -11,7 +11,9 @@ #include "FairMQSocketSHM.h" #include "FairMQMessageSHM.h" +#include "FairMQRegionSHM.h" #include "FairMQLogger.h" +#include "FairMQShmCommon.h" using namespace std; using namespace fair::mq::shmem; @@ -173,24 +175,22 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int flags) // ShPtrOwner* owner = Manager::Instance().Segment()->find(ownerID.c_str()).first; MetaHeader* hdr = static_cast(zmq_msg_data(msgPtr)); size_t size = 0; - if (hdr->fHandle) - { - static_cast(msg.get())->fHandle = hdr->fHandle; - static_cast(msg.get())->fChunkSize = hdr->fSize; - // static_cast(msg.get())->fOwner = owner; - // static_cast(msg.get())->fReceiving = true; - size = msg->GetSize(); + static_cast(msg.get())->fHandle = hdr->fHandle; + static_cast(msg.get())->fSize = hdr->fSize; + static_cast(msg.get())->fRegionId = hdr->fRegionId; + // static_cast(msg.get())->fOwner = owner; + // static_cast(msg.get())->fReceiving = true; + size = msg->GetSize(); - fBytesRx += size; - ++fMessagesRx; + fBytesRx += size; + ++fMessagesRx; - return size; - } - else - { - LOG(ERROR) << "Received meta data, but could not find corresponding chunk"; - return -1; - } + return size; + // else + // { + // LOG(ERROR) << "Received meta data, but could not find corresponding chunk"; + // return -1; + // } } else if (zmq_errno() == EAGAIN) { @@ -238,7 +238,6 @@ int64_t FairMQSocketSHM::Send(vector& msgVec, const int flags) { static_cast(msgVec[i].get())->fQueued = true; // static_cast(msgVec[i].get())->fReceiving = false; - // static_cast(msgVec[i].get())->fQueued = true; size_t size = msgVec[i]->GetSize(); totalSize += size; @@ -327,23 +326,21 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int fla // ShPtrOwner* owner = Manager::Instance().Segment()->find(ownerID.c_str()).first; MetaHeader* hdr = static_cast(zmq_msg_data(msgPtr)); size_t size = 0; - if (hdr->fHandle) - { - static_cast(part.get())->fHandle = hdr->fHandle; - static_cast(part.get())->fChunkSize = hdr->fSize; - // static_cast(msg.get())->fOwner = owner; - // static_cast(msg.get())->fReceiving = true; - size = part->GetSize(); + static_cast(part.get())->fHandle = hdr->fHandle; + static_cast(part.get())->fSize = hdr->fSize; + static_cast(part.get())->fRegionId = hdr->fRegionId; + // static_cast(part.get())->fOwner = owner; + // static_cast(part.get())->fReceiving = true; + size = part->GetSize(); - msgVec.push_back(move(part)); + msgVec.push_back(move(part)); - totalSize += size; - } - else - { - LOG(ERROR) << "Received meta data, but could not find corresponding chunk"; - return -1; - } + totalSize += size; + // else + // { + // LOG(ERROR) << "Received meta data, but could not find corresponding chunk"; + // return -1; + // } } else if (zmq_errno() == EAGAIN) { @@ -399,12 +396,14 @@ void FairMQSocketSHM::Close() void FairMQSocketSHM::Interrupt() { FairMQMessageSHM::fInterrupted = true; + FairMQRegionSHM::fInterrupted = true; fInterrupted = true; } void FairMQSocketSHM::Resume() { FairMQMessageSHM::fInterrupted = false; + FairMQRegionSHM::fInterrupted = true; fInterrupted = false; } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index b8461cff..53072de1 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -13,6 +13,7 @@ #include #include +#include #include #include @@ -21,10 +22,13 @@ #include #include +#include +#include // std::system using namespace std; using namespace fair::mq::shmem; namespace bipc = boost::interprocess; +namespace bfs = boost::filesystem; namespace bpt = boost::posix_time; FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM; @@ -75,30 +79,84 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno); } - fSendHeartbeats = true; - fHeartbeatThread = thread(&FairMQTransportFactorySHM::SendHeartbeats, this); - Manager::Instance().InitializeSegment("open_or_create", segmentName, segmentSize); LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << Manager::Instance().Segment()->get_free_memory() << " bytes."; - { // mutex scope + { bipc::scoped_lock lock(fShMutex); - pair result = Manager::Instance().Segment()->find(bipc::unique_instance); - if (result.first != nullptr) + fDeviceCounter = Manager::Instance().Segment()->find(bipc::unique_instance).first; + if (fDeviceCounter) { - fDeviceCounter = result.first; - LOG(DEBUG) << "shmem: device counter found, with value of " << fDeviceCounter->count << ". incrementing."; - (fDeviceCounter->count)++; - LOG(DEBUG) << "shmem: incremented device counter, now: " << fDeviceCounter->count; + LOG(DEBUG) << "shmem: device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; + (fDeviceCounter->fCount)++; + LOG(DEBUG) << "shmem: incremented device counter, now: " << fDeviceCounter->fCount; } else { LOG(DEBUG) << "shmem: no device counter found, creating one and initializing with 1"; fDeviceCounter = Manager::Instance().Segment()->construct(bipc::unique_instance)(1); - LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->count; + LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->fCount; + } + + // start shm monitor + // try + // { + // MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; + // if (monitorStatus == nullptr) + // { + // LOG(DEBUG) << "shmem: no shmmonitor found, starting..."; + // StartMonitor(); + // } + // else + // { + // LOG(DEBUG) << "shmem: found shmmonitor in fairmq_shmem_management."; + // } + // } + // catch (std::exception& e) + // { + // LOG(ERROR) << "shmem: Exception during shmmonitor initialization: " << e.what() << ", application will now exit"; + // exit(EXIT_FAILURE); + // } + } + + fSendHeartbeats = true; + fHeartbeatThread = thread(&FairMQTransportFactorySHM::SendHeartbeats, this); +} + +void FairMQTransportFactorySHM::StartMonitor() +{ + int numTries = 0; + + if (!bfs::exists(bfs::path("shmmonitor"))) + { + LOG(ERROR) << "Could not find shmmonitor. Is it in the PATH? Monitor not started"; + return; + } + + // TODO: replace with Boost.Process once boost 1.64 is available + int r = system("shmmonitor --self-destruct &"); + LOG(DEBUG) << r; + + do + { + MonitorStatus* monitorStatus = Manager::Instance().ManagementSegment().find(bipc::unique_instance).first; + if (monitorStatus) + { + LOG(DEBUG) << "shmem: shmmonitor started"; + break; + } + else + { + this_thread::sleep_for(std::chrono::milliseconds(10)); + if (++numTries > 100) + { + LOG(ERROR) << "Did not get response from shmmonitor after " << 10 * 100 << " milliseconds. Exiting."; + exit(EXIT_FAILURE); + } } } + while (true); } void FairMQTransportFactorySHM::SendHeartbeats() @@ -142,6 +200,11 @@ FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size return unique_ptr(new FairMQMessageSHM(data, size, ffn, hint)); } +FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(FairMQRegionPtr& region, void* data, const size_t size) const +{ + return unique_ptr(new FairMQMessageSHM(region, data, size)); +} + FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name) const { assert(fContext); @@ -168,6 +231,11 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const FairMQSocket& cmdS return unique_ptr(new FairMQPollerSHM(cmdSocket, dataSocket)); } +FairMQRegionPtr FairMQTransportFactorySHM::CreateRegion(const size_t size) const +{ + return unique_ptr(new FairMQRegionSHM(size)); +} + FairMQTransportFactorySHM::~FairMQTransportFactorySHM() { fSendHeartbeats = false; @@ -196,24 +264,17 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM() { // mutex scope bipc::scoped_lock lock(fShMutex); - (fDeviceCounter->count)--; + (fDeviceCounter->fCount)--; - if (fDeviceCounter->count == 0) + if (fDeviceCounter->fCount == 0) { LOG(DEBUG) << "shmem: last 'fairmq_shmem_main' user, removing segment."; - if (bipc::shared_memory_object::remove("fairmq_shmem_main")) - { - LOG(DEBUG) << "shmem: successfully removed shared memory segment after the device has stopped."; - } - else - { - LOG(DEBUG) << "shmem: did not remove shared memory segment after the device stopped. Already removed?"; - } + Manager::Instance().Remove(); } else { - LOG(DEBUG) << "shmem: other 'fairmq_shmem_main' users present (" << fDeviceCounter->count << "), not removing it."; + LOG(DEBUG) << "shmem: other 'fairmq_shmem_main' users present (" << fDeviceCounter->fCount << "), not removing it."; } } } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 1a69490d..eb8e7c45 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -13,8 +13,9 @@ #include "FairMQMessageSHM.h" #include "FairMQSocketSHM.h" #include "FairMQPollerSHM.h" -#include "FairMQShmDeviceCounter.h" +#include "FairMQShmCommon.h" #include +#include "FairMQRegionSHM.h" #include #include @@ -23,15 +24,17 @@ #include - class FairMQTransportFactorySHM : public FairMQTransportFactory { public: FairMQTransportFactorySHM(const std::string& id = "", const FairMQProgOptions* config = nullptr); + FairMQTransportFactorySHM(const FairMQTransportFactorySHM&) = delete; + FairMQTransportFactorySHM operator=(const FairMQTransportFactorySHM&) = delete; FairMQMessagePtr CreateMessage() const override; FairMQMessagePtr CreateMessage(const size_t size) const override; FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; + FairMQMessagePtr CreateMessage(FairMQRegionPtr& region, void* data, const size_t size) const override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; @@ -40,13 +43,16 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; - FairMQ::Transport GetType() const override; + FairMQRegionPtr CreateRegion(const size_t size) const override; - void SendHeartbeats(); + FairMQ::Transport GetType() const override; ~FairMQTransportFactorySHM() override; private: + void SendHeartbeats(); + void StartMonitor(); + static FairMQ::Transport fTransportType; void* fContext; void* fHeartbeatSocket; diff --git a/fairmq/shmem/README.md b/fairmq/shmem/README.md index b3637f10..747e316f 100644 --- a/fairmq/shmem/README.md +++ b/fairmq/shmem/README.md @@ -6,5 +6,3 @@ The transport manages shared memory via boost::interprocess library. The transfe Under development: - Cleanup of the shared memory segment in case all devices crash. Currently at least one device has to stop properly for a cleanup. -- Implement more than one transport per device. -- Configuration of the shared memory size (currently hard-coded). diff --git a/fairmq/shmem/runFairMQShmMonitor.cxx b/fairmq/shmem/runFairMQShmMonitor.cxx index 9d341677..825bb746 100644 --- a/fairmq/shmem/runFairMQShmMonitor.cxx +++ b/fairmq/shmem/runFairMQShmMonitor.cxx @@ -49,6 +49,7 @@ int main(int argc, char** argv) { cout << "Cleaning up \"" << segmentName << "\"..." << endl; fair::mq::shmem::Monitor::Cleanup(segmentName); + fair::mq::shmem::Monitor::CleanupControlQueues(); return 0; } @@ -56,6 +57,7 @@ int main(int argc, char** argv) fair::mq::shmem::Monitor monitor{segmentName, selfDestruct, interactive, timeoutInMS}; + monitor.CatchSignals(); monitor.Run(); } catch (exception& e) diff --git a/fairmq/zeromq/FairMQMessageZMQ.cxx b/fairmq/zeromq/FairMQMessageZMQ.cxx index 9a2e1e62..3ac453fb 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.cxx +++ b/fairmq/zeromq/FairMQMessageZMQ.cxx @@ -50,6 +50,25 @@ FairMQMessageZMQ::FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn } } +FairMQMessageZMQ::FairMQMessageZMQ(FairMQRegionPtr& region, void* data, const size_t size) + : fMessage() +{ + // FIXME: make this zero-copy: + // simply taking over the provided buffer can casue premature delete, since region could be destroyed before the message is sent out. + // Needs lifetime extension for the ZMQ region. + if (zmq_msg_init_size(&fMessage, size) != 0) + { + LOG(ERROR) << "failed initializing message with size, reason: " << zmq_strerror(errno); + } + + memcpy(zmq_msg_data(&fMessage), data, size); + + // if (zmq_msg_init_data(&fMessage, data, size, [](void*, void*){}, nullptr) != 0) + // { + // LOG(ERROR) << "failed initializing message with data, reason: " << zmq_strerror(errno); + // } +} + void FairMQMessageZMQ::Rebuild() { CloseMessage(); diff --git a/fairmq/zeromq/FairMQMessageZMQ.h b/fairmq/zeromq/FairMQMessageZMQ.h index f725891e..0a6cccbb 100644 --- a/fairmq/zeromq/FairMQMessageZMQ.h +++ b/fairmq/zeromq/FairMQMessageZMQ.h @@ -21,6 +21,7 @@ #include #include "FairMQMessage.h" +#include "FairMQRegion.h" class FairMQMessageZMQ : public FairMQMessage { @@ -28,6 +29,7 @@ class FairMQMessageZMQ : public FairMQMessage FairMQMessageZMQ(); FairMQMessageZMQ(const size_t size); FairMQMessageZMQ(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); + FairMQMessageZMQ(FairMQRegionPtr& region, void* data, const size_t size); virtual void Rebuild(); virtual void Rebuild(const size_t size); diff --git a/fairmq/zeromq/FairMQRegionZMQ.cxx b/fairmq/zeromq/FairMQRegionZMQ.cxx new file mode 100644 index 00000000..36a87744 --- /dev/null +++ b/fairmq/zeromq/FairMQRegionZMQ.cxx @@ -0,0 +1,34 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "FairMQRegionZMQ.h" +#include "FairMQLogger.h" + +using namespace std; + +FairMQRegionZMQ::FairMQRegionZMQ(const size_t size) + : fBuffer(malloc(size)) + , fSize(size) +{ +} + +void* FairMQRegionZMQ::GetData() const +{ + return fBuffer; +} + +size_t FairMQRegionZMQ::GetSize() const +{ + return fSize; +} + +FairMQRegionZMQ::~FairMQRegionZMQ() +{ + LOG(DEBUG) << "destroying region"; + free(fBuffer); +} diff --git a/fairmq/zeromq/FairMQRegionZMQ.h b/fairmq/zeromq/FairMQRegionZMQ.h new file mode 100644 index 00000000..0a9c0469 --- /dev/null +++ b/fairmq/zeromq/FairMQRegionZMQ.h @@ -0,0 +1,35 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIRMQREGIONZMQ_H_ +#define FAIRMQREGIONZMQ_H_ + +#include "FairMQRegion.h" + +#include // size_t + +class FairMQRegionZMQ : public FairMQRegion +{ + friend class FairMQSocketSHM; + + public: + FairMQRegionZMQ(const size_t size); + FairMQRegionZMQ(const FairMQRegionZMQ&) = delete; + FairMQRegionZMQ operator=(const FairMQRegionZMQ&) = delete; + + virtual void* GetData() const override; + virtual size_t GetSize() const override; + + virtual ~FairMQRegionZMQ(); + + private: + void* fBuffer; + size_t fSize; +}; + +#endif /* FAIRMQREGIONZMQ_H_ */ \ No newline at end of file diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 224ccec4..330a7e0f 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -14,8 +14,8 @@ using namespace std; FairMQ::Transport FairMQTransportFactoryZMQ::fTransportType = FairMQ::Transport::ZMQ; FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const FairMQProgOptions* config) -: FairMQTransportFactory(id) -, fContext(zmq_ctx_new()) + : FairMQTransportFactory(id) + , fContext(zmq_ctx_new()) { int major, minor, patch; zmq_version(&major, &minor, &patch); @@ -64,6 +64,11 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(void* data, const size return unique_ptr(new FairMQMessageZMQ(data, size, ffn, hint)); } +FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(FairMQRegionPtr& region, void* data, const size_t size) const +{ + return unique_ptr(new FairMQMessageZMQ(region, data, size)); +} + FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) const { assert(fContext); @@ -90,6 +95,11 @@ FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const FairMQSocket& cmdS return unique_ptr(new FairMQPollerZMQ(cmdSocket, dataSocket)); } +FairMQRegionPtr FairMQTransportFactoryZMQ::CreateRegion(const size_t size) const +{ + return unique_ptr(new FairMQRegionZMQ(size)); +} + FairMQ::Transport FairMQTransportFactoryZMQ::GetType() const { return fTransportType; @@ -114,6 +124,6 @@ FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ() } else { - LOG(ERROR) << "shmem: Terminate(): context now available for shutdown"; + LOG(ERROR) << "Terminate(): context not available for shutdown"; } } diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 248e173a..ab63a07b 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -22,17 +22,22 @@ #include "FairMQMessageZMQ.h" #include "FairMQSocketZMQ.h" #include "FairMQPollerZMQ.h" +#include "FairMQRegionZMQ.h" #include class FairMQTransportFactoryZMQ : public FairMQTransportFactory { public: FairMQTransportFactoryZMQ(const std::string& id = "", const FairMQProgOptions* config = nullptr); + FairMQTransportFactoryZMQ(const FairMQTransportFactoryZMQ&) = delete; + FairMQTransportFactoryZMQ operator=(const FairMQTransportFactoryZMQ&) = delete; + ~FairMQTransportFactoryZMQ() override; FairMQMessagePtr CreateMessage() const override; FairMQMessagePtr CreateMessage(const size_t size) const override; FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; + FairMQMessagePtr CreateMessage(FairMQRegionPtr& region, void* data, const size_t size) const override; FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override; @@ -41,6 +46,8 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const override; FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; + FairMQRegionPtr CreateRegion(const size_t size) const override; + FairMQ::Transport GetType() const override; private: static FairMQ::Transport fTransportType;