From 88dbcbe4fdec491451adb1e54a9c029987fad206 Mon Sep 17 00:00:00 2001 From: Alexey Rybalchenko Date: Tue, 29 Oct 2019 13:55:46 +0100 Subject: [PATCH] Formatting, some refactoring --- examples/multipart/Sampler.cxx | 14 +- examples/multipart/Sampler.h | 2 +- examples/multipart/Sink.cxx | 12 +- examples/multipart/Sink.h | 4 +- fairmq/shmem/FairMQMessageSHM.cxx | 140 +++-------- fairmq/shmem/FairMQMessageSHM.h | 8 +- fairmq/shmem/FairMQSocketSHM.cxx | 286 +++++++--------------- fairmq/shmem/FairMQSocketSHM.h | 12 +- fairmq/shmem/FairMQUnmanagedRegionSHM.cxx | 17 +- fairmq/shmem/FairMQUnmanagedRegionSHM.h | 6 +- 10 files changed, 151 insertions(+), 350 deletions(-) diff --git a/examples/multipart/Sampler.cxx b/examples/multipart/Sampler.cxx index 038ab686..d354dfc8 100644 --- a/examples/multipart/Sampler.cxx +++ b/examples/multipart/Sampler.cxx @@ -40,8 +40,7 @@ bool Sampler::ConditionalRun() header.stopFlag = 0; // Set stopFlag to 1 for last message. - if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1) - { + if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1) { header.stopFlag = 1; } LOG(info) << "Sending header with stopFlag: " << header.stopFlag; @@ -60,13 +59,16 @@ bool Sampler::ConditionalRun() assert(auxData.Size() == 0); assert(parts.Size() == 5); + parts.AddPart(NewMessage()); + + assert(parts.Size() == 6); + LOG(info) << "Sending body of size: " << parts.At(1)->GetSize(); Send(parts, "data"); // Go out of the sending loop if the stopFlag was sent. - if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) - { + if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) { LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; return false; } @@ -77,8 +79,4 @@ bool Sampler::ConditionalRun() return true; } -Sampler::~Sampler() -{ -} - } // namespace example_multipart diff --git a/examples/multipart/Sampler.h b/examples/multipart/Sampler.h index da059184..6d5904fe 100644 --- a/examples/multipart/Sampler.h +++ b/examples/multipart/Sampler.h @@ -24,7 +24,7 @@ class Sampler : public FairMQDevice { public: Sampler(); - virtual ~Sampler(); + virtual ~Sampler() {} protected: virtual void InitTask(); diff --git a/examples/multipart/Sink.cxx b/examples/multipart/Sink.cxx index 82931238..47a5053a 100644 --- a/examples/multipart/Sink.cxx +++ b/examples/multipart/Sink.cxx @@ -20,11 +20,6 @@ using namespace std; namespace example_multipart { -Sink::Sink() -{ - OnData("data", &Sink::HandleData); -} - bool Sink::HandleData(FairMQParts& parts, int /*index*/) { Header header; @@ -35,8 +30,7 @@ bool Sink::HandleData(FairMQParts& parts, int /*index*/) LOG(info) << "Received header with stopFlag: " << header.stopFlag; LOG(info) << "Received body of size: " << parts.At(1)->GetSize(); - if (header.stopFlag == 1) - { + if (header.stopFlag == 1) { LOG(info) << "stopFlag is 1, going IDLE"; return false; } @@ -44,8 +38,4 @@ bool Sink::HandleData(FairMQParts& parts, int /*index*/) return true; } -Sink::~Sink() -{ -} - } diff --git a/examples/multipart/Sink.h b/examples/multipart/Sink.h index 8496ba6a..4b24f91e 100644 --- a/examples/multipart/Sink.h +++ b/examples/multipart/Sink.h @@ -23,8 +23,8 @@ namespace example_multipart class Sink : public FairMQDevice { public: - Sink(); - virtual ~Sink(); + Sink() { OnData("data", &Sink::HandleData); } + virtual ~Sink() {} protected: bool HandleData(FairMQParts&, int); diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index 99a509b4..d2dbd649 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -5,8 +5,8 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include -#include +#include "Common.h" +#include "Region.h" #include "FairMQMessageSHM.h" #include "FairMQUnmanagedRegionSHM.h" @@ -38,8 +38,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQTransportFactory* fac , fHint(0) , fLocalPtr(nullptr) { - if (zmq_msg_init(&fMessage) != 0) - { + if (zmq_msg_init(&fMessage) != 0) { LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); } fMetaCreated = true; @@ -74,15 +73,11 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t si , fHint(0) , fLocalPtr(nullptr) { - if (InitializeChunk(size)) - { + if (InitializeChunk(size)) { memcpy(fLocalPtr, data, size); - if (ffn) - { + if (ffn) { ffn(data, hint); - } - else - { + } else { free(data); } } @@ -102,16 +97,12 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r , fLocalPtr(static_cast(data)) { if (reinterpret_cast(data) >= reinterpret_cast(region->GetData()) || - reinterpret_cast(data) <= reinterpret_cast(region->GetData()) + region->GetSize()) - { + reinterpret_cast(data) <= reinterpret_cast(region->GetData()) + region->GetSize()) { fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast(data) - reinterpret_cast(region->GetData())); - if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) - { + if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) { LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno); - } - else - { + } else { MetaHeader header; header.fSize = size; header.fHandle = fHandle; @@ -121,9 +112,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r fMetaCreated = true; } - } - else - { + } else { LOG(error) << "trying to create region message with data from outside the region"; throw runtime_error("trying to create region message with data from outside the region"); } @@ -131,24 +120,17 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r bool FairMQMessageSHM::InitializeChunk(const size_t size) { - while (fHandle < 0) - { - try - { + while (fHandle < 0) { + try { bipc::managed_shared_memory::size_type actualSize = size; char* hint = 0; // unused for bipc::allocate_new fLocalPtr = fManager.Segment().allocation_command(bipc::allocate_new, size, actualSize, hint); - } - catch (bipc::bad_alloc& ba) - { + } catch (bipc::bad_alloc& ba) { // LOG(warn) << "Shared memory full..."; this_thread::sleep_for(chrono::milliseconds(50)); - if (fInterrupted) - { + if (fInterrupted) { return false; - } - else - { + } else { continue; } } @@ -157,8 +139,7 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size) fSize = size; - if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) - { + if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) { LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno); return false; } @@ -180,8 +161,7 @@ void FairMQMessageSHM::Rebuild() fQueued = false; - if (zmq_msg_init(&fMessage) != 0) - { + if (zmq_msg_init(&fMessage) != 0) { LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); } fMetaCreated = true; @@ -190,9 +170,7 @@ void FairMQMessageSHM::Rebuild() void FairMQMessageSHM::Rebuild(const size_t size) { CloseMessage(); - fQueued = false; - InitializeChunk(size); } @@ -202,32 +180,25 @@ void FairMQMessageSHM::Rebuild(void* data, const size_t size, fairmq_free_fn* ff fQueued = false; - if (InitializeChunk(size)) - { + if (InitializeChunk(size)) { memcpy(fLocalPtr, data, size); - if (ffn) - { + if (ffn) { ffn(data, hint); - } - else - { + } else { free(data); } } } -zmq_msg_t* FairMQMessageSHM::GetMessage() -{ - return &fMessage; -} - void* FairMQMessageSHM::GetData() const { - if (fLocalPtr) { - return fLocalPtr; - } else { + if (!fLocalPtr) { if (fRegionId == 0) { - return fManager.Segment().get_address_from_handle(fHandle); + if (fSize > 0) { + fLocalPtr = reinterpret_cast(fManager.Segment().get_address_from_handle(fHandle)); + } else { + fLocalPtr = nullptr; + } } else { fRegionPtr = fManager.GetRemoteRegion(fRegionId); if (fRegionPtr) { @@ -236,14 +207,10 @@ void* FairMQMessageSHM::GetData() const // LOG(warn) << "could not get pointer from a region message"; fLocalPtr = nullptr; } - return fLocalPtr; } } -} -size_t FairMQMessageSHM::GetSize() const -{ - return fSize; + return fLocalPtr; } bool FairMQMessageSHM::SetUsedSize(const size_t size) @@ -270,72 +237,45 @@ bool FairMQMessageSHM::SetUsedSize(const size_t size) } } -fair::mq::Transport FairMQMessageSHM::GetType() const -{ - return fTransportType; -} - void FairMQMessageSHM::Copy(const FairMQMessage& msg) { - if (fHandle < 0) - { + if (fHandle < 0) { bipc::managed_shared_memory::handle_t otherHandle = static_cast(msg).fHandle; - if (otherHandle) - { - if (InitializeChunk(msg.GetSize())) - { + if (otherHandle) { + if (InitializeChunk(msg.GetSize())) { memcpy(GetData(), msg.GetData(), msg.GetSize()); } - } - else - { + } else { LOG(error) << "copy fail: source message not initialized!"; } - } - else - { + } else { LOG(error) << "copy fail: target message already initialized!"; } } void FairMQMessageSHM::CloseMessage() { - if (fHandle >= 0 && !fQueued) - { - if (fRegionId == 0) - { + if (fHandle >= 0 && !fQueued) { + if (fRegionId == 0) { fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fHandle)); fHandle = -1; - } - else - { - if (!fRegionPtr) - { + } else { + if (!fRegionPtr) { fRegionPtr = fManager.GetRemoteRegion(fRegionId); } - if (fRegionPtr) - { + if (fRegionPtr) { fRegionPtr->ReleaseBlock({fHandle, fSize, fHint}); - } - else - { + } else { LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack"; } } } - if (fMetaCreated) - { - if (zmq_msg_close(&fMessage) != 0) - { + if (fMetaCreated) { + if (zmq_msg_close(&fMessage) != 0) { LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); } fMetaCreated = false; } } - -FairMQMessageSHM::~FairMQMessageSHM() -{ - CloseMessage(); -} diff --git a/fairmq/shmem/FairMQMessageSHM.h b/fairmq/shmem/FairMQMessageSHM.h index f96b9e83..505fadc6 100644 --- a/fairmq/shmem/FairMQMessageSHM.h +++ b/fairmq/shmem/FairMQMessageSHM.h @@ -40,15 +40,15 @@ class FairMQMessageSHM final : public FairMQMessage void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; void* GetData() const override; - size_t GetSize() const override; + size_t GetSize() const override { return fSize; } bool SetUsedSize(const size_t size) override; - fair::mq::Transport GetType() const override; + fair::mq::Transport GetType() const override { return fTransportType; } void Copy(const FairMQMessage& msg) override; - ~FairMQMessageSHM() override; + ~FairMQMessageSHM() override { CloseMessage(); } private: fair::mq::shmem::Manager& fManager; @@ -65,7 +65,7 @@ class FairMQMessageSHM final : public FairMQMessage mutable char* fLocalPtr; bool InitializeChunk(const size_t size); - zmq_msg_t* GetMessage(); + zmq_msg_t* GetMessage() { return &fMessage; } void CloseMessage(); }; diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index 4b4b5d9c..b6ce2476 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -5,7 +5,7 @@ * GNU Lesser General Public Licence (LGPL) version 3, * * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include +#include "Common.h" #include "FairMQSocketSHM.h" #include "FairMQMessageSHM.h" @@ -38,32 +38,27 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str assert(context); fSocket = zmq_socket(context, GetConstant(type)); - if (fSocket == nullptr) - { + if (fSocket == nullptr) { LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); exit(EXIT_FAILURE); } - if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) - { + if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) { LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); } // Tell socket to try and send/receive outstanding messages for milliseconds before terminating. // Default value for ZeroMQ is -1, which is to wait forever. int linger = 1000; - if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) - { + if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) { LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); } - if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) - { + if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) { LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); } - if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) - { + if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) { LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); } @@ -75,8 +70,7 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str // } // } - if (type == "sub" || type == "pub") - { + if (type == "sub" || type == "pub") { LOG(error) << "PUB/SUB socket type is not supported for shared memory transport"; throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport"); } @@ -86,10 +80,9 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str bool FairMQSocketSHM::Bind(const string& address) { - // LOG(info) << "bind socket " << fId << " on " << address; + // LOG(info) << "binding socket " << fId << " on " << address; - if (zmq_bind(fSocket, address.c_str()) != 0) - { + if (zmq_bind(fSocket, address.c_str()) != 0) { if (errno == EADDRINUSE) { // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. return false; @@ -102,10 +95,9 @@ bool FairMQSocketSHM::Bind(const string& address) bool FairMQSocketSHM::Connect(const string& address) { - // LOG(info) << "connect socket " << fId << " on " << address; + // LOG(info) << "connecting socket " << fId << " on " << address; - if (zmq_connect(fSocket, address.c_str()) != 0) - { + if (zmq_connect(fSocket, address.c_str()) != 0) { LOG(error) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); return false; } @@ -116,55 +108,38 @@ bool FairMQSocketSHM::Connect(const string& address) int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) { int flags = 0; - if (timeout == 0) - { + if (timeout == 0) { flags = ZMQ_DONTWAIT; } int elapsed = 0; - while (true && !fInterrupted) - { + while (true && !fInterrupted) { int nbytes = zmq_msg_send(static_cast(msg.get())->GetMessage(), fSocket, flags); - if (nbytes == 0) - { + if (nbytes == 0) { + ++fMessagesTx; return nbytes; - } - else if (nbytes > 0) - { + } else if (nbytes > 0) { static_cast(msg.get())->fQueued = true; - size_t size = msg->GetSize(); fBytesTx += size; ++fMessagesTx; - return size; - } - else if (zmq_errno() == EAGAIN) - { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - { - if (timeout > 0) - { + } else if (zmq_errno() == EAGAIN) { + if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { elapsed += fSndTimeout; - if (elapsed >= timeout) - { + if (elapsed >= timeout) { return -2; } } continue; - } - else - { + } else { return -2; } - } - else if (zmq_errno() == ETERM) - { + } else if (zmq_errno() == ETERM) { LOG(info) << "terminating socket " << fId; return -1; - } - else - { + } else { LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } @@ -176,72 +151,53 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) { int flags = 0; - if (timeout == 0) - { + if (timeout == 0) { flags = ZMQ_DONTWAIT; } int elapsed = 0; - zmq_msg_t* msgPtr = static_cast(msg.get())->GetMessage(); - while (true) - { - int nbytes = zmq_msg_recv(msgPtr, fSocket, flags); - if (nbytes == 0) - { + while (true) { + FairMQMessageSHM* shmMsg = static_cast(msg.get()); + zmq_msg_t* zmqMsg = shmMsg->GetMessage(); + int nbytes = zmq_msg_recv(zmqMsg, fSocket, flags); + if (nbytes == 0) { ++fMessagesRx; - return nbytes; - } - else if (nbytes > 0) - { - // check for number of receiving messages. must be 1 + } else if (nbytes > 0) { + // check for number of received messages. must be 1 const auto numMsgs = nbytes / sizeof(MetaHeader); - if (numMsgs > 1) - { + if (numMsgs > 1) { LOG(error) << "Receiving SHM multipart with a single message receive call"; } - assert (numMsgs == 1); + assert(numMsgs == 1); - MetaHeader* hdr = static_cast(zmq_msg_data(msgPtr)); - size_t size = 0; - static_cast(msg.get())->fHandle = hdr->fHandle; - static_cast(msg.get())->fSize = hdr->fSize; - static_cast(msg.get())->fRegionId = hdr->fRegionId; - static_cast(msg.get())->fHint = hdr->fHint; - size = msg->GetSize(); + MetaHeader* hdr = static_cast(zmq_msg_data(zmqMsg)); + size_t size = hdr->fSize; + shmMsg->fHandle = hdr->fHandle; + shmMsg->fSize = size; + shmMsg->fRegionId = hdr->fRegionId; + shmMsg->fHint = hdr->fHint; fBytesRx += size; ++fMessagesRx; - return size; - } - else if (zmq_errno() == EAGAIN) - { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - { - if (timeout > 0) - { + } else if (zmq_errno() == EAGAIN) { + if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { elapsed += fRcvTimeout; - if (elapsed >= timeout) - { + if (elapsed >= timeout) { return -2; } } continue; - } - else - { + } else { return -2; } - } - else if (zmq_errno() == ETERM) - { + } else if (zmq_errno() == ETERM) { LOG(info) << "terminating socket " << fId; return -1; - } - else - { + } else { LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; } @@ -251,8 +207,7 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) int64_t FairMQSocketSHM::Send(vector& msgVec, const int timeout) { int flags = 0; - if (timeout == 0) - { + if (timeout == 0) { flags = ZMQ_DONTWAIT; } const unsigned int vecSize = msgVec.size(); @@ -269,8 +224,7 @@ int64_t FairMQSocketSHM::Send(vector& msgVec, const int timeou // prepare the message with shm metas MetaHeader* metas = static_cast(zmq_msg_data(&zmqMsg)); - for (auto& msg : msgVec) - { + for (auto& msg : msgVec) { zmq_msg_t* metaMsg = static_cast(msg.get())->GetMessage(); if (zmq_msg_size(metaMsg) > 0) { memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader)); @@ -286,21 +240,16 @@ int64_t FairMQSocketSHM::Send(vector& msgVec, const int timeou } } - while (!fInterrupted) - { + while (!fInterrupted) { int64_t totalSize = 0; int nbytes = zmq_msg_send(&zmqMsg, fSocket, flags); - if (nbytes == 0) - { + if (nbytes == 0) { zmq_msg_close(&zmqMsg); return nbytes; - } - else if (nbytes > 0) - { + } else if (nbytes > 0) { assert(static_cast(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing - for (auto& msg : msgVec) - { + for (auto& msg : msgVec) { FairMQMessageSHM* shmMsg = static_cast(msg.get()); shmMsg->fQueued = true; totalSize += shmMsg->fSize; @@ -312,36 +261,25 @@ int64_t FairMQSocketSHM::Send(vector& msgVec, const int timeou zmq_msg_close(&zmqMsg); return totalSize; - } - else if (zmq_errno() == EAGAIN) - { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - { - if (timeout > 0) - { + } else if (zmq_errno() == EAGAIN) { + if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { elapsed += fSndTimeout; - if (elapsed >= timeout) - { + if (elapsed >= timeout) { zmq_msg_close(&zmqMsg); return -2; } } continue; - } - else - { + } else { zmq_msg_close(&zmqMsg); return -2; } - } - else if (zmq_errno() == ETERM) - { + } else if (zmq_errno() == ETERM) { zmq_msg_close(&zmqMsg); LOG(info) << "terminating socket " << fId; return -1; - } - else - { + } else { zmq_msg_close(&zmqMsg); LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; @@ -355,8 +293,7 @@ int64_t FairMQSocketSHM::Send(vector& msgVec, const int timeou int64_t FairMQSocketSHM::Receive(vector& msgVec, const int timeout) { int flags = 0; - if (timeout == 0) - { + if (timeout == 0) { flags = ZMQ_DONTWAIT; } int elapsed = 0; @@ -364,17 +301,13 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int tim zmq_msg_t zmqMsg; zmq_msg_init(&zmqMsg); - while (!fInterrupted) - { + while (!fInterrupted) { int64_t totalSize = 0; int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags); - if (nbytes == 0) - { + if (nbytes == 0) { zmq_msg_close(&zmqMsg); return 0; - } - else if (nbytes > 0) - { + } else if (nbytes > 0) { MetaHeader* hdrVec = static_cast(zmq_msg_data(&zmqMsg)); const auto hdrVecSize = zmq_msg_size(&zmqMsg); assert(hdrVecSize > 0); @@ -384,24 +317,22 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int tim msgVec.reserve(numMessages); - for (size_t m = 0; m < numMessages; m++) - { - MetaHeader hdr; - memcpy(&hdr, &hdrVec[m], sizeof(MetaHeader)); + for (size_t m = 0; m < numMessages; m++) { + // get the meta data pointer + MetaHeader* hdr = &hdrVec[m]; - msgVec.emplace_back(fair::mq::tools::make_unique(fManager, GetTransport())); + // create new message (part) + msgVec.emplace_back(tools::make_unique(fManager, GetTransport())); + FairMQMessageSHM* shmMsg = static_cast(msgVec.back().get()); + // fill the zmq buffer with the delivered meta data + memcpy(zmq_msg_data(shmMsg->GetMessage()), hdr, sizeof(MetaHeader)); + // set the message members with the meta data + shmMsg->fHandle = hdr->fHandle; + shmMsg->fSize = hdr->fSize; + shmMsg->fRegionId = hdr->fRegionId; + shmMsg->fHint = hdr->fHint; - FairMQMessageSHM* msg = static_cast(msgVec.back().get()); - MetaHeader* msgHdr = static_cast(zmq_msg_data(msg->GetMessage())); - - memcpy(msgHdr, &hdr, sizeof(MetaHeader)); - - msg->fHandle = hdr.fHandle; - msg->fSize = hdr.fSize; - msg->fRegionId = hdr.fRegionId; - msg->fHint = hdr.fHint; - - totalSize += msg->GetSize(); + totalSize += shmMsg->GetSize(); } // store statistics on how many messages have been received (handle all parts as a single message) @@ -410,30 +341,21 @@ int64_t FairMQSocketSHM::Receive(vector& msgVec, const int tim zmq_msg_close(&zmqMsg); return totalSize; - } - else if (zmq_errno() == EAGAIN) - { - if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) - { - if (timeout > 0) - { + } else if (zmq_errno() == EAGAIN) { + if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) { + if (timeout > 0) { elapsed += fRcvTimeout; - if (elapsed >= timeout) - { + if (elapsed >= timeout) { zmq_msg_close(&zmqMsg); return -2; } } continue; - } - else - { + } else { zmq_msg_close(&zmqMsg); return -2; } - } - else - { + } else { zmq_msg_close(&zmqMsg); LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); return nbytes; @@ -448,13 +370,11 @@ void FairMQSocketSHM::Close() { // LOG(debug) << "Closing socket " << fId; - if (fSocket == nullptr) - { + if (fSocket == nullptr) { return; } - if (zmq_close(fSocket) != 0) - { + if (zmq_close(fSocket) != 0) { LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); } @@ -475,23 +395,16 @@ void FairMQSocketSHM::Resume() fInterrupted = false; } -void* FairMQSocketSHM::GetSocket() const -{ - return fSocket; -} - void FairMQSocketSHM::SetOption(const string& option, const void* value, size_t valueSize) { - if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) - { + if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno); } } void FairMQSocketSHM::GetOption(const string& option, void* value, size_t* valueSize) { - if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) - { + if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) { LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno); } } @@ -581,26 +494,6 @@ int FairMQSocketSHM::GetRcvKernelSize() const return value; } -unsigned long FairMQSocketSHM::GetBytesTx() const -{ - return fBytesTx; -} - -unsigned long FairMQSocketSHM::GetBytesRx() const -{ - return fBytesRx; -} - -unsigned long FairMQSocketSHM::GetMessagesTx() const -{ - return fMessagesTx; -} - -unsigned long FairMQSocketSHM::GetMessagesRx() const -{ - return fMessagesRx; -} - int FairMQSocketSHM::GetConstant(const string& constant) { if (constant == "") return 0; @@ -629,8 +522,3 @@ int FairMQSocketSHM::GetConstant(const string& constant) return -1; } - -FairMQSocketSHM::~FairMQSocketSHM() -{ - Close(); -} diff --git a/fairmq/shmem/FairMQSocketSHM.h b/fairmq/shmem/FairMQSocketSHM.h index 4bb0e4b6..df9d7c55 100644 --- a/fairmq/shmem/FairMQSocketSHM.h +++ b/fairmq/shmem/FairMQSocketSHM.h @@ -34,7 +34,7 @@ class FairMQSocketSHM final : public FairMQSocket int64_t Send(std::vector>& msgVec, const int timeout = -1) override; int64_t Receive(std::vector>& msgVec, const int timeout = -1) override; - void* GetSocket() const; + void* GetSocket() const { return fSocket; } void Close() override; @@ -55,14 +55,14 @@ class FairMQSocketSHM final : public FairMQSocket void SetRcvKernelSize(const int value) override; int GetRcvKernelSize() const override; - unsigned long GetBytesTx() const override; - unsigned long GetBytesRx() const override; - unsigned long GetMessagesTx() const override; - unsigned long GetMessagesRx() const override; + unsigned long GetBytesTx() const override { return fBytesTx; } + unsigned long GetBytesRx() const override { return fBytesRx; } + unsigned long GetMessagesTx() const override { return fMessagesTx; } + unsigned long GetMessagesRx() const override { return fMessagesRx; } static int GetConstant(const std::string& constant); - ~FairMQSocketSHM() override; + ~FairMQSocketSHM() override { Close(); } private: void* fSocket; diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx b/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx index 5aa9efe1..1a45147b 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx +++ b/fairmq/shmem/FairMQUnmanagedRegionSHM.cxx @@ -6,7 +6,7 @@ * copied verbatim in the file "LICENSE" * ********************************************************************************/ -#include +#include "Common.h" #include "FairMQUnmanagedRegionSHM.h" @@ -41,18 +41,3 @@ FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_ throw; } } - -void* FairMQUnmanagedRegionSHM::GetData() const -{ - return fRegion->get_address(); -} - -size_t FairMQUnmanagedRegionSHM::GetSize() const -{ - return fRegion->get_size(); -} - -FairMQUnmanagedRegionSHM::~FairMQUnmanagedRegionSHM() -{ - fManager.RemoveRegion(fRegionId); -} diff --git a/fairmq/shmem/FairMQUnmanagedRegionSHM.h b/fairmq/shmem/FairMQUnmanagedRegionSHM.h index 4334f891..8dea7523 100644 --- a/fairmq/shmem/FairMQUnmanagedRegionSHM.h +++ b/fairmq/shmem/FairMQUnmanagedRegionSHM.h @@ -28,10 +28,10 @@ class FairMQUnmanagedRegionSHM final : public FairMQUnmanagedRegion public: FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0); - void* GetData() const override; - size_t GetSize() const override; + void* GetData() const override { return fRegion->get_address(); } + size_t GetSize() const override { return fRegion->get_size(); } - ~FairMQUnmanagedRegionSHM() override; + ~FairMQUnmanagedRegionSHM() override { fManager.RemoveRegion(fRegionId); } private: fair::mq::shmem::Manager& fManager;