Formatting, some refactoring

This commit is contained in:
Alexey Rybalchenko 2019-10-29 13:55:46 +01:00 committed by Dennis Klein
parent 85a3a254d4
commit 88dbcbe4fd
10 changed files with 151 additions and 350 deletions

View File

@ -40,8 +40,7 @@ bool Sampler::ConditionalRun()
header.stopFlag = 0; header.stopFlag = 0;
// Set stopFlag to 1 for last message. // Set stopFlag to 1 for last message.
if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1) if (fMaxIterations > 0 && fNumIterations == fMaxIterations - 1) {
{
header.stopFlag = 1; header.stopFlag = 1;
} }
LOG(info) << "Sending header with stopFlag: " << header.stopFlag; LOG(info) << "Sending header with stopFlag: " << header.stopFlag;
@ -60,13 +59,16 @@ bool Sampler::ConditionalRun()
assert(auxData.Size() == 0); assert(auxData.Size() == 0);
assert(parts.Size() == 5); assert(parts.Size() == 5);
parts.AddPart(NewMessage());
assert(parts.Size() == 6);
LOG(info) << "Sending body of size: " << parts.At(1)->GetSize(); LOG(info) << "Sending body of size: " << parts.At(1)->GetSize();
Send(parts, "data"); Send(parts, "data");
// Go out of the sending loop if the stopFlag was sent. // Go out of the sending loop if the stopFlag was sent.
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
{
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state."; LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
return false; return false;
} }
@ -77,8 +79,4 @@ bool Sampler::ConditionalRun()
return true; return true;
} }
Sampler::~Sampler()
{
}
} // namespace example_multipart } // namespace example_multipart

View File

