mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-12 16:21:13 +00:00
generate id if no device id available
* CreateSocket factory no longer accepts id param, but the TransportFactory has an id member instead
This commit is contained in:
parent
eb614b6005
commit
3be2f297f3
|
@ -76,7 +76,7 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str
|
|||
}
|
||||
|
||||
FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared_ptr<FairMQTransportFactory> factory)
|
||||
: fSocket(factory->CreateSocket(type, name, name)) // TODO whats id and whats name?
|
||||
: fSocket(factory->CreateSocket(type, name))
|
||||
, fType(type)
|
||||
, fMethod("unspecified")
|
||||
, fAddress("unspecified")
|
||||
|
@ -665,7 +665,7 @@ void FairMQChannel::InitTransport(shared_ptr<FairMQTransportFactory> factory)
|
|||
|
||||
bool FairMQChannel::InitCommandInterface()
|
||||
{
|
||||
fChannelCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands", "internal");
|
||||
fChannelCmdSocket = fTransportFactory->CreateSocket("sub", "device-commands");
|
||||
if (fChannelCmdSocket)
|
||||
{
|
||||
fChannelCmdSocket->Connect("inproc://commands");
|
||||
|
|
|
@ -155,7 +155,7 @@ void FairMQDevice::InitWrapper()
|
|||
|
||||
if (fDeviceCmdSockets.empty())
|
||||
{
|
||||
auto p = fDeviceCmdSockets.emplace(fTransportFactory->GetType(), fTransportFactory->CreateSocket("pub", "device-commands", fId));
|
||||
auto p = fDeviceCmdSockets.emplace(fTransportFactory->GetType(), fTransportFactory->CreateSocket("pub", "device-commands"));
|
||||
if (p.second)
|
||||
{
|
||||
p.first->second->Bind("inproc://commands");
|
||||
|
@ -303,7 +303,7 @@ bool FairMQDevice::AttachChannel(FairMQChannel& ch)
|
|||
//(re-)init socket
|
||||
if (!ch.fSocket)
|
||||
{
|
||||
ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName, fId);
|
||||
ch.fSocket = ch.fTransportFactory->CreateSocket(ch.fType, ch.fName);
|
||||
}
|
||||
|
||||
// set high water marks
|
||||
|
@ -874,7 +874,7 @@ shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(const string& tran
|
|||
|
||||
if (i == fTransports.end())
|
||||
{
|
||||
auto tr = FairMQTransportFactory::CreateTransportFactory(transport);
|
||||
auto tr = FairMQTransportFactory::CreateTransportFactory(transport, fId);
|
||||
|
||||
LOG(DEBUG) << "Adding '" << transport << "' transport to the device.";
|
||||
|
||||
|
@ -882,7 +882,7 @@ shared_ptr<FairMQTransportFactory> FairMQDevice::AddTransport(const string& tran
|
|||
tr->Initialize(fConfig);
|
||||
fTransports.insert(trPair);
|
||||
|
||||
auto p = fDeviceCmdSockets.emplace(tr->GetType(), tr->CreateSocket("pub", "device-commands", fId));
|
||||
auto p = fDeviceCmdSockets.emplace(tr->GetType(), tr->CreateSocket("pub", "device-commands"));
|
||||
if (p.second)
|
||||
{
|
||||
p.first->second->Bind("inproc://commands");
|
||||
|
|
|
@ -14,21 +14,41 @@
|
|||
#endif /* NANOMSG_FOUND */
|
||||
#include <FairMQLogger.h>
|
||||
#include <memory>
|
||||
#include <boost/uuid/uuid.hpp>
|
||||
#include <boost/uuid/uuid_generators.hpp>
|
||||
#include <boost/uuid/uuid_io.hpp>
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
|
||||
auto FairMQTransportFactory::CreateTransportFactory(const std::string& type) -> std::shared_ptr<FairMQTransportFactory>
|
||||
FairMQTransportFactory::FairMQTransportFactory(const std::string& id)
|
||||
: fkId(id)
|
||||
{
|
||||
}
|
||||
|
||||
auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, const std::string& id) -> std::shared_ptr<FairMQTransportFactory>
|
||||
{
|
||||
using namespace std;
|
||||
|
||||
auto final_id = id;
|
||||
|
||||
// Generate uuid if empty
|
||||
if(final_id == "")
|
||||
{
|
||||
final_id = boost::uuids::to_string(boost::uuids::random_generator()());
|
||||
}
|
||||
|
||||
if (type == "zeromq")
|
||||
{
|
||||
return std::make_shared<FairMQTransportFactoryZMQ>();
|
||||
return std::make_shared<FairMQTransportFactoryZMQ>(final_id);
|
||||
}
|
||||
else if (type == "shmem")
|
||||
{
|
||||
return std::make_shared<FairMQTransportFactorySHM>();
|
||||
return std::make_shared<FairMQTransportFactorySHM>(final_id);
|
||||
}
|
||||
#ifdef NANOMSG_FOUND
|
||||
else if (type == "nanomsg")
|
||||
{
|
||||
return std::make_shared<FairMQTransportFactoryNN>();
|
||||
return std::make_shared<FairMQTransportFactoryNN>(final_id);
|
||||
}
|
||||
#endif /* NANOMSG_FOUND */
|
||||
else
|
||||
|
|
|
@ -1,37 +1,40 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQTransportFactory.h
|
||||
*
|
||||
* @since 2014-01-20
|
||||
* @author: A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQTRANSPORTFACTORY_H_
|
||||
#define FAIRMQTRANSPORTFACTORY_H_
|
||||
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
|
||||
#include "FairMQMessage.h"
|
||||
#include "FairMQSocket.h"
|
||||
#include "FairMQPoller.h"
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQTransports.h"
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include <vector>
|
||||
#include <unordered_map>
|
||||
|
||||
class FairMQChannel;
|
||||
class FairMQProgOptions;
|
||||
|
||||
class FairMQTransportFactory
|
||||
{
|
||||
private:
|
||||
/// Topology wide unique id
|
||||
const std::string fkId;
|
||||
|
||||
public:
|
||||
/// ctor
|
||||
/// @param id Topology wide unique id, usually the device id.
|
||||
FairMQTransportFactory(const std::string& id);
|
||||
|
||||
auto GetId() const -> const std::string { return fkId; };
|
||||
|
||||
/// Initialize transport
|
||||
virtual void Initialize(const FairMQProgOptions* config) = 0;
|
||||
|
||||
|
@ -51,7 +54,7 @@ class FairMQTransportFactory
|
|||
virtual FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const = 0;
|
||||
|
||||
/// Create a socket
|
||||
virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const std::string& id = "") const = 0;
|
||||
virtual FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const = 0;
|
||||
|
||||
/// Create a poller for all device channels
|
||||
virtual FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const = 0;
|
||||
|
@ -70,7 +73,7 @@ class FairMQTransportFactory
|
|||
|
||||
virtual ~FairMQTransportFactory() {};
|
||||
|
||||
static auto CreateTransportFactory(const std::string& type) -> std::shared_ptr<FairMQTransportFactory>;
|
||||
static auto CreateTransportFactory(const std::string& type, const std::string& id = "") -> std::shared_ptr<FairMQTransportFactory>;
|
||||
|
||||
static void FairMQNoCleanup(void* /*data*/, void* /*obj*/)
|
||||
{
|
||||
|
|
|
@ -1,16 +1,10 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung Gmb *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQTransportFactoryNN.cxx
|
||||
*
|
||||
* @since 2014-01-20
|
||||
* @author: A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include "FairMQTransportFactoryNN.h"
|
||||
#include "../options/FairMQProgOptions.h"
|
||||
|
@ -21,7 +15,8 @@ using namespace std;
|
|||
|
||||
FairMQ::Transport FairMQTransportFactoryNN::fTransportType = FairMQ::Transport::NN;
|
||||
|
||||
FairMQTransportFactoryNN::FairMQTransportFactoryNN()
|
||||
FairMQTransportFactoryNN::FairMQTransportFactoryNN(const string& id)
|
||||
: FairMQTransportFactory(id)
|
||||
{
|
||||
LOG(DEBUG) << "Transport: Using nanomsg library";
|
||||
}
|
||||
|
@ -46,9 +41,9 @@ FairMQMessagePtr FairMQTransportFactoryNN::CreateMessage(void* data, const size_
|
|||
return unique_ptr<FairMQMessage>(new FairMQMessageNN(data, size, ffn, hint));
|
||||
}
|
||||
|
||||
FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name, const string& id /*= ""*/) const
|
||||
FairMQSocketPtr FairMQTransportFactoryNN::CreateSocket(const string& type, const string& name) const
|
||||
{
|
||||
return unique_ptr<FairMQSocket>(new FairMQSocketNN(type, name, id));
|
||||
return unique_ptr<FairMQSocket>(new FairMQSocketNN(type, name, GetId()));
|
||||
}
|
||||
|
||||
FairMQPollerPtr FairMQTransportFactoryNN::CreatePoller(const vector<FairMQChannel>& channels) const
|
||||
|
|
|
@ -1,32 +1,26 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQTransportFactoryNN.h
|
||||
*
|
||||
* @since 2014-01-20
|
||||
* @author: A. Rybalchenko
|
||||
*/
|
||||
|
||||
#ifndef FAIRMQTRANSPORTFACTORYNN_H_
|
||||
#define FAIRMQTRANSPORTFACTORYNN_H_
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
||||
#include "FairMQTransportFactory.h"
|
||||
#include "FairMQMessageNN.h"
|
||||
#include "FairMQSocketNN.h"
|
||||
#include "FairMQPollerNN.h"
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
||||
class FairMQTransportFactoryNN : public FairMQTransportFactory
|
||||
{
|
||||
public:
|
||||
FairMQTransportFactoryNN();
|
||||
FairMQTransportFactoryNN(const std::string& id = "");
|
||||
~FairMQTransportFactoryNN() override;
|
||||
|
||||
void Initialize(const FairMQProgOptions* config) override;
|
||||
|
@ -35,7 +29,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory
|
|||
FairMQMessagePtr CreateMessage(const size_t size) const override;
|
||||
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override;
|
||||
|
||||
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const std::string& id = "") const override;
|
||||
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;
|
||||
|
||||
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
|
||||
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
||||
|
|
|
@ -1,10 +1,16 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2016-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQShmManager.h"
|
||||
#include "FairMQTransportFactorySHM.h"
|
||||
#include "../options/FairMQProgOptions.h"
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include <boost/version.hpp>
|
||||
|
@ -17,11 +23,6 @@
|
|||
|
||||
#include <chrono>
|
||||
|
||||
#include "FairMQLogger.h"
|
||||
#include "FairMQShmManager.h"
|
||||
#include "FairMQTransportFactorySHM.h"
|
||||
#include "../options/FairMQProgOptions.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace fair::mq::shmem;
|
||||
namespace bipc = boost::interprocess;
|
||||
|
@ -29,8 +30,9 @@ namespace bpt = boost::posix_time;
|
|||
|
||||
FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM;
|
||||
|
||||
FairMQTransportFactorySHM::FairMQTransportFactorySHM()
|
||||
: fContext(nullptr)
|
||||
FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id)
|
||||
: FairMQTransportFactory(id)
|
||||
, fContext(nullptr)
|
||||
, fHeartbeatSocket(nullptr)
|
||||
, fHeartbeatThread()
|
||||
, fSendHeartbeats(true)
|
||||
|
@ -144,10 +146,10 @@ FairMQMessagePtr FairMQTransportFactorySHM::CreateMessage(void* data, const size
|
|||
return unique_ptr<FairMQMessage>(new FairMQMessageSHM(data, size, ffn, hint));
|
||||
}
|
||||
|
||||
FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name, const string& id /*= ""*/) const
|
||||
FairMQSocketPtr FairMQTransportFactorySHM::CreateSocket(const string& type, const string& name) const
|
||||
{
|
||||
assert(fContext);
|
||||
return unique_ptr<FairMQSocket>(new FairMQSocketSHM(type, name, id, fContext));
|
||||
return unique_ptr<FairMQSocket>(new FairMQSocketSHM(type, name, GetId(), fContext));
|
||||
}
|
||||
|
||||
FairMQPollerPtr FairMQTransportFactorySHM::CreatePoller(const vector<FairMQChannel>& channels) const
|
||||
|
|
|
@ -1,13 +1,20 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2016-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
|
||||
#ifndef FAIRMQTRANSPORTFACTORYSHM_H_
|
||||
#define FAIRMQTRANSPORTFACTORYSHM_H_
|
||||
|
||||
#include "FairMQTransportFactory.h"
|
||||
#include "FairMQMessageSHM.h"
|
||||
#include "FairMQSocketSHM.h"
|
||||
#include "FairMQPollerSHM.h"
|
||||
#include "FairMQShmDeviceCounter.h"
|
||||
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
@ -15,16 +22,11 @@
|
|||
|
||||
#include <boost/interprocess/sync/named_mutex.hpp>
|
||||
|
||||
#include "FairMQTransportFactory.h"
|
||||
#include "FairMQMessageSHM.h"
|
||||
#include "FairMQSocketSHM.h"
|
||||
#include "FairMQPollerSHM.h"
|
||||
#include "FairMQShmDeviceCounter.h"
|
||||
|
||||
class FairMQTransportFactorySHM : public FairMQTransportFactory
|
||||
{
|
||||
public:
|
||||
FairMQTransportFactorySHM();
|
||||
FairMQTransportFactorySHM(const std::string& id = "");
|
||||
|
||||
void Initialize(const FairMQProgOptions* config) override;
|
||||
|
||||
|
@ -32,7 +34,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory
|
|||
FairMQMessagePtr CreateMessage(const size_t size) const override;
|
||||
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override;
|
||||
|
||||
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const std::string& id = "") const override;
|
||||
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;
|
||||
|
||||
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
|
||||
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
||||
|
|
|
@ -1,28 +1,22 @@
|
|||
/********************************************************************************
|
||||
* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* Copyright (C) 2014-2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH *
|
||||
* *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence version 3 (LGPL) version 3, *
|
||||
* This software is distributed under the terms of the *
|
||||
* GNU Lesser General Public Licence (LGPL) version 3, *
|
||||
* copied verbatim in the file "LICENSE" *
|
||||
********************************************************************************/
|
||||
/**
|
||||
* FairMQTransportFactoryZMQ.cxx
|
||||
*
|
||||
* @since 2014-01-20
|
||||
* @author: A. Rybalchenko
|
||||
*/
|
||||
|
||||
#include "zmq.h"
|
||||
|
||||
#include "FairMQTransportFactoryZMQ.h"
|
||||
#include "../options/FairMQProgOptions.h"
|
||||
#include <zmq.h>
|
||||
|
||||
using namespace std;
|
||||
|
||||
FairMQ::Transport FairMQTransportFactoryZMQ::fTransportType = FairMQ::Transport::ZMQ;
|
||||
|
||||
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ()
|
||||
: fContext(zmq_ctx_new())
|
||||
FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ(const string& id)
|
||||
: FairMQTransportFactory(id)
|
||||
, fContext(zmq_ctx_new())
|
||||
{
|
||||
int major, minor, patch;
|
||||
zmq_version(&major, &minor, &patch);
|
||||
|
@ -73,10 +67,10 @@ FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage(void* data, const size
|
|||
return unique_ptr<FairMQMessage>(new FairMQMessageZMQ(data, size, ffn, hint));
|
||||
}
|
||||
|
||||
FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name, const string& id /*= ""*/) const
|
||||
FairMQSocketPtr FairMQTransportFactoryZMQ::CreateSocket(const string& type, const string& name) const
|
||||
{
|
||||
assert(fContext);
|
||||
return unique_ptr<FairMQSocket>(new FairMQSocketZMQ(type, name, id, fContext));
|
||||
return unique_ptr<FairMQSocket>(new FairMQSocketZMQ(type, name, GetId(), fContext));
|
||||
}
|
||||
|
||||
FairMQPollerPtr FairMQTransportFactoryZMQ::CreatePoller(const vector<FairMQChannel>& channels) const
|
||||
|
|
|
@ -26,7 +26,7 @@
|
|||
class FairMQTransportFactoryZMQ : public FairMQTransportFactory
|
||||
{
|
||||
public:
|
||||
FairMQTransportFactoryZMQ();
|
||||
FairMQTransportFactoryZMQ(const std::string& id = "");
|
||||
~FairMQTransportFactoryZMQ() override;
|
||||
|
||||
void Initialize(const FairMQProgOptions* config) override;
|
||||
|
@ -35,7 +35,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory
|
|||
FairMQMessagePtr CreateMessage(const size_t size) const override;
|
||||
FairMQMessagePtr CreateMessage(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) const override;
|
||||
|
||||
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name, const std::string& id = "") const override;
|
||||
FairMQSocketPtr CreateSocket(const std::string& type, const std::string& name) const override;
|
||||
|
||||
FairMQPollerPtr CreatePoller(const std::vector<FairMQChannel>& channels) const override;
|
||||
FairMQPollerPtr CreatePoller(const std::unordered_map<std::string, std::vector<FairMQChannel>>& channelsMap, const std::vector<std::string>& channelList) const override;
|
||||
|
|
Loading…
Reference in New Issue
Block a user