mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-13 00:31:14 +00:00
Formatting
This commit is contained in:
parent
edbdc57332
commit
1f26883b75
|
@ -486,7 +486,6 @@ void FairMQDevice::RunWrapper()
|
||||||
}
|
}
|
||||||
|
|
||||||
PostRun();
|
PostRun();
|
||||||
|
|
||||||
} catch (const out_of_range& oor) {
|
} catch (const out_of_range& oor) {
|
||||||
LOG(error) << "out of range: " << oor.what();
|
LOG(error) << "out of range: " << oor.what();
|
||||||
LOG(error) << "incorrect/incomplete channel configuration?";
|
LOG(error) << "incorrect/incomplete channel configuration?";
|
||||||
|
@ -504,17 +503,12 @@ void FairMQDevice::HandleSingleChannelInput()
|
||||||
{
|
{
|
||||||
bool proceed = true;
|
bool proceed = true;
|
||||||
|
|
||||||
if (fMsgInputs.size() > 0)
|
if (fMsgInputs.size() > 0) {
|
||||||
{
|
while (!NewStatePending() && proceed) {
|
||||||
while (!NewStatePending() && proceed)
|
|
||||||
{
|
|
||||||
proceed = HandleMsgInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0);
|
proceed = HandleMsgInput(fInputChannelKeys.at(0), fMsgInputs.begin()->second, 0);
|
||||||
}
|
}
|
||||||
}
|
} else if (fMultipartInputs.size() > 0) {
|
||||||
else if (fMultipartInputs.size() > 0)
|
while (!NewStatePending() && proceed) {
|
||||||
{
|
|
||||||
while (!NewStatePending() && proceed)
|
|
||||||
{
|
|
||||||
proceed = HandleMultipartInput(fInputChannelKeys.at(0), fMultipartInputs.begin()->second, 0);
|
proceed = HandleMultipartInput(fInputChannelKeys.at(0), fMultipartInputs.begin()->second, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -524,75 +518,55 @@ void FairMQDevice::HandleMultipleChannelInput()
|
||||||
{
|
{
|
||||||
// check if more than one transport is used
|
// check if more than one transport is used
|
||||||
fMultitransportInputs.clear();
|
fMultitransportInputs.clear();
|
||||||
for (const auto& k : fInputChannelKeys)
|
for (const auto& k : fInputChannelKeys) {
|
||||||
{
|
|
||||||
fair::mq::Transport t = fChannels.at(k).at(0).fTransportType;
|
fair::mq::Transport t = fChannels.at(k).at(0).fTransportType;
|
||||||
if (fMultitransportInputs.find(t) == fMultitransportInputs.end())
|
if (fMultitransportInputs.find(t) == fMultitransportInputs.end()) {
|
||||||
{
|
|
||||||
fMultitransportInputs.insert(pair<fair::mq::Transport, vector<string>>(t, vector<string>()));
|
fMultitransportInputs.insert(pair<fair::mq::Transport, vector<string>>(t, vector<string>()));
|
||||||
fMultitransportInputs.at(t).push_back(k);
|
fMultitransportInputs.at(t).push_back(k);
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
{
|
|
||||||
fMultitransportInputs.at(t).push_back(k);
|
fMultitransportInputs.at(t).push_back(k);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& mi : fMsgInputs)
|
for (const auto& mi : fMsgInputs) {
|
||||||
{
|
for (auto& i : fChannels.at(mi.first)) {
|
||||||
for (auto& i : fChannels.at(mi.first))
|
|
||||||
{
|
|
||||||
i.fMultipart = false;
|
i.fMultipart = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const auto& mi : fMultipartInputs)
|
for (const auto& mi : fMultipartInputs) {
|
||||||
{
|
for (auto& i : fChannels.at(mi.first)) {
|
||||||
for (auto& i : fChannels.at(mi.first))
|
|
||||||
{
|
|
||||||
i.fMultipart = true;
|
i.fMultipart = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if more than one transport is used, handle poll of each in a separate thread
|
// if more than one transport is used, handle poll of each in a separate thread
|
||||||
if (fMultitransportInputs.size() > 1)
|
if (fMultitransportInputs.size() > 1) {
|
||||||
{
|
|
||||||
HandleMultipleTransportInput();
|
HandleMultipleTransportInput();
|
||||||
}
|
} else { // otherwise poll directly
|
||||||
else // otherwise poll directly
|
|
||||||
{
|
|
||||||
bool proceed = true;
|
bool proceed = true;
|
||||||
|
|
||||||
FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));
|
FairMQPollerPtr poller(fChannels.at(fInputChannelKeys.at(0)).at(0).fTransportFactory->CreatePoller(fChannels, fInputChannelKeys));
|
||||||
|
|
||||||
while (!NewStatePending() && proceed)
|
while (!NewStatePending() && proceed) {
|
||||||
{
|
|
||||||
poller->Poll(200);
|
poller->Poll(200);
|
||||||
|
|
||||||
// check which inputs are ready and call their data handlers if they are.
|
// check which inputs are ready and call their data handlers if they are.
|
||||||
for (const auto& ch : fInputChannelKeys)
|
for (const auto& ch : fInputChannelKeys) {
|
||||||
{
|
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
|
||||||
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
|
if (poller->CheckInput(ch, i)) {
|
||||||
{
|
if (fChannels.at(ch).at(i).fMultipart) {
|
||||||
if (poller->CheckInput(ch, i))
|
|
||||||
{
|
|
||||||
if (fChannels.at(ch).at(i).fMultipart)
|
|
||||||
{
|
|
||||||
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
proceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
{
|
|
||||||
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
proceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!proceed)
|
if (!proceed) {
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!proceed)
|
if (!proceed) {
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -606,64 +580,49 @@ void FairMQDevice::HandleMultipleTransportInput()
|
||||||
|
|
||||||
fMultitransportProceed = true;
|
fMultitransportProceed = true;
|
||||||
|
|
||||||
for (const auto& i : fMultitransportInputs)
|
for (const auto& i : fMultitransportInputs) {
|
||||||
{
|
|
||||||
threads.emplace_back(thread(&FairMQDevice::PollForTransport, this, fTransports.at(i.first).get(), i.second));
|
threads.emplace_back(thread(&FairMQDevice::PollForTransport, this, fTransports.at(i.first).get(), i.second));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (thread& t : threads)
|
for (thread& t : threads) {
|
||||||
{
|
|
||||||
t.join();
|
t.join();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const vector<string>& channelKeys)
|
void FairMQDevice::PollForTransport(const FairMQTransportFactory* factory, const vector<string>& channelKeys)
|
||||||
{
|
{
|
||||||
try
|
try {
|
||||||
{
|
|
||||||
FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys));
|
FairMQPollerPtr poller(factory->CreatePoller(fChannels, channelKeys));
|
||||||
|
|
||||||
while (!NewStatePending() && fMultitransportProceed)
|
while (!NewStatePending() && fMultitransportProceed) {
|
||||||
{
|
|
||||||
poller->Poll(500);
|
poller->Poll(500);
|
||||||
|
|
||||||
for (const auto& ch : channelKeys)
|
for (const auto& ch : channelKeys) {
|
||||||
{
|
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i) {
|
||||||
for (unsigned int i = 0; i < fChannels.at(ch).size(); ++i)
|
if (poller->CheckInput(ch, i)) {
|
||||||
{
|
|
||||||
if (poller->CheckInput(ch, i))
|
|
||||||
{
|
|
||||||
lock_guard<mutex> lock(fMultitransportMutex);
|
lock_guard<mutex> lock(fMultitransportMutex);
|
||||||
|
|
||||||
if (!fMultitransportProceed)
|
if (!fMultitransportProceed) {
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fChannels.at(ch).at(i).fMultipart)
|
if (fChannels.at(ch).at(i).fMultipart) {
|
||||||
{
|
|
||||||
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
fMultitransportProceed = HandleMultipartInput(ch, fMultipartInputs.at(ch), i);
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
{
|
|
||||||
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
fMultitransportProceed = HandleMsgInput(ch, fMsgInputs.at(ch), i);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fMultitransportProceed)
|
if (!fMultitransportProceed) {
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!fMultitransportProceed)
|
if (!fMultitransportProceed) {
|
||||||
{
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
} catch (exception& e) {
|
||||||
catch (exception& e)
|
|
||||||
{
|
|
||||||
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
|
LOG(error) << "FairMQDevice::PollForTransport() failed: " << e.what() << ", going to ERROR state.";
|
||||||
throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
|
throw runtime_error(tools::ToString("FairMQDevice::PollForTransport() failed: ", e.what(), ", going to ERROR state."));
|
||||||
}
|
}
|
||||||
|
@ -673,12 +632,9 @@ bool FairMQDevice::HandleMsgInput(const string& chName, const InputMsgCallback&
|
||||||
{
|
{
|
||||||
unique_ptr<FairMQMessage> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());
|
unique_ptr<FairMQMessage> input(fChannels.at(chName).at(i).fTransportFactory->CreateMessage());
|
||||||
|
|
||||||
if (Receive(input, chName, i) >= 0)
|
if (Receive(input, chName, i) >= 0) {
|
||||||
{
|
|
||||||
return callback(input, i);
|
return callback(input, i);
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
{
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -687,12 +643,9 @@ bool FairMQDevice::HandleMultipartInput(const string& chName, const InputMultipa
|
||||||
{
|
{
|
||||||
FairMQParts input;
|
FairMQParts input;
|
||||||
|
|
||||||
if (Receive(input, chName, i) >= 0)
|
if (Receive(input, chName, i) >= 0) {
|
||||||
{
|
|
||||||
return callback(input, 0);
|
return callback(input, 0);
|
||||||
}
|
} else {
|
||||||
else
|
|
||||||
{
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,7 +152,7 @@ struct Region
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG(debug) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
|
LOG(trace) << "AcksSender for " << fName << " leaving " << "(blocks left to free: " << fBlocksToFree.size() << ", "
|
||||||
<< " blocks left to send: " << blocksToSend << ").";
|
<< " blocks left to send: " << blocksToSend << ").";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -195,7 +195,7 @@ struct Region
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG(debug) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
|
LOG(trace) << "AcksReceiver for " << fName << " leaving (remaining queue size: " << fQueue->get_num_msg() << ").";
|
||||||
}
|
}
|
||||||
|
|
||||||
void ReleaseBlock(const RegionBlock& block)
|
void ReleaseBlock(const RegionBlock& block)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user