@ -24,7 +24,7 @@ class Sampler : public FairMQDevice
{ {
public: public:
Sampler(); Sampler();
virtual ~Sampler(); virtual ~Sampler() {}
protected: protected:
virtual void InitTask(); virtual void InitTask();

View File

@ -20,11 +20,6 @@ using namespace std;
namespace example_multipart namespace example_multipart
{ {
Sink::Sink()
{
OnData("data", &Sink::HandleData);
}
bool Sink::HandleData(FairMQParts& parts, int /*index*/) bool Sink::HandleData(FairMQParts& parts, int /*index*/)
{ {
Header header; Header header;
@ -35,8 +30,7 @@ bool Sink::HandleData(FairMQParts& parts, int /*index*/)
LOG(info) << "Received header with stopFlag: " << header.stopFlag; LOG(info) << "Received header with stopFlag: " << header.stopFlag;
LOG(info) << "Received body of size: " << parts.At(1)->GetSize(); LOG(info) << "Received body of size: " << parts.At(1)->GetSize();
if (header.stopFlag == 1) if (header.stopFlag == 1) {
{
LOG(info) << "stopFlag is 1, going IDLE"; LOG(info) << "stopFlag is 1, going IDLE";
return false; return false;
} }
@ -44,8 +38,4 @@ bool Sink::HandleData(FairMQParts& parts, int /*index*/)
return true; return true;
} }
Sink::~Sink()
{
}
} }

View File

@ -23,8 +23,8 @@ namespace example_multipart
class Sink : public FairMQDevice class Sink : public FairMQDevice
{ {
public: public:
Sink(); Sink() { OnData("data", &Sink::HandleData); }
virtual ~Sink(); virtual ~Sink() {}
protected: protected:
bool HandleData(FairMQParts&, int); bool HandleData(FairMQParts&, int);

View File

@ -5,8 +5,8 @@
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <fairmq/shmem/Common.h> #include "Common.h"
#include <fairmq/shmem/Region.h> #include "Region.h"
#include "FairMQMessageSHM.h" #include "FairMQMessageSHM.h"
#include "FairMQUnmanagedRegionSHM.h" #include "FairMQUnmanagedRegionSHM.h"
@ -38,8 +38,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQTransportFactory* fac
, fHint(0) , fHint(0)
, fLocalPtr(nullptr) , fLocalPtr(nullptr)
{ {
if (zmq_msg_init(&fMessage) != 0) if (zmq_msg_init(&fMessage) != 0) {
{
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
} }
fMetaCreated = true; fMetaCreated = true;
@ -74,15 +73,11 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, void* data, const size_t si
, fHint(0) , fHint(0)
, fLocalPtr(nullptr) , fLocalPtr(nullptr)
{ {
if (InitializeChunk(size)) if (InitializeChunk(size)) {
{
memcpy(fLocalPtr, data, size); memcpy(fLocalPtr, data, size);
if (ffn) if (ffn) {
{
ffn(data, hint); ffn(data, hint);
} } else {
else
{
free(data); free(data);
} }
} }
@ -102,16 +97,12 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r
, fLocalPtr(static_cast<char*>(data)) , fLocalPtr(static_cast<char*>(data))
{ {
if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) || if (reinterpret_cast<const char*>(data) >= reinterpret_cast<const char*>(region->GetData()) ||
reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) reinterpret_cast<const char*>(data) <= reinterpret_cast<const char*>(region->GetData()) + region->GetSize()) {
{
fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData())); fHandle = (bipc::managed_shared_memory::handle_t)(reinterpret_cast<const char*>(data) - reinterpret_cast<const char*>(region->GetData()));
if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) {
{
LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno); LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno);
} } else {
else
{
MetaHeader header; MetaHeader header;
header.fSize = size; header.fSize = size;
header.fHandle = fHandle; header.fHandle = fHandle;
@ -121,9 +112,7 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r
fMetaCreated = true; fMetaCreated = true;
} }
} } else {
else
{
LOG(error) << "trying to create region message with data from outside the region"; LOG(error) << "trying to create region message with data from outside the region";
throw runtime_error("trying to create region message with data from outside the region"); throw runtime_error("trying to create region message with data from outside the region");
} }
@ -131,24 +120,17 @@ FairMQMessageSHM::FairMQMessageSHM(Manager& manager, FairMQUnmanagedRegionPtr& r
bool FairMQMessageSHM::InitializeChunk(const size_t size) bool FairMQMessageSHM::InitializeChunk(const size_t size)
{ {
while (fHandle < 0) while (fHandle < 0) {
{ try {
try
{
bipc::managed_shared_memory::size_type actualSize = size; bipc::managed_shared_memory::size_type actualSize = size;
char* hint = 0; // unused for bipc::allocate_new char* hint = 0; // unused for bipc::allocate_new
fLocalPtr = fManager.Segment().allocation_command<char>(bipc::allocate_new, size, actualSize, hint); fLocalPtr = fManager.Segment().allocation_command<char>(bipc::allocate_new, size, actualSize, hint);
} } catch (bipc::bad_alloc& ba) {
catch (bipc::bad_alloc& ba)
{
// LOG(warn) << "Shared memory full..."; // LOG(warn) << "Shared memory full...";
this_thread::sleep_for(chrono::milliseconds(50)); this_thread::sleep_for(chrono::milliseconds(50));
if (fInterrupted) if (fInterrupted) {
{
return false; return false;
} } else {
else
{
continue; continue;
} }
} }
@ -157,8 +139,7 @@ bool FairMQMessageSHM::InitializeChunk(const size_t size)
fSize = size; fSize = size;
if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) if (zmq_msg_init_size(&fMessage, sizeof(MetaHeader)) != 0) {
{
LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno); LOG(error) << "failed initializing meta message, reason: " << zmq_strerror(errno);
return false; return false;
} }
@ -180,8 +161,7 @@ void FairMQMessageSHM::Rebuild()
fQueued = false; fQueued = false;
if (zmq_msg_init(&fMessage) != 0) if (zmq_msg_init(&fMessage) != 0) {
{
LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno); LOG(error) << "failed initializing message, reason: " << zmq_strerror(errno);
} }
fMetaCreated = true; fMetaCreated = true;
@ -190,9 +170,7 @@ void FairMQMessageSHM::Rebuild()
void FairMQMessageSHM::Rebuild(const size_t size) void FairMQMessageSHM::Rebuild(const size_t size)
{ {
CloseMessage(); CloseMessage();
fQueued = false; fQueued = false;
InitializeChunk(size); InitializeChunk(size);
} }
@ -202,32 +180,25 @@ void FairMQMessageSHM::Rebuild(void* data, const size_t size, fairmq_free_fn* ff
fQueued = false; fQueued = false;
if (InitializeChunk(size)) if (InitializeChunk(size)) {
{
memcpy(fLocalPtr, data, size); memcpy(fLocalPtr, data, size);
if (ffn) if (ffn) {
{
ffn(data, hint); ffn(data, hint);
} } else {
else
{
free(data); free(data);
} }
} }
} }
zmq_msg_t* FairMQMessageSHM::GetMessage()
{
return &fMessage;
}
void* FairMQMessageSHM::GetData() const void* FairMQMessageSHM::GetData() const
{ {
if (fLocalPtr) { if (!fLocalPtr) {
return fLocalPtr;
} else {
if (fRegionId == 0) { if (fRegionId == 0) {
return fManager.Segment().get_address_from_handle(fHandle); if (fSize > 0) {
fLocalPtr = reinterpret_cast<char*>(fManager.Segment().get_address_from_handle(fHandle));
} else {
fLocalPtr = nullptr;
}
} else { } else {
fRegionPtr = fManager.GetRemoteRegion(fRegionId); fRegionPtr = fManager.GetRemoteRegion(fRegionId);
if (fRegionPtr) { if (fRegionPtr) {
@ -236,14 +207,10 @@ void* FairMQMessageSHM::GetData() const
// LOG(warn) << "could not get pointer from a region message"; // LOG(warn) << "could not get pointer from a region message";
fLocalPtr = nullptr; fLocalPtr = nullptr;
} }
return fLocalPtr;
} }
} }
}
size_t FairMQMessageSHM::GetSize() const return fLocalPtr;
{
return fSize;
} }
bool FairMQMessageSHM::SetUsedSize(const size_t size) bool FairMQMessageSHM::SetUsedSize(const size_t size)
@ -270,72 +237,45 @@ bool FairMQMessageSHM::SetUsedSize(const size_t size)
} }
} }
fair::mq::Transport FairMQMessageSHM::GetType() const
{
return fTransportType;
}
void FairMQMessageSHM::Copy(const FairMQMessage& msg) void FairMQMessageSHM::Copy(const FairMQMessage& msg)
{ {
if (fHandle < 0) if (fHandle < 0) {
{
bipc::managed_shared_memory::handle_t otherHandle = static_cast<const FairMQMessageSHM&>(msg).fHandle; bipc::managed_shared_memory::handle_t otherHandle = static_cast<const FairMQMessageSHM&>(msg).fHandle;
if (otherHandle) if (otherHandle) {
{ if (InitializeChunk(msg.GetSize())) {
if (InitializeChunk(msg.GetSize()))
{
memcpy(GetData(), msg.GetData(), msg.GetSize()); memcpy(GetData(), msg.GetData(), msg.GetSize());
} }
} } else {
else
{
LOG(error) << "copy fail: source message not initialized!"; LOG(error) << "copy fail: source message not initialized!";
} }
} } else {
else
{
LOG(error) << "copy fail: target message already initialized!"; LOG(error) << "copy fail: target message already initialized!";
} }
} }
void FairMQMessageSHM::CloseMessage() void FairMQMessageSHM::CloseMessage()
{ {
if (fHandle >= 0 && !fQueued) if (fHandle >= 0 && !fQueued) {
{ if (fRegionId == 0) {
if (fRegionId == 0)
{
fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fHandle)); fManager.Segment().deallocate(fManager.Segment().get_address_from_handle(fHandle));
fHandle = -1; fHandle = -1;
} } else {
else if (!fRegionPtr) {
{
if (!fRegionPtr)
{
fRegionPtr = fManager.GetRemoteRegion(fRegionId); fRegionPtr = fManager.GetRemoteRegion(fRegionId);
} }
if (fRegionPtr) if (fRegionPtr) {
{
fRegionPtr->ReleaseBlock({fHandle, fSize, fHint}); fRegionPtr->ReleaseBlock({fHandle, fSize, fHint});
} } else {
else
{
LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack"; LOG(warn) << "region ack queue for id " << fRegionId << " no longer exist. Not sending ack";
} }
} }
} }
if (fMetaCreated) if (fMetaCreated) {
{ if (zmq_msg_close(&fMessage) != 0) {
if (zmq_msg_close(&fMessage) != 0)
{
LOG(error) << "failed closing message, reason: " << zmq_strerror(errno); LOG(error) << "failed closing message, reason: " << zmq_strerror(errno);
} }
fMetaCreated = false; fMetaCreated = false;
} }
} }
FairMQMessageSHM::~FairMQMessageSHM()
{
CloseMessage();
}

