diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index c5403495..b4b46dc4 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -88,6 +88,8 @@ set(FAIRMQ_HEADER_FILES shmem/FairMQPollerSHM.h shmem/FairMQSocketSHM.h shmem/FairMQTransportFactorySHM.h + shmem/FairMQShmMonitor.h + shmem/FairMQShmDeviceCounter.h tools/FairMQTools.h tools/runSimpleMQStateMachine.h zeromq/FairMQMessageZMQ.h @@ -137,6 +139,7 @@ set(FAIRMQ_SOURCE_FILES shmem/FairMQPollerSHM.cxx shmem/FairMQSocketSHM.cxx shmem/FairMQTransportFactorySHM.cxx + shmem/FairMQShmMonitor.cxx zeromq/FairMQMessageZMQ.cxx zeromq/FairMQPollerZMQ.cxx zeromq/FairMQSocketZMQ.cxx @@ -197,6 +200,7 @@ target_link_libraries(FairMQ Boost::filesystem Boost::regex Boost::date_time + $<$:rt> PRIVATE # only libFairMQ links against private dependencies ZeroMQ @@ -208,6 +212,7 @@ target_link_libraries(FairMQ ############### # executables # ############### + add_executable(bsampler run/runBenchmarkSampler.cxx) target_link_libraries(bsampler FairMQ) @@ -229,6 +234,9 @@ target_link_libraries(splitter FairMQ) add_executable(runConfigExample options/runConfigEx.cxx) target_link_libraries(runConfigExample FairMQ) +add_executable(shmmonitor shmem/runFairMQShmMonitor.cxx) +target_link_libraries(shmmonitor FairMQ) + #################### # aggregate target # @@ -242,6 +250,7 @@ set(FAIRMQ_FULL_TARGETS proxy sink splitter + shmmonitor ) add_custom_target(FairMQFull DEPENDS ${FAIRMQ_FULL_TARGETS}) # all targets including tests, if enabled diff --git a/fairmq/shmem/FairMQMessageSHM.cxx b/fairmq/shmem/FairMQMessageSHM.cxx index 41788643..ea27884f 100644 --- a/fairmq/shmem/FairMQMessageSHM.cxx +++ b/fairmq/shmem/FairMQMessageSHM.cxx @@ -12,7 +12,7 @@ #include "FairMQLogger.h" using namespace std; -using namespace FairMQ::shmem; +using namespace fair::mq::shmem; // uint64_t FairMQMessageSHM::fMessageID = 0; // string FairMQMessageSHM::fDeviceID = string(); diff --git a/fairmq/shmem/FairMQShmDeviceCounter.h b/fairmq/shmem/FairMQShmDeviceCounter.h index 1f4e3e89..e5827544 100644 --- a/fairmq/shmem/FairMQShmDeviceCounter.h +++ b/fairmq/shmem/FairMQShmDeviceCounter.h @@ -10,20 +10,23 @@ #include -namespace FairMQ +namespace fair +{ +namespace mq { namespace shmem { -struct FairMQShmDeviceCounter +struct DeviceCounter { - FairMQShmDeviceCounter(unsigned int c) + DeviceCounter(unsigned int c) : count(c) {} std::atomic count; }; } // namespace shmem -} // namespace FairMQ +} // namespace mq +} // namespace fair #endif /* FAIRMQSHMDEVICECOUNTER_H_ */ diff --git a/fairmq/shmem/FairMQShmManager.h b/fairmq/shmem/FairMQShmManager.h index e5a4a0e1..84900183 100644 --- a/fairmq/shmem/FairMQShmManager.h +++ b/fairmq/shmem/FairMQShmManager.h @@ -25,7 +25,9 @@ namespace bipc = boost::interprocess; -namespace FairMQ +namespace fair +{ +namespace mq { namespace shmem { @@ -185,7 +187,7 @@ struct alignas(16) MetaHeader // }; } // namespace shmem - -} // namespace FairMQ +} // namespace mq +} // namespace fair #endif /* FAIRMQSHMMANAGER_H_ */ diff --git a/fairmq/shmem/FairMQShmMonitor.cxx b/fairmq/shmem/FairMQShmMonitor.cxx new file mode 100644 index 00000000..45533217 --- /dev/null +++ b/fairmq/shmem/FairMQShmMonitor.cxx @@ -0,0 +1,334 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#include "FairMQShmMonitor.h" +#include "FairMQShmDeviceCounter.h" + +#include +#include +#include +#include + +#include +#include + +#include +#include + +#include +#include + +using namespace std; +namespace bipc = boost::interprocess; +namespace bpt = boost::posix_time; + +using CharAllocator = bipc::allocator; +using String = bipc::basic_string, CharAllocator>; +using StringAllocator = bipc::allocator; +using StringVector = bipc::vector; + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +Monitor::Monitor(const string& segmentName) + : fSegmentName(segmentName) + , fTerminating(false) + , fHeartbeatTriggered(false) + , fLastHeartbeat() + , fHeartbeatThread() +{ + if (bipc::message_queue::remove("fairmq_shmem_control_queue")) + { + // cout << "successfully removed control queue" << endl; + } + else + { + // cout << "could not remove control queue" << endl; + } +} + +void Monitor::PrintHeader() +{ + cout << "| " + << "\033[01;32m" << setw(18) << "name" << "\033[0m" << " | " + << "\033[01;32m" << setw(10) << "size" << "\033[0m" << " | " + << "\033[01;32m" << setw(10) << "free" << "\033[0m" << " | " + // << "\033[01;32m" << setw(15) << "all deallocated" << "\033[0m" << " | " + << "\033[01;32m" << setw(2) << "ok" << "\033[0m" << " | " + // << "\033[01;32m" << setw(10) << "# named" << "\033[0m" << " | " + << "\033[01;32m" << setw(10) << "# devices" << "\033[0m" << " | " + // << "\033[01;32m" << setw(10) << "# unique" << "\033[0m" << " |" + << "\033[01;32m" << setw(10) << "ms since" << "\033[0m" << " |" + << endl; +} + +void Monitor::PrintHelp() +{ + cout << "controls: [x] close memory, [p] print queues, [h] help, [q] quit." << endl; +} + +void Monitor::CloseMemory() +{ + if (bipc::shared_memory_object::remove(fSegmentName.c_str())) + { + cout << "Successfully removed shared memory \"" << fSegmentName.c_str() << "\"." << endl; + } + else + { + cout << "Did not remove shared memory. Already removed?" << endl; + } +} + +void Monitor::MonitorHeartbeats() +{ + try + { + bipc::message_queue mq(bipc::open_or_create, "fairmq_shmem_control_queue", 1000, sizeof(bool)); + + unsigned int priority; + bipc::message_queue::size_type recvdSize; + + while (!fTerminating) + { + bool heartbeat; + bpt::ptime rcvTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); + if (mq.timed_receive(&heartbeat, sizeof(heartbeat), recvdSize, priority, rcvTill)) + { + fHeartbeatTriggered = true; + fLastHeartbeat = chrono::high_resolution_clock::now(); + } + else + { + // cout << "control queue timeout" << endl; + } + } + } + catch (bipc::interprocess_exception& ie) + { + cout << ie.what() << endl; + } + + if (bipc::message_queue::remove("fairmq_shmem_control_queue")) + { + cout << "successfully removed control queue" << endl; + } + else + { + cout << "could not remove control queue" << endl; + } +} + +void Monitor::Run() +{ + thread heartbeatThread(&Monitor::MonitorHeartbeats, this); + + char input; + pollfd inputFd[1]; + inputFd[0].fd = fileno(stdin); + inputFd[0].events = POLLIN; + + struct termios t; + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag &= ~ICANON; // disable canonical input + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + + cout << endl; + PrintHelp(); + cout << endl; + PrintHeader(); + + while (!fTerminating) + { + if (poll(inputFd, 1, 100)) + { + input = getchar(); + + switch (input) + { + case 'q': + cout << "[q] --> quitting." << endl; + fTerminating = true; + break; + case 'p': + cout << "[p] --> active queues:" << endl; + PrintQueues(); + break; + case 'x': + cout << "[x] --> closing shared memory:" << endl; + CloseMemory(); + break; + case 'h': + cout << "[h] --> help:" << endl << endl; + PrintHelp(); + cout << endl; + break; + case '\n': + cout << "[\\n] --> invalid input." << endl; + break; + default: + cout << "[" << input << "] --> invalid input." << endl; + break; + } + + if (fTerminating) + { + break; + } + + PrintHeader(); + } + + CheckSegment(); + + cout << "\r"; + } + + tcgetattr(STDIN_FILENO, &t); // get the current terminal I/O structure + t.c_lflag |= ICANON; // re-enable canonical input + tcsetattr(STDIN_FILENO, TCSANOW, &t); // apply the new settings + + heartbeatThread.join(); +} + +void Monitor::CheckSegment() +{ + static uint64_t counter = 0; + + char c = '#'; + int mod = counter++ % 5; + switch (mod) + { + case 0: + c = '-'; + break; + case 1: + c = '\\'; + break; + case 2: + c = '|'; + break; + case 3: + c = '-'; + break; + case 4: + c = '/'; + break; + default: + break; + } + + try + { + bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); + + unsigned int numDevices = 0; + + pair result = segment.find(bipc::unique_instance); + if (result.first != nullptr) + { + numDevices = result.first->count; + } + + auto now = chrono::high_resolution_clock::now(); + unsigned int duration = chrono::duration_cast(now - fLastHeartbeat).count(); + + if (fHeartbeatTriggered && duration > 5000) + { + cout << "no heartbeats since over 5 seconds, cleaning..." << endl; + CloseMemory(); + fHeartbeatTriggered = false; + } + + cout << "| " + << setw(18) << fSegmentName << " | " + << setw(10) << segment.get_size() << " | " + << setw(10) << segment.get_free_memory() << " | " + // << setw(15) << segment.all_memory_deallocated() << " | " + << setw(2) << segment.check_sanity() << " | " + // << setw(10) << segment.get_num_named_objects() << " | " + << setw(10) << numDevices << " | " + // << setw(10) << segment.get_num_unique_objects() << " |" + << setw(10) << duration << " |" + << c + << flush; + } + catch (bipc::interprocess_exception& ie) + { + fHeartbeatTriggered = false; + cout << "| " + << setw(18) << "-" << " | " + << setw(10) << "-" << " | " + << setw(10) << "-" << " | " + // << setw(15) << "-" << " | " + << setw(2) << "-" << " | " + << setw(10) << "-" << " | " + << setw(10) << "-" << " |" + << c + << flush; + } +} + +void Monitor::Cleanup(const string& segmentName) +{ + if (bipc::shared_memory_object::remove(segmentName.c_str())) + { + cout << "Successfully removed shared memory \"" << segmentName.c_str() << "\"." << endl; + } + else + { + cout << "Did not remove shared memory. Already removed?" << endl; + } +} + +void Monitor::PrintQueues() +{ + cout << '\n'; + + try + { + bipc::managed_shared_memory segment(bipc::open_only, fSegmentName.c_str()); + pair queues = segment.find("fairmq_shmem_queues"); + if (queues.first != nullptr) + { + cout << "found " << queues.first->size() << " queue(s):" << endl; + + for (int i = 0; i < queues.first->size(); ++i) + { + string name(queues.first->at(i).c_str()); + cout << '\t' << name << " : "; + pair*, size_t> queueSize = segment.find>(name.c_str()); + if (queueSize.first != nullptr) + { + cout << *(queueSize.first) << " messages" << endl; + } + else + { + cout << "\tqueue does not have a queue size entry." << endl; + } + } + } + else + { + cout << "\tno queues found" << endl; + } + } + catch (bipc::interprocess_exception& ie) + { + cout << "\tno queues found" << endl; + } + + cout << endl; +} + +} // namespace shmem +} // namespace mq +} // namespace fair diff --git a/fairmq/shmem/FairMQShmMonitor.h b/fairmq/shmem/FairMQShmMonitor.h new file mode 100644 index 00000000..cc89153c --- /dev/null +++ b/fairmq/shmem/FairMQShmMonitor.h @@ -0,0 +1,55 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#ifndef FAIRMQSHMMONITOR_H_ +#define FAIRMQSHMMONITOR_H_ + +#include +#include +#include +#include + +namespace fair +{ +namespace mq +{ +namespace shmem +{ + +class Monitor +{ + public: + Monitor(const std::string& segmentName); + Monitor(const Monitor&) = delete; + Monitor operator=(const Monitor&) = delete; + + void Run(); + + virtual ~Monitor() {} + + private: + void PrintHeader(); + void PrintHelp(); + void PrintQueues(); + void CloseMemory(); + void MonitorHeartbeats(); + void CheckSegment(); + + static void Cleanup(const std::string& segmentName); + + std::string fSegmentName; + std::atomic fTerminating; + std::atomic fHeartbeatTriggered; + std::chrono::high_resolution_clock::time_point fLastHeartbeat; + std::thread fHeartbeatThread; +}; + +} // namespace shmem +} // namespace mq +} // namespace fair + +#endif /* FAIRMQSHMMONITOR_H_ */ \ No newline at end of file diff --git a/fairmq/shmem/FairMQSocketSHM.cxx b/fairmq/shmem/FairMQSocketSHM.cxx index 66a9d08b..a3d10797 100644 --- a/fairmq/shmem/FairMQSocketSHM.cxx +++ b/fairmq/shmem/FairMQSocketSHM.cxx @@ -14,7 +14,7 @@ #include "FairMQLogger.h" using namespace std; -using namespace FairMQ::shmem; +using namespace fair::mq::shmem; atomic FairMQSocketSHM::fInterrupted(false); diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index d8027c6b..538a715a 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -23,8 +23,9 @@ #include "../options/FairMQProgOptions.h" using namespace std; -using namespace FairMQ::shmem; +using namespace fair::mq::shmem; namespace bipc = boost::interprocess; +namespace bpt = boost::posix_time; FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport::SHM; @@ -85,7 +86,7 @@ void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config) { // mutex scope bipc::scoped_lock lock(fShMutex); - pair result = Manager::Instance().Segment()->find(bipc::unique_instance); + pair result = Manager::Instance().Segment()->find(bipc::unique_instance); if (result.first != nullptr) { fDeviceCounter = result.first; @@ -96,7 +97,7 @@ void FairMQTransportFactorySHM::Initialize(const FairMQProgOptions* config) else { LOG(DEBUG) << "shmem: no device counter found, creating one and initializing with 1"; - fDeviceCounter = Manager::Instance().Segment()->construct(bipc::unique_instance)(1); + fDeviceCounter = Manager::Instance().Segment()->construct(bipc::unique_instance)(1); LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->count; } } @@ -110,7 +111,7 @@ void FairMQTransportFactorySHM::SendHeartbeats() { bipc::message_queue mq(bipc::open_only, "fairmq_shmem_control_queue"); bool heartbeat = true; - boost::posix_time::ptime sndTill = boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100); + bpt::ptime sndTill = bpt::microsec_clock::universal_time() + bpt::milliseconds(100); if (mq.timed_send(&heartbeat, sizeof(heartbeat), 0, sndTill)) { this_thread::sleep_for(chrono::milliseconds(100)); diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 3a32166a..3ad20e3b 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -54,7 +54,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory std::thread fHeartbeatThread; std::atomic fSendHeartbeats; boost::interprocess::named_mutex fShMutex; - FairMQ::shmem::FairMQShmDeviceCounter* fDeviceCounter; + fair::mq::shmem::DeviceCounter* fDeviceCounter; }; #endif /* FAIRMQTRANSPORTFACTORYSHM_H_ */ diff --git a/fairmq/shmem/runFairMQShmMonitor.cxx b/fairmq/shmem/runFairMQShmMonitor.cxx new file mode 100644 index 00000000..50d1612c --- /dev/null +++ b/fairmq/shmem/runFairMQShmMonitor.cxx @@ -0,0 +1,28 @@ +/******************************************************************************** + * Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence version 3 (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ +#include "FairMQShmMonitor.h" + +#include +#include + +int main(int argc, char** argv) +{ + std::string segmentName = "fairmq_shmem_main"; + + if (argc == 2) + { + segmentName = argv[1]; + } + std::cout << "Looking for shared memory segment \"" << segmentName << "\"..." << std::endl; + + fair::mq::shmem::Monitor monitor{segmentName}; + + monitor.Run(); + + return 0; +}