Compare commits

..

3 Commits

Author SHA1 Message Date
Alexey Rybalchenko
af0d668951 Shm: fix region init with external regions 2022-09-09 15:40:33 +02:00
Alexey Rybalchenko
072d7cb744 shm: add some debug output 2022-09-09 15:40:33 +02:00
Alexey Rybalchenko
f5c46ce018 region example: add options for testing with externally-created regions 2022-09-09 15:40:33 +02:00
5 changed files with 40 additions and 24 deletions

View File

@@ -19,6 +19,10 @@ SAMPLER+=" --severity debug"
SAMPLER+=" --msg-size $msgSize" SAMPLER+=" --msg-size $msgSize"
# SAMPLER+=" --rate 10" # SAMPLER+=" --rate 10"
SAMPLER+=" --transport $transport" SAMPLER+=" --transport $transport"
# SAMPLER+=" --external-region true"
# SAMPLER+=" --shm-no-cleaup true"
# SAMPLER+=" --shm-monitor false"
# SAMPLER+=" --shmid 1"
SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992" SAMPLER+=" --channel-config name=data,type=push,method=bind,address=tcp://127.0.0.1:7777,sndKernelSize=212992"
xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER & xterm -geometry 120x60+0+0 -hold -e @EX_BIN_DIR@/$SAMPLER &
@@ -26,5 +30,8 @@ SINK="fairmq-ex-region-sink"
SINK+=" --id sink1" SINK+=" --id sink1"
SINK+=" --severity debug" SINK+=" --severity debug"
SINK+=" --transport $transport" SINK+=" --transport $transport"
# SINK+=" --shm-no-cleaup true"
# SINK+=" --shm-monitor false"
# SINK+=" --shmid 1"
SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992" SINK+=" --channel-config name=data,type=pull,method=connect,address=tcp://127.0.0.1:7777,rcvKernelSize=212992"
xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK & xterm -geometry 120x60+750+0 -hold -e @EX_BIN_DIR@/$SINK &

View File

@@ -19,6 +19,7 @@ struct Sampler : fair::mq::Device
{ {
void InitTask() override void InitTask() override
{ {
fExternalRegion = fConfig->GetProperty<bool>("external-region");
fMsgSize = fConfig->GetProperty<int>("msg-size"); fMsgSize = fConfig->GetProperty<int>("msg-size");
fLinger = fConfig->GetProperty<uint32_t>("region-linger"); fLinger = fConfig->GetProperty<uint32_t>("region-linger");
fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations"); fMaxIterations = fConfig->GetProperty<uint64_t>("max-iterations");
@@ -34,9 +35,18 @@ struct Sampler : fair::mq::Device
fair::mq::RegionConfig regionCfg; fair::mq::RegionConfig regionCfg;
regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events regionCfg.linger = fLinger; // delay in ms before region destruction to collect outstanding events
// options for testing with an externally-created -region
if (fExternalRegion) {
regionCfg.id = 1;
regionCfg.removeOnDestruction = false;
regionCfg.lock = false; // mlock region after creation
regionCfg.lock = false; // mlock region after creation
} else {
regionCfg.lock = true; // mlock region after creation regionCfg.lock = true; // mlock region after creation
regionCfg.zero = true; // zero region content after creation regionCfg.zero = true; // zero region content after creation
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor("data", // region is created using the transport of this channel... }
fRegion = fair::mq::UnmanagedRegionPtr(NewUnmanagedRegionFor(
"data", // region is created using the transport of this channel...
0, // ... and this sub-channel 0, // ... and this sub-channel
10000000, // region size 10000000, // region size
[this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport [this](const std::vector<fair::mq::RegionBlock>& blocks) { // callback to be called when message buffers no longer needed by transport
@@ -45,7 +55,9 @@ struct Sampler : fair::mq::Device
if (fMaxIterations > 0) { if (fMaxIterations > 0) {
LOG(info) << "Received " << blocks.size() << " acks"; LOG(info) << "Received " << blocks.size() << " acks";
} }
}, regionCfg)); },
regionCfg
));
} }
bool ConditionalRun() override bool ConditionalRun() override
@@ -91,6 +103,7 @@ struct Sampler : fair::mq::Device
} }
private: private:
int fExternalRegion = false;
int fMsgSize = 10000; int fMsgSize = 10000;
uint32_t fLinger = 100; uint32_t fLinger = 100;
uint64_t fMaxIterations = 0; uint64_t fMaxIterations = 0;
@@ -105,7 +118,8 @@ void addCustomOptions(bpo::options_description& options)
options.add_options() options.add_options()
("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes") ("msg-size", bpo::value<int>()->default_value(1000), "Message size in bytes")
("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions") ("region-linger", bpo::value<uint32_t>()->default_value(100), "Linger period for regions")
("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)"); ("max-iterations", bpo::value<uint64_t>()->default_value(0), "Maximum number of iterations of Run/ConditionalRun/OnData (0 - infinite)")
("external-region", bpo::value<bool>()->default_value(false), "Use region created by another process");
} }
std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/) std::unique_ptr<fair::mq::Device> getDevice(fair::mq::ProgOptions& /*config*/)