View File

@ -40,15 +40,15 @@ class FairMQMessageSHM final : public FairMQMessage
void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override; void Rebuild(void* data, const size_t size, fairmq_free_fn* ffn, void* hint = nullptr) override;
void* GetData() const override; void* GetData() const override;
size_t GetSize() const override; size_t GetSize() const override { return fSize; }
bool SetUsedSize(const size_t size) override; bool SetUsedSize(const size_t size) override;
fair::mq::Transport GetType() const override; fair::mq::Transport GetType() const override { return fTransportType; }
void Copy(const FairMQMessage& msg) override; void Copy(const FairMQMessage& msg) override;
~FairMQMessageSHM() override; ~FairMQMessageSHM() override { CloseMessage(); }
private: private:
fair::mq::shmem::Manager& fManager; fair::mq::shmem::Manager& fManager;
@ -65,7 +65,7 @@ class FairMQMessageSHM final : public FairMQMessage
mutable char* fLocalPtr; mutable char* fLocalPtr;
bool InitializeChunk(const size_t size); bool InitializeChunk(const size_t size);
zmq_msg_t* GetMessage(); zmq_msg_t* GetMessage() { return &fMessage; }
void CloseMessage(); void CloseMessage();
}; };

View File

@ -5,7 +5,7 @@
* GNU Lesser General Public Licence (LGPL) version 3, * * GNU Lesser General Public Licence (LGPL) version 3, *
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <fairmq/shmem/Common.h> #include "Common.h"
#include "FairMQSocketSHM.h" #include "FairMQSocketSHM.h"
#include "FairMQMessageSHM.h" #include "FairMQMessageSHM.h"
@ -38,32 +38,27 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
assert(context); assert(context);
fSocket = zmq_socket(context, GetConstant(type)); fSocket = zmq_socket(context, GetConstant(type));
if (fSocket == nullptr) if (fSocket == nullptr) {
{
LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) if (zmq_setsockopt(fSocket, ZMQ_IDENTITY, fId.c_str(), fId.length()) != 0) {
{
LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_IDENTITY socket option, reason: " << zmq_strerror(errno);
} }
// Tell socket to try and send/receive outstanding messages for <linger> milliseconds before terminating. // Tell socket to try and send/receive outstanding messages for <linger> milliseconds before terminating.
// Default value for ZeroMQ is -1, which is to wait forever. // Default value for ZeroMQ is -1, which is to wait forever.
int linger = 1000; int linger = 1000;
if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) if (zmq_setsockopt(fSocket, ZMQ_LINGER, &linger, sizeof(linger)) != 0) {
{
LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
} }
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) {
{
LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
} }
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) {
{
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
} }
@ -75,8 +70,7 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
// } // }
// } // }
if (type == "sub" || type == "pub") if (type == "sub" || type == "pub") {
{
LOG(error) << "PUB/SUB socket type is not supported for shared memory transport"; LOG(error) << "PUB/SUB socket type is not supported for shared memory transport";
throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport"); throw fair::mq::SocketError("PUB/SUB socket type is not supported for shared memory transport");
} }
@ -86,10 +80,9 @@ FairMQSocketSHM::FairMQSocketSHM(Manager& manager, const string& type, const str
bool FairMQSocketSHM::Bind(const string& address) bool FairMQSocketSHM::Bind(const string& address)
{ {
// LOG(info) << "bind socket " << fId << " on " << address; // LOG(info) << "binding socket " << fId << " on " << address;
if (zmq_bind(fSocket, address.c_str()) != 0) if (zmq_bind(fSocket, address.c_str()) != 0) {
{
if (errno == EADDRINUSE) { if (errno == EADDRINUSE) {
// do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range. // do not print error in this case, this is handled by FairMQDevice in case no connection could be established after trying a number of random ports from a range.
return false; return false;
@ -102,10 +95,9 @@ bool FairMQSocketSHM::Bind(const string& address)
bool FairMQSocketSHM::Connect(const string& address) bool FairMQSocketSHM::Connect(const string& address)
{ {
// LOG(info) << "connect socket " << fId << " on " << address; // LOG(info) << "connecting socket " << fId << " on " << address;
if (zmq_connect(fSocket, address.c_str()) != 0) if (zmq_connect(fSocket, address.c_str()) != 0) {
{
LOG(error) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed connecting socket " << fId << ", reason: " << zmq_strerror(errno);
return false; return false;
} }
@ -116,55 +108,38 @@ bool FairMQSocketSHM::Connect(const string& address)
int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout) int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout)
{ {
int flags = 0; int flags = 0;
if (timeout == 0) if (timeout == 0) {
{
flags = ZMQ_DONTWAIT; flags = ZMQ_DONTWAIT;
} }
int elapsed = 0; int elapsed = 0;
while (true && !fInterrupted) while (true && !fInterrupted) {
{
int nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(), fSocket, flags); int nbytes = zmq_msg_send(static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(), fSocket, flags);
if (nbytes == 0) if (nbytes == 0) {
{ ++fMessagesTx;
return nbytes; return nbytes;
} } else if (nbytes > 0) {
else if (nbytes > 0)
{
static_cast<FairMQMessageSHM*>(msg.get())->fQueued = true; static_cast<FairMQMessageSHM*>(msg.get())->fQueued = true;
size_t size = msg->GetSize(); size_t size = msg->GetSize();
fBytesTx += size; fBytesTx += size;
++fMessagesTx; ++fMessagesTx;
return size; return size;
} } else if (zmq_errno() == EAGAIN) {
else if (zmq_errno() == EAGAIN) if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
{ if (timeout > 0) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{
if (timeout > 0)
{
elapsed += fSndTimeout; elapsed += fSndTimeout;
if (elapsed >= timeout) if (elapsed >= timeout) {
{
return -2; return -2;
} }
} }
continue; continue;
} } else {
else
{
return -2; return -2;
} }
} } else if (zmq_errno() == ETERM) {
else if (zmq_errno() == ETERM)
{
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} } else {
else
{
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes; return nbytes;
} }
@ -176,72 +151,53 @@ int FairMQSocketSHM::Send(FairMQMessagePtr& msg, const int timeout)
int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout) int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout)
{ {
int flags = 0; int flags = 0;
if (timeout == 0) if (timeout == 0) {
{
flags = ZMQ_DONTWAIT; flags = ZMQ_DONTWAIT;
} }
int elapsed = 0; int elapsed = 0;
zmq_msg_t* msgPtr = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(); while (true) {
while (true) FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get());
{ zmq_msg_t* zmqMsg = shmMsg->GetMessage();
int nbytes = zmq_msg_recv(msgPtr, fSocket, flags); int nbytes = zmq_msg_recv(zmqMsg, fSocket, flags);
if (nbytes == 0) if (nbytes == 0) {
{
++fMessagesRx; ++fMessagesRx;
return nbytes; return nbytes;
} } else if (nbytes > 0) {
else if (nbytes > 0) // check for number of received messages. must be 1
{
// check for number of receiving messages. must be 1
const auto numMsgs = nbytes / sizeof(MetaHeader); const auto numMsgs = nbytes / sizeof(MetaHeader);
if (numMsgs > 1) if (numMsgs > 1) {
{
LOG(error) << "Receiving SHM multipart with a single message receive call"; LOG(error) << "Receiving SHM multipart with a single message receive call";
} }
assert (numMsgs == 1); assert(numMsgs == 1);
MetaHeader* hdr = static_cast<MetaHeader*>(zmq_msg_data(msgPtr)); MetaHeader* hdr = static_cast<MetaHeader*>(zmq_msg_data(zmqMsg));
size_t size = 0; size_t size = hdr->fSize;
static_cast<FairMQMessageSHM*>(msg.get())->fHandle = hdr->fHandle; shmMsg->fHandle = hdr->fHandle;
static_cast<FairMQMessageSHM*>(msg.get())->fSize = hdr->fSize; shmMsg->fSize = size;
static_cast<FairMQMessageSHM*>(msg.get())->fRegionId = hdr->fRegionId; shmMsg->fRegionId = hdr->fRegionId;
static_cast<FairMQMessageSHM*>(msg.get())->fHint = hdr->fHint; shmMsg->fHint = hdr->fHint;
size = msg->GetSize();
fBytesRx += size; fBytesRx += size;
++fMessagesRx; ++fMessagesRx;
return size; return size;
} } else if (zmq_errno() == EAGAIN) {
else if (zmq_errno() == EAGAIN) if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
{ if (timeout > 0) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{
if (timeout > 0)
{
elapsed += fRcvTimeout; elapsed += fRcvTimeout;
if (elapsed >= timeout) if (elapsed >= timeout) {
{
return -2; return -2;
} }
} }
continue; continue;
} } else {
else
{
return -2; return -2;
} }
} } else if (zmq_errno() == ETERM) {
else if (zmq_errno() == ETERM)
{
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} } else {
else
{
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes; return nbytes;
} }
@ -251,8 +207,7 @@ int FairMQSocketSHM::Receive(FairMQMessagePtr& msg, const int timeout)
int64_t FairMQSocketSHM::Send(vector<FairMQMessagePtr>& msgVec, const int timeout) int64_t FairMQSocketSHM::Send(vector<FairMQMessagePtr>& msgVec, const int timeout)
{ {
int flags = 0; int flags = 0;
if (timeout == 0) if (timeout == 0) {
{
flags = ZMQ_DONTWAIT; flags = ZMQ_DONTWAIT;
} }
const unsigned int vecSize = msgVec.size(); const unsigned int vecSize = msgVec.size();
@ -269,8 +224,7 @@ int64_t FairMQSocketSHM::Send(vector<FairMQMessagePtr>& msgVec, const int timeou
// prepare the message with shm metas // prepare the message with shm metas
MetaHeader* metas = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg)); MetaHeader* metas = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
for (auto& msg : msgVec) for (auto& msg : msgVec) {
{
zmq_msg_t* metaMsg = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage(); zmq_msg_t* metaMsg = static_cast<FairMQMessageSHM*>(msg.get())->GetMessage();
if (zmq_msg_size(metaMsg) > 0) { if (zmq_msg_size(metaMsg) > 0) {
memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader)); memcpy(metas++, zmq_msg_data(metaMsg), sizeof(MetaHeader));
@ -286,21 +240,16 @@ int64_t FairMQSocketSHM::Send(vector<FairMQMessagePtr>& msgVec, const int timeou
} }
} }
while (!fInterrupted) while (!fInterrupted) {
{
int64_t totalSize = 0; int64_t totalSize = 0;
int nbytes = zmq_msg_send(&zmqMsg, fSocket, flags); int nbytes = zmq_msg_send(&zmqMsg, fSocket, flags);
if (nbytes == 0) if (nbytes == 0) {
{
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
return nbytes; return nbytes;
} } else if (nbytes > 0) {
else if (nbytes > 0)
{
assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing assert(static_cast<unsigned int>(nbytes) == (vecSize * sizeof(MetaHeader))); // all or nothing
for (auto& msg : msgVec) for (auto& msg : msgVec) {
{
FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get()); FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msg.get());
shmMsg->fQueued = true; shmMsg->fQueued = true;
totalSize += shmMsg->fSize; totalSize += shmMsg->fSize;
@ -312,36 +261,25 @@ int64_t FairMQSocketSHM::Send(vector<FairMQMessagePtr>& msgVec, const int timeou
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
return totalSize; return totalSize;
} } else if (zmq_errno() == EAGAIN) {
else if (zmq_errno() == EAGAIN) if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
{ if (timeout > 0) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{
if (timeout > 0)
{
elapsed += fSndTimeout; elapsed += fSndTimeout;
if (elapsed >= timeout) if (elapsed >= timeout) {
{
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
return -2; return -2;
} }
} }
continue; continue;
} } else {
else
{
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
return -2; return -2;
} }
} } else if (zmq_errno() == ETERM) {
else if (zmq_errno() == ETERM)
{
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
LOG(info) << "terminating socket " << fId; LOG(info) << "terminating socket " << fId;
return -1; return -1;
} } else {
else
{
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes; return nbytes;
@ -355,8 +293,7 @@ int64_t FairMQSocketSHM::Send(vector<FairMQMessagePtr>& msgVec, const int timeou
int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int timeout) int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int timeout)
{ {
int flags = 0; int flags = 0;
if (timeout == 0) if (timeout == 0) {
{
flags = ZMQ_DONTWAIT; flags = ZMQ_DONTWAIT;
} }
int elapsed = 0; int elapsed = 0;
@ -364,17 +301,13 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
zmq_msg_t zmqMsg; zmq_msg_t zmqMsg;
zmq_msg_init(&zmqMsg); zmq_msg_init(&zmqMsg);
while (!fInterrupted) while (!fInterrupted) {
{
int64_t totalSize = 0; int64_t totalSize = 0;
int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags); int nbytes = zmq_msg_recv(&zmqMsg, fSocket, flags);
if (nbytes == 0) if (nbytes == 0) {
{
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
return 0; return 0;
} } else if (nbytes > 0) {
else if (nbytes > 0)
{
MetaHeader* hdrVec = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg)); MetaHeader* hdrVec = static_cast<MetaHeader*>(zmq_msg_data(&zmqMsg));
const auto hdrVecSize = zmq_msg_size(&zmqMsg); const auto hdrVecSize = zmq_msg_size(&zmqMsg);
assert(hdrVecSize > 0); assert(hdrVecSize > 0);
@ -384,24 +317,22 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
msgVec.reserve(numMessages); msgVec.reserve(numMessages);
for (size_t m = 0; m < numMessages; m++) for (size_t m = 0; m < numMessages; m++) {
{ // get the meta data pointer
MetaHeader hdr; MetaHeader* hdr = &hdrVec[m];
memcpy(&hdr, &hdrVec[m], sizeof(MetaHeader));
msgVec.emplace_back(fair::mq::tools::make_unique<FairMQMessageSHM>(fManager, GetTransport())); // create new message (part)
msgVec.emplace_back(tools::make_unique<FairMQMessageSHM>(fManager, GetTransport()));
FairMQMessageSHM* shmMsg = static_cast<FairMQMessageSHM*>(msgVec.back().get());
// fill the zmq buffer with the delivered meta data
memcpy(zmq_msg_data(shmMsg->GetMessage()), hdr, sizeof(MetaHeader));
// set the message members with the meta data
shmMsg->fHandle = hdr->fHandle;
shmMsg->fSize = hdr->fSize;
shmMsg->fRegionId = hdr->fRegionId;
shmMsg->fHint = hdr->fHint;
FairMQMessageSHM* msg = static_cast<FairMQMessageSHM*>(msgVec.back().get()); totalSize += shmMsg->GetSize();
MetaHeader* msgHdr = static_cast<MetaHeader*>(zmq_msg_data(msg->GetMessage()));
memcpy(msgHdr, &hdr, sizeof(MetaHeader));
msg->fHandle = hdr.fHandle;
msg->fSize = hdr.fSize;
msg->fRegionId = hdr.fRegionId;
msg->fHint = hdr.fHint;
totalSize += msg->GetSize();
} }
// store statistics on how many messages have been received (handle all parts as a single message) // store statistics on how many messages have been received (handle all parts as a single message)
@ -410,30 +341,21 @@ int64_t FairMQSocketSHM::Receive(vector<FairMQMessagePtr>& msgVec, const int tim
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
return totalSize; return totalSize;
} } else if (zmq_errno() == EAGAIN) {
else if (zmq_errno() == EAGAIN) if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0)) {
{ if (timeout > 0) {
if (!fInterrupted && ((flags & ZMQ_DONTWAIT) == 0))
{
if (timeout > 0)
{
elapsed += fRcvTimeout; elapsed += fRcvTimeout;
if (elapsed >= timeout) if (elapsed >= timeout) {
{
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
return -2; return -2;
} }
} }
continue; continue;
} } else {
else
{
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
return -2; return -2;
} }
} } else {
else
{
zmq_msg_close(&zmqMsg); zmq_msg_close(&zmqMsg);
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
return nbytes; return nbytes;
@ -448,13 +370,11 @@ void FairMQSocketSHM::Close()
{ {
// LOG(debug) << "Closing socket " << fId; // LOG(debug) << "Closing socket " << fId;
if (fSocket == nullptr) if (fSocket == nullptr) {
{
return; return;
} }
if (zmq_close(fSocket) != 0) if (zmq_close(fSocket) != 0) {
{
LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno); LOG(error) << "Failed closing socket " << fId << ", reason: " << zmq_strerror(errno);
} }
@ -475,23 +395,16 @@ void FairMQSocketSHM::Resume()
fInterrupted = false; fInterrupted = false;
} }
void* FairMQSocketSHM::GetSocket() const
{
return fSocket;
}
void FairMQSocketSHM::SetOption(const string& option, const void* value, size_t valueSize) void FairMQSocketSHM::SetOption(const string& option, const void* value, size_t valueSize)
{ {
if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) if (zmq_setsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
{
LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed setting socket option, reason: " << zmq_strerror(errno);
} }
} }
void FairMQSocketSHM::GetOption(const string& option, void* value, size_t* valueSize) void FairMQSocketSHM::GetOption(const string& option, void* value, size_t* valueSize)
{ {
if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) if (zmq_getsockopt(fSocket, GetConstant(option), value, valueSize) < 0) {
{
LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno); LOG(error) << "Failed getting socket option, reason: " << zmq_strerror(errno);
} }
} }
@ -581,26 +494,6 @@ int FairMQSocketSHM::GetRcvKernelSize() const
return value; return value;
} }
unsigned long FairMQSocketSHM::GetBytesTx() const
{
return fBytesTx;
}
unsigned long FairMQSocketSHM::GetBytesRx() const
{
return fBytesRx;
}
unsigned long FairMQSocketSHM::GetMessagesTx() const
{
return fMessagesTx;
}
unsigned long FairMQSocketSHM::GetMessagesRx() const
{
return fMessagesRx;
}
int FairMQSocketSHM::GetConstant(const string& constant) int FairMQSocketSHM::GetConstant(const string& constant)
{ {
if (constant == "") return 0; if (constant == "") return 0;
@ -629,8 +522,3 @@ int FairMQSocketSHM::GetConstant(const string& constant)
return -1; return -1;
} }
FairMQSocketSHM::~FairMQSocketSHM()
{
Close();
}

View File

@ -34,7 +34,7 @@ class FairMQSocketSHM final : public FairMQSocket
int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout = -1) override; int64_t Send(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout = -1) override;
int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout = -1) override; int64_t Receive(std::vector<std::unique_ptr<FairMQMessage>>& msgVec, const int timeout = -1) override;
void* GetSocket() const; void* GetSocket() const { return fSocket; }
void Close() override; void Close() override;
@ -55,14 +55,14 @@ class FairMQSocketSHM final : public FairMQSocket
void SetRcvKernelSize(const int value) override; void SetRcvKernelSize(const int value) override;
int GetRcvKernelSize() const override; int GetRcvKernelSize() const override;
unsigned long GetBytesTx() const override; unsigned long GetBytesTx() const override { return fBytesTx; }
unsigned long GetBytesRx() const override; unsigned long GetBytesRx() const override { return fBytesRx; }
unsigned long GetMessagesTx() const override; unsigned long GetMessagesTx() const override { return fMessagesTx; }
unsigned long GetMessagesRx() const override; unsigned long GetMessagesRx() const override { return fMessagesRx; }
static int GetConstant(const std::string& constant); static int GetConstant(const std::string& constant);
~FairMQSocketSHM() override; ~FairMQSocketSHM() override { Close(); }
private: private:
void* fSocket; void* fSocket;

View File

@ -6,7 +6,7 @@
* copied verbatim in the file "LICENSE" * * copied verbatim in the file "LICENSE" *
********************************************************************************/ ********************************************************************************/
#include <fairmq/shmem/Common.h> #include "Common.h"
#include "FairMQUnmanagedRegionSHM.h" #include "FairMQUnmanagedRegionSHM.h"
@ -41,18 +41,3 @@ FairMQUnmanagedRegionSHM::FairMQUnmanagedRegionSHM(Manager& manager, const size_
throw; throw;
} }
} }
void* FairMQUnmanagedRegionSHM::GetData() const
{
return fRegion->get_address();
}
size_t FairMQUnmanagedRegionSHM::GetSize() const
{
return fRegion->get_size();
}
FairMQUnmanagedRegionSHM::~FairMQUnmanagedRegionSHM()
{
fManager.RemoveRegion(fRegionId);
}

View File

@ -28,10 +28,10 @@ class FairMQUnmanagedRegionSHM final : public FairMQUnmanagedRegion
public: public:
FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0); FairMQUnmanagedRegionSHM(fair::mq::shmem::Manager& manager, const size_t size, FairMQRegionCallback callback = nullptr, const std::string& path = "", int flags = 0);
void* GetData() const override; void* GetData() const override { return fRegion->get_address(); }
size_t GetSize() const override; size_t GetSize() const override { return fRegion->get_size(); }
~FairMQUnmanagedRegionSHM() override; ~FairMQUnmanagedRegionSHM() override { fManager.RemoveRegion(fRegionId); }
private: private:
fair::mq::shmem::Manager& fManager; fair::mq::shmem::Manager& fManager;