mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-12 16:21:13 +00:00
Refactor TransportFactory to RAII
* Remove explicit Initialize and Terminate states, map them onto ctor/dtor * Remove no longer needed Shutdown state * Remove deprecated SetTransport()
This commit is contained in:
parent
733794657c
commit
8bc21675af
|
@ -1,16 +1,10 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2012-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQDevice.cxx
|
||||
*
|
||||
* @since 2012-10-25
|
||||
* @author D. Klein, A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include <list>
|
||||
#include <csignal> // catching system signals
|
||||
|
@ -851,35 +845,17 @@ int FairMQDevice::GetProperty(const int key, const int default_ /*= 0*/)
|
|||
}
|
||||
}
|
||||
|
||||
// DEPRECATED, use the string version
|
||||
void FairMQDevice::SetTransport(FairMQTransportFactory* factory)
|
||||
{
|
||||
if (fTransports.empty())
|
||||
{
|
||||
fTransportFactory = shared_ptr<FairMQTransportFactory>(factory);
|
||||
pair<FairMQ::Transport, shared_ptr<FairMQTransportFactory>> t(fTransportFactory->GetType(), fTransportFactory);
|
||||
fTransportFactory->Initialize(fConfig);
|
||||
fTransports.insert(t);
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG(ERROR) << "Transports container is not empty when setting transport. Setting twice?";
|
||||
ChangeState(ERROR_FOUND);
|
||||
}
|
||||
}
|
||||
|
||||
shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(const string& transport)
|
||||
{
|
||||
unordered_map<FairMQ::Transport, shared_ptr<FairMQTransportFactory>>::const_iterator i = fTransports.find(FairMQ::TransportTypes.at(transport));
|
||||
|
||||
if (i == fTransports.end())
|
||||
{
|
||||
auto tr = FairMQTransportFactory::CreateTransportFactory(transport, fId);
|
||||
auto tr = FairMQTransportFactory::CreateTransportFactory(transport, fId, fConfig);
|
||||
|
||||
LOG(DEBUG) << "Adding '" << transport << "' transport to the device.";
|
||||
|
||||
pair<FairMQ::Transport, shared_ptr<FairMQTransportFactory>> trPair(FairMQ::TransportTypes.at(transport), tr);
|
||||
tr->Initialize(fConfig);
|
||||
fTransports.insert(trPair);
|
||||
|
||||
auto p = fDeviceCmdSockets.emplace(tr->GetType(), tr->CreateSocket("pub", "device-commands"));
|
||||
|
@ -934,8 +910,6 @@ unique_ptr<FairMQTransportFactory> FairMQDevice::MakeTransport(const string& tra
|
|||
return tr;
|
||||
}
|
||||
|
||||
tr->Initialize(nullptr);
|
||||
|
||||
return move(tr);
|
||||
}
|
||||
|
||||
|
@ -1232,15 +1206,8 @@ const FairMQChannel& FairMQDevice::GetChannel(const std::string& channelName, co
|
|||
return fChannels.at(channelName).at(index);
|
||||
}
|
||||
|
||||
|
||||
void FairMQDevice::Exit()
|
||||
{
|
||||
// ask transports to terminate transfers
|
||||
for (const auto& t : fTransports)
|
||||
{
|
||||
t.second->Shutdown();
|
||||
}
|
||||
|
||||
LOG(DEBUG) << "All transports are shut down.";
|
||||
}
|
||||
|
||||
|
|
|
@ -319,9 +319,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable
|
|||
/// Print all properties of this and the parent class to LOG(INFO)
|
||||
virtual void ListProperties();
|
||||
|
||||
/// Configures the device with a transport factory (DEPRECATED)
|
||||
/// @param factory Pointer to the transport factory object
|
||||
void SetTransport(FairMQTransportFactory* factory);
|
||||
/// Adds a transport to the device if it doesn't exist
|
||||
/// @param transport Transport string ("zeromq"/"nanomsg"/"shmem")
|
||||
std::shared_ptr<FairMQTransportFactory> AddTransport(const std::string& transport);
|
||||
|
|
|
@ -25,7 +25,7 @@ FairMQTransportFactory::FairMQTransportFactory(const std::string& id)
|
|||
{
|
||||
}
|
||||
|
||||
auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, const std::string& id) -> std::shared_ptr<FairMQTransportFactory>
|
||||
auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, const std::string& id, const FairMQProgOptions* config) -> std::shared_ptr<FairMQTransportFactory>
|
||||
{
|
||||
using namespace std;
|
||||
|
||||
|
@ -39,16 +39,16 @@ auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, con
|
|||
|
||||
if (type == "zeromq")
|
||||
{
|
||||
return std::make_shared<FairMQTransportFactoryZMQ>(final_id);
|
||||
return std::make_shared<FairMQTransportFactoryZMQ>(final_id, config);
|
||||
}
|
||||
else if (type == "shmem")
|
||||
{
|
||||
return std::make_shared<FairMQTransportFactorySHM>(final_id);
|
||||
return std::make_shared<FairMQTransportFactorySHM>(final_id, config);
|
||||
}
|
||||
#ifdef NANOMSG_FOUND
|
||||
else if (type == "nanomsg")
|
||||
{
|
||||
return std::make_shared<FairMQTransportFactoryNN>(final_id);
|
||||
return std::make_shared<FairMQTransportFactoryNN>(final_id, config);
|
||||
}
|
||||
#endif /* NANOMSG_FOUND */
|
||||
else
|
||||
|
|
|
@ -35,9 +35,6 @@ class FairMQTransportFactory
|
|||
|
||||
auto GetId() const -> const std::string { return fkId; };
|
||||
|
||||
/// Initialize transport
|
||||
virtual void Initialize(const FairMQProgOptions* config) = 0;
|
||||
|
||||
/// @brief Create empty FairMQMessage
|
||||
/// @return pointer to FairMQMessage
|
||||
virtual FairMQMessagePtr CreateMessage() const = 0;
|
||||
|
@ -68,14 +65,9 @@ class FairMQTransportFactory
|
|||
/// Get transport type
|
||||
virtual FairMQ::Transport GetType() const = 0;
|
||||
|
||||
/// Shutdown transport (stop transfers, get ready for complete shutdown)
|
||||
virtual void Shutdown() = 0;
|
||||
/// Terminate transport (complete shutdown)
|
||||
virtual void Terminate() = 0;
|
||||
|
||||
virtual ~FairMQTransportFactory() {};
|
||||
|
||||
static auto CreateTransportFactory(const std::string& type, const std::string& id = "") -> std::shared_ptr<FairMQTransportFactory>;
|
||||
static auto CreateTransportFactory(const std::string& type, const std::string& id = "", const FairMQProgOptions* config = nullptr) -> std::shared_ptr<FairMQTransportFactory>;
|
||||
|
||||
static void FairMQNoCleanup(void* /*data*/, void* /*obj*/)
|
||||
{
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
********************************************************************************/
|
||||
|
||||
#include "FairMQTransportFactoryNN.h"
|
||||
#include "../options/FairMQProgOptions.h"
|
||||
|
||||
#include <nanomsg/nn.h>
|
||||
|
||||
|
@ -15,17 +14,12 @@ using namespace std;
|
|||
|
||||
FairMQ::Transport FairMQTransportFactoryNN::fTransportType = FairMQ::Transport::NN;
|
||||
|
||||
FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id)
|
||||
FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id, const FairMQProgOptions* config)
|
||||
: FairMQTransportFactory(id)
|
||||
{
|
||||
LOG(DEBUG) << "Transport: Using nanomsg library";
|
||||
}
|
||||
|
||||
void FairMQTransportFactoryNN::Initialize(const FairMQProgOptions* config)
|
||||
{
|
||||
// nothing to do for nanomsg, transport is ready to be used any time (until nn_term()).
|
||||
}
|
||||
|
||||
FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage() const
|
||||
{
|
||||
return unique_ptr<FairMQMessage>(new FairMQMessageNN());
|
||||
|
@ -66,17 +60,6 @@ FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const FairMQSocket& cmdSo
|
|||
return unique_ptr<FairMQPoller>(new FairMQPollerNN(cmdSocket, dataSocket));
|
||||
}
|
||||
|
||||
void FairMQTransportFactoryNN::Shutdown()
|
||||
{
|
||||
// nn_term();
|
||||
// see https://www.freelists.org/post/nanomsg/Getting-rid-of-nn-init-and-nn-term,8
|
||||
}
|
||||
|
||||
void FairMQTransportFactoryNN::Terminate()
|
||||
{
|
||||
// nothing to do for nanomsg
|
||||
}
|
||||
|
||||
FairMQ::Transport FairMQTransportFactoryNN::GetType() const
|
||||
{
|
||||
return fTransportType;
|
||||
|
@ -84,5 +67,6 @@ FairMQ::Transport FairMQTransportFactoryNN::GetType() const
|
|||
|
||||
FairMQTransportFactoryNN::~FairMQTransportFactoryNN()
|
||||
{
|
||||
Terminate();
|
||||
// nn_term();
|
||||
// see https://www.freelists.org/post/nanomsg/Getting-rid-of-nn-init-and-nn-term,8
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#include "FairMQMessageNN.h"
|
||||
#include "FairMQSocketNN.h"
|
||||
#include "FairMQPollerNN.h"
|
||||
#include <options/FairMQProgOptions.h>
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
@ -20,11 +21,9 @@
|
|||
class FairMQTransportFactoryNN : public FairMQTransportFactory
|
||||
{
|
||||
public:
|
||||
FairMQTransportFactoryNN(const std::string& id = "");
|
||||
FairMQTransportFactoryNN(const std::string& id = "", const FairMQProgOptions* config = nullptr);
|
||||
~FairMQTransportFactoryNN() override;
|
||||
|
||||
void Initialize(const FairMQProgOptions* config) override;
|
||||
|
||||
FairMQMessagePtr CreateMessage() const override;
|
||||
FairMQMessagePtr CreateMessage(const size_t size) const override;
|
||||
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override;
|
||||
|
@ -38,9 +37,6 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
|
|||
|
||||
FairMQ::Transport GetType() const override;
|
||||
|
||||
void Shutdown() override;
|
||||
void Terminate() override;
|
||||
|
||||
private:
|
||||
static FairMQ::Transport fTransportType;
|
||||
};
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
#include "FairMQLogger.h"
|
||||
#include "FairMQShmManager.h"
|
||||
#include "FairMQTransportFactorySHM.h"
|
||||
#include "../options/FairMQProgOptions.h"
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
|
@ -30,7 +29,7 @@ namespace bpt = boost::posix_time;
|
|||
|
||||
FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM;
|
||||
|
||||
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id)
|
||||
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config)
|
||||
: FairMQTransportFactory(id)
|
||||
, fContext(nullptr)
|
||||
, fHeartbeatSocket(nullptr)
|
||||
|
@ -50,10 +49,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id)
|
|||
LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno);
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config)
|
||||
{
|
||||
int numIoThreads = 1;
|
||||
size_t segmentSize = 2000000000;
|
||||
string segmentName = "fairmq_shmem_main";
|
||||
|
@ -172,19 +168,11 @@ FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const FairMQSocket& cmdS
|
|||
return unique_ptr<FairMQPoller>(new FairMQPollerSHM(cmdSocket, dataSocket));
|
||||
}
|
||||
|
||||
void FairMQTransportFactorySHM::Shutdown()
|
||||
FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
|
||||
{
|
||||
if (zmq_ctx_shutdown(fContext) != 0)
|
||||
{
|
||||
LOG(ERROR) << "shmem: failed shutting down context, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
fSendHeartbeats = false;
|
||||
fHeartbeatThread.join();
|
||||
}
|
||||
|
||||
void FairMQTransportFactorySHM::Terminate()
|
||||
{
|
||||
if (fContext)
|
||||
{
|
||||
if (zmq_ctx_term(fContext) != 0)
|
||||
|
@ -234,8 +222,3 @@ FairMQ::Transport FairMQTransportFactorySHM::GetType() const
|
|||
{
|
||||
return fTransportType;
|
||||
}
|
||||
|
||||
FairMQTransportFactorySHM::~FairMQTransportFactorySHM()
|
||||
{
|
||||
Terminate();
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "FairMQSocketSHM.h"
|
||||
#include "FairMQPollerSHM.h"
|
||||
#include "FairMQShmDeviceCounter.h"
|
||||
#include <options/FairMQProgOptions.h>
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
@ -26,9 +27,7 @@
|
|||
class FairMQTransportFactorySHM : public FairMQTransportFactory
|
||||
{
|
||||
public:
|
||||
FairMQTransportFactorySHM(const std::string& id = "");
|
||||
|
||||
void Initialize(const FairMQProgOptions* config) override;
|
||||
FairMQTransportFactorySHM(const std::string& id = "", const FairMQProgOptions* config = nullptr);
|
||||
|
||||
FairMQMessagePtr CreateMessage() const override;
|
||||
FairMQMessagePtr CreateMessage(const size_t size) const override;
|
||||
|
@ -43,9 +42,6 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
|
|||
|
||||
FairMQ::Transport GetType() const override;
|
||||
|
||||
void Shutdown() override;
|
||||
void Terminate() override;
|
||||
|
||||
void SendHeartbeats();
|
||||
|
||||
~FairMQTransportFactorySHM() override;
|
||||
|
|
|
@ -24,7 +24,6 @@ using namespace std;
|
|||
|
||||
auto RunSingleThreadedMultipart(string transport, string address) -> void {
|
||||
auto factory = FairMQTransportFactory::CreateTransportFactory(transport);
|
||||
factory->Initialize(nullptr);
|
||||
auto push = FairMQChannel{"Push", "push", factory};
|
||||
ASSERT_TRUE(push.Bind(address));
|
||||
auto pull = FairMQChannel{"Pull", "pull", factory};
|
||||
|
@ -52,14 +51,11 @@ auto RunSingleThreadedMultipart(string transport, string address) -> void {
|
|||
out << string{static_cast<char*>(part->GetData()), part->GetSize()};
|
||||
});
|
||||
ASSERT_EQ(out.str(), "123");
|
||||
|
||||
factory->Shutdown();
|
||||
}
|
||||
|
||||
auto RunMultiThreadedMultipart(string transport, string address) -> void
|
||||
{
|
||||
auto factory = FairMQTransportFactory::CreateTransportFactory(transport);
|
||||
factory->Initialize(nullptr);
|
||||
auto push = FairMQChannel{"Push", "push", factory};
|
||||
ASSERT_TRUE(push.Bind(address));
|
||||
auto pull = FairMQChannel{"Pull", "pull", factory};
|
||||
|
@ -91,8 +87,6 @@ auto RunMultiThreadedMultipart(string transport, string address) -> void
|
|||
|
||||
pusher.join();
|
||||
puller.join();
|
||||
|
||||
factory->Shutdown();
|
||||
}
|
||||
|
||||
TEST(PushPull, ST_ZeroMQ__inproc_Multipart)
|
||||
|
|
|
@ -7,14 +7,13 @@
|
|||
********************************************************************************/
|
||||
|
||||
#include "FairMQTransportFactoryZMQ.h"
|
||||
#include "../options/FairMQProgOptions.h"
|
||||
#include <zmq.h>
|
||||
|
||||
using namespace std;
|
||||
|
||||
FairMQ::Transport FairMQTransportFactoryZMQ::fTransportType = FairMQ::Transport::ZMQ;
|
||||
|
||||
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id)
|
||||
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id, const FairMQProgOptions* config)
|
||||
: FairMQTransportFactory(id)
|
||||
, fContext(zmq_ctx_new())
|
||||
{
|
||||
|
@ -32,10 +31,7 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id)
|
|||
{
|
||||
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQTransportFactoryZMQ::Initialize(const FairMQProgOptions* config)
|
||||
{
|
||||
int numIoThreads = 1;
|
||||
if (config)
|
||||
{
|
||||
|
@ -50,6 +46,7 @@ void FairMQTransportFactoryZMQ::Initialize(const FairMQProgOptions* config)
|
|||
{
|
||||
LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage() const
|
||||
|
@ -98,15 +95,7 @@ FairMQ::Transport FairMQTransportFactoryZMQ::GetType() const
|
|||
return fTransportType;
|
||||
}
|
||||
|
||||
void FairMQTransportFactoryZMQ::Shutdown()
|
||||
{
|
||||
if (zmq_ctx_shutdown(fContext) != 0)
|
||||
{
|
||||
LOG(ERROR) << "zeromq: failed shutting down context, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
}
|
||||
|
||||
void FairMQTransportFactoryZMQ::Terminate()
|
||||
FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ()
|
||||
{
|
||||
if (fContext)
|
||||
{
|
||||
|
@ -128,8 +117,3 @@ void FairMQTransportFactoryZMQ::Terminate()
|
|||
LOG(ERROR) << "shmem: Terminate(): context now available for shutdown";
|
||||
}
|
||||
}
|
||||
|
||||
FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ()
|
||||
{
|
||||
Terminate();
|
||||
}
|
||||
|
|
|
@ -22,15 +22,14 @@
|
|||
#include "FairMQMessageZMQ.h"
|
||||
#include "FairMQSocketZMQ.h"
|
||||
#include "FairMQPollerZMQ.h"
|
||||
#include <options/FairMQProgOptions.h>
|
||||
|
||||
class FairMQTransportFactoryZMQ : public FairMQTransportFactory
|
||||
{
|
||||
public:
|
||||
FairMQTransportFactoryZMQ(const std::string& id = "");
|
||||
FairMQTransportFactoryZMQ(const std::string& id = "", const FairMQProgOptions* config = nullptr);
|
||||
~FairMQTransportFactoryZMQ() override;
|
||||
|
||||
void Initialize(const FairMQProgOptions* config) override;
|
||||
|
||||
FairMQMessagePtr CreateMessage() const override;
|
||||
FairMQMessagePtr CreateMessage(const size_t size) const override;
|
||||
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override;
|
||||
|
@ -43,10 +42,6 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
|
|||
FairMQPollerPtr CreatePoller(const FairMQSocket& cmdSocket, const FairMQSocket& dataSocket) const override;
|
||||
|
||||
FairMQ::Transport GetType() const override;
|
||||
|
||||
void Shutdown() override;
|
||||
void Terminate() override;
|
||||
|
||||
private:
|
||||
static FairMQ::Transport fTransportType;
|
||||
void* fContext;
|
||||
|
|
Loading…
Reference in New Issue
Block a user