diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index d84fe9a7..76f44f02 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -1,16 +1,10 @@ /******************************************************************************** - * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * Copyright (C) 2012-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * * * - * This software is distributed under the terms of the * - * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -/** - * FairMQDevice.cxx - * - * @since 2012-10-25 - * @author D. Klein, A. Rybalchenko - */ #include #include // catching system signals @@ -851,35 +845,17 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/) } } -// DEPRECATED, use the string version -void FairMQDevice::SetTransport(FairMQTransportFactory* factory) -{ - if (fTransports.empty()) - { - fTransportFactory = shared_ptr(factory); - pair> t(fTransportFactory->GetType(), fTransportFactory); - fTransportFactory->Initialize(fConfig); - fTransports.insert(t); - } - else - { - LOG(ERROR) << "Transports container is not empty when setting transport. Setting twice?"; - ChangeState(ERROR_FOUND); - } -} - shared_ptr FairMQDevice::AddTransport(const string& transport) { unordered_map>::const_iterator i = fTransports.find(FairMQ::TransportTypes.at(transport)); if (i == fTransports.end()) { - auto tr = FairMQTransportFactory::CreateTransportFactory(transport, fId); + auto tr = FairMQTransportFactory::CreateTransportFactory(transport, fId, fConfig); LOG(DEBUG) << "Adding '" << transport << "' transport to the device."; pair> trPair(FairMQ::TransportTypes.at(transport), tr); - tr->Initialize(fConfig); fTransports.insert(trPair); auto p = fDeviceCmdSockets.emplace(tr->GetType(), tr->CreateSocket("pub", "device-commands")); @@ -934,8 +910,6 @@ unique_ptr FairMQDevice::MakeTransport(const string& tra return tr; } - tr->Initialize(nullptr); - return move(tr); } @@ -1232,15 +1206,8 @@ const FairMQChannel& FairMQDevice::GetChannel(const std::string& channelName, co return fChannels.at(channelName).at(index); } - void FairMQDevice::Exit() { - // ask transports to terminate transfers - for (const auto& t : fTransports) - { - t.second->Shutdown(); - } - LOG(DEBUG) << "All transports are shut down."; } diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 2dfcb1f4..aae43165 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -319,9 +319,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// Print all properties of this and the parent class to LOG(INFO) virtual void ListProperties(); - /// Configures the device with a transport factory (DEPRECATED) - /// @param factory Pointer to the transport factory object - void SetTransport(FairMQTransportFactory* factory); /// Adds a transport to the device if it doesn't exist /// @param transport Transport string ("zeromq"/"nanomsg"/"shmem") std::shared_ptr AddTransport(const std::string& transport); diff --git a/fairmq/FairMQTransportFactory.cxx b/fairmq/FairMQTransportFactory.cxx index f7fe04c4..59feed20 100644 --- a/fairmq/FairMQTransportFactory.cxx +++ b/fairmq/FairMQTransportFactory.cxx @@ -25,7 +25,7 @@ FairMQTransportFactory::FairMQTransportFactory(const std::string& id) { } -auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, const std::string& id) -> std::shared_ptr +auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, const std::string& id, const FairMQProgOptions* config) -> std::shared_ptr { using namespace std; @@ -39,16 +39,16 @@ auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, con if (type == "zeromq") { - return std::make_shared(final_id); + return std::make_shared(final_id, config); } else if (type == "shmem") { - return std::make_shared(final_id); + return std::make_shared(final_id, config); } #ifdef NANOMSG_FOUND else if (type == "nanomsg") { - return std::make_shared(final_id); + return std::make_shared(final_id, config); } #endif /* NANOMSG_FOUND */ else diff --git a/fairmq/FairMQTransportFactory.h b/fairmq/FairMQTransportFactory.h index 6489241d..52d14f18 100644 --- a/fairmq/FairMQTransportFactory.h +++ b/fairmq/FairMQTransportFactory.h @@ -35,9 +35,6 @@ class FairMQTransportFactory auto GetId() const -> const std::string { return fkId; }; - /// Initialize transport - virtual void Initialize(const FairMQProgOptions* config) = 0; - /// @brief Create empty FairMQMessage /// @return pointer to FairMQMessage virtual FairMQMessagePtr CreateMessage() const = 0; @@ -68,14 +65,9 @@ class FairMQTransportFactory /// Get transport type virtual FairMQ::Transport GetType() const = 0; - /// Shutdown transport (stop transfers, get ready for complete shutdown) - virtual void Shutdown() = 0; - /// Terminate transport (complete shutdown) - virtual void Terminate() = 0; - virtual ~FairMQTransportFactory() {}; - static auto CreateTransportFactory(const std::string& type, const std::string& id = "") -> std::shared_ptr; + static auto CreateTransportFactory(const std::string& type, const std::string& id = "", const FairMQProgOptions* config = nullptr) -> std::shared_ptr; static void FairMQNoCleanup(void* /*data*/, void* /*obj*/) { diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index e5c38c50..51b4e117 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -7,7 +7,6 @@ ********************************************************************************/ #include "FairMQTransportFactoryNN.h" -#include "../options/FairMQProgOptions.h" #include @@ -15,17 +14,12 @@ using namespace std; FairMQ::Transport FairMQTransportFactoryNN::fTransportType = FairMQ::Transport::NN; -FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id) +FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const FairMQProgOptions* config) : FairMQTransportFactory(id) { LOG(DEBUG) << "Transport: Using nanomsg library"; } -void FairMQTransportFactoryNN::Initialize(const FairMQProgOptions* config) -{ - // nothing to do for nanomsg, transport is ready to be used any time (until nn_term()). -} - FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage() const { return unique_ptr(new FairMQMessageNN()); @@ -66,17 +60,6 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSo return unique_ptr(new FairMQPollerNN(cmdSocket, dataSocket)); } -void FairMQTransportFactoryNN::Shutdown() -{ - // nn_term(); - // see https://www.freelists.org/post/nanomsg/Getting-rid-of-nn-init-and-nn-term,8 -} - -void FairMQTransportFactoryNN::Terminate() -{ - // nothing to do for nanomsg -} - FairMQ::Transport FairMQTransportFactoryNN::GetType() const { return fTransportType; @@ -84,5 +67,6 @@ FairMQ::Transport FairMQTransportFactoryNN::GetType() const FairMQTransportFactoryNN::~FairMQTransportFactoryNN() { - Terminate(); + // nn_term(); + // see https://www.freelists.org/post/nanomsg/Getting-rid-of-nn-init-and-nn-term,8 } diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index c4179781..a47d2b24 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -13,6 +13,7 @@ #include "FairMQMessageNN.h" #include "FairMQSocketNN.h" #include "FairMQPollerNN.h" +#include #include #include @@ -20,11 +21,9 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory { public: - FairMQTransportFactoryNN(const std::string& id = ""); + FairMQTransportFactoryNN(const std::string& id = "", const FairMQProgOptions* config = nullptr); ~FairMQTransportFactoryNN() override; - void Initialize(const FairMQProgOptions* config) override; - FairMQMessagePtr CreateMessage() const override; FairMQMessagePtr CreateMessage(const size_t size) const override; FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; @@ -38,9 +37,6 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory FairMQ::Transport GetType() const override; - void Shutdown() override; - void Terminate() override; - private: static FairMQ::Transport fTransportType; }; diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index e2a1db1c..b8461cff 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -9,7 +9,6 @@ #include "FairMQLogger.h" #include "FairMQShmManager.h" #include "FairMQTransportFactorySHM.h" -#include "../options/FairMQProgOptions.h" #include @@ -30,7 +29,7 @@ namespace bpt = boost::posix_time; FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM; -FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id) +FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config) : FairMQTransportFactory(id) , fContext(nullptr) , fHeartbeatSocket(nullptr) @@ -50,10 +49,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id) LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); exit(EXIT_FAILURE); } -} - -void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config) -{ + int numIoThreads = 1; size_t segmentSize = 2000000000; string segmentName = "fairmq_shmem_main"; @@ -172,19 +168,11 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const FairMQSocket& cmdS return unique_ptr(new FairMQPollerSHM(cmdSocket, dataSocket)); } -void FairMQTransportFactorySHM::Shutdown() +FairMQTransportFactorySHM::~FairMQTransportFactorySHM() { - if (zmq_ctx_shutdown(fContext) != 0) - { - LOG(ERROR) << "shmem: failed shutting down context, reason: " << zmq_strerror(errno); - } - fSendHeartbeats = false; fHeartbeatThread.join(); -} -void FairMQTransportFactorySHM::Terminate() -{ if (fContext) { if (zmq_ctx_term(fContext) != 0) @@ -234,8 +222,3 @@ FairMQ::Transport FairMQTransportFactorySHM::GetType() const { return fTransportType; } - -FairMQTransportFactorySHM::~FairMQTransportFactorySHM() -{ - Terminate(); -} diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 6089a834..1a69490d 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -14,6 +14,7 @@ #include "FairMQSocketSHM.h" #include "FairMQPollerSHM.h" #include "FairMQShmDeviceCounter.h" +#include #include #include @@ -26,9 +27,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory { public: - FairMQTransportFactorySHM(const std::string& id = ""); - - void Initialize(const FairMQProgOptions* config) override; + FairMQTransportFactorySHM(const std::string& id = "", const FairMQProgOptions* config = nullptr); FairMQMessagePtr CreateMessage() const override; FairMQMessagePtr CreateMessage(const size_t size) const override; @@ -43,9 +42,6 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory FairMQ::Transport GetType() const override; - void Shutdown() override; - void Terminate() override; - void SendHeartbeats(); ~FairMQTransportFactorySHM() override; diff --git a/fairmq/test/protocols/_push_pull_multipart.cxx b/fairmq/test/protocols/_push_pull_multipart.cxx index f9c1a0e5..02f652b2 100644 --- a/fairmq/test/protocols/_push_pull_multipart.cxx +++ b/fairmq/test/protocols/_push_pull_multipart.cxx @@ -24,7 +24,6 @@ using namespace std; auto RunSingleThreadedMultipart(string transport, string address) -> void { auto factory = FairMQTransportFactory::CreateTransportFactory(transport); - factory->Initialize(nullptr); auto push = FairMQChannel{"Push", "push", factory}; ASSERT_TRUE(push.Bind(address)); auto pull = FairMQChannel{"Pull", "pull", factory}; @@ -52,14 +51,11 @@ auto RunSingleThreadedMultipart(string transport, string address) -> void { out << string{static_cast(part->GetData()), part->GetSize()}; }); ASSERT_EQ(out.str(), "123"); - - factory->Shutdown(); } auto RunMultiThreadedMultipart(string transport, string address) -> void { auto factory = FairMQTransportFactory::CreateTransportFactory(transport); - factory->Initialize(nullptr); auto push = FairMQChannel{"Push", "push", factory}; ASSERT_TRUE(push.Bind(address)); auto pull = FairMQChannel{"Pull", "pull", factory}; @@ -91,8 +87,6 @@ auto RunMultiThreadedMultipart(string transport, string address) -> void pusher.join(); puller.join(); - - factory->Shutdown(); } TEST(PushPull, ST_ZeroMQ__inproc_Multipart) diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index c0b2176f..224ccec4 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -7,14 +7,13 @@ ********************************************************************************/ #include "FairMQTransportFactoryZMQ.h" -#include "../options/FairMQProgOptions.h" #include using namespace std; FairMQ::Transport FairMQTransportFactoryZMQ::fTransportType = FairMQ::Transport::ZMQ; -FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id) +FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const FairMQProgOptions* config) : FairMQTransportFactory(id) , fContext(zmq_ctx_new()) { @@ -32,10 +31,7 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id) { LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); } -} -void FairMQTransportFactoryZMQ::Initialize(const FairMQProgOptions* config) -{ int numIoThreads = 1; if (config) { @@ -50,6 +46,7 @@ void FairMQTransportFactoryZMQ::Initialize(const FairMQProgOptions* config) { LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); } + } FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage() const @@ -98,15 +95,7 @@ FairMQ::Transport FairMQTransportFactoryZMQ::GetType() const return fTransportType; } -void FairMQTransportFactoryZMQ::Shutdown() -{ - if (zmq_ctx_shutdown(fContext) != 0) - { - LOG(ERROR) << "zeromq: failed shutting down context, reason: " << zmq_strerror(errno); - } -} - -void FairMQTransportFactoryZMQ::Terminate() +FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ() { if (fContext) { @@ -128,8 +117,3 @@ void FairMQTransportFactoryZMQ::Terminate() LOG(ERROR) << "shmem: Terminate(): context now available for shutdown"; } } - -FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ() -{ - Terminate(); -} diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index 892ca491..248e173a 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -22,15 +22,14 @@ #include "FairMQMessageZMQ.h" #include "FairMQSocketZMQ.h" #include "FairMQPollerZMQ.h" +#include class FairMQTransportFactoryZMQ : public FairMQTransportFactory { public: - FairMQTransportFactoryZMQ(const std::string& id = ""); + FairMQTransportFactoryZMQ(const std::string& id = "", const FairMQProgOptions* config = nullptr); ~FairMQTransportFactoryZMQ() override; - void Initialize(const FairMQProgOptions* config) override; - FairMQMessagePtr CreateMessage() const override; FairMQMessagePtr CreateMessage(const size_t size) const override; FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override; @@ -43,10 +42,6 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override; FairMQ::Transport GetType() const override; - - void Shutdown() override; - void Terminate() override; - private: static FairMQ::Transport fTransportType; void* fContext;