diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 80801f32..471ad2c8 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -77,14 +77,12 @@ set(FAIRMQ_HEADER_FILES options/FairProgOptions.h options/FairProgOptionsHelper.h runFairMQDevice.h - shmem/FairMQContextSHM.h shmem/FairMQMessageSHM.h shmem/FairMQPollerSHM.h shmem/FairMQSocketSHM.h shmem/FairMQTransportFactorySHM.h tools/FairMQTools.h tools/runSimpleMQStateMachine.h - zeromq/FairMQContextZMQ.h zeromq/FairMQMessageZMQ.h zeromq/FairMQPollerZMQ.h zeromq/FairMQSocketZMQ.h @@ -128,12 +126,10 @@ set(FAIRMQ_SOURCE_FILES options/FairMQProgOptions.cxx options/FairMQSuboptParser.cxx options/FairProgOptions.cxx - shmem/FairMQContextSHM.cxx shmem/FairMQMessageSHM.cxx shmem/FairMQPollerSHM.cxx shmem/FairMQSocketSHM.cxx shmem/FairMQTransportFactorySHM.cxx - zeromq/FairMQContextZMQ.cxx zeromq/FairMQMessageZMQ.cxx zeromq/FairMQPollerZMQ.cxx zeromq/FairMQSocketZMQ.cxx @@ -176,7 +172,6 @@ target_include_directories(FairMQ $ ) - ################## # link libraries # ################## @@ -195,7 +190,7 @@ target_link_libraries(FairMQ Boost::filesystem Boost::regex Boost::date_time - + PRIVATE # only libFairMQ links against private dependencies ZeroMQ $<$:nanomsg> diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index 1056f749..cdb81310 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -581,9 +581,9 @@ void FairMQChannel::InitTransport(shared_ptr factory) fTransportType = factory->GetType(); } -bool FairMQChannel::InitCommandInterface(int numIoThreads) +bool FairMQChannel::InitCommandInterface() { - fChannelCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", numIoThreads, "internal"); + fChannelCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", "internal"); if (fChannelCmdSocket) { fChannelCmdSocket->Connect("inproc://commands"); diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 26b2c7ab..38578e1d 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -258,7 +258,7 @@ class FairMQChannel bool CheckCompatibility(std::vector>& msgVec) const; void InitTransport(std::shared_ptr factory); - bool InitCommandInterface(int numIoThreads); + bool InitCommandInterface(); bool HandleUnblock() const; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index d8fb3803..5aad7232 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -128,7 +128,7 @@ void FairMQDevice::AttachChannels(list& chans) { if (AttachChannel(**itr)) { - (*itr)->InitCommandInterface(fNumIoThreads); + (*itr)->InitCommandInterface(); chans.erase(itr++); } else @@ -154,7 +154,7 @@ void FairMQDevice::InitWrapper() if (fDeviceCmdSockets.empty()) { - auto p = fDeviceCmdSockets.emplace(fTransportFactory->GetType(), fTransportFactory->CreateSocket("pub", "device-commands", fNumIoThreads, fId)); + auto p = fDeviceCmdSockets.emplace(fTransportFactory->GetType(), fTransportFactory->CreateSocket("pub", "device-commands", fId)); if (p.second) { p.first->second->Bind("inproc://commands"); @@ -282,7 +282,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch) //(re-)init socket if (!ch.fSocket) { - ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName, fNumIoThreads, fId); + ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName, fId); } // set high water marks @@ -837,6 +837,7 @@ void FairMQDevice::SetTransport(FairMQTransportFactory* factory) { fTransportFactory = shared_ptr(factory); pair> t(fTransportFactory->GetType(), fTransportFactory); + fTransportFactory->Initialize(fConfig); fTransports.insert(t); } else @@ -870,9 +871,7 @@ shared_ptr FairMQDevice::AddTransport(const string& tran #endif else { - LOG(ERROR) << "Unavailable transport requested: " - << "\"" << transport << "\"" - << ". Available are: " + LOG(ERROR) << "Unavailable transport requested: " << "\"" << transport << "\"" << ". Available are: " << "\"zeromq\"" << "\"shmem\"" #ifdef NANOMSG_FOUND @@ -885,9 +884,10 @@ shared_ptr FairMQDevice::AddTransport(const string& tran LOG(DEBUG) << "Adding '" << transport << "' transport to the device."; pair> trPair(FairMQ::TransportTypes.at(transport), tr); + tr->Initialize(fConfig); fTransports.insert(trPair); - auto p = fDeviceCmdSockets.emplace(tr->GetType(), tr->CreateSocket("pub", "device-commands", fNumIoThreads, fId)); + auto p = fDeviceCmdSockets.emplace(tr->GetType(), tr->CreateSocket("pub", "device-commands", fId)); if (p.second) { p.first->second->Bind("inproc://commands"); @@ -997,6 +997,8 @@ void FairMQDevice::LogSocketRates() t0 = get_timestamp(); + LOG(DEBUG) << ": in: <#msgs> () out: <#msgs> ()"; + while (CheckCurrentState(RUNNING)) { t1 = get_timestamp(); @@ -1014,24 +1016,23 @@ void FairMQDevice::LogSocketRates() intervalCounters.at(i) = 0; bytesInNew.at(i) = vi->GetBytesRx(); - mbPerSecIn.at(i) = (static_cast(bytesInNew.at(i) - bytesIn.at(i)) / (1000. * 1000.)) / static_cast(msSinceLastLog) * 1000.; - bytesIn.at(i) = bytesInNew.at(i); - msgInNew.at(i) = vi->GetMessagesRx(); - msgPerSecIn.at(i) = static_cast(msgInNew.at(i) - msgIn.at(i)) / static_cast(msSinceLastLog) * 1000.; - msgIn.at(i) = msgInNew.at(i); - bytesOutNew.at(i) = vi->GetBytesTx(); - mbPerSecOut.at(i) = (static_cast(bytesOutNew.at(i) - bytesOut.at(i)) / (1000. * 1000.)) / static_cast(msSinceLastLog) * 1000.; - bytesOut.at(i) = bytesOutNew.at(i); - msgOutNew.at(i) = vi->GetMessagesTx(); + + mbPerSecIn.at(i) = (static_cast(bytesInNew.at(i) - bytesIn.at(i)) / (1000. * 1000.)) / static_cast(msSinceLastLog) * 1000.; + msgPerSecIn.at(i) = static_cast(msgInNew.at(i) - msgIn.at(i)) / static_cast(msSinceLastLog) * 1000.; + mbPerSecOut.at(i) = (static_cast(bytesOutNew.at(i) - bytesOut.at(i)) / (1000. * 1000.)) / static_cast(msSinceLastLog) * 1000.; msgPerSecOut.at(i) = static_cast(msgOutNew.at(i) - msgOut.at(i)) / static_cast(msSinceLastLog) * 1000.; + + bytesIn.at(i) = bytesInNew.at(i); + msgIn.at(i) = msgInNew.at(i); + bytesOut.at(i) = bytesOutNew.at(i); msgOut.at(i) = msgOutNew.at(i); LOG(DEBUG) << filteredChannelNames.at(i) << ": " - << "in: " << msgPerSecIn.at(i) << " msg (" << mbPerSecIn.at(i) << " MB), " - << "out: " << msgPerSecOut.at(i) << " msg (" << mbPerSecOut.at(i) << " MB)"; + << "in: " << msgPerSecIn.at(i) << " (" << mbPerSecIn.at(i) << " MB) " + << "out: " << msgPerSecOut.at(i) << " (" << mbPerSecOut.at(i) << " MB)"; } ++i; @@ -1195,36 +1196,29 @@ bool FairMQDevice::Terminated() return fTerminationRequested; } -void FairMQDevice::Terminate() +void FairMQDevice::Exit() { - // Termination signal has to be sent only once to any socket. - for (auto& kv : fDeviceCmdSockets) + // ask transports to terminate transfers + for (const auto& t : fTransports) { - kv.second->Terminate(); + t.second->Shutdown(); } - // if (!fDeviceCmdSockets.empty()) - // { - // fDeviceCmdSockets[0]->Terminate(); - // } -} -void FairMQDevice::Shutdown() -{ LOG(DEBUG) << "Closing sockets..."; - // iterate over the channels map - for (const auto& mi : fChannels) + // iterate over the channels + for (const auto& c : fChannels) { - // iterate over the channels vector - for (const auto& vi : mi.second) + // iterate over the sub-channels + for (const auto& sc : c.second) { - if (vi.fSocket) + if (sc.fSocket) { - vi.fSocket->Close(); + sc.fSocket->Close(); } - if (vi.fChannelCmdSocket) + if (sc.fChannelCmdSocket) { - vi.fChannelCmdSocket->Close(); + sc.fChannelCmdSocket->Close(); } } } @@ -1234,12 +1228,15 @@ void FairMQDevice::Shutdown() s.second->Close(); } - // if (!fDeviceCmdSockets.empty()) - // { - // fDeviceCmdSockets[0]->Close(); - // } - LOG(DEBUG) << "Closed all sockets!"; + + // ask transports to terminate + for (const auto& t : fTransports) + { + t.second->Terminate(); + } + + LOG(DEBUG) << "All transports exited."; } FairMQDevice::~FairMQDevice() diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index e0c6c30a..a40554de 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -337,7 +337,10 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable void SetTransport(const std::string& transport = "zeromq"); void SetConfig(FairMQProgOptions& config); - const FairMQProgOptions* GetConfig() const {return fConfig;} + const FairMQProgOptions* GetConfig() const + { + return fConfig; + } /// Implements the sort algorithm used in SortChannel() /// @param lhs Right hand side value for comparison @@ -442,20 +445,19 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable void ResetTaskWrapper(); /// Handles the Reset() method void ResetWrapper(); - /// Shuts down the device (closses socket connections) - void Shutdown(); - /// Terminates the transport interface - void Terminate(); /// Unblocks blocking channel send/receive calls void Unblock(); + /// Shuts down the transports and the device + void Exit(); + /// Attach (bind/connect) channels in the list void AttachChannels(std::list& chans); /// Sets up and connects/binds a socket to an endpoint /// return a string with the actual endpoint if it happens - //to stray from default. + /// to stray from default. bool ConnectEndpoint(FairMQSocket& socket, std::string& endpoint); bool BindEndpoint(FairMQSocket& socket, std::string& endpoint); /// Attaches the channel to all listed endpoints diff --git a/fairmq/FairMQSocket.h b/fairmq/FairMQSocket.h index 36280323..d4e55679 100644 --- a/fairmq/FairMQSocket.h +++ b/fairmq/FairMQSocket.h @@ -49,7 +49,6 @@ class FairMQSocket virtual void* GetSocket() const = 0; virtual int GetSocket(int nothing) const = 0; virtual void Close() = 0; - virtual void Terminate() = 0; virtual void Interrupt() = 0; virtual void Resume() = 0; diff --git a/fairmq/FairMQStateMachine.h b/fairmq/FairMQStateMachine.h index c2159e9f..60b4acfd 100644 --- a/fairmq/FairMQStateMachine.h +++ b/fairmq/FairMQStateMachine.h @@ -73,7 +73,6 @@ struct FairMQFSM_ : public msmf::state_machine_def public: FairMQFSM_() : fWorkerThread() - , fTerminateStateThread() , fWork() , fWorkAvailableCondition() , fWorkDoneCondition() @@ -329,9 +328,7 @@ struct FairMQFSM_ : public msmf::state_machine_def fsm.fWorkerThread.join(); } - fsm.fTerminateStateThread = std::thread(&FairMQFSM_::Terminate, &fsm); - fsm.Shutdown(); - fsm.fTerminateStateThread.join(); + fsm.Exit(); } }; @@ -358,8 +355,7 @@ struct FairMQFSM_ : public msmf::state_machine_def virtual void Reset() {} virtual void ResetTaskWrapper() {} virtual void ResetTask() {} - virtual void Shutdown() {} - virtual void Terminate() {} // Termination method called during StopFct action. + virtual void Exit() {} virtual void Unblock() {} // Method to send commands. void Worker() @@ -526,7 +522,6 @@ struct FairMQFSM_ : public msmf::state_machine_def // this is to run certain functions in a separate thread std::thread fWorkerThread; - std::thread fTerminateStateThread; // function to execute user states in a worker thread std::function fWork; diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 47069ff5..fdd72dc1 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -27,22 +27,39 @@ #include "FairMQTransports.h" class FairMQChannel; +class FairMQProgOptions; class FairMQTransportFactory { public: + /// Initialize transport + virtual void Initialize(const FairMQProgOptions* config) = 0; + + /// Create an empty message (e.g. for receiving) virtual FairMQMessagePtr CreateMessage() const = 0; + /// Create an empty of a specified size virtual FairMQMessagePtr CreateMessage(const size_t size) const = 0; + /// Create a message from a supplied buffer, size and a deallocation callback virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0; - virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const = 0; + /// Create a socket + virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const std::string& id = "") const = 0; + /// Create a poller for all device channels virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const = 0; + /// Create a poller for all device channels virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const = 0; + /// Create a poller for two sockets virtual FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const = 0; + /// Get transport type virtual FairMQ::Transport GetType() const = 0; + /// Shutdown transport (stop transfers, get ready for complete shutdown) + virtual void Shutdown() = 0; + /// Terminate transport (complete shutdown) + virtual void Terminate() = 0; + virtual ~FairMQTransportFactory() {}; }; diff --git a/fairmq/nanomsg/FairMQSocketNN.cxx b/fairmq/nanomsg/FairMQSocketNN.cxx index b219782b..ba856700 100644 --- a/fairmq/nanomsg/FairMQSocketNN.cxx +++ b/fairmq/nanomsg/FairMQSocketNN.cxx @@ -31,7 +31,7 @@ using namespace std; atomic FairMQSocketNN::fInterrupted(false); -FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) +FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const string& id /*= ""*/) : FairMQSocket(0, 0, NN_DONTWAIT) , fSocket(-1) , fId() @@ -42,11 +42,6 @@ FairMQSocketNN::FairMQSocketNN(const string& type, const string& name, const int { fId = id + "." + name + "." + type; - if (numIoThreads > 1) - { - LOG(INFO) << "number of I/O threads is not used in nanomsg"; - } - if (type == "router" || type == "dealer") { // Additional info about using the sockets ROUTER and DEALER with nanomsg can be found in: @@ -370,11 +365,6 @@ void FairMQSocketNN::Close() nn_close(fSocket); } -void FairMQSocketNN::Terminate() -{ - nn_term(); -} - void FairMQSocketNN::Interrupt() { fInterrupted = true; diff --git a/fairmq/nanomsg/FairMQSocketNN.h b/fairmq/nanomsg/FairMQSocketNN.h index 136f02f1..ae9b6c6b 100644 --- a/fairmq/nanomsg/FairMQSocketNN.h +++ b/fairmq/nanomsg/FairMQSocketNN.h @@ -24,7 +24,7 @@ class FairMQSocketNN : public FairMQSocket { public: - FairMQSocketNN(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); // numIoThreads is not used in nanomsg. + FairMQSocketNN(const std::string& type, const std::string& name, const std::string& id = ""); FairMQSocketNN(const FairMQSocketNN&) = delete; FairMQSocketNN operator=(const FairMQSocketNN&) = delete; @@ -42,7 +42,6 @@ class FairMQSocketNN : public FairMQSocket virtual void* GetSocket() const; virtual int GetSocket(int nothing) const; virtual void Close(); - virtual void Terminate(); virtual void Interrupt(); virtual void Resume(); diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 49cbc49d..a845682c 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -13,6 +13,9 @@ */ #include "FairMQTransportFactoryNN.h" +#include "../options/FairMQProgOptions.h" + +#include using namespace std; @@ -23,6 +26,11 @@ FairMQTransportFactoryNN::FairMQTransportFactoryNN() LOG(DEBUG) << "Transport: Using nanomsg library"; } +void FairMQTransportFactoryNN::Initialize(const FairMQProgOptions* config) +{ + // nothing to do for nanomsg, transport is ready to be used any time (until nn_term()). +} + FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage() const { return unique_ptr(new FairMQMessageNN()); @@ -38,9 +46,9 @@ FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(void* data, const size_ return unique_ptr(new FairMQMessageNN(data, size, ffn, hint)); } -FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) const +FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name, const string& id /*= ""*/) const { - return unique_ptr(new FairMQSocketNN(type, name, numIoThreads, id)); + return unique_ptr(new FairMQSocketNN(type, name, id)); } FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector& channels) const @@ -58,6 +66,16 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSo return unique_ptr(new FairMQPollerNN(cmdSocket, dataSocket)); } +void FairMQTransportFactoryNN::Shutdown() +{ + // nothing to do for nanomsg, transport is ready to be terminated any time. +} + +void FairMQTransportFactoryNN::Terminate() +{ + nn_term(); +} + FairMQ::Transport FairMQTransportFactoryNN::GetType() const { return fTransportType; diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 4d7e8305..63aa16b1 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -28,11 +28,13 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory public: FairMQTransportFactoryNN(); + virtual void Initialize(const FairMQProgOptions* config); + virtual FairMQMessagePtr CreateMessage() const; virtual FairMQMessagePtr CreateMessage(const size_t size) const; - virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const; + virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const; - virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const; + virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const std::string& id = "") const; virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const; virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const; @@ -40,6 +42,9 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory virtual FairMQ::Transport GetType() const; + virtual void Shutdown(); + virtual void Terminate(); + virtual ~FairMQTransportFactoryNN() {}; private: diff --git a/fairmq/runFairMQDevice.h b/fairmq/runFairMQDevice.h index 09438203..8cf1b052 100644 --- a/fairmq/runFairMQDevice.h +++ b/fairmq/runFairMQDevice.h @@ -52,6 +52,11 @@ int main(int argc, char** argv) config.ParseAll(argc, argv); std::unique_ptr device(getDevice(config)); + if (!device) + { + LOG(ERROR) << "getDevice(): no valid device provided. Exiting."; + return 1; + } int result = runStateMachine(*device, config); if (result > 0) diff --git a/fairmq/shmem/FairMQContextSHM.cxx b/fairmq/shmem/FairMQContextSHM.cxx deleted file mode 100644 index 002f7bf2..00000000 --- a/fairmq/shmem/FairMQContextSHM.cxx +++ /dev/null @@ -1,83 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -#include - -#include - -#include - -#include "FairMQLogger.h" -#include "FairMQContextSHM.h" -#include "FairMQShmManager.h" - -using namespace FairMQ::shmem; - -FairMQContextSHM::FairMQContextSHM(int numIoThreads) - : fContext() -{ - fContext = zmq_ctx_new(); - if (fContext == NULL) - { - LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); - exit(EXIT_FAILURE); - } - - if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) - { - LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); - } - - // Set the maximum number of allowed sockets on the context. - if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) - { - LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); - } - - Manager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemory", 2000000000); - LOG(INFO) << "Created/Opened shared memory segment of 2,000,000,000 bytes. Available are " << Manager::Instance().Segment()->get_free_memory() << " bytes."; -} - -FairMQContextSHM::~FairMQContextSHM() -{ - Close(); - - if (boost::interprocess::shared_memory_object::remove("FairMQSharedMemory")) - { - fprintf(stderr, "Successfully removed shared memory after the device has stopped.\n"); - } - else - { - fprintf(stderr, "Did not remove shared memory after the device stopped. Already removed?\n"); - } -} - -void* FairMQContextSHM::GetContext() -{ - return fContext; -} - -void FairMQContextSHM::Close() -{ - if (fContext == NULL) - { - return; - } - - if (zmq_ctx_destroy(fContext) != 0) - { - if (errno == EINTR) - { - LOG(ERROR) << " failed closing context, reason: " << zmq_strerror(errno); - } - else - { - fContext = NULL; - return; - } - } -} diff --git a/fairmq/shmem/FairMQContextSHM.h b/fairmq/shmem/FairMQContextSHM.h deleted file mode 100644 index 04113dbc..00000000 --- a/fairmq/shmem/FairMQContextSHM.h +++ /dev/null @@ -1,27 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -#ifndef FAIRMQCONTEXTSHM_H_ -#define FAIRMQCONTEXTSHM_H_ - -class FairMQContextSHM -{ - public: - /// Constructor - FairMQContextSHM(int numIoThreads); - FairMQContextSHM(const FairMQContextSHM&) = delete; - FairMQContextSHM operator=(const FairMQContextSHM&) = delete; - - virtual ~FairMQContextSHM(); - void* GetContext(); - void Close(); - - private: - void* fContext; -}; - -#endif /* FAIRMQCONTEXTSHM_H_ */ diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index ad07f71d..66a9d08b 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -16,12 +16,9 @@ using namespace std; using namespace FairMQ::shmem; -// Context to hold the ZeroMQ sockets -unique_ptr FairMQSocketSHM::fContext; // = unique_ptr(new FairMQContextSHM(1)); -bool FairMQSocketSHM::fContextInitialized = false; atomic FairMQSocketSHM::fInterrupted(false); -FairMQSocketSHM::FairMQSocketSHM(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) +FairMQSocketSHM::FairMQSocketSHM(const string& type, const string& name, const string& id /*= ""*/, void* context) : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) , fSocket(NULL) , fId() @@ -32,18 +29,8 @@ FairMQSocketSHM::FairMQSocketSHM(const string& type, const string& name, const i { fId = id + "." + name + "." + type; - if (!fContextInitialized) - { - fContext = unique_ptr(new FairMQContextSHM(1)); - fContextInitialized = true; - } - - if (zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads) != 0) - { - LOG(ERROR) << "Failed configuring context, reason: " << zmq_strerror(errno); - } - - fSocket = zmq_socket(fContext->GetContext(), GetConstant(type)); + assert(context); + fSocket = zmq_socket(context, GetConstant(type)); if (fSocket == NULL) { @@ -409,14 +396,6 @@ void FairMQSocketSHM::Close() fSocket = NULL; } -void FairMQSocketSHM::Terminate() -{ - if (zmq_ctx_destroy(fContext->GetContext()) != 0) - { - LOG(ERROR) << "Failed terminating context, reason: " << zmq_strerror(errno); - } -} - void FairMQSocketSHM::Interrupt() { FairMQMessageSHM::fInterrupted = true; diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/FairMQSocketSHM.h index 971b5dd6..c7b7f905 100644 --- a/fairmq/shmem/FairMQSocketSHM.h +++ b/fairmq/shmem/FairMQSocketSHM.h @@ -14,13 +14,12 @@ #include "FairMQSocket.h" #include "FairMQMessage.h" -#include "FairMQContextSHM.h" #include "FairMQShmManager.h" class FairMQSocketSHM : public FairMQSocket { public: - FairMQSocketSHM(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); + FairMQSocketSHM(const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr); FairMQSocketSHM(const FairMQSocketSHM&) = delete; FairMQSocketSHM operator=(const FairMQSocketSHM&) = delete; @@ -38,7 +37,6 @@ class FairMQSocketSHM : public FairMQSocket virtual void* GetSocket() const; virtual int GetSocket(int nothing) const; virtual void Close(); - virtual void Terminate(); virtual void Interrupt(); virtual void Resume(); @@ -68,8 +66,6 @@ class FairMQSocketSHM : public FairMQSocket std::atomic fMessagesTx; std::atomic fMessagesRx; - static std::unique_ptr fContext; - static bool fContextInitialized; static std::atomic fInterrupted; }; diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index 7e87064d..39881937 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -7,19 +7,59 @@ ********************************************************************************/ #include "zmq.h" #include +#include +#include "FairMQLogger.h" +#include "FairMQShmManager.h" #include "FairMQTransportFactorySHM.h" +#include "../options/FairMQProgOptions.h" using namespace std; +using namespace FairMQ::shmem; FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM; FairMQTransportFactorySHM::FairMQTransportFactorySHM() + : fContext(nullptr) { int major, minor, patch; zmq_version(&major, &minor, &patch); LOG(DEBUG) << "Transport: Using ZeroMQ (" << major << "." << minor << "." << patch << ") & " << "boost::interprocess (" << (BOOST_VERSION / 100000) << "." << (BOOST_VERSION / 100 % 1000) << "." << (BOOST_VERSION % 100) << ")"; + + fContext = zmq_ctx_new(); + if (!fContext) + { + LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); + exit(EXIT_FAILURE); + } +} + +void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config) +{ + int numIoThreads = 1; + if (config) + { + numIoThreads = config->GetValue("io-threads"); + } + else + { + LOG(WARN) << "shmem: FairMQProgOptions not available! Using defaults."; + } + + if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) + { + LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + } + + // Set the maximum number of allowed sockets on the context. + if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) + { + LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + } + + Manager::Instance().InitializeSegment("open_or_create", "FairMQSharedMemory", 2000000000); + LOG(DEBUG) << "shmem: created/opened shared memory segment of 2000000000 bytes. Available are " << Manager::Instance().Segment()->get_free_memory() << " bytes."; } FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage() const @@ -37,9 +77,10 @@ FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size return unique_ptr(new FairMQMessageSHM(data, size, ffn, hint)); } -FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) const +FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name, const string& id /*= ""*/) const { - return unique_ptr(new FairMQSocketSHM(type, name, numIoThreads, id)); + assert(fContext); + return unique_ptr(new FairMQSocketSHM(type, name, id, fContext)); } FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector& channels) const @@ -57,6 +98,46 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const FairMQSocket& cmdS return unique_ptr(new FairMQPollerSHM(cmdSocket, dataSocket)); } +void FairMQTransportFactorySHM::Shutdown() +{ + if (zmq_ctx_shutdown(fContext) != 0) + { + LOG(ERROR) << "shmem: failed shutting down context, reason: " << zmq_strerror(errno); + } +} + +void FairMQTransportFactorySHM::Terminate() +{ + if (fContext) + { + if (zmq_ctx_term(fContext) != 0) + { + if (errno == EINTR) + { + LOG(ERROR) << "shmem: failed closing context, reason: " << zmq_strerror(errno); + } + else + { + fContext = NULL; + return; + } + } + } + else + { + LOG(ERROR) << "shmem: Terminate(): context now available for shutdown"; + } + + if (boost::interprocess::shared_memory_object::remove("FairMQSharedMemory")) + { + LOG(DEBUG) << "shmem: successfully removed shared memory segment after the device has stopped."; + } + else + { + LOG(DEBUG) << "shmem: did not remove shared memory segment after the device stopped. Already removed?"; + } +} + FairMQ::Transport FairMQTransportFactorySHM::GetType() const { return fTransportType; diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index f83f037c..45321f5d 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -12,7 +12,6 @@ #include #include "FairMQTransportFactory.h" -#include "FairMQContextSHM.h" #include "FairMQMessageSHM.h" #include "FairMQSocketSHM.h" #include "FairMQPollerSHM.h" @@ -22,11 +21,13 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory public: FairMQTransportFactorySHM(); + virtual void Initialize(const FairMQProgOptions* config); + virtual FairMQMessagePtr CreateMessage() const; virtual FairMQMessagePtr CreateMessage(const size_t size) const; - virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const; + virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const; - virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const; + virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const std::string& id = "") const; virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const; virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const; @@ -34,10 +35,14 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory virtual FairMQ::Transport GetType() const; + virtual void Shutdown(); + virtual void Terminate(); + virtual ~FairMQTransportFactorySHM() {}; private: static FairMQ::Transport fTransportType; + void* fContext; }; #endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */ diff --git a/fairmq/zeromq/FairMQContextZMQ.cxx b/fairmq/zeromq/FairMQContextZMQ.cxx deleted file mode 100644 index 4cb52841..00000000 --- a/fairmq/zeromq/FairMQContextZMQ.cxx +++ /dev/null @@ -1,71 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQContextZMQ.cxx - * - * @since 2012-12-05 - * @author D. Klein, A. Rybalchenko - */ - -#include - -#include "FairMQLogger.h" -#include "FairMQContextZMQ.h" - -FairMQContextZMQ::FairMQContextZMQ(int numIoThreads) - : fContext() -{ - fContext = zmq_ctx_new(); - if (fContext == NULL) - { - LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); - exit(EXIT_FAILURE); - } - - if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) - { - LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); - } - - // Set the maximum number of allowed sockets on the context. - if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) - { - LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); - } -} - -FairMQContextZMQ::~FairMQContextZMQ() -{ - Close(); -} - -void* FairMQContextZMQ::GetContext() -{ - return fContext; -} - -void FairMQContextZMQ::Close() -{ - if (fContext == NULL) - { - return; - } - - if (zmq_ctx_destroy(fContext) != 0) - { - if (errno == EINTR) - { - LOG(ERROR) << " failed closing context, reason: " << zmq_strerror(errno); - } - else - { - fContext = NULL; - return; - } - } -} diff --git a/fairmq/zeromq/FairMQContextZMQ.h b/fairmq/zeromq/FairMQContextZMQ.h deleted file mode 100644 index 5de7da82..00000000 --- a/fairmq/zeromq/FairMQContextZMQ.h +++ /dev/null @@ -1,34 +0,0 @@ -/******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * - * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * - * copied verbatim in the file "LICENSE" * - ********************************************************************************/ -/** - * FairMQContextZMQ.h - * - * @since 2012-12-05 - * @author D. Klein, A. Rybalchenko - */ - -#ifndef FAIRMQCONTEXTZMQ_H_ -#define FAIRMQCONTEXTZMQ_H_ - -class FairMQContextZMQ -{ - public: - /// Constructor - FairMQContextZMQ(int numIoThreads); - FairMQContextZMQ(const FairMQContextZMQ&) = delete; - FairMQContextZMQ operator=(const FairMQContextZMQ&) = delete; - - virtual ~FairMQContextZMQ(); - void* GetContext(); - void Close(); - - private: - void* fContext; -}; - -#endif /* FAIRMQCONTEXTZMQ_H_ */ diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index a19bd084..9016940d 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -22,11 +22,9 @@ using namespace std; -// Context to hold the ZeroMQ sockets -unique_ptr FairMQSocketZMQ::fContext = unique_ptr(new FairMQContextZMQ(1)); atomic FairMQSocketZMQ::fInterrupted(false); -FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) +FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const string& id /*= ""*/, void* context) : FairMQSocket(ZMQ_SNDMORE, ZMQ_RCVMORE, ZMQ_DONTWAIT) , fSocket(NULL) , fId() @@ -37,12 +35,8 @@ FairMQSocketZMQ::FairMQSocketZMQ(const string& type, const string& name, const i { fId = id + "." + name + "." + type; - if (zmq_ctx_set(fContext->GetContext(), ZMQ_IO_THREADS, numIoThreads) != 0) - { - LOG(ERROR) << "Failed configuring context, reason: " << zmq_strerror(errno); - } - - fSocket = zmq_socket(fContext->GetContext(), GetConstant(type)); + assert(context); + fSocket = zmq_socket(context, GetConstant(type)); if (fSocket == NULL) { @@ -346,14 +340,6 @@ void FairMQSocketZMQ::Close() fSocket = NULL; } -void FairMQSocketZMQ::Terminate() -{ - if (zmq_ctx_destroy(fContext->GetContext()) != 0) - { - LOG(ERROR) << "Failed terminating context, reason: " << zmq_strerror(errno); - } -} - void FairMQSocketZMQ::Interrupt() { fInterrupted = true; diff --git a/fairmq/zeromq/FairMQSocketZMQ.h b/fairmq/zeromq/FairMQSocketZMQ.h index 6e6bf226..3023f8e6 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.h +++ b/fairmq/zeromq/FairMQSocketZMQ.h @@ -21,12 +21,11 @@ #include "FairMQSocket.h" #include "FairMQMessage.h" -#include "FairMQContextZMQ.h" class FairMQSocketZMQ : public FairMQSocket { public: - FairMQSocketZMQ(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = ""); + FairMQSocketZMQ(const std::string& type, const std::string& name, const std::string& id = "", void* context = nullptr); FairMQSocketZMQ(const FairMQSocketZMQ&) = delete; FairMQSocketZMQ operator=(const FairMQSocketZMQ&) = delete; @@ -44,7 +43,6 @@ class FairMQSocketZMQ : public FairMQSocket virtual void* GetSocket() const; virtual int GetSocket(int nothing) const; virtual void Close(); - virtual void Terminate(); virtual void Interrupt(); virtual void Resume(); @@ -74,7 +72,6 @@ class FairMQSocketZMQ : public FairMQSocket std::atomic fMessagesTx; std::atomic fMessagesRx; - static std::unique_ptr fContext; static std::atomic fInterrupted; }; diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index d9222879..78f19f78 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -15,16 +15,48 @@ #include "zmq.h" #include "FairMQTransportFactoryZMQ.h" +#include "../options/FairMQProgOptions.h" using namespace std; FairMQ::Transport FairMQTransportFactoryZMQ::fTransportType = FairMQ::Transport::ZMQ; FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ() + : fContext(zmq_ctx_new()) { int major, minor, patch; zmq_version(&major, &minor, &patch); LOG(DEBUG) << "Transport: Using ZeroMQ library, version: " << major << "." << minor << "." << patch; + + if (!fContext) + { + LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); + exit(EXIT_FAILURE); + } +} + +void FairMQTransportFactoryZMQ::Initialize(const FairMQProgOptions* config) +{ + int numIoThreads = 1; + if (config) + { + numIoThreads = config->GetValue("io-threads"); + } + else + { + LOG(WARN) << "zeromq: FairMQProgOptions not available! Using defaults."; + } + + if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) + { + LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + } + + // Set the maximum number of allowed sockets on the context. + if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) + { + LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + } } FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage() const @@ -42,9 +74,10 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(void* data, const size return unique_ptr(new FairMQMessageZMQ(data, size, ffn, hint)); } -FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name, const int numIoThreads, const string& id /*= ""*/) const +FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name, const string& id /*= ""*/) const { - return unique_ptr(new FairMQSocketZMQ(type, name, numIoThreads, id)); + assert(fContext); + return unique_ptr(new FairMQSocketZMQ(type, name, id, fContext)); } FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector& channels) const @@ -66,3 +99,34 @@ FairMQ::Transport FairMQTransportFactoryZMQ::GetType() const { return fTransportType; } + +void FairMQTransportFactoryZMQ::Shutdown() +{ + if (zmq_ctx_shutdown(fContext) != 0) + { + LOG(ERROR) << "zeromq: failed shutting down context, reason: " << zmq_strerror(errno); + } +} + +void FairMQTransportFactoryZMQ::Terminate() +{ + if (fContext) + { + if (zmq_ctx_term(fContext) != 0) + { + if (errno == EINTR) + { + LOG(ERROR) << " failed closing context, reason: " << zmq_strerror(errno); + } + else + { + fContext = nullptr; + return; + } + } + } + else + { + LOG(ERROR) << "shmem: Terminate(): context now available for shutdown"; + } +} diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 53258ffa..1be6f8a6 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -19,7 +19,6 @@ #include #include "FairMQTransportFactory.h" -#include "FairMQContextZMQ.h" #include "FairMQMessageZMQ.h" #include "FairMQSocketZMQ.h" #include "FairMQPollerZMQ.h" @@ -29,11 +28,13 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory public: FairMQTransportFactoryZMQ(); + virtual void Initialize(const FairMQProgOptions* config); + virtual FairMQMessagePtr CreateMessage() const; virtual FairMQMessagePtr CreateMessage(const size_t size) const; - virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = NULL) const; + virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const; - virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const int numIoThreads, const std::string& id = "") const; + virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const std::string& id = "") const; virtual FairMQPollerPtr CreatePoller(const std::vector& channels) const; virtual FairMQPollerPtr CreatePoller(const std::unordered_map>& channelsMap, const std::vector& channelList) const; @@ -41,10 +42,14 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory virtual FairMQ::Transport GetType() const; + virtual void Shutdown(); + virtual void Terminate(); + virtual ~FairMQTransportFactoryZMQ() {}; private: static FairMQ::Transport fTransportType; + void* fContext; }; #endif /* FAIRMQTRANSPORTFACTORYZMQ_H_ */