View File

@@ -395,18 +395,10 @@ class Manager
const uint16_t id = cfg.id.value(); const uint16_t id = cfg.id.value();
UnmanagedRegion* region = nullptr;
bool newRegionCreated = false;
std::lock_guard<std::mutex> lock(fLocalRegionsMtx); std::lock_guard<std::mutex> lock(fLocalRegionsMtx);
auto res = fRegions.emplace(id, std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg)); auto& region = fRegions[id] = std::make_unique<UnmanagedRegion>(fShmId, size, false, cfg);
newRegionCreated = res.second;
region = res.first->second.get();
// LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'"; // LOG(debug) << "Created region with id '" << id << "', path: '" << cfg.path << "', flags: '" << cfg.creationFlags << "'";
if (!newRegionCreated) {
region->fRemote = false; // TODO: this should be more clear, refactor it.
}
// start ack receiver only if a callback has been provided. // start ack receiver only if a callback has been provided.
if (callback || bulkCallback) { if (callback || bulkCallback) {
region->SetCallbacks(callback, bulkCallback); region->SetCallbacks(callback, bulkCallback);
@@ -414,7 +406,7 @@ class Manager
region->StartAckSender(); region->StartAckSender();
region->StartAckReceiver(); region->StartAckReceiver();
} }
result.first = region; result.first = region.get();
result.second = id; result.second = id;
} }
fRegionsGen += 1; // signal TL cache invalidation fRegionsGen += 1; // signal TL cache invalidation

View File

@@ -670,7 +670,7 @@ std::vector<std::pair<std::string, bool>> Monitor::Cleanup(const ShmId& shmIdT,
string path = info.fPath.c_str(); string path = info.fPath.c_str();
int flags = info.fCreationFlags; int flags = info.fCreationFlags;
if (verbose) { if (verbose) {
LOG(info) << "Found RegionInfo with path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << "."; LOG(info) << "Found UnmanagedRegion with id: " << id << ", path: '" << path << "', flags: " << flags << ", fDestroyed: " << info.fDestroyed << ".";
} }
if (!path.empty()) { if (!path.empty()) {
result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose)); result.emplace_back(Remove<bipc::file_mapping>(path + "fmq_" + shmId + "_rg_" + to_string(id), verbose));

View File

@@ -74,6 +74,8 @@ struct UnmanagedRegion
// TODO: refactor this // TODO: refactor this
cfg.size = size; cfg.size = size;
LOG(debug) << "UnmanagedRegion(): " << fName << " | remote: " << remote << ".";
if (!cfg.path.empty()) { if (!cfg.path.empty()) {
fName = std::string(cfg.path + fName); fName = std::string(cfg.path + fName);
@@ -171,6 +173,7 @@ struct UnmanagedRegion
~UnmanagedRegion() ~UnmanagedRegion()
{ {
LOG(debug) << "~UnmanagedRegion(): " << fName << " | remote: " << fRemote << ".";
fStopAcks = true; fStopAcks = true;
if (fAcksSender.joinable()) { if (fAcksSender.joinable()) {