SDK: Add test for timeout, concurrent call. Implement TODOs

This commit is contained in:
Alexey Rybalchenko 2019-07-24 14:16:35 +02:00 committed by Dennis Klein
parent dc55272317
commit 5d535163f1
3 changed files with 116 additions and 67 deletions

View File

@ -39,26 +39,28 @@ auto operator<<(std::ostream& os, AsyncOpResultCode v) -> std::ostream&
auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream& auto operator<<(std::ostream& os, AsyncOpResult v) -> std::ostream&
{ {
(void)(os << "[" << v.code << "]"); os << "[" << v.code << "]";
if (v.msg.empty()) { if (!v.msg.empty()) {
(void)(os << " " << v.msg); os << " " << v.msg;
} }
return os; return os;
} }
namespace sdk { namespace sdk {
const std::unordered_map<DeviceTransition, DeviceState, tools::HashEnum<DeviceTransition>> const std::unordered_map<DeviceTransition, DeviceState, tools::HashEnum<DeviceTransition>> expectedState =
expectedState = {{Transition::InitDevice, DeviceState::InitializingDevice}, {
{Transition::CompleteInit, DeviceState::Initialized}, { Transition::InitDevice, DeviceState::InitializingDevice },
{Transition::Bind, DeviceState::Bound}, { Transition::CompleteInit, DeviceState::Initialized },
{Transition::Connect, DeviceState::DeviceReady}, { Transition::Bind, DeviceState::Bound },
{Transition::InitTask, DeviceState::InitializingTask}, { Transition::Connect, DeviceState::DeviceReady },
{Transition::Run, DeviceState::Running}, { Transition::InitTask, DeviceState::InitializingTask },
{Transition::Stop, DeviceState::Ready}, { Transition::Run, DeviceState::Running },
{Transition::ResetTask, DeviceState::DeviceReady}, { Transition::Stop, DeviceState::Ready },
{Transition::ResetDevice, DeviceState::Idle}, { Transition::ResetTask, DeviceState::DeviceReady },
{Transition::End, DeviceState::Exiting}}; { Transition::ResetDevice, DeviceState::Idle },
{ Transition::End, DeviceState::Exiting }
};
Topology::Topology(DDSTopology topo, DDSSession session) Topology::Topology(DDSTopology topo, DDSSession session)
: fDDSSession(std::move(session)) : fDDSSession(std::move(session))
@ -70,8 +72,8 @@ Topology::Topology(DDSTopology topo, DDSSession session)
{ {
std::vector<uint64_t> deviceList = fDDSTopo.GetDeviceList(); std::vector<uint64_t> deviceList = fDDSTopo.GetDeviceList();
for (const auto& d : deviceList) { for (const auto& d : deviceList) {
LOG(info) << "fair::mq::Topology Adding device " << d; LOG(info) << "Adding device " << d;
fTopologyState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
} }
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) { fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) {
LOG(debug) << "Received from " << senderId << ": " << msg; LOG(debug) << "Received from " << senderId << ": " << msg;
@ -83,7 +85,6 @@ Topology::Topology(DDSTopology topo, DDSSession session)
} }
if (parts[0] == "state-change") { if (parts[0] == "state-change") {
boost::trim(parts[3]);
AddNewStateEntry(std::stoull(parts[2]), parts[3]); AddNewStateEntry(std::stoull(parts[2]), parts[3]);
} else if (parts[0] == "state-changes-subscription") { } else if (parts[0] == "state-changes-subscription") {
if (parts[2] != "OK") { if (parts[2] != "OK") {
@ -104,10 +105,12 @@ Topology::Topology(DDSTopology topo, DDSSession session)
auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb, Duration timeout) -> void auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb, Duration timeout) -> void
{ {
{ {
std::lock_guard<std::mutex> guard(fMtx); std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeOngoing) { if (fStateChangeOngoing) {
LOG(error) << "State change already in progress, concurrent requested not yet supported"; LOG(error) << "A state change request is already in progress, concurrent requests are currently not supported";
return; // TODO call the callback with error msg lock.unlock();
cb({{AsyncOpResultCode::Error, "A state change request is already in progress, concurrent requests are currently not supported"}, fState});
return;
} }
LOG(info) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition); LOG(info) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition);
fStateChangeOngoing = true; fStateChangeOngoing = true;
@ -143,38 +146,42 @@ void Topology::WaitForState()
auto condition = [&] { auto condition = [&] {
// LOG(info) << "checking condition"; // LOG(info) << "checking condition";
// LOG(info) << "fShutdown: " << fShutdown; // LOG(info) << "fShutdown: " << fShutdown;
// LOG(info) << "condition: " << std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); // LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
return fShutdown || std::all_of(fTopologyState.cbegin(), fTopologyState.cend(), [&](TopologyState::value_type i) { return fShutdown || std::all_of(fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
return i.second.state == fTargetState; return i.second.state == fTargetState;
}); });
}; };
std::unique_lock<std::mutex> lock(fMtx); std::unique_lock<std::mutex> lock(fMtx);
// TODO Fix the timeout version
if (fStateChangeTimeout > std::chrono::milliseconds(0)) { if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
LOG(debug) << "initiating wait with timeout";
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
LOG(debug) << "timeout"; // LOG(debug) << "timeout";
fStateChangeOngoing = false; fStateChangeOngoing = false;
TopologyState state = fState;
lock.unlock();
fChangeStateCallback({{AsyncOpResultCode::Timeout, "timeout"}, std::move(state)});
break; break;
} }
} else { } else {
LOG(debug) << "initiating wait without timeout";
fCV.wait(lock, condition); fCV.wait(lock, condition);
} }
fStateChangeOngoing = false; fStateChangeOngoing = false;
if (fShutdown) { if (fShutdown) {
// TODO call the callback here with Aborted result LOG(debug) << "Aborting because a shutdown was requested";
TopologyState state = fState;
lock.unlock();
fChangeStateCallback({{AsyncOpResultCode::Aborted, "Aborted because a shutdown was requested"}, std::move(state)});
break; break;
} }
} catch(std::exception& e) { } catch (std::exception& e) {
fStateChangeOngoing = false;
LOG(error) << "Error while processing state request: " << e.what(); LOG(error) << "Error while processing state request: " << e.what();
fChangeStateCallback({{AsyncOpResultCode::Error, ""}, fTopologyState}); fChangeStateCallback({{AsyncOpResultCode::Error, tools::ToString("Exception thrown: ", e.what())}, fState});
} }
fChangeStateCallback({{AsyncOpResultCode::Ok, ""}, fTopologyState}); fChangeStateCallback({{AsyncOpResultCode::Ok, "success"}, fState});
} else { } else {
std::unique_lock<std::mutex> lock(fExecutionMtx); std::unique_lock<std::mutex> lock(fExecutionMtx);
fExecutionCV.wait(lock); fExecutionCV.wait(lock);
@ -191,14 +198,14 @@ void Topology::AddNewStateEntry(uint64_t senderId, const std::string& state)
{ {
try { try {
std::unique_lock<std::mutex> lock(fMtx); std::unique_lock<std::mutex> lock(fMtx);
fTopologyState[senderId] = DeviceStatus{ true, fair::mq::GetState(endState) }; fState[senderId] = DeviceStatus{ true, fair::mq::GetState(endState) };
} catch(const std::exception& e) { } catch (const std::exception& e) {
LOG(error) << "Exception in AddNewStateEntry: " << e.what(); LOG(error) << "Exception in AddNewStateEntry: " << e.what();
} }
// LOG(info) << "fTopologyState after update: "; // LOG(info) << "fState after update: ";
// for (auto& e : fTopologyState) { // for (auto& e : fState) {
// LOG(info) << e.first << ": " << e.second.state; // LOG(info) << e.first << ": " << e.second.state;
// } // }
} }
fCV.notify_one(); fCV.notify_one();
@ -208,7 +215,7 @@ Topology::~Topology()
{ {
fDDSSession.UnsubscribeFromCommands(); fDDSSession.UnsubscribeFromCommands();
{ {
std::lock_guard<std::mutex> guard(fMtx); std::lock_guard<std::mutex> guard(fExecutionMtx);
fShutdown = true; fShutdown = true;
} }
fExecutionCV.notify_one(); fExecutionCV.notify_one();

View File

@ -22,11 +22,13 @@
#include <ostream> #include <ostream>
#include <string> #include <string>
#include <vector> #include <vector>
#include <stdexcept>
namespace fair { namespace fair {
namespace mq { namespace mq {
enum class AsyncOpResultCode { enum class AsyncOpResultCode
{
Ok, Ok,
Timeout, Timeout,
Error, Error,
@ -57,6 +59,26 @@ struct DeviceStatus
using TopologyState = std::unordered_map<uint64_t, DeviceStatus>; using TopologyState = std::unordered_map<uint64_t, DeviceStatus>;
using TopologyTransition = fair::mq::Transition; using TopologyTransition = fair::mq::Transition;
struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };
DeviceState AggregateState(const TopologyState& topologyState)
{
DeviceState first = topologyState.begin()->second.state;
if (std::all_of(topologyState.cbegin(), topologyState.cend(), [&](TopologyState::value_type i) {
return i.second.state == first;
})) {
return first;
} else {
throw MixedState("State is not uniform");
}
}
bool StateEqualsTo(const TopologyState& topologyState, DeviceState state)
{
return AggregateState(topologyState) == state;
}
/** /**
* @class Topology Topology.h <fairmq/sdk/Topology.h> * @class Topology Topology.h <fairmq/sdk/Topology.h>
* @brief Represents a FairMQ topology * @brief Represents a FairMQ topology
@ -95,14 +117,22 @@ class Topology
/// @return The result of the state transition /// @return The result of the state transition
auto ChangeState(TopologyTransition t, Duration timeout = std::chrono::milliseconds(0)) -> ChangeStateResult; auto ChangeState(TopologyTransition t, Duration timeout = std::chrono::milliseconds(0)) -> ChangeStateResult;
/// @brief Returns the current state of the topology
/// @return map of id : DeviceStatus (initialized, state)
TopologyState GetCurrentState() const { std::lock_guard<std::mutex> guard(fMtx); return fState; }
DeviceState AggregateState() { return sdk::AggregateState(fState); }
bool StateEqualsTo(DeviceState state) { return sdk::StateEqualsTo(fState, state); }
private: private:
DDSSession fDDSSession; DDSSession fDDSSession;
DDSTopology fDDSTopo; DDSTopology fDDSTopo;
TopologyState fTopologyState; TopologyState fState;
bool fStateChangeOngoing; bool fStateChangeOngoing;
DeviceState fTargetState; DeviceState fTargetState;
std::mutex fMtx; mutable std::mutex fMtx;
std::mutex fExecutionMtx; mutable std::mutex fExecutionMtx;
std::condition_variable fCV; std::condition_variable fCV;
std::condition_variable fExecutionCV; std::condition_variable fExecutionCV;
std::thread fExecutionThread; std::thread fExecutionThread;

View File

@ -21,26 +21,24 @@ TEST_F(Topology, Construction)
fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession); fair::mq::sdk::Topology topo(mDDSTopo, mDDSSession);
} }
TEST_F(Topology, ChangeState_async1) TEST_F(Topology, ChangeStateAsync)
{ {
using fair::mq::sdk::Topology; using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition; using fair::mq::sdk::TopologyTransition;
Topology topo(mDDSTopo, mDDSSession); Topology topo(mDDSTopo, mDDSSession);
fair::mq::tools::Semaphore blocker; fair::mq::tools::Semaphore blocker;
topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) { topo.ChangeState(TopologyTransition::Stop, [&blocker, &topo](Topology::ChangeStateResult result) {
LOG(info) << result; LOG(info) << result;
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
// TODO add the helper to check state consistency EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state));
for (const auto& e : result.state) { EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Ready), true);
EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ready);
}
blocker.Signal(); blocker.Signal();
}); });
blocker.Wait(); blocker.Wait();
} }
TEST_F(Topology, ChangeState_sync) TEST_F(Topology, ChangeStateSync)
{ {
using fair::mq::sdk::Topology; using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition; using fair::mq::sdk::TopologyTransition;
@ -49,27 +47,41 @@ TEST_F(Topology, ChangeState_sync)
auto result(topo.ChangeState(TopologyTransition::Stop)); auto result(topo.ChangeState(TopologyTransition::Stop));
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok); EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Ok);
// TODO add the helper to check state consistency EXPECT_NO_THROW(fair::mq::sdk::AggregateState(result.state));
for (const auto& e : result.state) { EXPECT_EQ(fair::mq::sdk::StateEqualsTo(result.state, fair::mq::sdk::DeviceState::Ready), true);
EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ready);
}
} }
// TEST_F(Topology, Timeout)
// {
// using fair::mq::sdk::Topology;
// using fair::mq::sdk::TopologyTransition;
// Topology topo(mDDSTopo, mDDSSession); TEST_F(Topology, ChangeStateConcurrent)
// Topology::ChangeStateResult r; {
// fair::mq::tools::Semaphore blocker; using fair::mq::sdk::Topology;
// topo.ChangeState(TopologyTransition::End, [&](Topology::ChangeStateResult result) { using fair::mq::sdk::TopologyTransition;
// LOG(info) << result;
// blocker.Signal(); Topology topo(mDDSTopo, mDDSSession);
// }, std::chrono::milliseconds(100)); fair::mq::tools::Semaphore blocker;
// blocker.Wait(); topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) {
// for (const auto& e : r.rc) { LOG(info) << "result for valid ChangeState: " << result;
// EXPECT_EQ(e.second.state, fair::mq::sdk::DeviceState::Ok); blocker.Signal();
// } });
// } topo.ChangeState(TopologyTransition::Stop, [&blocker](Topology::ChangeStateResult result) {
LOG(info) << "result for invalid ChangeState: " << result;
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Error);
});
blocker.Wait();
}
TEST_F(Topology, ChangeStateTimeout)
{
using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition;
Topology topo(mDDSTopo, mDDSSession);
fair::mq::tools::Semaphore blocker;
topo.ChangeState(TopologyTransition::End, [&](Topology::ChangeStateResult result) {
LOG(info) << result;
EXPECT_EQ(result.rc, fair::mq::AsyncOpResultCode::Timeout);
blocker.Signal();
}, std::chrono::milliseconds(1));
blocker.Wait();
}
} // namespace } // namespace