diff --git a/fairmq/FairMQChannel.cxx b/fairmq/FairMQChannel.cxx index d4d966b9..a8fd2b57 100644 --- a/fairmq/FairMQChannel.cxx +++ b/fairmq/FairMQChannel.cxx @@ -75,6 +75,31 @@ FairMQChannel::FairMQChannel(const string& type, const string& method, const str { } +FairMQChannel::FairMQChannel(const string& name, const string& type, std::shared_ptr factory) + : fSocket(factory->CreateSocket(type, name, name)) // TODO whats id and whats name? + , fType(type) + , fMethod("unspecified") + , fAddress("unspecified") + , fTransport("default") // TODO refactor, either use string representation or enum type + , fSndBufSize(1000) + , fRcvBufSize(1000) + , fSndKernelSize(0) + , fRcvKernelSize(0) + , fRateLogging(1) + , fName(name) + , fIsValid(false) + , fPoller(nullptr) + , fChannelCmdSocket(nullptr) + , fTransportType(factory->GetType()) + , fTransportFactory(factory) + , fNoBlockFlag(0) + , fSndMoreFlag(0) + , fMultipart(false) + , fModified(true) + , fReset(false) +{ +} + FairMQChannel::FairMQChannel(const FairMQChannel& chan) : fSocket(nullptr) , fType(chan.fType) diff --git a/fairmq/FairMQChannel.h b/fairmq/FairMQChannel.h index 60971135..8d2f3111 100644 --- a/fairmq/FairMQChannel.h +++ b/fairmq/FairMQChannel.h @@ -42,6 +42,12 @@ class FairMQChannel /// @param address Network address to bind/connect to (e.g. "tcp://127.0.0.1:5555" or "ipc://abc") FairMQChannel(const std::string& type, const std::string& method, const std::string& address); + /// Constructor + /// @param name Channel name + /// @param type Socket type (push/pull/pub/sub/spub/xsub/pair/req/rep/dealer/router/) + /// @param factory TransportFactory + FairMQChannel(const std::string& name, const std::string& type, std::shared_ptr factory); + /// Copy Constructor FairMQChannel(const FairMQChannel&); @@ -53,6 +59,20 @@ class FairMQChannel FairMQSocket const & GetSocket() const; + auto Bind(const std::string& address) -> bool + { + fMethod = "bind"; + fAddress = address; + return fSocket->Bind(address); + } + + auto Connect(const std::string& address) -> void + { + fMethod = "connect"; + fAddress = address; + return fSocket->Connect(address); + } + /// Get channel name /// @return Returns full channel name (e.g. "data[0]") std::string GetChannelName() const; diff --git a/fairmq/FairMQDevice.cxx b/fairmq/FairMQDevice.cxx index d2d91100..fea8fa7e 100644 --- a/fairmq/FairMQDevice.cxx +++ b/fairmq/FairMQDevice.cxx @@ -1235,45 +1235,7 @@ void FairMQDevice::Exit() t.second->Shutdown(); } - LOG(DEBUG) << "Closing sockets..."; - - // iterate over the channels - for (auto& c : fChannels) - { - // iterate over the sub-channels - for (auto& sc : c.second) - { - if (sc.fSocket) - { - sc.fSocket->Close(); - sc.fSocket = nullptr; - } - if (sc.fChannelCmdSocket) - { - sc.fChannelCmdSocket->Close(); - sc.fChannelCmdSocket = nullptr; - } - if (sc.fPoller) - { - sc.fPoller = nullptr; - } - } - } - - for (auto& s : fDeviceCmdSockets) - { - s.second->Close(); - } - - LOG(DEBUG) << "Closed all sockets!"; - - // ask transports to terminate - for (const auto& t : fTransports) - { - t.second->Terminate(); - } - - LOG(DEBUG) << "All transports exited."; + LOG(DEBUG) << "All transports are shut down."; } FairMQDevice::~FairMQDevice() diff --git a/fairmq/FairMQDevice.h b/fairmq/FairMQDevice.h index 17842eb5..332d8496 100644 --- a/fairmq/FairMQDevice.h +++ b/fairmq/FairMQDevice.h @@ -301,9 +301,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable /// @param rhs Left hand side value for comparison static bool SortSocketsByAddress(const FairMQChannel &lhs, const FairMQChannel &rhs); - std::unordered_map> fChannels; ///< Device channels - FairMQProgOptions* fConfig; ///< Program options configuration - template void OnData(const std::string& channelName, bool (T::* memberFunction)(FairMQMessagePtr& msg, int index)) { @@ -340,6 +337,14 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable bool Terminated(); + protected: + std::shared_ptr fTransportFactory; ///< Transport factory + std::unordered_map> fTransports; ///< Container for transports + + public: + std::unordered_map> fChannels; ///< Device channels + FairMQProgOptions* fConfig; ///< Program options configuration + protected: std::string fId; ///< Device ID std::string fNetworkInterface; ///< Network interface to use for dynamic binding @@ -352,8 +357,6 @@ class FairMQDevice : public FairMQStateMachine, public FairMQConfigurable int fPortRangeMin; ///< Minimum value for the port range (if dynamic) int fPortRangeMax; ///< Maximum value for the port range (if dynamic) - std::shared_ptr fTransportFactory; ///< Transport factory - std::unordered_map> fTransports; ///< Container for transports std::unordered_map fDeviceCmdSockets; ///< Sockets used for the internal unblocking mechanism /// Additional user initialization (can be overloaded in child classes). Prefer to use InitTask(). diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx index 288bc3da..0fde95df 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.cxx +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.cxx @@ -80,3 +80,8 @@ FairMQ::Transport FairMQTransportFactoryNN::GetType() const { return fTransportType; } + +FairMQTransportFactoryNN::~FairMQTransportFactoryNN() +{ + Terminate(); +} diff --git a/fairmq/nanomsg/FairMQTransportFactoryNN.h b/fairmq/nanomsg/FairMQTransportFactoryNN.h index 58957852..586fce21 100644 --- a/fairmq/nanomsg/FairMQTransportFactoryNN.h +++ b/fairmq/nanomsg/FairMQTransportFactoryNN.h @@ -27,6 +27,7 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory { public: FairMQTransportFactoryNN(); + ~FairMQTransportFactoryNN() override; void Initialize(const FairMQProgOptions* config) override; @@ -45,8 +46,6 @@ class FairMQTransportFactoryNN : public FairMQTransportFactory void Shutdown() override; void Terminate() override; - ~FairMQTransportFactoryNN() override {}; - private: static FairMQ::Transport fTransportType; }; diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index 0f8448b2..a1bcc605 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -627,4 +627,5 @@ int FairMQSocketSHM::GetConstant(const string& constant) FairMQSocketSHM::~FairMQSocketSHM() { + Close(); } diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index 538a715a..ff665fa6 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -228,3 +228,7 @@ FairMQ::Transport FairMQTransportFactorySHM::GetType() const return fTransportType; } +FairMQTransportFactorySHM::~FairMQTransportFactorySHM() +{ + Terminate(); +} diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 3ad20e3b..e9c84b97 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -45,7 +45,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory void SendHeartbeats(); - ~FairMQTransportFactorySHM() override {}; + ~FairMQTransportFactorySHM() override; private: static FairMQ::Transport fTransportType; diff --git a/fairmq/zeromq/FairMQSocketZMQ.cxx b/fairmq/zeromq/FairMQSocketZMQ.cxx index 04c2935b..7e610f74 100644 --- a/fairmq/zeromq/FairMQSocketZMQ.cxx +++ b/fairmq/zeromq/FairMQSocketZMQ.cxx @@ -565,4 +565,5 @@ int FairMQSocketZMQ::GetConstant(const string& constant) FairMQSocketZMQ::~FairMQSocketZMQ() { + Close(); } diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx index 78f19f78..5216532f 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.cxx @@ -33,6 +33,11 @@ FairMQTransportFactoryZMQ::FairMQTransportFactoryZMQ() LOG(ERROR) << "failed creating context, reason: " << zmq_strerror(errno); exit(EXIT_FAILURE); } + + if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) + { + LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); + } } void FairMQTransportFactoryZMQ::Initialize(const FairMQProgOptions* config) @@ -51,12 +56,6 @@ void FairMQTransportFactoryZMQ::Initialize(const FairMQProgOptions* config) { LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); } - - // Set the maximum number of allowed sockets on the context. - if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) - { - LOG(ERROR) << "failed configuring context, reason: " << zmq_strerror(errno); - } } FairMQMessagePtr FairMQTransportFactoryZMQ::CreateMessage() const @@ -130,3 +129,8 @@ void FairMQTransportFactoryZMQ::Terminate() LOG(ERROR) << "shmem: Terminate(): context now available for shutdown"; } } + +FairMQTransportFactoryZMQ::~FairMQTransportFactoryZMQ() +{ + Terminate(); +} diff --git a/fairmq/zeromq/FairMQTransportFactoryZMQ.h b/fairmq/zeromq/FairMQTransportFactoryZMQ.h index e7088dc8..0c1835c8 100644 --- a/fairmq/zeromq/FairMQTransportFactoryZMQ.h +++ b/fairmq/zeromq/FairMQTransportFactoryZMQ.h @@ -27,6 +27,7 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory { public: FairMQTransportFactoryZMQ(); + ~FairMQTransportFactoryZMQ() override; void Initialize(const FairMQProgOptions* config) override; @@ -45,8 +46,6 @@ class FairMQTransportFactoryZMQ : public FairMQTransportFactory void Shutdown() override; void Terminate() override; - ~FairMQTransportFactoryZMQ() override {}; - private: static FairMQ::Transport fTransportType; void* fContext;