diff --git a/cmake/FairMQDependencies.cmake b/cmake/FairMQDependencies.cmake index 4ac7e5eb..ab536844 100644 --- a/cmake/FairMQDependencies.cmake +++ b/cmake/FairMQDependencies.cmake @@ -19,7 +19,7 @@ if(BUILD_FAIRMQ OR BUILD_SDK) endif() if(BUILD_OFI_TRANSPORT) - find_package2(PRIVATE asiofi REQUIRED VERSION 0.3.1) + find_package2(PRIVATE asiofi REQUIRED VERSION 0.5) find_package2(PRIVATE OFI REQUIRED) endif() diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index f0d5633c..f2d584d2 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -143,6 +143,7 @@ if(BUILD_FAIRMQ) # libFairMQ header files # ########################## set(FAIRMQ_PUBLIC_HEADER_FILES + Channel.h Device.h DeviceRunner.h EventManager.h @@ -152,22 +153,26 @@ if(BUILD_FAIRMQ) FairMQMessage.h FairMQParts.h FairMQPoller.h - FairMQUnmanagedRegion.h FairMQSocket.h FairMQTransportFactory.h - MemoryResources.h - MemoryResourceTools.h - Transports.h - options/FairMQProgOptions.h + FairMQUnmanagedRegion.h JSONParser.h - ProgOptionsFwd.h - ProgOptions.h - Properties.h - PropertyOutput.h - SuboptParser.h + MemoryResourceTools.h + MemoryResources.h + Message.h Plugin.h PluginManager.h PluginServices.h + Poller.h + ProgOptions.h + ProgOptionsFwd.h + Properties.h + PropertyOutput.h + SuboptParser.h + TransportFactory.h + Transports.h + UnmanagedRegion.h + options/FairMQProgOptions.h runDevice.h runFairMQDevice.h shmem/Monitor.h @@ -202,8 +207,8 @@ if(BUILD_FAIRMQ) if(BUILD_OFI_TRANSPORT) set(FAIRMQ_PRIVATE_HEADER_FILES ${FAIRMQ_PRIVATE_HEADER_FILES} ofi/Context.h + ofi/ControlMessages.h ofi/Message.h - ofi/Poller.h ofi/Socket.h ofi/TransportFactory.h ) @@ -231,6 +236,7 @@ if(BUILD_FAIRMQ) plugins/config/Config.cxx plugins/control/Control.cxx MemoryResources.cxx + shmem/Manager.cxx shmem/Monitor.cxx ) @@ -238,9 +244,7 @@ if(BUILD_FAIRMQ) set(FAIRMQ_SOURCE_FILES ${FAIRMQ_SOURCE_FILES} ofi/Context.cxx ofi/Message.cxx - ofi/Poller.cxx ofi/Socket.cxx - ofi/TransportFactory.cxx ) endif() @@ -295,7 +299,6 @@ if(BUILD_FAIRMQ) if(BUILD_OFI_TRANSPORT) set(OFI_DEPS asiofi::asiofi - Boost::container ) endif() diff --git a/fairmq/Channel.h b/fairmq/Channel.h new file mode 100644 index 00000000..e10e6cc8 --- /dev/null +++ b/fairmq/Channel.h @@ -0,0 +1,20 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_CHANNEL_H +#define FAIR_MQ_CHANNEL_H + +#include + +namespace fair::mq { + +using Channel = FairMQChannel; + +} // namespace fair::mq + +#endif // FAIR_MQ_CHANNEL_H diff --git a/fairmq/FairMQUnmanagedRegion.h b/fairmq/FairMQUnmanagedRegion.h index 5940da17..2f240926 100644 --- a/fairmq/FairMQUnmanagedRegion.h +++ b/fairmq/FairMQUnmanagedRegion.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,11 +9,12 @@ #ifndef FAIRMQUNMANAGEDREGION_H_ #define FAIRMQUNMANAGEDREGION_H_ -#include // size_t -#include // uint32_t -#include // std::unique_ptr -#include // std::function -#include // std::ostream +#include // size_t +#include // uint32_t +#include +#include // std::function +#include // std::unique_ptr +#include // std::ostream #include class FairMQTransportFactory; diff --git a/fairmq/Message.h b/fairmq/Message.h new file mode 100644 index 00000000..f638134a --- /dev/null +++ b/fairmq/Message.h @@ -0,0 +1,14 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_MESSAGE_H +#define FAIR_MQ_MESSAGE_H + +#include + +#endif // FAIR_MQ_MESSAGE_H diff --git a/fairmq/Poller.h b/fairmq/Poller.h new file mode 100644 index 00000000..c5b57d59 --- /dev/null +++ b/fairmq/Poller.h @@ -0,0 +1,14 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_POLLER_H +#define FAIR_MQ_POLLER_H + +#include + +#endif // FAIR_MQ_POLLER_H diff --git a/fairmq/Socket.h b/fairmq/Socket.h new file mode 100644 index 00000000..f9376c84 --- /dev/null +++ b/fairmq/Socket.h @@ -0,0 +1,14 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_SOCKET_H +#define FAIR_MQ_SOCKET_H + +#include + +#endif // FAIR_MQ_SOCKET_H diff --git a/fairmq/TransportFactory.h b/fairmq/TransportFactory.h new file mode 100644 index 00000000..ca5ef255 --- /dev/null +++ b/fairmq/TransportFactory.h @@ -0,0 +1,14 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_TRANSPORTFACTORY_H +#define FAIR_MQ_TRANSPORTFACTORY_H + +#include + +#endif // FAIR_MQ_TRANSPORTFACTORY_H diff --git a/fairmq/UnmanagedRegion.h b/fairmq/UnmanagedRegion.h new file mode 100644 index 00000000..ea77cf9b --- /dev/null +++ b/fairmq/UnmanagedRegion.h @@ -0,0 +1,14 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_UNMANAGEDREGION_H +#define FAIR_MQ_UNMANAGEDREGION_H + +#include + +#endif // FAIR_MQ_UNMANAGEDREGION_H diff --git a/fairmq/ofi/Context.cxx b/fairmq/ofi/Context.cxx index aafec40d..44ef36b2 100644 --- a/fairmq/ofi/Context.cxx +++ b/fairmq/ofi/Context.cxx @@ -1,25 +1,22 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include -#include - -#include #include -#include +#include #include #include +#include +#include +#include #include #include #include #include -#include #include namespace fair::mq::ofi diff --git a/fairmq/ofi/Context.h b/fairmq/ofi/Context.h index b6a821d9..1f0ad79c 100644 --- a/fairmq/ofi/Context.h +++ b/fairmq/ofi/Context.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,13 +9,12 @@ #ifndef FAIR_MQ_OFI_CONTEXT_H #define FAIR_MQ_OFI_CONTEXT_H -#include -#include - +#include #include #include #include -#include +#include +#include #include #include #include @@ -58,7 +57,7 @@ class Context ~Context(); auto GetAsiofiVersion() const -> std::string; - auto GetIoContext() -> boost::asio::io_context& { return fIoContext; } + auto GetIoContext() -> asio::io_context& { return fIoContext; } static auto ConvertAddress(std::string address) -> Address; static auto ConvertAddress(Address address) -> sockaddr_in; static auto ConvertAddress(sockaddr_in address) -> Address; @@ -72,8 +71,8 @@ class Context auto SetSizeHint(size_t size) -> void { fSizeHint = size; } private: - boost::asio::io_context fIoContext; - boost::asio::io_context::work fIoWork; + asio::io_context fIoContext; + asio::io_context::work fIoWork; std::vector fThreadPool; FairMQTransportFactory& fReceiveFactory; FairMQTransportFactory& fSendFactory; diff --git a/fairmq/ofi/ControlMessages.h b/fairmq/ofi/ControlMessages.h index f8082fe3..c68dc366 100644 --- a/fairmq/ofi/ControlMessages.h +++ b/fairmq/ofi/ControlMessages.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,24 +9,24 @@ #ifndef FAIR_MQ_OFI_CONTROLMESSAGES_H #define FAIR_MQ_OFI_CONTROLMESSAGES_H -#include -#include -#include +#include #include +#include #include #include +#include #include -namespace boost::asio +namespace asio { template -auto buffer(const PodType& obj) -> boost::asio::const_buffer +auto buffer(const PodType& obj) -> asio::const_buffer { - return boost::asio::const_buffer(static_cast(&obj), sizeof(PodType)); + return asio::const_buffer(static_cast(&obj), sizeof(PodType)); } -} // namespace boost::asio +} // namespace asio namespace fair::mq::ofi { @@ -68,7 +68,7 @@ template using unique_ptr = std::unique_ptr>; template -auto MakeControlMessageWithPmr(boost::container::pmr::memory_resource& pmr, Args&&... args) +auto MakeControlMessageWithPmr(std::pmr::memory_resource& pmr, Args&&... args) -> ofi::unique_ptr { void* mem = pmr.allocate(sizeof(ControlMessage)); @@ -109,4 +109,4 @@ auto MakeControlMessage(Args&&... args) -> ControlMessage } // namespace fair::mq::ofi -#endif /* FAIR_MQ_OFI_CONTROLMESSAGES_H */ +#endif /* FAIR_MQ_OFI_CONTROLMESSAGES_H */ diff --git a/fairmq/ofi/Message.cxx b/fairmq/ofi/Message.cxx index 9ff9df5b..ba50bfd7 100644 --- a/fairmq/ofi/Message.cxx +++ b/fairmq/ofi/Message.cxx @@ -1,17 +1,16 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include - #include #include #include +#include +#include #include namespace fair::mq::ofi @@ -19,7 +18,7 @@ namespace fair::mq::ofi using namespace std; -Message::Message(boost::container::pmr::memory_resource* pmr) +Message::Message(pmr::memory_resource* pmr) : fInitialSize(0) , fSize(0) , fData(nullptr) @@ -29,7 +28,7 @@ Message::Message(boost::container::pmr::memory_resource* pmr) { } -Message::Message(boost::container::pmr::memory_resource* pmr, Alignment /* alignment */) +Message::Message(pmr::memory_resource* pmr, Alignment /* alignment */) : fInitialSize(0) , fSize(0) , fData(nullptr) @@ -39,7 +38,7 @@ Message::Message(boost::container::pmr::memory_resource* pmr, Alignment /* align { } -Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size) +Message::Message(pmr::memory_resource* pmr, const size_t size) : fInitialSize(size) , fSize(size) , fData(nullptr) @@ -53,7 +52,7 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size) } } -Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size, Alignment /* alignment */) +Message::Message(pmr::memory_resource* pmr, const size_t size, Alignment /* alignment */) : fInitialSize(size) , fSize(size) , fData(nullptr) @@ -67,7 +66,7 @@ Message::Message(boost::container::pmr::memory_resource* pmr, const size_t size, } } -Message::Message(boost::container::pmr::memory_resource* pmr, +Message::Message(pmr::memory_resource* pmr, void* data, const size_t size, fairmq_free_fn* ffn, @@ -80,8 +79,8 @@ Message::Message(boost::container::pmr::memory_resource* pmr, , fPmr(pmr) {} -Message::Message(boost::container::pmr::memory_resource* /*pmr*/, - FairMQUnmanagedRegionPtr& /*region*/, +Message::Message(pmr::memory_resource* /*pmr*/, + fair::mq::UnmanagedRegionPtr& /*region*/, void* /*data*/, const size_t /*size*/, void* /*hint*/) diff --git a/fairmq/ofi/Message.h b/fairmq/ofi/Message.h index db18f6f0..80fe37d9 100644 --- a/fairmq/ofi/Message.h +++ b/fairmq/ofi/Message.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,12 +9,13 @@ #ifndef FAIR_MQ_OFI_MESSAGE_H #define FAIR_MQ_OFI_MESSAGE_H -#include -#include - #include #include #include // size_t +#include +#include +#include +#include #include namespace fair::mq::ofi @@ -22,24 +23,24 @@ namespace fair::mq::ofi /** * @class Message Message.h - * @brief + * @brief * * @todo TODO insert long description */ class Message final : public fair::mq::Message { public: - Message(boost::container::pmr::memory_resource* pmr); - Message(boost::container::pmr::memory_resource* pmr, Alignment alignment); - Message(boost::container::pmr::memory_resource* pmr, const size_t size); - Message(boost::container::pmr::memory_resource* pmr, const size_t size, Alignment alignment); - Message(boost::container::pmr::memory_resource* pmr, + Message(std::pmr::memory_resource* pmr); + Message(std::pmr::memory_resource* pmr, Alignment alignment); + Message(std::pmr::memory_resource* pmr, const size_t size); + Message(std::pmr::memory_resource* pmr, const size_t size, Alignment alignment); + Message(std::pmr::memory_resource* pmr, void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr); - Message(boost::container::pmr::memory_resource* pmr, - FairMQUnmanagedRegionPtr& region, + Message(std::pmr::memory_resource* pmr, + fair::mq::UnmanagedRegionPtr& region, void* data, const size_t size, void* hint = 0); @@ -70,8 +71,8 @@ class Message final : public fair::mq::Message void* fData; fairmq_free_fn* fFreeFunction; void* fHint; - boost::container::pmr::memory_resource* fPmr; -}; /* class Message */ + std::pmr::memory_resource* fPmr; +}; /* class Message */ } // namespace fair::mq::ofi diff --git a/fairmq/ofi/Poller.cxx b/fairmq/ofi/Poller.cxx deleted file mode 100644 index 0dad03a1..00000000 --- a/fairmq/ofi/Poller.cxx +++ /dev/null @@ -1,152 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include -#include -#include -#include - -#include - -namespace fair::mq::ofi -{ - -using namespace std; - -Poller::Poller(const vector& channels) -{ - fNumItems = channels.size(); - fItems = new zmq_pollitem_t[fNumItems]; - - for (int i = 0; i < fNumItems; ++i) { - fItems[i].socket = static_cast(&(channels.at(i).GetSocket()))->GetSocket(); - fItems[i].fd = 0; - fItems[i].revents = 0; - - int type = 0; - size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); - - SetItemEvents(fItems[i], type); - } -} - -Poller::Poller(const vector& channels) -{ - fNumItems = channels.size(); - fItems = new zmq_pollitem_t[fNumItems]; - - for (int i = 0; i < fNumItems; ++i) { - fItems[i].socket = static_cast(&(channels.at(i)->GetSocket()))->GetSocket(); - fItems[i].fd = 0; - fItems[i].revents = 0; - - int type = 0; - size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channels.at(i)->GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); - - SetItemEvents(fItems[i], type); - } -} - -Poller::Poller(const unordered_map>& channelsMap, const vector& channelList) -{ - try { - int offset = 0; - // calculate offsets and the total size of the poll item set - for (string channel : channelList) { - fOffsetMap[channel] = offset; - offset += channelsMap.at(channel).size(); - fNumItems += channelsMap.at(channel).size(); - } - - fItems = new zmq_pollitem_t[fNumItems]; - - int index = 0; - for (string channel : channelList) { - for (unsigned int i = 0; i < channelsMap.at(channel).size(); ++i) { - index = fOffsetMap[channel] + i; - - fItems[index].socket = static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(); - fItems[index].fd = 0; - fItems[index].revents = 0; - - int type = 0; - size_t size = sizeof(type); - zmq_getsockopt(static_cast(&(channelsMap.at(channel).at(i).GetSocket()))->GetSocket(), ZMQ_TYPE, &type, &size); - - SetItemEvents(fItems[index], type); - } - } - } - catch (const std::out_of_range& oor) { - throw PollerError{tools::ToString("At least one of the provided channel keys for poller initialization is invalid. ", - "Out of range error: ", oor.what())}; - } -} - -auto Poller::SetItemEvents(zmq_pollitem_t& item, const int type) -> void -{ - if (type == ZMQ_PAIR) { - item.events = ZMQ_POLLIN|ZMQ_POLLOUT; - } else { - throw PollerError{"Invalid poller configuration."}; - } -} - -auto Poller::Poll(const int timeout) -> void -{ - if (zmq_poll(fItems, fNumItems, timeout) < 0) { - if (errno == ETERM) { - LOG(debug) << "polling exited, reason: " << zmq_strerror(errno); - } else { - throw PollerError{tools::ToString("Polling failed, reason: ", zmq_strerror(errno))}; - } - } -} - -auto Poller::CheckInput(const int index) -> bool -{ - return fItems[index].revents & ZMQ_POLLIN; -} - -auto Poller::CheckOutput(const int index) -> bool -{ - return fItems[index].revents & ZMQ_POLLOUT; -} - -auto Poller::CheckInput(const string& channelKey, const int index) -> bool -{ - try { - return fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLIN; - } catch (const std::out_of_range& oor) { - throw PollerError{tools::ToString( - "Invalid channel key: '", channelKey, "', ", - "Out of range error: ", oor.what() - )}; - } -} - -auto Poller::CheckOutput(const string& channelKey, const int index) -> bool -{ - try { - return fItems[fOffsetMap.at(channelKey) + index].revents & ZMQ_POLLOUT; - } catch (const std::out_of_range& oor) { - throw PollerError{tools::ToString( - "Invalid channel key: '", channelKey, "', ", - "Out of range error: ", oor.what() - )}; - } -} - -Poller::~Poller() -{ - delete[] fItems; -} - -} // namespace fair::mq::ofi diff --git a/fairmq/ofi/Poller.h b/fairmq/ofi/Poller.h deleted file mode 100644 index cb264f18..00000000 --- a/fairmq/ofi/Poller.h +++ /dev/null @@ -1,64 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#ifndef FAIR_MQ_OFI_POLLER_H -#define FAIR_MQ_OFI_POLLER_H - -#include -#include -#include - -#include -#include - -#include - -namespace fair::mq::ofi -{ - -class TransportFactory; - -/** - * @class Poller Poller.h - * @brief - * - * @todo TODO insert long description - */ -class Poller final : public FairMQPoller -{ - friend class FairMQChannel; - friend class TransportFactory; - - public: - Poller(const std::vector& channels); - Poller(const std::vector& channels); - Poller(const std::unordered_map>& channelsMap, const std::vector& channelList); - - Poller(const Poller&) = delete; - Poller operator=(const Poller&) = delete; - - auto SetItemEvents(zmq_pollitem_t& item, const int type) -> void; - - auto Poll(const int timeout) -> void override; - auto CheckInput(const int index) -> bool override; - auto CheckOutput(const int index) -> bool override; - auto CheckInput(const std::string& channelKey, const int index) -> bool override; - auto CheckOutput(const std::string& channelKey, const int index) -> bool override; - - ~Poller() override; - - private: - zmq_pollitem_t* fItems; - int fNumItems; - - std::unordered_map fOffsetMap; -}; /* class Poller */ - -} // namespace fair::mq::ofi - -#endif /* FAIR_MQ_OFI_POLLER_H */ diff --git a/fairmq/ofi/Socket.cxx b/fairmq/ofi/Socket.cxx index 809f69b3..ed10dcde 100644 --- a/fairmq/ofi/Socket.cxx +++ b/fairmq/ofi/Socket.cxx @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -13,9 +13,9 @@ #include #include -#include -#include -#include +#include +#include +#include #include #include #include @@ -151,14 +151,14 @@ auto Socket::BindDataEndpoint() -> void LOG(debug) << "OFI transport (" << fId << "): data band connection accepted."; if (fContext.GetSizeHint()) { - boost::asio::post(fContext.GetIoContext(), + asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); - boost::asio::post(fContext.GetIoContext(), + asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this)); } else { - boost::asio::post(fContext.GetIoContext(), + asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); - boost::asio::post(fContext.GetIoContext(), + asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); } }); @@ -180,11 +180,11 @@ try { ConnectEndpoint(fDataEndpoint, Band::Data); if (fContext.GetSizeHint()) { - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this)); + asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); + asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this)); } else { - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); - boost::asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); + asio::post(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); + asio::post(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); } return true; @@ -307,7 +307,7 @@ auto Socket::SendQueueReader() -> void } // Send control message - boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage)); + asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage)); if (fNeedOfiMemoryRegistration) { asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::send); @@ -315,17 +315,17 @@ auto Socket::SendQueueReader() -> void fControlEndpoint->send(ctrlMsg, desc, [&, ctrl2 = std::move(ctrlMsg), mr2 = std::move(mr)]( - boost::asio::mutable_buffer) mutable {}); + asio::mutable_buffer) mutable {}); } else { fControlEndpoint->send( - ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable {}); + ctrlMsg, [&, ctrl2 = std::move(ctrl)](asio::mutable_buffer) mutable {}); } // Send data message const auto size = msg->GetSize(); if (size) { - boost::asio::mutable_buffer buffer(msg->GetData(), size); + asio::mutable_buffer buffer(msg->GetData(), size); if (fNeedOfiMemoryRegistration) { asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); @@ -334,14 +334,14 @@ auto Socket::SendQueueReader() -> void fDataEndpoint->send(buffer, desc, [&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( - boost::asio::mutable_buffer) mutable { + asio::mutable_buffer) mutable { fBytesTx += size; fMessagesTx++; fSendPushSem.signal(); }); } else { fDataEndpoint->send( - buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { + buffer, [&, size, msg2 = std::move(msg)](asio::mutable_buffer) mutable { fBytesTx += size; fMessagesTx++; fSendPushSem.signal(); @@ -353,7 +353,7 @@ auto Socket::SendQueueReader() -> void } } - boost::asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); + asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReader, this)); }); } @@ -377,7 +377,7 @@ auto Socket::SendQueueReaderStatic() -> void const auto size = msg->GetSize(); if (size) { - boost::asio::mutable_buffer buffer(msg->GetData(), size); + asio::mutable_buffer buffer(msg->GetData(), size); if (fNeedOfiMemoryRegistration) { asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::send); @@ -386,14 +386,14 @@ auto Socket::SendQueueReaderStatic() -> void fDataEndpoint->send(buffer, desc, [&, size, msg2 = std::move(msg), mr2 = std::move(mr)]( - boost::asio::mutable_buffer) mutable { + asio::mutable_buffer) mutable { fBytesTx += size; fMessagesTx++; fSendPushSem.signal(); }); } else { fDataEndpoint->send( - buffer, [&, size, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { + buffer, [&, size, msg2 = std::move(msg)](asio::mutable_buffer) mutable { fBytesTx += size; fMessagesTx++; fSendPushSem.signal(); @@ -404,7 +404,7 @@ auto Socket::SendQueueReaderStatic() -> void fSendPushSem.signal(); } - boost::asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); + asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::SendQueueReaderStatic, this)); }); } @@ -460,7 +460,7 @@ auto Socket::RecvControlQueueReader() -> void fRecvPushSem.async_wait([&] { // Receive control message ofi::unique_ptr ctrl(MakeControlMessageWithPmr(fControlMemPool)); - boost::asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage)); + asio::mutable_buffer ctrlMsg(ctrl.get(), sizeof(ControlMessage)); if (fNeedOfiMemoryRegistration) { asiofi::memory_region mr(*fOfiDomain, ctrlMsg, asiofi::mr::access::recv); @@ -470,10 +470,10 @@ auto Socket::RecvControlQueueReader() -> void ctrlMsg, desc, [&, ctrl2 = std::move(ctrl), mr2 = std::move(mr)]( - boost::asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); + asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); } else { fControlEndpoint->recv( - ctrlMsg, [&, ctrl2 = std::move(ctrl)](boost::asio::mutable_buffer) mutable { + ctrlMsg, [&, ctrl2 = std::move(ctrl)](asio::mutable_buffer) mutable { OnRecvControl(std::move(ctrl2)); }); } @@ -507,7 +507,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void auto msg = fContext.MakeReceiveMessage(size); if (size) { - boost::asio::mutable_buffer buffer(msg->GetData(), size); + asio::mutable_buffer buffer(msg->GetData(), size); if (fNeedOfiMemoryRegistration) { asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv); @@ -517,11 +517,11 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void buffer, desc, [&, msg2 = std::move(msg), mr2 = std::move(mr)]( - boost::asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); }); + asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); }); } else { fDataEndpoint->recv(buffer, - [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { + [&, msg2 = std::move(msg)](asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); }); } @@ -529,7 +529,7 @@ auto Socket::OnRecvControl(ofi::unique_ptr ctrl) -> void DataMessageReceived(std::move(msg)); } - boost::asio::dispatch(fContext.GetIoContext(), + asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::RecvControlQueueReader, this)); } @@ -541,7 +541,7 @@ auto Socket::RecvQueueReaderStatic() -> void auto msg = fContext.MakeReceiveMessage(size); if (size) { - boost::asio::mutable_buffer buffer(msg->GetData(), size); + asio::mutable_buffer buffer(msg->GetData(), size); if (fNeedOfiMemoryRegistration) { asiofi::memory_region mr(*fOfiDomain, buffer, asiofi::mr::access::recv); @@ -550,13 +550,13 @@ auto Socket::RecvQueueReaderStatic() -> void fDataEndpoint->recv(buffer, desc, [&, msg2 = std::move(msg), mr2 = std::move(mr)]( - boost::asio::mutable_buffer) mutable { + asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); }); } else { fDataEndpoint->recv( - buffer, [&, msg2 = std::move(msg)](boost::asio::mutable_buffer) mutable { + buffer, [&, msg2 = std::move(msg)](asio::mutable_buffer) mutable { DataMessageReceived(std::move(msg2)); }); } @@ -564,7 +564,7 @@ auto Socket::RecvQueueReaderStatic() -> void DataMessageReceived(std::move(msg)); } - boost::asio::dispatch(fContext.GetIoContext(), + asio::dispatch(fContext.GetIoContext(), std::bind(&Socket::RecvQueueReaderStatic, this)); }); } diff --git a/fairmq/ofi/Socket.h b/fairmq/ofi/Socket.h index 9d8408a4..fe71d866 100644 --- a/fairmq/ofi/Socket.h +++ b/fairmq/ofi/Socket.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -18,7 +18,6 @@ #include #include #include -#include #include // unique_ptr #include diff --git a/fairmq/ofi/TransportFactory.cxx b/fairmq/ofi/TransportFactory.cxx deleted file mode 100644 index a786de48..00000000 --- a/fairmq/ofi/TransportFactory.cxx +++ /dev/null @@ -1,120 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ - -#include -#include -#include -#include - -#include - -namespace fair::mq::ofi -{ - -using namespace std; - -TransportFactory::TransportFactory(const string& id, const fair::mq::ProgOptions* config) -try : FairMQTransportFactory(id) - , fContext(*this, *this, 1) -{ - LOG(debug) << "OFI transport: asiofi (" << fContext.GetAsiofiVersion() << ")"; - - if (config) { - fContext.SetSizeHint(config->GetProperty("ofi-size-hint", 0)); - } -} catch (ContextError& e) { - throw TransportFactoryError{e.what()}; -} - -auto TransportFactory::CreateMessage() -> MessagePtr -{ - return MessagePtr{new Message(&fMemoryResource)}; -} - -auto TransportFactory::CreateMessage(Alignment /* alignment */) -> MessagePtr -{ - // TODO Do not ignore alignment - return MessagePtr{new Message(&fMemoryResource)}; -} - -auto TransportFactory::CreateMessage(const size_t size) -> MessagePtr -{ - return MessagePtr{new Message(&fMemoryResource, size)}; -} - -auto TransportFactory::CreateMessage(const size_t size, Alignment /* alignment */) -> MessagePtr -{ - // TODO Do not ignore alignment - return MessagePtr{new Message(&fMemoryResource, size)}; -} - -auto TransportFactory::CreateMessage(void* data, - const size_t size, - fairmq_free_fn* ffn, - void* hint) -> MessagePtr -{ - return MessagePtr{new Message(&fMemoryResource, data, size, ffn, hint)}; -} - -auto TransportFactory::CreateMessage(UnmanagedRegionPtr& region, - void* data, - const size_t size, - void* hint) -> MessagePtr -{ - return MessagePtr{new Message(&fMemoryResource, region, data, size, hint)}; -} - -auto TransportFactory::CreateSocket(const string& type, const string& name) -> SocketPtr -{ - return SocketPtr{new Socket(fContext, type, name, GetId())}; -} - -auto TransportFactory::CreatePoller(const vector& /*channels*/) const -> PollerPtr -{ - throw runtime_error{"Not yet implemented (Poller)."}; - // return PollerPtr{new Poller(channels)}; -} - -auto TransportFactory::CreatePoller(const vector& /*channels*/) const -> PollerPtr -{ - throw runtime_error{"Not yet implemented (Poller)."}; - // return PollerPtr{new Poller(channels)}; -} - -auto TransportFactory::CreatePoller(const unordered_map>& /*channelsMap*/, const vector& /*channelList*/) const -> PollerPtr -{ - throw runtime_error{"Not yet implemented (Poller)."}; - // return PollerPtr{new Poller(channelsMap, channelList)}; -} - -auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr -{ - throw runtime_error{"Not yet implemented UMR."}; -} - -auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr -{ - throw runtime_error{"Not yet implemented UMR."}; -} - -auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr -{ - throw runtime_error{"Not yet implemented UMR."}; -} - -auto TransportFactory::CreateUnmanagedRegion(const size_t /*size*/, const int64_t /*userFlags*/, FairMQRegionBulkCallback /*callback*/, const std::string& /* path = "" */, int /* flags = 0 */, fair::mq::RegionConfig /* cfg = fair::mq::RegionConfig() */) -> UnmanagedRegionPtr -{ - throw runtime_error{"Not yet implemented UMR."}; -} - -auto TransportFactory::GetType() const -> Transport -{ - return Transport::OFI; -} - -} // namespace fair::mq::ofi diff --git a/fairmq/ofi/TransportFactory.h b/fairmq/ofi/TransportFactory.h index ba76ff28..2163f9f2 100644 --- a/fairmq/ofi/TransportFactory.h +++ b/fairmq/ofi/TransportFactory.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2018 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2018-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -9,14 +9,28 @@ #ifndef FAIR_MQ_OFI_TRANSPORTFACTORY_H #define FAIR_MQ_OFI_TRANSPORTFACTORY_H -#include -#include -#include - #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include -namespace fair::mq::ofi -{ +namespace fair::mq::ofi { /** * @class TransportFactory TransportFactory.h @@ -24,37 +38,155 @@ namespace fair::mq::ofi * * @todo TODO insert long description */ -class TransportFactory final : public FairMQTransportFactory +struct TransportFactory final : mq::TransportFactory { - public: - TransportFactory(const std::string& id = "", const fair::mq::ProgOptions* config = nullptr); - TransportFactory(const TransportFactory&) = delete; - TransportFactory operator=(const TransportFactory&) = delete; + TransportFactory(std::string const& id = "", ProgOptions const* config = nullptr) + : mq::TransportFactory(id) + , fContext(*this, *this, 1) + { + try { + LOG(debug) << "OFI transport: asiofi (" << fContext.GetAsiofiVersion() << ")"; - auto CreateMessage() -> MessagePtr override; - auto CreateMessage(Alignment alignment) -> MessagePtr override; - auto CreateMessage(const std::size_t size) -> MessagePtr override; - auto CreateMessage(const std::size_t size, Alignment alignment) -> MessagePtr override; - auto CreateMessage(void* data, const std::size_t size, fairmq_free_fn* ffn, void* hint = nullptr) -> MessagePtr override; - auto CreateMessage(UnmanagedRegionPtr& region, void* data, const std::size_t size, void* hint = nullptr) -> MessagePtr override; + if (config) { + fContext.SetSizeHint(config->GetProperty("ofi-size-hint", 0)); + } + } catch (ContextError& e) { + throw TransportFactoryError(e.what()); + } + } - auto CreateSocket(const std::string& type, const std::string& name) -> SocketPtr override; + TransportFactory(TransportFactory const&) = delete; + TransportFactory& operator=(TransportFactory const&) = delete; + TransportFactory(TransportFactory&&) = default; + TransportFactory& operator=(TransportFactory&&) = default; - auto CreatePoller(const std::vector& channels) const -> PollerPtr override; - auto CreatePoller(const std::vector& channels) const -> PollerPtr override; - auto CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const -> PollerPtr override; + auto CreateMessage() -> std::unique_ptr override + { + return std::make_unique(&fMemoryResource); + } - auto CreateUnmanagedRegion(const size_t size, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override; - auto CreateUnmanagedRegion(const size_t size, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override; - auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override; - auto CreateUnmanagedRegion(const size_t size, int64_t userFlags, RegionBulkCallback callback = nullptr, const std::string& path = "", int flags = 0, fair::mq::RegionConfig cfg = fair::mq::RegionConfig()) -> UnmanagedRegionPtr override; + auto CreateMessage(Alignment /*alignment*/) -> std::unique_ptr override + { + // TODO Do not ignore alignment + return std::make_unique(&fMemoryResource); + } - void SubscribeToRegionEvents(RegionEventCallback /* callback */) override { LOG(error) << "SubscribeToRegionEvents not yet implemented for OFI"; } - bool SubscribedToRegionEvents() override { LOG(error) << "Region event subscriptions not yet implemented for OFI"; return false; } - void UnsubscribeFromRegionEvents() override { LOG(error) << "UnsubscribeFromRegionEvents not yet implemented for OFI"; } - std::vector GetRegionInfo() override { LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; return std::vector(); } + auto CreateMessage(std::size_t size) -> std::unique_ptr override + { + return std::make_unique(&fMemoryResource, size); + } - auto GetType() const -> Transport override; + auto CreateMessage(std::size_t size, Alignment /*alignment*/) + -> std::unique_ptr override + { + // TODO Do not ignore alignment + return std::make_unique(&fMemoryResource, size); + } + + auto CreateMessage(void* data, std::size_t size, fairmq_free_fn* ffn, void* hint = nullptr) + -> std::unique_ptr override + { + return std::make_unique(&fMemoryResource, data, size, ffn, hint); + } + + auto CreateMessage(std::unique_ptr& region, + void* data, + std::size_t size, + void* hint = nullptr) -> std::unique_ptr override + { + return std::make_unique(&fMemoryResource, region, data, size, hint); + } + + auto CreateSocket(std::string const& type, std::string const& name) + -> std::unique_ptr override + { + return std::make_unique(fContext, type, name, GetId()); + } + + auto CreatePoller(std::vector const& /*channels*/) const + -> std::unique_ptr override + { + throw std::runtime_error("Not yet implemented (Poller)."); + } + + auto CreatePoller(std::vector const& /*channels*/) const + -> std::unique_ptr override + { + throw std::runtime_error("Not yet implemented (Poller)."); + } + + auto CreatePoller( + std::unordered_map> const& /*channelsMap*/, + std::vector const& /*channelList*/) const + -> std::unique_ptr override + { + throw std::runtime_error("Not yet implemented (Poller)."); + } + + auto CreateUnmanagedRegion(std::size_t /*size*/, + RegionCallback /*callback = nullptr*/, + std::string const& /*path = ""*/, + int /*flags = 0*/, + RegionConfig /*cfg = RegionConfig()*/) + -> std::unique_ptr override + { + throw std::runtime_error("Not yet implemented UMR."); + } + + auto CreateUnmanagedRegion(std::size_t /*size*/, + RegionBulkCallback /*callback = nullptr*/, + std::string const& /*path = ""*/, + int /*flags = 0*/, + RegionConfig /*cfg = RegionConfig()*/) + -> std::unique_ptr override + { + throw std::runtime_error("Not yet implemented UMR."); + } + + auto CreateUnmanagedRegion(std::size_t /*size*/, + int64_t /*userFlags*/, + RegionCallback /*callback = nullptr*/, + std::string const& /*path = ""*/, + int /*flags = 0*/, + RegionConfig /*cfg = RegionConfig()*/) + -> std::unique_ptr override + { + throw std::runtime_error("Not yet implemented UMR."); + } + + auto CreateUnmanagedRegion(std::size_t /*size*/, + int64_t /*userFlags*/, + RegionBulkCallback /*callback = nullptr*/, + std::string const& /*path = ""*/, + int /*flags = 0*/, + RegionConfig /*cfg = RegionConfig()*/) + -> std::unique_ptr override + { + throw std::runtime_error("Not yet implemented UMR."); + } + + auto SubscribeToRegionEvents(RegionEventCallback /*callback*/) -> void override + { + throw std::runtime_error("Not yet implemented."); + } + + auto SubscribedToRegionEvents() -> bool override + { + throw std::runtime_error("Not yet implemented."); + } + + auto UnsubscribeFromRegionEvents() -> void override + { + throw std::runtime_error("Not yet implemented."); + } + + auto GetRegionInfo() -> std::vector override + { + LOG(error) << "GetRegionInfo not yet implemented for OFI, returning empty vector"; + return std::vector(); + } + + auto GetType() const -> Transport override { return Transport::OFI; } void Interrupt() override { fContext.Interrupt(); } void Resume() override { fContext.Resume(); } @@ -63,8 +195,8 @@ class TransportFactory final : public FairMQTransportFactory private: mutable Context fContext; asiofi::allocated_pool_resource fMemoryResource; -}; /* class TransportFactory */ +}; /* class TransportFactory */ -} // namespace fair::mq::ofi +} // namespace fair::mq::ofi #endif /* FAIR_MQ_OFI_TRANSPORTFACTORY_H */ diff --git a/fairmq/shmem/Manager.cxx b/fairmq/shmem/Manager.cxx new file mode 100644 index 00000000..5d319f54 --- /dev/null +++ b/fairmq/shmem/Manager.cxx @@ -0,0 +1,48 @@ +/******************************************************************************** + * Copyright (C) 2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "Manager.h" + +// Needed to compile-firewall the header because it +// interferes with the header. So, let's factor +// the whole dependency to Boost.Process out of the header. +#include +#include + +namespace fair::mq::shmem { + +bool Manager::SpawnShmMonitor(const std::string& id) +{ + auto const env(boost::this_process::environment()); + std::string const fairmq_path_key("FAIRMQ_PATH"); + std::string const shmmonitor_exe_name("fairmq-shmmonitor"); + std::string const shmmonitor_verbose_key("FAIRMQ_SHMMONITOR_VERBOSE"); + auto path(boost::this_process::path()); + + if (env.count(fairmq_path_key)) { + path.emplace(path.begin(), env.at(fairmq_path_key).to_string()); + } + + auto exe(boost::process::search_path(shmmonitor_exe_name, path)); + if (exe.empty()) { + LOG(warn) << "could not find " << shmmonitor_exe_name << " in \"$" << fairmq_path_key + << ":$PATH\""; + return false; + } + + // TODO Move this to fairmq-shmmonitor itself ? + bool verbose(env.count(shmmonitor_verbose_key) + && env.at(shmmonitor_verbose_key).to_string() == "true"); + + boost::process::spawn( + exe, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env); + + return true; +} + +} // namespace fair::mq::shmem diff --git a/fairmq/shmem/Manager.h b/fairmq/shmem/Manager.h index ac03eb8f..1595e9c3 100644 --- a/fairmq/shmem/Manager.h +++ b/fairmq/shmem/Manager.h @@ -1,5 +1,5 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * * This software is distributed under the terms of the * * GNU Lesser General Public Licence (LGPL) version 3, * @@ -31,7 +31,6 @@ #include #include #include -#include #include #include @@ -220,6 +219,10 @@ class Manager Manager(const Manager&) = delete; Manager operator=(const Manager&) = delete; + private: + static bool SpawnShmMonitor(const std::string& id); + + public: static void StartMonitor(const std::string& id) { using namespace boost::interprocess; @@ -228,25 +231,8 @@ class Manager LOG(debug) << "Found fairmq-shmmonitor for shared memory id " << id; } catch (interprocess_exception&) { LOG(debug) << "no fairmq-shmmonitor found for shared memory id " << id << ", starting..."; - auto env = boost::this_process::environment(); - std::vector ownPath = boost::this_process::path(); - - if (const char* fmqp = getenv("FAIRMQ_PATH")) { - ownPath.insert(ownPath.begin(), boost::filesystem::path(fmqp)); - } - - boost::filesystem::path p = boost::process::search_path("fairmq-shmmonitor", ownPath); - - bool verbose = false; - if (const char* verboseEnv = getenv("FAIRMQ_SHMMONITOR_VERBOSE")) { - if (std::string(verboseEnv) == "true") { - verbose = true; - } - } - - if (!p.empty()) { - boost::process::spawn(p, "-x", "-m", "--shmid", id, "-d", "-t", "2000", (verbose ? "--verbose" : ""), env); + if (SpawnShmMonitor(id)) { int numTries = 0; do { try { @@ -261,8 +247,6 @@ class Manager } } } while (true); - } else { - LOG(warn) << "could not find fairmq-shmmonitor in the path"; } } } diff --git a/fairmq/shmem/Region.h b/fairmq/shmem/Region.h index 2b6a47d5..96d2ddd2 100644 --- a/fairmq/shmem/Region.h +++ b/fairmq/shmem/Region.h @@ -1,10 +1,10 @@ /******************************************************************************** -* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * -* * -* This software is distributed under the terms of the * -* GNU Lesser General Public Licence (LGPL) version 3, * -* copied verbatim in the file "LICENSE" * -********************************************************************************/ + * Copyright (C) 2014-2021 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ /** * Region.h * @@ -22,7 +22,6 @@ #include #include -#include #include #include #include