mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 17:41:45 +00:00
Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
78b1c188bf | ||
|
66bc7ba762 | ||
|
88bc1f7a06 | ||
|
f70201610b | ||
|
fc7f6f1116 | ||
|
8125489776 | ||
|
6dd0a44308 | ||
|
afe2dcaa02 | ||
|
aeab9e5407 | ||
|
539e5602a6 | ||
|
beb510ded8 | ||
|
d1c51e0f1f | ||
|
f885b4618e | ||
|
3364da9541 | ||
|
7aec6f91de | ||
|
9e2a002942 | ||
|
52c6264faf | ||
|
79489bb501 |
@@ -32,6 +32,7 @@ Set(configure_options "${configure_options};-DBUILD_SDK=ON")
|
||||
Set(configure_options "${configure_options};-DBUILD_SDK_COMMANDS=ON")
|
||||
Set(configure_options "${configure_options};-DFAST_BUILD=ON")
|
||||
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")
|
||||
Set(configure_options "${configure_options};-DBoost_NO_BOOST_CMAKE=ON")
|
||||
|
||||
Set(EXTRA_FLAGS $ENV{EXTRA_FLAGS})
|
||||
If(EXTRA_FLAGS)
|
||||
|
@@ -145,7 +145,7 @@ macro(set_fairmq_defaults)
|
||||
# Configure build types
|
||||
set(CMAKE_CONFIGURATION_TYPES "Debug" "Release" "RelWithDebInfo" "Nightly" "Profile" "Experimental" "AddressSan" "ThreadSan")
|
||||
set(_warnings "-Wshadow -Wall -Wextra -Wpedantic")
|
||||
set(CMAKE_CXX_FLAGS_DEBUG "-g ${_warnings}")
|
||||
set(CMAKE_CXX_FLAGS_DEBUG "-Og -g ${_warnings}")
|
||||
set(CMAKE_CXX_FLAGS_RELEASE "-O2 -DNDEBUG")
|
||||
set(CMAKE_CXX_FLAGS_RELWITHDEBINFO "-O2 -g ${_warnings} -DNDEBUG")
|
||||
set(CMAKE_CXX_FLAGS_NIGHTLY "-O2 -g ${_warnings}")
|
||||
|
@@ -17,8 +17,7 @@ Sampler::Sampler()
|
||||
: fText()
|
||||
, fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
{
|
||||
}
|
||||
{}
|
||||
|
||||
void Sampler::InitTask()
|
||||
{
|
||||
@@ -32,25 +31,22 @@ bool Sampler::ConditionalRun()
|
||||
// create a copy of the data with new(), that will be deleted after the transfer is complete
|
||||
string* text = new string(fText);
|
||||
|
||||
// create message object with a pointer to the data buffer,
|
||||
// its size,
|
||||
// create message object with a pointer to the data buffer, its size,
|
||||
// custom deletion function (called when transfer is done),
|
||||
// and pointer to the object managing the data buffer
|
||||
FairMQMessagePtr msg(NewMessage(const_cast<char*>(text->c_str()),
|
||||
text->length(),
|
||||
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
|
||||
text));
|
||||
FairMQMessagePtr msg(NewMessage(
|
||||
const_cast<char*>(text->c_str()),
|
||||
text->length(),
|
||||
[](void* /*data*/, void* object) { delete static_cast<string*>(object); },
|
||||
text));
|
||||
|
||||
LOG(info) << "Sending \"" << fText << "\"";
|
||||
|
||||
// in case of error or transfer interruption, return false to go to IDLE state
|
||||
// in case of error or transfer interruption, return false to go to the Ready state
|
||||
// successfull transfer will return number of bytes transfered (can be 0 if sending an empty message).
|
||||
if (Send(msg, "data") < 0)
|
||||
{
|
||||
if (Send(msg, "data") < 0) {
|
||||
return false;
|
||||
}
|
||||
else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
|
||||
{
|
||||
} else if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
return false;
|
||||
}
|
||||
@@ -58,8 +54,6 @@ bool Sampler::ConditionalRun()
|
||||
return true;
|
||||
}
|
||||
|
||||
Sampler::~Sampler()
|
||||
{
|
||||
}
|
||||
Sampler::~Sampler() {}
|
||||
|
||||
} // namespace example_1_1
|
||||
|
@@ -33,23 +33,22 @@ void Sink::InitTask()
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
}
|
||||
|
||||
// handler is called whenever a message arrives on "data", with a reference to the message and a sub-channel index (here 0)
|
||||
// handler is called whenever a message arrives on "data", with a reference to the message and a
|
||||
// sub-channel index (here 0)
|
||||
bool Sink::HandleData(FairMQMessagePtr& msg, int /*index*/)
|
||||
{
|
||||
LOG(info) << "Received: \"" << string(static_cast<char*>(msg->GetData()), msg->GetSize()) << "\"";
|
||||
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations)
|
||||
{
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
return false;
|
||||
}
|
||||
|
||||
// return true if want to be called again (otherwise return false go to IDLE state)
|
||||
// return true if you want the handler to be called again (otherwise return false go to the
|
||||
// Ready state)
|
||||
return true;
|
||||
}
|
||||
|
||||
Sink::~Sink()
|
||||
{
|
||||
}
|
||||
Sink::~Sink() {}
|
||||
|
||||
} // namespace example_1_1
|
||||
|
@@ -23,36 +23,40 @@ namespace example_region
|
||||
|
||||
Sampler::Sampler()
|
||||
: fMsgSize(10000)
|
||||
, fLinger(100)
|
||||
, fMaxIterations(0)
|
||||
, fNumIterations(0)
|
||||
, fRegion(nullptr)
|
||||
, fNumUnackedMsgs(0)
|
||||
{
|
||||
}
|
||||
{}
|
||||
|
||||
void Sampler::InitTask()
|
||||
{
|
||||
fMsgSize = fConfig->GetProperty<int>("msg-size");
|
||||
fLinger = fConfig->GetProperty<uint32_t>("region-linger");
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
|
||||
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
||||
LOG(warn) << ">>>" << info.event;
|
||||
LOG(warn) << "id: " << info.id;
|
||||
LOG(warn) << "ptr: " << info.ptr;
|
||||
LOG(warn) << "size: " << info.size;
|
||||
LOG(warn) << "flags: " << info.flags;
|
||||
LOG(info) << "Region event: " << info.event
|
||||
<< ", id: " << info.id
|
||||
<< ", ptr: " << info.ptr
|
||||
<< ", size: " << info.size
|
||||
<< ", flags: " << info.flags;
|
||||
});
|
||||
|
||||
fRegion = FairMQUnmanagedRegionPtr(NewUnmanagedRegionFor("data",
|
||||
0,
|
||||
10000000,
|
||||
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
fNumUnackedMsgs -= blocks.size();
|
||||
|
||||
if (fMaxIterations > 0) {
|
||||
LOG(debug) << "Received " << blocks.size() << " acks";
|
||||
LOG(info) << "Received " << blocks.size() << " acks";
|
||||
}
|
||||
}
|
||||
));
|
||||
fRegion->SetLinger(fLinger);
|
||||
}
|
||||
|
||||
bool Sampler::ConditionalRun()
|
||||
@@ -69,27 +73,30 @@ bool Sampler::ConditionalRun()
|
||||
// LOG(info) << "check: " << static_cast<char*>(fRegion->GetData())[3];
|
||||
// std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
if (Send(msg, "data", 0) > 0) {
|
||||
++fNumUnackedMsgs;
|
||||
|
||||
if (fMaxIterations > 0 && ++fNumIterations >= fMaxIterations) {
|
||||
LOG(info) << "Configured maximum number of iterations reached. Leaving RUNNING state.";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
++fNumUnackedMsgs;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Sampler::ResetTask()
|
||||
{
|
||||
// if not all messages acknowledged, wait for a bit. But only once, since receiver could be already dead.
|
||||
if (fNumUnackedMsgs != 0) {
|
||||
LOG(debug) << "waiting for all acknowledgements... (" << fNumUnackedMsgs << ")";
|
||||
this_thread::sleep_for(chrono::milliseconds(500));
|
||||
LOG(debug) << "done, still unacked: " << fNumUnackedMsgs;
|
||||
}
|
||||
// On destruction UnmanagedRegion will try to TODO
|
||||
fRegion.reset();
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
if (fNumUnackedMsgs != 0) {
|
||||
LOG(info) << "Done, still not acknowledged: " << fNumUnackedMsgs;
|
||||
} else {
|
||||
LOG(info) << "All acknowledgements received.";
|
||||
}
|
||||
}
|
||||
fChannels.at("data").at(0).Transport()->UnsubscribeFromRegionEvents();
|
||||
}
|
||||
|
||||
|
@@ -15,7 +15,8 @@
|
||||
#ifndef FAIRMQEXAMPLEREGIONSAMPLER_H
|
||||
#define FAIRMQEXAMPLEREGIONSAMPLER_H
|
||||
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
#include <cstdint>
|
||||
|
||||
#include "FairMQDevice.h"
|
||||
|
||||
@@ -35,10 +36,12 @@ class Sampler : public FairMQDevice
|
||||
|
||||
private:
|
||||
int fMsgSize;
|
||||
uint32_t fLinger;
|
||||
uint64_t fMaxIterations;
|
||||
uint64_t fNumIterations;
|
||||
FairMQUnmanagedRegionPtr fRegion;
|
||||
std::atomic<uint64_t> fNumUnackedMsgs;
|
||||
std::mutex fMtx;
|
||||
uint64_t fNumUnackedMsgs;
|
||||
};
|
||||
|
||||
} // namespace example_region
|
||||
|
@@ -30,11 +30,11 @@ void Sink::InitTask()
|
||||
// Get the fMaxIterations value from the command line options (via fConfig)
|
||||
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
|
||||
fChannels.at("data").at(0).Transport()->SubscribeToRegionEvents([](FairMQRegionInfo info) {
|
||||
LOG(warn) << ">>>" << info.event;
|
||||
LOG(warn) << "id: " << info.id;
|
||||
LOG(warn) << "ptr: " << info.ptr;
|
||||
LOG(warn) << "size: " << info.size;
|
||||
LOG(warn) << "flags: " << info.flags;
|
||||
LOG(info) << "Region event: " << info.event
|
||||
<< ", id: " << info.id
|
||||
<< ", ptr: " << info.ptr
|
||||
<< ", size: " << info.size
|
||||
<< ", flags: " << info.flags;
|
||||
});
|
||||
}
|
||||
|
||||
|
@@ -15,6 +15,7 @@ void addCustomOptions(bpo::options_description& options)
|
||||
{
|
||||
options.add_options()
|
||||
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
|
||||
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)");
|
||||
}
|
||||
|
||||
|
@@ -23,6 +23,7 @@ SAMPLER+=" --verbosity veryhigh"
|
||||
SAMPLER+=" --control static --color false"
|
||||
SAMPLER+=" --max-iterations 1"
|
||||
SAMPLER+=" --msg-size $msgSize"
|
||||
SAMPLER+=" --region-linger 500"
|
||||
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777"
|
||||
@CMAKE_CURRENT_BINARY_DIR@/$SAMPLER &
|
||||
SAMPLER_PID=$!
|
||||
|
@@ -170,6 +170,7 @@ if(BUILD_FAIRMQ)
|
||||
PluginManager.h
|
||||
PluginServices.h
|
||||
runFairMQDevice.h
|
||||
shmem/Monitor.h
|
||||
)
|
||||
|
||||
set(FAIRMQ_PRIVATE_HEADER_FILES
|
||||
@@ -234,6 +235,7 @@ if(BUILD_FAIRMQ)
|
||||
plugins/config/Config.cxx
|
||||
plugins/Control.cxx
|
||||
MemoryResources.cxx
|
||||
shmem/Monitor.cxx
|
||||
)
|
||||
|
||||
if(BUILD_OFI_TRANSPORT)
|
||||
|
@@ -671,7 +671,6 @@ void FairMQChannel::Init()
|
||||
bool FairMQChannel::ConnectEndpoint(const string& endpoint)
|
||||
{
|
||||
lock_guard<mutex> lock(fMtx);
|
||||
|
||||
return fSocket->Connect(endpoint);
|
||||
}
|
||||
|
||||
@@ -683,6 +682,13 @@ bool FairMQChannel::BindEndpoint(string& endpoint)
|
||||
if (fSocket->Bind(endpoint)) {
|
||||
return true;
|
||||
} else {
|
||||
// auto-bind only implemented for TCP
|
||||
size_t protocolPos = endpoint.find(':');
|
||||
string protocol = endpoint.substr(0, protocolPos);
|
||||
if (protocol != "tcp") {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (fAutoBind) {
|
||||
// number of attempts when choosing a random port
|
||||
int numAttempts = 0;
|
||||
|
@@ -11,6 +11,7 @@
|
||||
|
||||
#include <cstddef> // for size_t
|
||||
#include <memory> // unique_ptr
|
||||
#include <stdexcept>
|
||||
|
||||
#include <fairmq/Transports.h>
|
||||
|
||||
|
@@ -9,9 +9,10 @@
|
||||
#ifndef FAIRMQSOCKET_H_
|
||||
#define FAIRMQSOCKET_H_
|
||||
|
||||
#include <memory>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include <memory>
|
||||
|
||||
#include "FairMQMessage.h"
|
||||
class FairMQTransportFactory;
|
||||
|
@@ -10,6 +10,7 @@
|
||||
#define FAIRMQUNMANAGEDREGION_H_
|
||||
|
||||
#include <cstddef> // size_t
|
||||
#include <cstdint> // uint32_t
|
||||
#include <memory> // std::unique_ptr
|
||||
#include <functional> // std::function
|
||||
#include <ostream> // std::ostream
|
||||
@@ -72,6 +73,8 @@ class FairMQUnmanagedRegion
|
||||
virtual void* GetData() const = 0;
|
||||
virtual size_t GetSize() const = 0;
|
||||
virtual uint64_t GetId() const = 0;
|
||||
virtual void SetLinger(uint32_t linger) = 0;
|
||||
virtual uint32_t GetLinger() const = 0;
|
||||
|
||||
FairMQTransportFactory* GetTransport() { return fTransport; }
|
||||
void SetTransport(FairMQTransportFactory* transport) { fTransport = transport; }
|
||||
|
@@ -17,6 +17,7 @@
|
||||
#include <chrono>
|
||||
#include <cstddef> // size_t
|
||||
#include <cstdint> // uint64_t
|
||||
#include <cstring> // memset
|
||||
#include <string>
|
||||
|
||||
/**
|
||||
@@ -28,6 +29,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
public:
|
||||
FairMQBenchmarkSampler()
|
||||
: fMultipart(false)
|
||||
, fMemSet(false)
|
||||
, fNumParts(1)
|
||||
, fMsgSize(10000)
|
||||
, fMsgRate(0)
|
||||
@@ -39,6 +41,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
void InitTask() override
|
||||
{
|
||||
fMultipart = fConfig->GetProperty<bool>("multipart");
|
||||
fMemSet = fConfig->GetProperty<bool>("memset");
|
||||
fNumParts = fConfig->GetProperty<size_t>("num-parts");
|
||||
fMsgSize = fConfig->GetProperty<size_t>("msg-size");
|
||||
fMsgRate = fConfig->GetProperty<float>("msg-rate");
|
||||
@@ -51,8 +54,6 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
// store the channel reference to avoid traversing the map on every loop iteration
|
||||
FairMQChannel& dataOutChannel = fChannels.at(fOutChannelName).at(0);
|
||||
|
||||
FairMQMessagePtr baseMsg(dataOutChannel.NewMessage(fMsgSize));
|
||||
|
||||
LOG(info) << "Starting the benchmark with message size of " << fMsgSize << " and " << fMaxIterations << " iterations.";
|
||||
auto tStart = std::chrono::high_resolution_clock::now();
|
||||
|
||||
@@ -64,6 +65,9 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
|
||||
for (size_t i = 0; i < fNumParts; ++i) {
|
||||
parts.AddPart(dataOutChannel.NewMessage(fMsgSize));
|
||||
if (fMemSet) {
|
||||
std::memset(parts.At(i)->GetData(), 0, parts.At(i)->GetSize());
|
||||
}
|
||||
}
|
||||
|
||||
if (dataOutChannel.Send(parts) >= 0) {
|
||||
@@ -76,6 +80,9 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
}
|
||||
} else {
|
||||
FairMQMessagePtr msg(dataOutChannel.NewMessage(fMsgSize));
|
||||
if (fMemSet) {
|
||||
std::memset(msg->GetData(), 0, msg->GetSize());
|
||||
}
|
||||
|
||||
if (dataOutChannel.Send(msg) >= 0) {
|
||||
if (fMaxIterations > 0) {
|
||||
@@ -101,6 +108,7 @@ class FairMQBenchmarkSampler : public FairMQDevice
|
||||
|
||||
protected:
|
||||
bool fMultipart;
|
||||
bool fMemSet;
|
||||
size_t fNumParts;
|
||||
size_t fMsgSize;
|
||||
std::atomic<int> fMsgCounter;
|
||||
|
@@ -15,8 +15,8 @@ void addCustomOptions(bpo::options_description& options)
|
||||
{
|
||||
options.add_options()
|
||||
("out-channel", bpo::value<std::string>()->default_value("data"), "Name of the output channel")
|
||||
("same-msg", bpo::value<bool>()->default_value(false), "Re-send the same message, or recreate for each iteration")
|
||||
("multipart", bpo::value<bool>()->default_value(false), "Handle multipart payloads")
|
||||
("memset", bpo::value<bool>()->default_value(false), "Memset allocated buffers to 0")
|
||||
("num-parts", bpo::value<size_t>()->default_value(1), "Number of parts to send. 1 will send single messages, not parts")
|
||||
("msg-size", bpo::value<size_t>()->default_value(1000000), "Message size in bytes")
|
||||
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Number of run iterations (0 - infinite)")
|
||||
|
@@ -12,7 +12,6 @@
|
||||
|
||||
#include <atomic>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
#include <boost/interprocess/allocators/allocator.hpp>
|
||||
@@ -113,8 +112,11 @@ struct RegionBlock
|
||||
inline std::string buildShmIdFromSessionIdAndUserId(const std::string& sessionId)
|
||||
{
|
||||
std::string seed((std::to_string(geteuid()) + sessionId));
|
||||
std::string shmId = picosha2::hash256_hex_string(seed);
|
||||
shmId.resize(10, '_');
|
||||
// generate a 8-digit hex value out of sha256 hash
|
||||
std::vector<unsigned char> hash(4);
|
||||
picosha2::hash256(seed.begin(), seed.end(), hash.begin(), hash.end());
|
||||
std::string shmId = picosha2::bytes_to_hex_string(hash.begin(), hash.end());
|
||||
|
||||
return shmId;
|
||||
}
|
||||
|
||||
|
@@ -17,6 +17,7 @@
|
||||
|
||||
#include "Common.h"
|
||||
#include "Region.h"
|
||||
#include "Monitor.h"
|
||||
|
||||
#include <FairMQLogger.h>
|
||||
#include <FairMQMessage.h>
|
||||
@@ -32,6 +33,8 @@
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
|
||||
#include <cstdlib> // getenv
|
||||
#include <condition_variable>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
@@ -55,10 +58,8 @@ class Manager
|
||||
Manager(std::string id, std::string deviceId, size_t size, bool throwOnBadAlloc)
|
||||
: fShmId(std::move(id))
|
||||
, fDeviceId(std::move(deviceId))
|
||||
, fSegmentName("fmq_" + fShmId + "_main")
|
||||
, fManagementSegmentName("fmq_" + fShmId + "_mng")
|
||||
, fSegment(boost::interprocess::open_or_create, fSegmentName.c_str(), size)
|
||||
, fManagementSegment(boost::interprocess::open_or_create, fManagementSegmentName.c_str(), 655360)
|
||||
, fSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_main").c_str(), size)
|
||||
, fManagementSegment(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mng").c_str(), 655360)
|
||||
, fShmVoidAlloc(fManagementSegment.get_segment_manager())
|
||||
, fShmMtx(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_mtx").c_str())
|
||||
, fRegionEventsCV(boost::interprocess::open_or_create, std::string("fmq_" + fShmId + "_cv").c_str())
|
||||
@@ -72,7 +73,7 @@ class Manager
|
||||
, fThrowOnBadAlloc(throwOnBadAlloc)
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << size << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
|
||||
LOG(debug) << "created/opened shared memory segment '" << "fmq_" << fShmId << "_main" << "' of " << fSegment.get_size() << " bytes. Available are " << fSegment.get_free_memory() << " bytes.";
|
||||
|
||||
fRegionInfos = fManagementSegment.find_or_construct<Uint64RegionInfoMap>(unique_instance)(fShmVoidAlloc);
|
||||
// store info about the managed segment as region with id 0
|
||||
@@ -100,39 +101,6 @@ class Manager
|
||||
Manager(const Manager&) = delete;
|
||||
Manager operator=(const Manager&) = delete;
|
||||
|
||||
~Manager()
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
bool lastRemoved = false;
|
||||
|
||||
UnsubscribeFromRegionEvents();
|
||||
|
||||
fSendHeartbeats = false;
|
||||
fHeartbeatThread.join();
|
||||
|
||||
try {
|
||||
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
|
||||
|
||||
(fDeviceCounter->fCount)--;
|
||||
|
||||
if (fDeviceCounter->fCount == 0) {
|
||||
LOG(debug) << "last segment user, removing segment.";
|
||||
|
||||
RemoveSegments();
|
||||
lastRemoved = true;
|
||||
} else {
|
||||
LOG(debug) << "other segment users present (" << fDeviceCounter->fCount << "), not removing it.";
|
||||
}
|
||||
} catch(interprocess_exception& e) {
|
||||
LOG(error) << "error while acquiring lock in Manager destructor: " << e.what();
|
||||
}
|
||||
|
||||
if (lastRemoved) {
|
||||
named_mutex::remove(std::string("fmq_" + fShmId + "_mtx").c_str());
|
||||
named_condition::remove(std::string("fmq_" + fShmId + "_cv").c_str());
|
||||
}
|
||||
}
|
||||
|
||||
boost::interprocess::managed_shared_memory& Segment() { return fSegment; }
|
||||
boost::interprocess::managed_shared_memory& ManagementSegment() { return fManagementSegment; }
|
||||
|
||||
@@ -265,8 +233,12 @@ class Manager
|
||||
|
||||
auto r = fRegions.emplace(id, tools::make_unique<Region>(fShmId, id, 0, true, nullptr, nullptr, path, flags));
|
||||
return r.first->second.get();
|
||||
} catch (std::out_of_range& oor) {
|
||||
LOG(error) << "Could not get remote region with id '" << id << "'. Does the region creator run with the same session id?";
|
||||
LOG(error) << oor.what();
|
||||
return nullptr;
|
||||
} catch (boost::interprocess::interprocess_exception& e) {
|
||||
LOG(warn) << "Could not get remote region for id: " << id;
|
||||
LOG(warn) << "Could not get remote region for id '" << id << "'";
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
@@ -274,9 +246,9 @@ class Manager
|
||||
|
||||
void RemoveRegion(const uint64_t id)
|
||||
{
|
||||
fRegions.erase(id);
|
||||
{
|
||||
boost::interprocess::scoped_lock<boost::interprocess::named_mutex> lock(fShmMtx);
|
||||
fRegions.erase(id);
|
||||
fRegionInfos->at(id).fDestroyed = true;
|
||||
}
|
||||
fRegionEventsCV.notify_all();
|
||||
@@ -381,48 +353,67 @@ class Manager
|
||||
void IncrementMsgCounter() { ++fMsgCounter; }
|
||||
void DecrementMsgCounter() { --fMsgCounter; }
|
||||
|
||||
void RemoveSegments()
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
if (shared_memory_object::remove(fSegmentName.c_str())) {
|
||||
LOG(debug) << "successfully removed '" << fSegmentName << "' segment after the device has stopped.";
|
||||
} else {
|
||||
LOG(debug) << "did not remove " << fSegmentName << " segment after the device stopped. Already removed?";
|
||||
}
|
||||
|
||||
if (shared_memory_object::remove(fManagementSegmentName.c_str())) {
|
||||
LOG(debug) << "successfully removed '" << fManagementSegmentName << "' segment after the device has stopped.";
|
||||
} else {
|
||||
LOG(debug) << "did not remove '" << fManagementSegmentName << "' segment after the device stopped. Already removed?";
|
||||
}
|
||||
}
|
||||
|
||||
void SendHeartbeats()
|
||||
{
|
||||
std::string controlQueueName("fmq_" + fShmId + "_cq");
|
||||
std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
|
||||
while (fSendHeartbeats) {
|
||||
try {
|
||||
boost::interprocess::message_queue mq(boost::interprocess::open_only, controlQueueName.c_str());
|
||||
boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100);
|
||||
if (mq.timed_send(fDeviceId.c_str(), fDeviceId.size(), 0, sndTill)) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(100), [&]() { return !fSendHeartbeats; });
|
||||
} else {
|
||||
LOG(debug) << "control queue timeout";
|
||||
}
|
||||
} catch (boost::interprocess::interprocess_exception& ie) {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||||
// LOG(warn) << "no " << controlQueueName << " found";
|
||||
fHeartbeatsCV.wait_for(lock, std::chrono::milliseconds(500), [&]() { return !fSendHeartbeats; });
|
||||
// LOG(debug) << "no " << controlQueueName << " found";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool ThrowingOnBadAlloc() const { return fThrowOnBadAlloc; }
|
||||
|
||||
~Manager()
|
||||
{
|
||||
using namespace boost::interprocess;
|
||||
bool lastRemoved = false;
|
||||
|
||||
UnsubscribeFromRegionEvents();
|
||||
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(fHeartbeatsMtx);
|
||||
fSendHeartbeats = false;
|
||||
}
|
||||
fHeartbeatsCV.notify_one();
|
||||
if (fHeartbeatThread.joinable()) {
|
||||
fHeartbeatThread.join();
|
||||
}
|
||||
|
||||
try {
|
||||
boost::interprocess::scoped_lock<named_mutex> lock(fShmMtx);
|
||||
|
||||
(fDeviceCounter->fCount)--;
|
||||
|
||||
if (fDeviceCounter->fCount == 0) {
|
||||
LOG(debug) << "Last segment user, removing segment.";
|
||||
lastRemoved = true;
|
||||
} else {
|
||||
LOG(debug) << "Other segment users present (" << fDeviceCounter->fCount << "), skipping removal.";
|
||||
}
|
||||
} catch (interprocess_exception& e) {
|
||||
LOG(error) << "Manager could not acquire lock: " << e.what();
|
||||
}
|
||||
|
||||
if (lastRemoved) {
|
||||
Monitor::Cleanup(ShmId{fShmId});
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::string fShmId;
|
||||
std::string fDeviceId;
|
||||
std::string fSegmentName;
|
||||
std::string fManagementSegmentName;
|
||||
boost::interprocess::managed_shared_memory fSegment;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
VoidAlloc fShmVoidAlloc;
|
||||
@@ -442,7 +433,10 @@ class Manager
|
||||
std::atomic<int32_t> fMsgCounter; // TODO: find a better lifetime solution instead of the counter
|
||||
|
||||
std::thread fHeartbeatThread;
|
||||
std::atomic<bool> fSendHeartbeats;
|
||||
bool fSendHeartbeats;
|
||||
std::mutex fHeartbeatsMtx;
|
||||
std::condition_variable fHeartbeatsCV;
|
||||
|
||||
bool fThrowOnBadAlloc;
|
||||
};
|
||||
|
||||
|
@@ -251,6 +251,10 @@ class Message final : public fair::mq::Message
|
||||
// boost::interprocess::managed_shared_memory::size_type actualSize = size;
|
||||
// char* hint = 0; // unused for boost::interprocess::allocate_new
|
||||
// fLocalPtr = fManager.Segment().allocation_command<char>(boost::interprocess::allocate_new, size, actualSize, hint);
|
||||
size_t segmentSize = fManager.Segment().get_size();
|
||||
if (size > segmentSize) {
|
||||
throw MessageBadAlloc(tools::ToString("Requested message size (", size, ") exceeds segment size (", segmentSize, ")"));
|
||||
}
|
||||
if (alignment == 0) {
|
||||
fLocalPtr = reinterpret_cast<char*>(fManager.Segment().allocate(size));
|
||||
} else {
|
||||
|
@@ -64,7 +64,6 @@ Monitor::Monitor(const string& shmId, bool selfDestruct, bool interactive, bool
|
||||
, fHeartbeatTriggered(false)
|
||||
, fLastHeartbeat(chrono::high_resolution_clock::now())
|
||||
, fSignalThread()
|
||||
, fManagementSegment(bipc::open_or_create, fManagementSegmentName.c_str(), 65536)
|
||||
, fDeviceHeartbeats()
|
||||
{
|
||||
if (!fViewOnly) {
|
||||
@@ -203,7 +202,7 @@ void Monitor::Interactive()
|
||||
case 'x':
|
||||
cout << "\n[x] --> closing shared memory:" << endl;
|
||||
if (!fViewOnly) {
|
||||
Cleanup(fShmId);
|
||||
Cleanup(ShmId{fShmId});
|
||||
} else {
|
||||
cout << "cannot close because in view only mode" << endl;
|
||||
}
|
||||
@@ -288,7 +287,7 @@ void Monitor::CheckSegment()
|
||||
|
||||
if (fHeartbeatTriggered && duration > fTimeoutInMS) {
|
||||
cout << "no heartbeats since over " << fTimeoutInMS << " milliseconds, cleaning..." << endl;
|
||||
Cleanup(fShmId);
|
||||
Cleanup(ShmId{fShmId});
|
||||
fHeartbeatTriggered = false;
|
||||
if (fSelfDestruct) {
|
||||
cout << "\nself destructing" << endl;
|
||||
@@ -321,7 +320,7 @@ void Monitor::CheckSegment()
|
||||
unsigned int duration = chrono::duration_cast<chrono::milliseconds>(now - fLastHeartbeat).count();
|
||||
|
||||
if (fIsDaemon && duration > fTimeoutInMS * 2) {
|
||||
Cleanup(fShmId);
|
||||
Cleanup(ShmId{fShmId});
|
||||
fHeartbeatTriggered = false;
|
||||
if (fSelfDestruct) {
|
||||
cout << "\nself destructing" << endl;
|
||||
@@ -395,51 +394,52 @@ void Monitor::PrintHelp()
|
||||
void Monitor::RemoveObject(const string& name)
|
||||
{
|
||||
if (bipc::shared_memory_object::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::RemoveFileMapping(const string& name)
|
||||
{
|
||||
if (bipc::file_mapping::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::RemoveQueue(const string& name)
|
||||
{
|
||||
if (bipc::message_queue::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::RemoveMutex(const string& name)
|
||||
{
|
||||
if (bipc::named_mutex::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::RemoveCondition(const string& name)
|
||||
{
|
||||
if (bipc::named_condition::remove(name.c_str())) {
|
||||
cout << "Successfully removed \"" << name << "\"." << endl;
|
||||
cout << "Successfully removed '" << name << "'." << endl;
|
||||
} else {
|
||||
cout << "Did not remove \"" << name << "\". Already removed?" << endl;
|
||||
cout << "Did not remove '" << name << "'. Already removed?" << endl;
|
||||
}
|
||||
}
|
||||
|
||||
void Monitor::Cleanup(const string& shmId)
|
||||
void Monitor::Cleanup(const ShmId& shmId)
|
||||
{
|
||||
string managementSegmentName("fmq_" + shmId + "_mng");
|
||||
cout << "Cleaning up for shared memory id '" << shmId.shmId << "'..." << endl;
|
||||
string managementSegmentName("fmq_" + shmId.shmId + "_mng");
|
||||
try {
|
||||
bipc::managed_shared_memory managementSegment(bipc::open_only, managementSegmentName.c_str());
|
||||
RegionCounter* rc = managementSegment.find<RegionCounter>(bipc::unique_instance).first;
|
||||
@@ -454,20 +454,20 @@ void Monitor::Cleanup(const string& shmId)
|
||||
RegionInfo ri = m->at(i);
|
||||
string path = ri.fPath.c_str();
|
||||
int flags = ri.fFlags;
|
||||
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << "'." << endl;
|
||||
cout << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << ri.fDestroyed << "." << endl;
|
||||
if (path != "") {
|
||||
RemoveFileMapping(tools::ToString(path, "fmq_" + shmId + "_rg_" + to_string(i)));
|
||||
RemoveFileMapping(tools::ToString(path, "fmq_" + shmId.shmId + "_rg_" + to_string(i)));
|
||||
} else {
|
||||
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
|
||||
RemoveObject("fmq_" + shmId.shmId + "_rg_" + to_string(i));
|
||||
}
|
||||
} else {
|
||||
RemoveObject("fmq_" + shmId + "_rg_" + to_string(i));
|
||||
RemoveObject("fmq_" + shmId.shmId + "_rg_" + to_string(i));
|
||||
}
|
||||
|
||||
RemoveQueue(string("fmq_" + shmId + "_rgq_" + to_string(i)));
|
||||
RemoveQueue(string("fmq_" + shmId.shmId + "_rgq_" + to_string(i)));
|
||||
}
|
||||
} else {
|
||||
cout << "No region counter found. no regions to cleanup." << endl;
|
||||
cout << "No region counter found. No regions to cleanup." << endl;
|
||||
}
|
||||
|
||||
RemoveObject(managementSegmentName.c_str());
|
||||
@@ -477,20 +477,41 @@ void Monitor::Cleanup(const string& shmId)
|
||||
cout << "Could not locate element in the region map, out of range: " << oor.what() << endl;
|
||||
}
|
||||
|
||||
RemoveObject("fmq_" + shmId + "_main");
|
||||
RemoveMutex("fmq_" + shmId + "_mtx");
|
||||
RemoveCondition("fmq_" + shmId + "_cv");
|
||||
RemoveObject("fmq_" + shmId.shmId + "_main");
|
||||
RemoveMutex("fmq_" + shmId.shmId + "_mtx");
|
||||
RemoveCondition("fmq_" + shmId.shmId + "_cv");
|
||||
|
||||
cout << endl;
|
||||
}
|
||||
|
||||
void Monitor::Cleanup(const SessionId& sessionId)
|
||||
{
|
||||
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
|
||||
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
|
||||
Cleanup(shmId);
|
||||
}
|
||||
|
||||
void Monitor::CleanupFull(const ShmId& shmId)
|
||||
{
|
||||
Cleanup(shmId);
|
||||
RemoveMutex("fmq_" + shmId.shmId + "_ms");
|
||||
RemoveQueue("fmq_" + shmId.shmId + "_cq");
|
||||
}
|
||||
|
||||
void Monitor::CleanupFull(const SessionId& sessionId)
|
||||
{
|
||||
ShmId shmId{buildShmIdFromSessionIdAndUserId(sessionId.sessionId)};
|
||||
cout << "Cleanup called with session id '" << sessionId.sessionId << "', translating to shared memory id '" << shmId.shmId << "'" << endl;
|
||||
CleanupFull(shmId);
|
||||
}
|
||||
|
||||
Monitor::~Monitor()
|
||||
{
|
||||
if (fSignalThread.joinable()) {
|
||||
fSignalThread.join();
|
||||
}
|
||||
if (fCleanOnExit) {
|
||||
Cleanup(fShmId);
|
||||
Cleanup(ShmId{fShmId});
|
||||
}
|
||||
if (!fViewOnly) {
|
||||
RemoveMutex("fmq_" + fShmId + "_ms");
|
||||
|
@@ -8,8 +8,6 @@
|
||||
#ifndef FAIR_MQ_SHMEM_MONITOR_H_
|
||||
#define FAIR_MQ_SHMEM_MONITOR_H_
|
||||
|
||||
#include <boost/interprocess/managed_shared_memory.hpp>
|
||||
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <atomic>
|
||||
@@ -24,6 +22,18 @@ namespace mq
|
||||
namespace shmem
|
||||
{
|
||||
|
||||
struct SessionId
|
||||
{
|
||||
std::string sessionId;
|
||||
explicit operator std::string() const { return sessionId; }
|
||||
};
|
||||
|
||||
struct ShmId
|
||||
{
|
||||
std::string shmId;
|
||||
explicit operator std::string() const { return shmId; }
|
||||
};
|
||||
|
||||
class Monitor
|
||||
{
|
||||
public:
|
||||
@@ -37,7 +47,19 @@ class Monitor
|
||||
void CatchSignals();
|
||||
void Run();
|
||||
|
||||
static void Cleanup(const std::string& shmId);
|
||||
/// @brief Cleanup all shared memory artifacts created by devices
|
||||
/// @param shmId shared memory id
|
||||
static void Cleanup(const ShmId& shmId);
|
||||
/// @brief Cleanup all shared memory artifacts created by devices
|
||||
/// @param sessionId session id
|
||||
static void Cleanup(const SessionId& sessionId);
|
||||
/// @brief Cleanup all shared memory artifacts created by devices and monitors
|
||||
/// @param shmId shared memory id
|
||||
static void CleanupFull(const ShmId& shmId);
|
||||
/// @brief Cleanup all shared memory artifacts created by devices and monitors
|
||||
/// @param sessionId session id
|
||||
static void CleanupFull(const SessionId& sessionId);
|
||||
|
||||
static void RemoveObject(const std::string&);
|
||||
static void RemoveFileMapping(const std::string&);
|
||||
static void RemoveQueue(const std::string&);
|
||||
@@ -70,7 +92,6 @@ class Monitor
|
||||
std::atomic<bool> fHeartbeatTriggered;
|
||||
std::chrono::high_resolution_clock::time_point fLastHeartbeat;
|
||||
std::thread fSignalThread;
|
||||
boost::interprocess::managed_shared_memory fManagementSegment;
|
||||
std::unordered_map<std::string, std::chrono::high_resolution_clock::time_point> fDeviceHeartbeats;
|
||||
};
|
||||
|
||||
|
@@ -6,6 +6,23 @@ The transport manages shared memory via boost::interprocess library. The transfe
|
||||
|
||||
Devices track and cleanup shared memory on shutdown. For more information on the current shared memory segment and additional cleanup options, see following section.
|
||||
|
||||
# Shared Memory objects / files
|
||||
|
||||
FairMQ Shared Memory currently uses the following names to register shared memory on the system:
|
||||
|
||||
| name | info | created by | used by |
|
||||
| ------------------------- | ---------------------------------------------- | ------------------ | ------------------------------ |
|
||||
| `fmq_<shmId>_main` | main segment (user data) | one of the devices | devices |
|
||||
| `fmq_<shmId>_mng` | management segment (management data) | one of the devices | devices |
|
||||
| `fmq_<shmId>_mtx` | mutex | one of the devices | devices |
|
||||
| `fmq_<shmId>_cv` | condition variable | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_rg_<index>` | unmanaged region(s) | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_rgq_<index>` | unmanaged region queue(s) | one of the devices | devices with unmanaged regions |
|
||||
| `fmq_<shmId>_ms` | shmmonitor status | shmmonitor | devices, shmmonitor |
|
||||
| `fmq_<shmId>_cq` | message queue between transport and shmmonitor | shmmonitor | devices, shmmonitor |
|
||||
|
||||
The shmId is generated out of session id and user id.
|
||||
|
||||
## Shared memory monitor
|
||||
|
||||
The shared memory monitor tool, supplied with the shared memory transport can be used to monitor shared memory use and automatically cleanup shared memory in case of device crashes.
|
||||
@@ -25,13 +42,3 @@ Without the `--self-destruct` option, the monitor will run continuously, moitori
|
||||
Possible further implementation would be to run the monitor with `--self-destruct` with each topology.
|
||||
|
||||
The Monitor class can also be used independently from the supplied executable (built from `runMonitor.cxx`), allowing integration on any level. For example invoking the monitor could be a functionality that a device offers.
|
||||
|
||||
FairMQ Shared Memory currently uses following names to register shared memory on the system:
|
||||
|
||||
`fmq_<shmId>_main` - main segment name, used for user data (the shmId is generated out of session id and user id).
|
||||
`fmq_<shmId>_mng` - management segment name, used for storing management data.
|
||||
`fmq_<shmId>_cq` - message queue for communicating between shm transport and shm monitor (exists independent of above segments).
|
||||
`fmq_<shmId>_mtx` - boost::interprocess::named_mutex for management purposes (exists independent of above segments).
|
||||
`fmq_<shmId>_ms` - shmmonitor status used to signal if it is active or not (exists independent of above segments).
|
||||
`fmq_<shmId>_rg_<index>` - names of unmanaged regions.
|
||||
`fmq_<shmId>_rgq_<index>` - names of queues for the unmanaged regions.
|
||||
|
@@ -30,6 +30,7 @@
|
||||
#include <boost/interprocess/ipc/message_queue.hpp>
|
||||
|
||||
#include <algorithm> // min
|
||||
#include <atomic>
|
||||
#include <thread>
|
||||
#include <mutex>
|
||||
#include <condition_variable>
|
||||
@@ -49,6 +50,7 @@ struct Region
|
||||
{
|
||||
Region(const std::string& shmId, uint64_t id, uint64_t size, bool remote, RegionCallback callback, RegionBulkCallback bulkCallback, const std::string& path, int flags)
|
||||
: fRemote(remote)
|
||||
, fLinger(100)
|
||||
, fStop(false)
|
||||
, fName("fmq_" + shmId + "_rg_" + std::to_string(id))
|
||||
, fQueueName("fmq_" + shmId + "_rgq_" + std::to_string(id))
|
||||
@@ -56,8 +58,8 @@ struct Region
|
||||
, fFile(nullptr)
|
||||
, fFileMapping()
|
||||
, fQueue(nullptr)
|
||||
, fReceiveAcksWorker()
|
||||
, fSendAcksWorker()
|
||||
, fAcksReceiver()
|
||||
, fAcksSender()
|
||||
, fCallback(callback)
|
||||
, fBulkCallback(bulkCallback)
|
||||
{
|
||||
@@ -118,38 +120,35 @@ struct Region
|
||||
LOG(debug) << "shmem: initialized region queue: " << fQueueName;
|
||||
}
|
||||
|
||||
void StartSendingAcks()
|
||||
{
|
||||
fSendAcksWorker = std::thread(&Region::SendAcks, this);
|
||||
}
|
||||
|
||||
void StartSendingAcks() { fAcksSender = std::thread(&Region::SendAcks, this); }
|
||||
void SendAcks()
|
||||
{
|
||||
std::unique_ptr<RegionBlock[]> blocks = tools::make_unique<RegionBlock[]>(fAckBunchSize);
|
||||
size_t blocksToSend = 0;
|
||||
|
||||
while (true) { // we'll try to send all acks before stopping
|
||||
size_t blocksToSend = 0;
|
||||
|
||||
{ // mutex locking block
|
||||
while (true) {
|
||||
blocksToSend = 0;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(fBlockMtx);
|
||||
|
||||
// try to get more blocks without waiting (we can miss a notify from CloseMessage())
|
||||
if (!fStop && (fBlocksToFree.size() < fAckBunchSize)) {
|
||||
// cv.wait() timeout: send whatever blocks we have
|
||||
// try to get <fAckBunchSize> blocks
|
||||
if (fBlocksToFree.size() < fAckBunchSize) {
|
||||
fBlockSendCV.wait_for(lock, std::chrono::milliseconds(500));
|
||||
}
|
||||
|
||||
// send whatever blocks we have
|
||||
blocksToSend = std::min(fBlocksToFree.size(), fAckBunchSize);
|
||||
|
||||
copy_n(fBlocksToFree.end() - blocksToSend, blocksToSend, blocks.get());
|
||||
fBlocksToFree.resize(fBlocksToFree.size() - blocksToSend);
|
||||
} // unlock the block mutex here while sending over IPC
|
||||
}
|
||||
|
||||
if (blocksToSend > 0) {
|
||||
while (!fQueue->try_send(blocks.get(), blocksToSend * sizeof(RegionBlock), 0) && !fStop) {
|
||||
// receiver slow? yield and try again...
|
||||
std::this_thread::yield();
|
||||
}
|
||||
// LOG(debug) << "Sent " << blocksToSend << " blocks.";
|
||||
} else { // blocksToSend == 0
|
||||
if (fStop) {
|
||||
break;
|
||||
@@ -157,14 +156,11 @@ struct Region
|
||||
}
|
||||
}
|
||||
|
||||
LOG(debug) << "send ack worker for " << fName << " leaving.";
|
||||
}
|
||||
|
||||
void StartReceivingAcks()
|
||||
{
|
||||
fReceiveAcksWorker = std::thread(&Region::ReceiveAcks, this);
|
||||
LOG(debug) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
|
||||
<< " blocks left to send: " << blocksToSend << ").";
|
||||
}
|
||||
|
||||
void StartReceivingAcks() { fAcksReceiver = std::thread(&Region::ReceiveAcks, this); }
|
||||
void ReceiveAcks()
|
||||
{
|
||||
unsigned int priority;
|
||||
@@ -173,12 +169,18 @@ struct Region
|
||||
std::vector<fair::mq::RegionBlock> result;
|
||||
result.reserve(fAckBunchSize);
|
||||
|
||||
while (!fStop) { // end thread condition (should exist until region is destroyed)
|
||||
auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(500);
|
||||
while (true) {
|
||||
uint32_t timeout = 100;
|
||||
bool leave = false;
|
||||
if (fStop) {
|
||||
timeout = fLinger;
|
||||
leave = true;
|
||||
}
|
||||
auto rcvTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(timeout);
|
||||
|
||||
while (fQueue->timed_receive(blocks.get(), fAckBunchSize * sizeof(RegionBlock), recvdSize, priority, rcvTill)) {
|
||||
// LOG(debug) << "received: " << block.fHandle << " " << block.fSize << " " << block.fMessageId;
|
||||
const auto numBlocks = recvdSize / sizeof(RegionBlock);
|
||||
// LOG(debug) << "Received " << numBlocks << " blocks (recvdSize: " << recvdSize << "). (remaining queue size: " << fQueue->get_num_msg() << ").";
|
||||
if (fBulkCallback) {
|
||||
result.clear();
|
||||
for (size_t i = 0; i < numBlocks; i++) {
|
||||
@@ -191,9 +193,13 @@ struct Region
|
||||
}
|
||||
}
|
||||
}
|
||||
} // while !fStop
|
||||
|
||||
LOG(debug) << "ReceiveAcks() worker for " << fName << " leaving.";
|
||||
if (leave) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
LOG(debug) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
|
||||
}
|
||||
|
||||
void ReleaseBlock(const RegionBlock& block)
|
||||
@@ -203,31 +209,34 @@ struct Region
|
||||
fBlocksToFree.emplace_back(block);
|
||||
|
||||
if (fBlocksToFree.size() >= fAckBunchSize) {
|
||||
lock.unlock(); // reduces contention on fBlockMtx
|
||||
lock.unlock();
|
||||
fBlockSendCV.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void SetLinger(uint32_t linger) { fLinger = linger; }
|
||||
uint32_t GetLinger() const { return fLinger; }
|
||||
|
||||
~Region()
|
||||
{
|
||||
fStop = true;
|
||||
|
||||
if (fSendAcksWorker.joinable()) {
|
||||
if (fAcksSender.joinable()) {
|
||||
fBlockSendCV.notify_one();
|
||||
fSendAcksWorker.join();
|
||||
fAcksSender.join();
|
||||
}
|
||||
|
||||
if (!fRemote) {
|
||||
if (fReceiveAcksWorker.joinable()) {
|
||||
fReceiveAcksWorker.join();
|
||||
if (fAcksReceiver.joinable()) {
|
||||
fAcksReceiver.join();
|
||||
}
|
||||
|
||||
if (boost::interprocess::shared_memory_object::remove(fName.c_str())) {
|
||||
LOG(debug) << "shmem: destroyed region " << fName;
|
||||
LOG(debug) << "Region '" << fName << "' destroyed.";
|
||||
}
|
||||
|
||||
if (boost::interprocess::file_mapping::remove(fName.c_str())) {
|
||||
LOG(debug) << "shmem: destroyed file mapping " << fName;
|
||||
LOG(debug) << "File mapping '" << fName << "' destroyed.";
|
||||
}
|
||||
|
||||
if (fFile) {
|
||||
@@ -235,16 +244,19 @@ struct Region
|
||||
}
|
||||
|
||||
if (boost::interprocess::message_queue::remove(fQueueName.c_str())) {
|
||||
LOG(debug) << "shmem: removed region queue " << fQueueName;
|
||||
LOG(debug) << "Region queue '" << fQueueName << "' destroyed.";
|
||||
}
|
||||
} else {
|
||||
// LOG(debug) << "shmem: region '" << fName << "' is remote, no cleanup necessary.";
|
||||
LOG(debug) << "shmem: region queue '" << fQueueName << "' is remote, no cleanup necessary";
|
||||
LOG(debug) << "Region queue '" << fQueueName << "' is remote, no cleanup necessary";
|
||||
}
|
||||
|
||||
LOG(debug) << "Region '" << fName << "' (" << (fRemote ? "remote" : "local") << ") destructed.";
|
||||
}
|
||||
|
||||
bool fRemote;
|
||||
bool fStop;
|
||||
uint32_t fLinger;
|
||||
std::atomic<bool> fStop;
|
||||
std::string fName;
|
||||
std::string fQueueName;
|
||||
boost::interprocess::shared_memory_object fShmemObject;
|
||||
@@ -258,8 +270,8 @@ struct Region
|
||||
const std::size_t fAckBunchSize = 256;
|
||||
std::unique_ptr<boost::interprocess::message_queue> fQueue;
|
||||
|
||||
std::thread fReceiveAcksWorker;
|
||||
std::thread fSendAcksWorker;
|
||||
std::thread fAcksReceiver;
|
||||
std::thread fAcksSender;
|
||||
RegionCallback fCallback;
|
||||
RegionBulkCallback fBulkCallback;
|
||||
};
|
||||
|
@@ -55,8 +55,7 @@ class Socket final : public fair::mq::Socket
|
||||
, fBytesRx(0)
|
||||
, fMessagesTx(0)
|
||||
, fMessagesRx(0)
|
||||
, fSndTimeout(100)
|
||||
, fRcvTimeout(100)
|
||||
, fTimeout(100)
|
||||
{
|
||||
assert(context);
|
||||
fSocket = zmq_socket(context, GetConstant(type));
|
||||
@@ -77,11 +76,11 @@ class Socket final : public fair::mq::Socket
|
||||
LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) {
|
||||
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
|
||||
LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) {
|
||||
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
|
||||
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
@@ -129,6 +128,35 @@ class Socket final : public fair::mq::Socket
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ShouldRetry(int flags, int timeout, int& elapsed) const
|
||||
{
|
||||
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
int HandleErrors() const
|
||||
{
|
||||
if (zmq_errno() == ETERM) {
|
||||
LOG(debug) << "Terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Transfer interrupted by system call";
|
||||
return -1;
|
||||
} else {
|
||||
LOG(error) << "Failed transfer on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int Send(MessagePtr& msg, const int timeout = -1) override
|
||||
{
|
||||
int flags = 0;
|
||||
@@ -150,26 +178,13 @@ class Socket final : public fair::mq::Socket
|
||||
fBytesTx += size;
|
||||
return size;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fSndTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
return -2;
|
||||
}
|
||||
} else if (zmq_errno() == ETERM) {
|
||||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Send interrupted by system call";
|
||||
return nbytes;
|
||||
}else {
|
||||
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
|
||||
return nbytes;
|
||||
} else {
|
||||
return HandleErrors();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -206,26 +221,13 @@ class Socket final : public fair::mq::Socket
|
||||
++fMessagesRx;
|
||||
return size;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fRcvTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
return -2;
|
||||
}
|
||||
} else if (zmq_errno() == ETERM) {
|
||||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Receive interrupted by system call";
|
||||
return nbytes;
|
||||
}else {
|
||||
LOG(error) << "Failed receiving on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
|
||||
return nbytes;
|
||||
} else {
|
||||
return HandleErrors();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -268,26 +270,13 @@ class Socket final : public fair::mq::Socket
|
||||
|
||||
return totalSize;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fSndTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
return -2;
|
||||
}
|
||||
} else if (zmq_errno() == ETERM) {
|
||||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Send interrupted by system call";
|
||||
return nbytes;
|
||||
}else {
|
||||
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
|
||||
return nbytes;
|
||||
} else {
|
||||
return HandleErrors();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -335,23 +324,13 @@ class Socket final : public fair::mq::Socket
|
||||
|
||||
return totalSize;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fManager.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fRcvTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
return -2;
|
||||
}
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Receive interrupted by system call";
|
||||
return nbytes;
|
||||
} else {
|
||||
LOG(error) << "Failed receiving on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno) << ", nbytes = " << nbytes;
|
||||
return nbytes;
|
||||
return HandleErrors();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -521,8 +500,7 @@ class Socket final : public fair::mq::Socket
|
||||
std::atomic<unsigned long> fMessagesTx;
|
||||
std::atomic<unsigned long> fMessagesRx;
|
||||
|
||||
int fSndTimeout;
|
||||
int fRcvTimeout;
|
||||
int fTimeout;
|
||||
};
|
||||
|
||||
}
|
||||
|
@@ -57,7 +57,7 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
|
||||
int numIoThreads = 1;
|
||||
std::string sessionName = "default";
|
||||
size_t segmentSize = 2000000000;
|
||||
size_t segmentSize = 2ULL << 30;
|
||||
bool autolaunchMonitor = false;
|
||||
bool throwOnBadAlloc = true;
|
||||
if (config) {
|
||||
@@ -71,6 +71,7 @@ class TransportFactory final : public fair::mq::TransportFactory
|
||||
}
|
||||
|
||||
fShmId = buildShmIdFromSessionIdAndUserId(sessionName);
|
||||
LOG(debug) << "Generated shmid '" << fShmId << "' out of session id '" << sessionName << "'.";
|
||||
|
||||
try {
|
||||
if (zmq_ctx_set(fZMQContext, ZMQ_IO_THREADS, numIoThreads) != 0) {
|
||||
|
@@ -57,6 +57,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
||||
void* GetData() const override { return fRegion->get_address(); }
|
||||
size_t GetSize() const override { return fRegion->get_size(); }
|
||||
uint64_t GetId() const override { return fRegionId; }
|
||||
void SetLinger(uint32_t linger) override { fManager.GetRegion(fRegionId)->SetLinger(linger); }
|
||||
uint32_t GetLinger() const override { return fManager.GetRegion(fRegionId)->GetLinger(); }
|
||||
|
||||
~UnmanagedRegion() override { fManager.RemoveRegion(fRegionId); }
|
||||
|
||||
|
@@ -111,9 +111,7 @@ int main(int argc, char** argv)
|
||||
}
|
||||
|
||||
if (cleanup) {
|
||||
cout << "Cleaning up \"" << shmId << "\"..." << endl;
|
||||
Monitor::Cleanup(shmId);
|
||||
Monitor::RemoveQueue("fmq_" + shmId + "_cq");
|
||||
Monitor::CleanupFull(ShmId{shmId});
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@@ -12,13 +12,15 @@
|
||||
#include <FairMQLogger.h>
|
||||
#include <FairMQMessage.h>
|
||||
#include <FairMQSocket.h>
|
||||
#include <atomic>
|
||||
#include <fairmq/Tools.h>
|
||||
#include <fairmq/zeromq/Context.h>
|
||||
#include <fairmq/zeromq/Message.h>
|
||||
#include <memory> // unique_ptr
|
||||
|
||||
#include <zmq.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <memory> // unique_ptr
|
||||
|
||||
namespace fair {
|
||||
namespace mq {
|
||||
namespace zmq {
|
||||
@@ -35,8 +37,7 @@ class Socket final : public fair::mq::Socket
|
||||
, fBytesRx(0)
|
||||
, fMessagesTx(0)
|
||||
, fMessagesRx(0)
|
||||
, fSndTimeout(100)
|
||||
, fRcvTimeout(100)
|
||||
, fTimeout(100)
|
||||
{
|
||||
if (fSocket == nullptr) {
|
||||
LOG(error) << "Failed creating socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
@@ -54,11 +55,11 @@ class Socket final : public fair::mq::Socket
|
||||
LOG(error) << "Failed setting ZMQ_LINGER socket option, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fSndTimeout, sizeof(fSndTimeout)) != 0) {
|
||||
if (zmq_setsockopt(fSocket, ZMQ_SNDTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
|
||||
LOG(error) << "Failed setting ZMQ_SNDTIMEO socket option, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fRcvTimeout, sizeof(fRcvTimeout)) != 0) {
|
||||
if (zmq_setsockopt(fSocket, ZMQ_RCVTIMEO, &fTimeout, sizeof(fTimeout)) != 0) {
|
||||
LOG(error) << "Failed setting ZMQ_RCVTIMEO socket option, reason: " << zmq_strerror(errno);
|
||||
}
|
||||
|
||||
@@ -78,13 +79,12 @@ class Socket final : public fair::mq::Socket
|
||||
|
||||
bool Bind(const std::string& address) override
|
||||
{
|
||||
// LOG(info) << "bind socket " << fId << " on " << address;
|
||||
// LOG(debug) << "Binding socket " << fId << " on " << address;
|
||||
|
||||
if (zmq_bind(fSocket, address.c_str()) != 0) {
|
||||
if (errno == EADDRINUSE) {
|
||||
// do not print error in this case, this is handled by FairMQDevice in case no
|
||||
// connection could be established after trying a number of random ports from a
|
||||
// range.
|
||||
// connection could be established after trying a number of random ports from a range.
|
||||
return false;
|
||||
}
|
||||
LOG(error) << "Failed binding socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
|
||||
@@ -96,7 +96,7 @@ class Socket final : public fair::mq::Socket
|
||||
|
||||
bool Connect(const std::string& address) override
|
||||
{
|
||||
// LOG(info) << "connect socket " << fId << " on " << address;
|
||||
// LOG(debug) << "Connecting socket " << fId << " on " << address;
|
||||
|
||||
if (zmq_connect(fSocket, address.c_str()) != 0) {
|
||||
LOG(error) << "Failed connecting socket " << fId << ", address: " << address << ", reason: " << zmq_strerror(errno);
|
||||
@@ -106,6 +106,35 @@ class Socket final : public fair::mq::Socket
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ShouldRetry(int flags, int timeout, int& elapsed) const
|
||||
{
|
||||
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
int HandleErrors() const
|
||||
{
|
||||
if (zmq_errno() == ETERM) {
|
||||
LOG(debug) << "Terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Transfer interrupted by system call";
|
||||
return -1;
|
||||
} else {
|
||||
LOG(error) << "Failed transfer on socket " << fId << ", errno: " << errno << ", reason: " << zmq_strerror(errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
int Send(MessagePtr& msg, const int timeout = -1) override
|
||||
{
|
||||
int flags = 0;
|
||||
@@ -121,29 +150,15 @@ class Socket final : public fair::mq::Socket
|
||||
if (nbytes >= 0) {
|
||||
fBytesTx += nbytes;
|
||||
++fMessagesTx;
|
||||
|
||||
return nbytes;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fSndTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
return -2;
|
||||
}
|
||||
} else if (zmq_errno() == ETERM) {
|
||||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Send interrupted by system call";
|
||||
return nbytes;
|
||||
} else {
|
||||
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
return nbytes;
|
||||
return HandleErrors();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -163,26 +178,13 @@ class Socket final : public fair::mq::Socket
|
||||
++fMessagesRx;
|
||||
return nbytes;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fRcvTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
continue;
|
||||
} else {
|
||||
return -2;
|
||||
}
|
||||
} else if (zmq_errno() == ETERM) {
|
||||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Receive interrupted by system call";
|
||||
return nbytes;
|
||||
} else {
|
||||
LOG(error) << "Failed receiving on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
return nbytes;
|
||||
return HandleErrors();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -210,32 +212,15 @@ class Socket final : public fair::mq::Socket
|
||||
int nbytes = zmq_msg_send(static_cast<Message*>(msgVec[i].get())->GetMessage(), fSocket, (i < vecSize - 1) ? ZMQ_SNDMORE | flags : flags);
|
||||
if (nbytes >= 0) {
|
||||
totalSize += nbytes;
|
||||
} else {
|
||||
// according to ZMQ docs, this can only occur for the first part
|
||||
if (zmq_errno() == EAGAIN) {
|
||||
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fSndTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
repeat = true;
|
||||
break;
|
||||
} else {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
if (zmq_errno() == ETERM) {
|
||||
LOG(info) << "terminating socket " << fId;
|
||||
return -1;
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Receive interrupted by system call";
|
||||
return nbytes;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
repeat = true;
|
||||
break;
|
||||
} else {
|
||||
LOG(error) << "Failed sending on socket " << fId << ", reason: " << zmq_strerror(errno);
|
||||
return nbytes;
|
||||
return -2;
|
||||
}
|
||||
} else {
|
||||
return HandleErrors();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -243,16 +228,14 @@ class Socket final : public fair::mq::Socket
|
||||
continue;
|
||||
}
|
||||
|
||||
// store statistics on how many messages have been sent (handle all parts as a
|
||||
// single message)
|
||||
// store statistics on how many messages have been sent (handle all parts as a single message)
|
||||
++fMessagesTx;
|
||||
fBytesTx += totalSize;
|
||||
return totalSize;
|
||||
}
|
||||
} // If there's only one part, send it as a regular message
|
||||
else if (vecSize == 1) {
|
||||
} else if (vecSize == 1) { // If there's only one part, send it as a regular message
|
||||
return Send(msgVec.back(), timeout);
|
||||
} else { // if the vector is empty, something might be wrong
|
||||
} else { // if the vector is empty, something might be wrong
|
||||
LOG(warn) << "Will not send empty vector";
|
||||
return -1;
|
||||
}
|
||||
@@ -279,23 +262,14 @@ class Socket final : public fair::mq::Socket
|
||||
msgVec.push_back(move(part));
|
||||
totalSize += nbytes;
|
||||
} else if (zmq_errno() == EAGAIN) {
|
||||
if (!fCtx.Interrupted() && ((flags & ZMQ_DONTWAIT) == 0)) {
|
||||
if (timeout > 0) {
|
||||
elapsed += fRcvTimeout;
|
||||
if (elapsed >= timeout) {
|
||||
return -2;
|
||||
}
|
||||
}
|
||||
if (ShouldRetry(flags, timeout, elapsed)) {
|
||||
repeat = true;
|
||||
break;
|
||||
} else {
|
||||
return -2;
|
||||
}
|
||||
} else if (zmq_errno() == EINTR) {
|
||||
LOG(debug) << "Receive interrupted by system call";
|
||||
return nbytes;
|
||||
} else {
|
||||
return nbytes;
|
||||
return HandleErrors();
|
||||
}
|
||||
|
||||
size_t moreSize = sizeof(more);
|
||||
@@ -306,8 +280,7 @@ class Socket final : public fair::mq::Socket
|
||||
continue;
|
||||
}
|
||||
|
||||
// store statistics on how many messages have been received (handle all parts as a
|
||||
// single message)
|
||||
// store statistics on how many messages have been received (handle all parts as a single message)
|
||||
++fMessagesRx;
|
||||
fBytesRx += totalSize;
|
||||
return totalSize;
|
||||
@@ -475,8 +448,7 @@ class Socket final : public fair::mq::Socket
|
||||
std::atomic<unsigned long> fMessagesTx;
|
||||
std::atomic<unsigned long> fMessagesRx;
|
||||
|
||||
int fSndTimeout;
|
||||
int fRcvTimeout;
|
||||
int fTimeout;
|
||||
};
|
||||
|
||||
} // namespace zmq
|
||||
|
@@ -55,7 +55,7 @@ class TransportFactory final : public FairMQTransportFactory
|
||||
{
|
||||
return tools::make_unique<Message>(this);
|
||||
}
|
||||
|
||||
|
||||
MessagePtr CreateMessage(Alignment alignment) override
|
||||
{
|
||||
return tools::make_unique<Message>(alignment, this);
|
||||
|
@@ -52,6 +52,8 @@ class UnmanagedRegion final : public fair::mq::UnmanagedRegion
|
||||
virtual size_t GetSize() const override { return fSize; }
|
||||
uint64_t GetId() const override { return fId; }
|
||||
int64_t GetUserFlags() const { return fUserFlags; }
|
||||
void SetLinger(uint32_t /* linger */) override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; }
|
||||
uint32_t GetLinger() const override { LOG(debug) << "ZeroMQ UnmanagedRegion linger option not implemented. Acknowledgements are local."; return 0; }
|
||||
|
||||
virtual ~UnmanagedRegion()
|
||||
{
|
||||
|
@@ -163,9 +163,9 @@ TEST(PushPull, Multipart_ST_ipc_zeromq)
|
||||
RunSingleThreadedMultipart("zeromq", "ipc://test_Multipart_ST_ipc_zeromq_1", "ipc://test_Multipart_ST_ipc_zeromq_2");
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_ST_ipc_shmen)
|
||||
TEST(PushPull, Multipart_ST_ipc_shmem)
|
||||
{
|
||||
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmen_1", "ipc://test_Multipart_ST_ipc_shmen_2");
|
||||
RunSingleThreadedMultipart("shmem", "ipc://test_Multipart_ST_ipc_shmem_1", "ipc://test_Multipart_ST_ipc_shmem_2");
|
||||
}
|
||||
|
||||
TEST(PushPull, Multipart_MT_inproc_zeromq)
|
||||
|
Reference in New Issue
Block a user