diff --git a/fairmq/CMakeLists.txt b/fairmq/CMakeLists.txt index 240385a5..2cd0deb3 100644 --- a/fairmq/CMakeLists.txt +++ b/fairmq/CMakeLists.txt @@ -110,6 +110,7 @@ set(FAIRMQ_HEADER_FILES tools/Network.h tools/Strings.h tools/Version.h + tools/Unique.h zeromq/FairMQMessageZMQ.h zeromq/FairMQPollerZMQ.h zeromq/FairMQUnmanagedRegionZMQ.h @@ -269,6 +270,8 @@ target_link_libraries(runConfigExample FairMQ) add_executable(shmmonitor shmem/runMonitor.cxx) target_link_libraries(shmmonitor FairMQ) +add_executable(uuidGen run/runUuidGenerator.cxx) +target_link_libraries(uuidGen FairMQ) #################### # aggregate target # diff --git a/fairmq/FairMQTransportFactory.cxx b/fairmq/FairMQTransportFactory.cxx index a36abc43..7496e3c5 100644 --- a/fairmq/FairMQTransportFactory.cxx +++ b/fairmq/FairMQTransportFactory.cxx @@ -13,10 +13,7 @@ #include #endif /* NANOMSG_FOUND */ #include - -#include -#include -#include +#include #include #include @@ -36,7 +33,7 @@ auto FairMQTransportFactory::CreateTransportFactory(const std::string& type, con // Generate uuid if empty if (finalId == "") { - finalId = boost::uuids::to_string(boost::uuids::random_generator()()); + finalId = fair::mq::tools::Uuid(); } if (type == "zeromq") diff --git a/fairmq/Tools.h b/fairmq/Tools.h index 9d04afce..6882b20f 100644 --- a/fairmq/Tools.h +++ b/fairmq/Tools.h @@ -14,6 +14,7 @@ #include #include #include +#include // IWYU pragma: end_exports #endif // FAIR_MQ_TOOLS_H diff --git a/fairmq/run/runUuidGenerator.cxx b/fairmq/run/runUuidGenerator.cxx new file mode 100644 index 00000000..df974f37 --- /dev/null +++ b/fairmq/run/runUuidGenerator.cxx @@ -0,0 +1,58 @@ +/******************************************************************************** +* Copyright (C) 2014 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * +* * +* This software is distributed under the terms of the * +* GNU Lesser General Public Licence version 3 (LGPL) version 3, * +* copied verbatim in the file "LICENSE" * +********************************************************************************/ +#include + +#include + +#include +#include + +using namespace std; +using namespace boost::program_options; + +int main(int argc, char** argv) +{ + try + { + bool hash = false; + + options_description desc("Options"); + desc.add_options() + ("hash,h", value(&hash)->implicit_value(true), "Generates UUID and returns its hash.") + ("help", "Print help"); + + variables_map vm; + store(parse_command_line(argc, argv, desc), vm); + + if (vm.count("help")) + { + cout << "UUID generator" << endl << desc << endl; + return 0; + } + + notify(vm); + + if (hash) + { + std::cout << fair::mq::tools::UuidHash() << std::endl; + } + else + { + std::cout << fair::mq::tools::Uuid() << std::endl; + } + + return 0; + } + catch (exception& e) + { + cerr << "Unhandled Exception reached the top of main: " << e.what() << ", application will now exit" << endl; + return 2; + } + + return 0; +} diff --git a/fairmq/shmem/FairMQTransportFactorySHM.cxx b/fairmq/shmem/FairMQTransportFactorySHM.cxx index 2829437e..4a5c42a6 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.cxx +++ b/fairmq/shmem/FairMQTransportFactorySHM.cxx @@ -35,12 +35,12 @@ FairMQ::Transport FairMQTransportFactorySHM::fTransportType = FairMQ::Transport: FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const FairMQProgOptions* config) : FairMQTransportFactory(id) - , fSessionName("default") + , fSessionName() , fContext(nullptr) , fHeartbeatSocket(nullptr) , fHeartbeatThread() , fSendHeartbeats(true) - , fShMutex(bipc::open_or_create, std::string("fmq_shm_" + fSessionName + "_mutex").c_str()) + , fShMutex(nullptr) , fDeviceCounter(nullptr) , fManager(nullptr) { @@ -62,6 +62,7 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai { numIoThreads = config->GetValue("io-threads"); fSessionName = config->GetValue("session"); + fSessionName.resize(8); // shorten the session name, to acomodate for name size limit on some systems (MacOS) // fSegmentName = "fmq_shm_" + fSessionName + "_main"; segmentSize = config->GetValue("shm-segment-size"); } @@ -70,56 +71,66 @@ FairMQTransportFactorySHM::FairMQTransportFactorySHM(const string& id, const Fai LOG(WARN) << "shmem: FairMQProgOptions not available! Using defaults."; } - if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) + try { - LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno); - } + fShMutex = fair::mq::tools::make_unique(bipc::open_or_create, std::string("fmq_shm_" + fSessionName + "_mutex").c_str()); - // Set the maximum number of allowed sockets on the context. - if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) - { - LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno); - } - - fManager = fair::mq::tools::make_unique(fSessionName, segmentSize); - LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes."; - - { - bipc::scoped_lock lock(fShMutex); - - fDeviceCounter = fManager->Segment().find(bipc::unique_instance).first; - if (fDeviceCounter) + if (zmq_ctx_set(fContext, ZMQ_IO_THREADS, numIoThreads) != 0) { - LOG(DEBUG) << "shmem: device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; - (fDeviceCounter->fCount)++; - LOG(DEBUG) << "shmem: incremented device counter, now: " << fDeviceCounter->fCount; - } - else - { - LOG(DEBUG) << "shmem: no device counter found, creating one and initializing with 1"; - fDeviceCounter = fManager->Segment().construct(bipc::unique_instance)(1); - LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->fCount; + LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno); } - // start shm monitor - // try - // { - // MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; - // if (monitorStatus == nullptr) - // { - // LOG(DEBUG) << "shmem: no shmmonitor found, starting..."; - // StartMonitor(); - // } - // else - // { - // LOG(DEBUG) << "shmem: found shmmonitor."; - // } - // } - // catch (std::exception& e) - // { - // LOG(ERROR) << "shmem: Exception during shmmonitor initialization: " << e.what() << ", application will now exit"; - // exit(EXIT_FAILURE); - // } + // Set the maximum number of allowed sockets on the context. + if (zmq_ctx_set(fContext, ZMQ_MAX_SOCKETS, 10000) != 0) + { + LOG(ERROR) << "shmem: failed configuring context, reason: " << zmq_strerror(errno); + } + + fManager = fair::mq::tools::make_unique(fSessionName, segmentSize); + LOG(DEBUG) << "shmem: created/opened shared memory segment of " << segmentSize << " bytes. Available are " << fManager->Segment().get_free_memory() << " bytes."; + + { + bipc::scoped_lock lock(*fShMutex); + + fDeviceCounter = fManager->Segment().find(bipc::unique_instance).first; + if (fDeviceCounter) + { + LOG(DEBUG) << "shmem: device counter found, with value of " << fDeviceCounter->fCount << ". incrementing."; + (fDeviceCounter->fCount)++; + LOG(DEBUG) << "shmem: incremented device counter, now: " << fDeviceCounter->fCount; + } + else + { + LOG(DEBUG) << "shmem: no device counter found, creating one and initializing with 1"; + fDeviceCounter = fManager->Segment().construct(bipc::unique_instance)(1); + LOG(DEBUG) << "shmem: initialized device counter with: " << fDeviceCounter->fCount; + } + + // start shm monitor + // try + // { + // MonitorStatus* monitorStatus = fManagementSegment.find(bipc::unique_instance).first; + // if (monitorStatus == nullptr) + // { + // LOG(DEBUG) << "shmem: no shmmonitor found, starting..."; + // StartMonitor(); + // } + // else + // { + // LOG(DEBUG) << "shmem: found shmmonitor."; + // } + // } + // catch (std::exception& e) + // { + // LOG(ERROR) << "shmem: Exception during shmmonitor initialization: " << e.what() << ", application will now exit"; + // exit(EXIT_FAILURE); + // } + } + } + catch(bipc::interprocess_exception& e) + { + LOG(ERROR) << "Could not initialize shared memory transport: " << e.what(); + throw runtime_error("Cannot update configuration. Socket method (bind/connect) not specified."); } fSendHeartbeats = true; @@ -267,7 +278,7 @@ FairMQTransportFactorySHM::~FairMQTransportFactorySHM() bool lastRemoved = false; { // mutex scope - bipc::scoped_lock lock(fShMutex); + bipc::scoped_lock lock(*fShMutex); (fDeviceCounter->fCount)--; diff --git a/fairmq/shmem/FairMQTransportFactorySHM.h b/fairmq/shmem/FairMQTransportFactorySHM.h index 26b1509d..392a26a5 100644 --- a/fairmq/shmem/FairMQTransportFactorySHM.h +++ b/fairmq/shmem/FairMQTransportFactorySHM.h @@ -60,7 +60,7 @@ class FairMQTransportFactorySHM : public FairMQTransportFactory void* fHeartbeatSocket; std::thread fHeartbeatThread; std::atomic fSendHeartbeats; - boost::interprocess::named_mutex fShMutex; + std::unique_ptr fShMutex; fair::mq::shmem::DeviceCounter* fDeviceCounter; std::unique_ptr fManager; }; diff --git a/fairmq/test/protocols/_poller.cxx b/fairmq/test/protocols/_poller.cxx index 2f9d8961..7c78e580 100644 --- a/fairmq/test/protocols/_poller.cxx +++ b/fairmq/test/protocols/_poller.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include "runner.h" +#include #include #include // std::stringstream #include @@ -19,13 +20,15 @@ using namespace fair::mq::test; auto RunPoller(string transport, int pollType) -> void { + size_t session{fair::mq::tools::UuidHash()}; + auto pollout = execute_result{"", 0}; thread poll_out_thread([&]() { stringstream cmd; cmd << runTestDevice << " --id pollout_"<< transport << " --control static --verbosity DEBUG --log-color false" - << " --mq-config \"" << mqConfig << "\""; + << " --session " << session << " --mq-config \"" << mqConfig << "\""; pollout = execute(cmd.str(), "[POLLOUT]"); }); @@ -35,7 +38,7 @@ auto RunPoller(string transport, int pollType) -> void cmd << runTestDevice << " --id pollin_" << transport << " --control static --verbosity DEBUG --log-color false" - << " --mq-config \"" << mqConfig << "\" --poll-type " << pollType; + << " --session " << session << " --mq-config \"" << mqConfig << "\" --poll-type " << pollType; pollin = execute(cmd.str(), "[POLLIN]"); }); diff --git a/fairmq/test/protocols/_pub_sub.cxx b/fairmq/test/protocols/_pub_sub.cxx index 8e0b97e0..31a3f455 100644 --- a/fairmq/test/protocols/_pub_sub.cxx +++ b/fairmq/test/protocols/_pub_sub.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include "runner.h" +#include #include #include // std::stringstream #include @@ -19,11 +20,13 @@ using namespace fair::mq::test; auto RunPubSub(string transport) -> void { + size_t session{fair::mq::tools::UuidHash()}; + auto pub = execute_result{"", 0}; thread pub_thread([&]() { stringstream cmd; cmd << runTestDevice << " --id pub_" << transport << " --control static --verbosity DEBUG " - << "--log-color false --mq-config \"" << mqConfig << "\""; + << "--session " << session << " --log-color false --mq-config \"" << mqConfig << "\""; pub = execute(cmd.str(), "[PUB]"); }); @@ -31,7 +34,7 @@ auto RunPubSub(string transport) -> void thread sub1_thread([&]() { stringstream cmd; cmd << runTestDevice << " --id sub_1" << transport << " --control static --verbosity DEBUG " - << "--log-color false --mq-config \"" << mqConfig << "\""; + << "--session " << session << " --log-color false --mq-config \"" << mqConfig << "\""; sub1 = execute(cmd.str(), "[SUB1]"); }); @@ -39,7 +42,7 @@ auto RunPubSub(string transport) -> void thread sub2_thread([&]() { stringstream cmd; cmd << runTestDevice << " --id sub_2" << transport << " --control static --verbosity DEBUG " - << "--log-color false --mq-config \"" << mqConfig << "\""; + << "--session " << session << " --log-color false --mq-config \"" << mqConfig << "\""; sub2 = execute(cmd.str(), "[SUB2]"); }); diff --git a/fairmq/test/protocols/_push_pull.cxx b/fairmq/test/protocols/_push_pull.cxx index b4de75ac..772c5417 100644 --- a/fairmq/test/protocols/_push_pull.cxx +++ b/fairmq/test/protocols/_push_pull.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include "runner.h" +#include #include #include // std::stringstream #include @@ -19,11 +20,13 @@ using namespace fair::mq::test; auto RunPushPull(string transport) -> void { + size_t session{fair::mq::tools::UuidHash()}; + auto push = execute_result{"", 100}; thread push_thread([&]() { stringstream cmd; cmd << runTestDevice << " --id push_" << transport << " --control static --verbosity DEBUG " - << "--log-color false --mq-config \"" << mqConfig << "\""; + << "--session " << session << " --log-color false --mq-config \"" << mqConfig << "\""; push = execute(cmd.str(), "[PUSH]"); }); @@ -31,7 +34,7 @@ auto RunPushPull(string transport) -> void thread pull_thread([&]() { stringstream cmd; cmd << runTestDevice << " --id pull_" << transport << " --control static --verbosity DEBUG " - << "--log-color false --mq-config \"" << mqConfig << "\""; + << "--session " << session << " --log-color false --mq-config \"" << mqConfig << "\""; pull = execute(cmd.str(), "[PULL]"); }); diff --git a/fairmq/test/protocols/_push_pull_multipart.cxx b/fairmq/test/protocols/_push_pull_multipart.cxx index 02f652b2..f6599739 100644 --- a/fairmq/test/protocols/_push_pull_multipart.cxx +++ b/fairmq/test/protocols/_push_pull_multipart.cxx @@ -11,6 +11,9 @@ #include #include #include +#include +#include + #include #include #include @@ -23,7 +26,14 @@ namespace using namespace std; auto RunSingleThreadedMultipart(string transport, string address) -> void { - auto factory = FairMQTransportFactory::CreateTransportFactory(transport); + + size_t session{fair::mq::tools::UuidHash()}; + + FairMQProgOptions config; + config.SetValue("session", std::to_string(session)); + config.SetValue("io-threads", 1); + config.SetValue("shm-segment-size", 20000000); + auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); auto push = FairMQChannel{"Push", "push", factory}; ASSERT_TRUE(push.Bind(address)); auto pull = FairMQChannel{"Pull", "pull", factory}; @@ -55,7 +65,13 @@ auto RunSingleThreadedMultipart(string transport, string address) -> void { auto RunMultiThreadedMultipart(string transport, string address) -> void { - auto factory = FairMQTransportFactory::CreateTransportFactory(transport); + size_t session{fair::mq::tools::UuidHash()}; + + FairMQProgOptions config; + config.SetValue("session", std::to_string(session)); + config.SetValue("io-threads", 1); + config.SetValue("shm-segment-size", 20000000); + auto factory = FairMQTransportFactory::CreateTransportFactory(transport, fair::mq::tools::Uuid(), &config); auto push = FairMQChannel{"Push", "push", factory}; ASSERT_TRUE(push.Bind(address)); auto pull = FairMQChannel{"Pull", "pull", factory}; diff --git a/fairmq/test/protocols/_req_rep.cxx b/fairmq/test/protocols/_req_rep.cxx index 28851347..3f913692 100644 --- a/fairmq/test/protocols/_req_rep.cxx +++ b/fairmq/test/protocols/_req_rep.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include "runner.h" +#include #include #include // std::stringstream #include @@ -19,11 +20,13 @@ using namespace fair::mq::test; auto RunReqRep(string transport) -> void { + size_t session{fair::mq::tools::UuidHash()}; + auto rep = execute_result{ "", 0 }; thread rep_thread([&]() { stringstream cmd; cmd << runTestDevice << " --id rep_" << transport << " --control static --verbosity DEBUG " - << "--log-color false --mq-config \"" << mqConfig << "\""; + << "--session " << session << " --log-color false --mq-config \"" << mqConfig << "\""; rep = execute(cmd.str(), "[REP]"); }); @@ -31,7 +34,7 @@ auto RunReqRep(string transport) -> void thread req1_thread([&]() { stringstream cmd; cmd << runTestDevice << " --id req_1" << transport << " --control static --verbosity DEBUG " - << "--log-color false --mq-config \"" << mqConfig << "\""; + << "--session " << session << " --log-color false --mq-config \"" << mqConfig << "\""; req1 = execute(cmd.str(), "[REQ1]"); }); @@ -39,7 +42,7 @@ auto RunReqRep(string transport) -> void thread req2_thread([&]() { stringstream cmd; cmd << runTestDevice << " --id req_2" << transport << " --control static --verbosity DEBUG " - << "--log-color false --mq-config \"" << mqConfig << "\""; + << "--session " << session << " --log-color false --mq-config \"" << mqConfig << "\""; req2 = execute(cmd.str(), "[REQ2]"); }); diff --git a/fairmq/test/protocols/_transfer_timeout.cxx b/fairmq/test/protocols/_transfer_timeout.cxx index 13bc900e..568771cc 100644 --- a/fairmq/test/protocols/_transfer_timeout.cxx +++ b/fairmq/test/protocols/_transfer_timeout.cxx @@ -7,6 +7,7 @@ ********************************************************************************/ #include "runner.h" +#include #include #include // std::stringstream @@ -18,9 +19,10 @@ using namespace fair::mq::test; auto RunTransferTimeout(string transport) -> void { + size_t session{fair::mq::tools::UuidHash()}; stringstream cmd; cmd << runTestDevice << " --id transfer_timeout_" << transport << " --control static --verbosity DEBUG " - << "--log-color false --mq-config \"" << mqConfig << "\""; + << "--session " << session << " --log-color false --mq-config \"" << mqConfig << "\""; auto res = execute(cmd.str()); cerr << res.error_out; diff --git a/fairmq/tools/FairMQTools.h b/fairmq/tools/FairMQTools.h index 6c5fb1d1..287dc3ab 100644 --- a/fairmq/tools/FairMQTools.h +++ b/fairmq/tools/FairMQTools.h @@ -7,6 +7,7 @@ #include #include #include +#include namespace FairMQ { @@ -22,6 +23,9 @@ using fair::mq::tools::getDefaultRouteNetworkInterface; using fair::mq::tools::S; +using fair::mq::tools::Uuid; +using fair::mq::tools::UuidHash; + using fair::mq::tools::Version; } // namespace tools diff --git a/fairmq/tools/Unique.h b/fairmq/tools/Unique.h new file mode 100644 index 00000000..0462ca96 --- /dev/null +++ b/fairmq/tools/Unique.h @@ -0,0 +1,47 @@ +/******************************************************************************** + * Copyright (C) 2017 GSI Helmholtzzentrum fuer Schwerionenforschung GmbH * + * * + * This software is distributed under the terms of the * + * GNU Lesser General Public Licence (LGPL) version 3, * + * copied verbatim in the file "LICENSE" * + ********************************************************************************/ + +#ifndef FAIR_MQ_TOOLS_UNIQUE_H +#define FAIR_MQ_TOOLS_UNIQUE_H + +#include +#include +#include +#include + +#include + +namespace fair +{ +namespace mq +{ +namespace tools +{ + +// generates UUID string +inline std::string Uuid() +{ + boost::uuids::random_generator gen; + boost::uuids::uuid u = gen(); + return boost::uuids::to_string(u); +} + +// generates UUID and returns its hash +inline std::size_t UuidHash() +{ + boost::uuids::random_generator gen; + boost::hash uuid_hasher; + boost::uuids::uuid u = gen(); + return uuid_hasher(u); +} + +} /* namespace tools */ +} /* namespace mq */ +} /* namespace fair */ + +#endif /* FAIR_MQ_TOOLS_UNIQUE_H */