Compare commits

..

9 Commits

Author SHA1 Message Date
Dennis Klein
7cacf471b9 CI: Disable sdk until DDS 2.6 2019-07-29 09:22:02 +02:00
Dennis Klein
7316b0e7f2 Example.DDS: Run example as unit test
Part of #185
2019-07-29 09:22:02 +02:00
Dennis Klein
1fa82f5f22 Example.DDS: Make example topologies pass xml validation 2019-07-29 09:22:02 +02:00
Dennis Klein
1bb77bf47b DDS plugin: Automatically set session and device id if not provided
Resolves #187
2019-07-29 09:22:02 +02:00
Dennis Klein
07fe02a0a0 Tests.SDK: Add another test 2019-07-29 09:22:02 +02:00
Dennis Klein
9cbccface7 DDS plugin: Synchronize FillChannelContainers and DDSKeyValue updates
This was a regression after introducing external control mode in f7cdf5e.
2019-07-29 09:22:02 +02:00
Dennis Klein
7b773cde51 SDK: Improve error handling in case state-change fails on a device
Replace the log message with

1. Nothing, if the device is already in the target state
2. Abort and call the completion callback with error otherwise
2019-07-29 09:22:02 +02:00
Dennis Klein
fd282fa950 SDK: Track channel to task id association 2019-07-29 09:22:02 +02:00
Dennis Klein
008be36125 PluginServices: Do not throw if device control cannot be released 2019-07-29 09:22:02 +02:00
17 changed files with 180 additions and 63 deletions

View File

@@ -29,7 +29,7 @@ Set(configure_options "${configure_options};-DCMAKE_PREFIX_PATH=$ENV{SIMPATH}")
Set(configure_options "${configure_options};-DBUILD_NANOMSG_TRANSPORT=ON") Set(configure_options "${configure_options};-DBUILD_NANOMSG_TRANSPORT=ON")
# Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON") # Set(configure_options "${configure_options};-DBUILD_OFI_TRANSPORT=ON")
Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON") Set(configure_options "${configure_options};-DBUILD_DDS_PLUGIN=ON")
Set(configure_options "${configure_options};-DBUILD_SDK=ON") Set(configure_options "${configure_options};-DBUILD_SDK=OFF")
Set(configure_options "${configure_options};-DFAST_BUILD=ON") Set(configure_options "${configure_options};-DFAST_BUILD=ON")
Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}") Set(configure_options "${configure_options};-DCOTIRE_MAXIMUM_NUMBER_OF_UNITY_INCLUDES=-j$ENV{number_of_processors}")

View File

@@ -37,6 +37,12 @@ configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-ex-dds-env.sh ${CMAKE_CURRENT_
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh @ONLY) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/fairmq-start-ex-dds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh @ONLY)
# test # test
add_test(NAME Example.DDS.localhost COMMAND ${CMAKE_CURRENT_BINARY_DIR}/fairmq-start-ex-dds.sh localhost)
set_tests_properties(Example.DDS.localhost PROPERTIES
TIMEOUT 15
RUN_SERIAL true
PASS_REGULAR_EXPRESSION "Example successful"
)
# install # install

View File

@@ -8,8 +8,8 @@
<declrequirement name="SinkWorker" type="wnname" value="sink"/> <declrequirement name="SinkWorker" type="wnname" value="sink"/>
<decltask name="Sampler"> <decltask name="Sampler">
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env> <env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind --rate 100 -P dds</exe>
<requirements> <requirements>
<name>SamplerWorker</name> <name>SamplerWorker</name>
</requirements> </requirements>
@@ -19,10 +19,10 @@
</decltask> </decltask>
<decltask name="Processor"> <decltask name="Processor">
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env> <env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<requirements> <requirements>
<id>ProcessorWorker</id> <name>ProcessorWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="read">data1</name> <name access="read">data1</name>
@@ -31,8 +31,8 @@
</decltask> </decltask>
<decltask name="Sink"> <decltask name="Sink">
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env> <env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds</exe>
<requirements> <requirements>
<name>SinkWorker</name> <name>SinkWorker</name>
</requirements> </requirements>

View File

@@ -8,8 +8,8 @@
<declrequirement name="SinkWorker" type="wnname" value="sink"/> <declrequirement name="SinkWorker" type="wnname" value="sink"/>
<decltask name="Sampler"> <decltask name="Sampler">
<exe>fairmq-ex-dds-sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env> <env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sampler --id sampler --color false --channel-config name=data1,type=push,method=bind -P dds --iterations 10</exe>
<requirements> <requirements>
<name>SamplerWorker</name> <name>SamplerWorker</name>
</requirements> </requirements>
@@ -19,10 +19,10 @@
</decltask> </decltask>
<decltask name="Processor"> <decltask name="Processor">
<exe>fairmq-ex-dds-processor --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env> <env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-processor --id processor_%taskIndex% --color false --channel-config name=data1,type=pull,method=connect name=data2,type=push,method=connect -P dds</exe>
<requirements> <requirements>
<id>ProcessorWorker</id> <name>ProcessorWorker</name>
</requirements> </requirements>
<properties> <properties>
<name access="read">data1</name> <name access="read">data1</name>
@@ -31,8 +31,8 @@
</decltask> </decltask>
<decltask name="Sink"> <decltask name="Sink">
<exe>fairmq-ex-dds-sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
<env reachable="false">fairmq-ex-dds-env.sh</env> <env reachable="false">fairmq-ex-dds-env.sh</env>
<exe>fairmq-ex-dds-sink --id sink --color false --channel-config name=data2,type=pull,method=bind -P dds --iterations 10</exe>
<requirements> <requirements>
<name>SinkWorker</name> <name>SinkWorker</name>
</requirements> </requirements>

View File

@@ -46,7 +46,7 @@ echo "TOPOLOGY FILE: ${topologyFile}"
# TODO Uncomment once DDS 2.6 is released # TODO Uncomment once DDS 2.6 is released
# dds-info --active-topology # dds-info --active-topology
dds-topology --disable-validation --activate ${topologyFile} dds-topology --activate ${topologyFile}
# dds-info --active-topology # dds-info --active-topology
# dds-info --wait-for-executing-agents ${requiredNofAgents} # dds-info --wait-for-executing-agents ${requiredNofAgents}
sleep 1 sleep 1
@@ -82,4 +82,7 @@ logDir="${wrkDir}/logs"
for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done for file in $(find "${logDir}" -name "*.tar.gz"); do tar -xf ${file} -C "${logDir}" ; done
echo "AGENT LOG FILES IN: ${logDir}" echo "AGENT LOG FILES IN: ${logDir}"
# This string is used by ctest to detect success
echo "Example successful :)"
# Cleanup function is called by EXIT trap # Cleanup function is called by EXIT trap

View File

@@ -63,8 +63,6 @@ bool DeviceRunner::HandleGeneralOptions(const fair::mq::ProgOptions& config, boo
<< " / __/ / /_/ / / / _ / / / / /_/ / " << FAIRMQ_REPO_URL << endl << " / __/ / /_/ / / / _ / / / / /_/ / " << FAIRMQ_REPO_URL << endl
<< " /_/ \\__,_/_/_/ /_/ /_/ \\___\\_\\ " << FAIRMQ_LICENSE << " © " << FAIRMQ_COPYRIGHT << endl; << " /_/ \\__,_/_/_/ /_/ /_/ \\___\\_\\ " << FAIRMQ_LICENSE << " © " << FAIRMQ_COPYRIGHT << endl;
} }
config.PrintOptions();
} }
return true; return true;
@@ -169,6 +167,9 @@ auto DeviceRunner::Run() -> int
// Instantiate and run plugins // Instantiate and run plugins
fPluginManager.InstantiatePlugins(); fPluginManager.InstantiatePlugins();
// Log IDLE configuration
fConfig.PrintOptions();
// Run the device // Run the device
fDevice->RunStateMachine(); fDevice->RunStateMachine();

View File

@@ -74,7 +74,8 @@ auto PluginServices::ReleaseDeviceControl(const string& controller) -> void
if (fDeviceController == controller) { if (fDeviceController == controller) {
fDeviceController = boost::none; fDeviceController = boost::none;
} else { } else {
throw DeviceControlError{tools::ToString("Plugin '", controller, "' cannot release control because it has not taken over control.")}; LOG(debug) << "Plugin '" << controller << "' cannot release control "
<< "because it has no control.";
} }
} }

View File

@@ -51,21 +51,22 @@ DDS::DDS(const string& name,
, fLastState(DeviceState::Idle) , fLastState(DeviceState::Idle)
, fDeviceTerminationRequested(false) , fDeviceTerminationRequested(false)
, fHeartbeatInterval(100) , fHeartbeatInterval(100)
, fUpdatesAllowed(false)
{ {
try { try {
TakeDeviceControl(); TakeDeviceControl();
fControllerThread = thread(&DDS::HandleControl, this);
fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
} catch (PluginServices::DeviceControlError& e) {
LOG(debug) << e.what();
} catch (exception& e) {
LOG(error) << "Error in plugin initialization: " << e.what();
}
}
auto DDS::HandleControl() -> void fHeartbeatThread = thread(&DDS::HeartbeatSender, this);
{
try { std::string deviceId(GetProperty<std::string>("id"));
if (deviceId.empty()) {
SetProperty<std::string>("id", dds::env_prop<dds::task_path>());
}
std::string sessionId(GetProperty<std::string>("session"));
if (sessionId == "default") {
SetProperty<std::string>("session", dds::env_prop<dds::dds_session_id>());
}
auto control = GetProperty<string>("control"); auto control = GetProperty<string>("control");
bool staticMode(false); bool staticMode(false);
if (control == "static") { if (control == "static") {
@@ -85,22 +86,28 @@ auto DDS::HandleControl() -> void
// subscribe to device state changes, pushing new state changes into the event queue // subscribe to device state changes, pushing new state changes into the event queue
SubscribeToDeviceStateChange([&](DeviceState newState) { SubscribeToDeviceStateChange([&](DeviceState newState) {
fStateQueue.Push(newState); fStateQueue.Push(newState);
switch(newState) { switch (newState) {
case DeviceState::Bound: case DeviceState::Bound:
// Receive addresses of connecting channels from DDS // Receive addresses of connecting channels from DDS
// and propagate addresses of bound channels to DDS. // and propagate addresses of bound channels to DDS.
FillChannelContainers(); FillChannelContainers();
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i] // publish bound addresses via DDS at keys corresponding to the channel
PublishBoundChannels(); // prefixes, e.g. 'data' in data[i]
break; PublishBoundChannels();
case DeviceState::Exiting: break;
fDeviceTerminationRequested = true; case DeviceState::ResettingDevice: {
UnsubscribeFromDeviceStateChange(); std::lock_guard<std::mutex> lk(fUpdateMutex);
ReleaseDeviceControl(); fUpdatesAllowed = false;
break; break;
default: }
break; case DeviceState::Exiting:
fDeviceTerminationRequested = true;
UnsubscribeFromDeviceStateChange();
ReleaseDeviceControl();
break;
default:
break;
} }
lock_guard<mutex> lock{fStateChangeSubscriberMutex}; lock_guard<mutex> lock{fStateChangeSubscriberMutex};
@@ -114,15 +121,26 @@ auto DDS::HandleControl() -> void
}); });
if (staticMode) { if (staticMode) {
TransitionDeviceStateTo(DeviceState::Running); fControllerThread = thread(&DDS::StaticControl, this);
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!fDeviceTerminationRequested) {
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(debug) << "Stopping DDS control plugin";
} }
} catch (PluginServices::DeviceControlError& e) {
LOG(debug) << e.what();
} catch (exception& e) {
LOG(error) << "Error in plugin initialization: " << e.what();
}
}
auto DDS::StaticControl() -> void
{
try {
TransitionDeviceStateTo(DeviceState::Running);
// wait until stop signal
unique_lock<mutex> lock(fStopMutex);
while (!fDeviceTerminationRequested) {
fStopCondition.wait_for(lock, chrono::seconds(1));
}
LOG(debug) << "Stopping DDS plugin static controller";
} catch (DeviceErrorState&) { } catch (DeviceErrorState&) {
ReleaseDeviceControl(); ReleaseDeviceControl();
} catch (exception& e) { } catch (exception& e) {
@@ -197,6 +215,11 @@ auto DDS::FillChannelContainers() -> void
LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n; LOG(debug) << "dds-i-n: adding " << chanName << " -> i: " << i << " n: " << n;
fIofN.insert(make_pair(chanName, IofN(i, n))); fIofN.insert(make_pair(chanName, IofN(i, n)));
} }
{
std::lock_guard<std::mutex> lk(fUpdateMutex);
fUpdatesAllowed = true;
}
fUpdateCondition.notify_one();
} catch (const exception& e) { } catch (const exception& e) {
LOG(error) << "Error filling channel containers: " << e.what(); LOG(error) << "Error filling channel containers: " << e.what();
} }
@@ -209,6 +232,10 @@ auto DDS::SubscribeForConnectingChannels() -> void
fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) { fDDS.SubscribeKeyValue([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
try { try {
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID; LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
std::unique_lock<std::mutex> lk(fUpdateMutex);
fUpdateCondition.wait(lk, [&]{ return fUpdatesAllowed; });
string val = value; string val = value;
// check if it is to handle as one out of multiple values // check if it is to handle as one out of multiple values
auto it = fIofN.find(propertyId); auto it = fIofN.find(propertyId);

View File

@@ -63,6 +63,9 @@ struct DDSSubscription
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg; LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg;
}); });
// fDDSCustomCmd.subscribe([](const std::string& cmd, const std::string& cond, uint64_t senderId) {
// LOG(debug) << "cmd: " << cmd << ", cond: " << cond << ", senderId: " << senderId;
// });
assert(!dds_session_id.empty()); assert(!dds_session_id.empty());
} }
@@ -125,7 +128,7 @@ class DDS : public Plugin
~DDS(); ~DDS();
private: private:
auto HandleControl() -> void; auto StaticControl() -> void;
auto FillChannelContainers() -> void; auto FillChannelContainers() -> void;
auto SubscribeForConnectingChannels() -> void; auto SubscribeForConnectingChannels() -> void;
@@ -160,6 +163,10 @@ class DDS : public Plugin
std::thread fHeartbeatThread; std::thread fHeartbeatThread;
std::chrono::milliseconds fHeartbeatInterval; std::chrono::milliseconds fHeartbeatInterval;
bool fUpdatesAllowed;
std::mutex fUpdateMutex;
std::condition_variable fUpdateCondition;
}; };
Plugin::ProgOptions DDSProgramOptions() Plugin::ProgOptions DDSProgramOptions()

View File

@@ -273,7 +273,7 @@ int main(int argc, char* argv[])
cerr << "state-changes-unsubscription failed with return code: " << parts[2]; cerr << "state-changes-unsubscription failed with return code: " << parts[2];
} }
} else { } else {
// cout << "Received: " << msg << endl; cout << "Received: " << msg << endl;
} }
}); });

View File

@@ -20,7 +20,9 @@
#include <cassert> #include <cassert>
#include <cstdlib> #include <cstdlib>
#include <mutex>
#include <sstream> #include <sstream>
#include <unordered_map>
#include <utility> #include <utility>
namespace fair { namespace fair {
@@ -134,6 +136,8 @@ struct DDSSession::Impl
dds::intercom_api::CCustomCmd fDDSCustomCmd; dds::intercom_api::CCustomCmd fDDSCustomCmd;
Id fId; Id fId;
bool fStopOnDestruction; bool fStopOnDestruction;
mutable std::mutex fMtx;
std::unordered_map<DDSChannel::Id, DDSTask::Id> fTaskIdByChannelIdMap;
}; };
DDSSession::DDSSession(DDSEnvironment env) DDSSession::DDSSession(DDSEnvironment env)
@@ -316,6 +320,18 @@ void DDSSession::UnsubscribeFromCommands()
void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); } void DDSSession::SendCommand(const std::string& cmd) { fImpl->fDDSCustomCmd.send(cmd, ""); }
auto DDSSession::UpdateChannelToTaskAssociation(DDSChannel::Id channelId, DDSTask::Id taskId) -> void
{
std::lock_guard<std::mutex> lk(fImpl->fMtx);
fImpl->fTaskIdByChannelIdMap[channelId] = taskId;
}
auto DDSSession::GetTaskId(DDSChannel::Id channelId) const -> DDSTask::Id
{
std::lock_guard<std::mutex> lk(fImpl->fMtx);
return fImpl->fTaskIdByChannelIdMap.at(channelId);
}
auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream& auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&
{ {
return os << "$DDS_SESSION_ID: " << session.GetId(); return os << "$DDS_SESSION_ID: " << session.GetId();

View File

@@ -43,6 +43,18 @@ auto operator>>(std::istream& is, DDSRMSPlugin& plugin) -> std::istream&;
class DDSTopology; class DDSTopology;
class DDSAgent; class DDSAgent;
class DDSTask
{
public:
using Id = std::uint64_t;
};
class DDSChannel
{
public:
using Id = std::uint64_t;
};
/** /**
* @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h> * @class DDSSession DDSSession.h <fairmq/sdk/DDSSession.h>
* @brief Represents a DDS session * @brief Represents a DDS session
@@ -95,6 +107,8 @@ class DDSSession
void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>); void SubscribeToCommands(std::function<void(const std::string& msg, const std::string& condition, uint64_t senderId)>);
void UnsubscribeFromCommands(); void UnsubscribeFromCommands();
void SendCommand(const std::string&); void SendCommand(const std::string&);
auto UpdateChannelToTaskAssociation(DDSChannel::Id, DDSTask::Id) -> void;
auto GetTaskId(DDSChannel::Id) const -> DDSTask::Id;
friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&; friend auto operator<<(std::ostream& os, const DDSSession& session) -> std::ostream&;

View File

@@ -75,7 +75,7 @@ Topology::Topology(DDSTopology topo, DDSSession session)
// LOG(debug) << "Adding device " << d; // LOG(debug) << "Adding device " << d;
fState.emplace(d, DeviceStatus{ false, DeviceState::Ok }); fState.emplace(d, DeviceStatus{ false, DeviceState::Ok });
} }
fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, uint64_t senderId) { fDDSSession.SubscribeToCommands([this](const std::string& msg, const std::string& /* condition */, DDSChannel::Id senderId) {
// LOG(debug) << "Received from " << senderId << ": " << msg; // LOG(debug) << "Received from " << senderId << ": " << msg;
std::vector<std::string> parts; std::vector<std::string> parts;
boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,")); boost::algorithm::split(parts, msg, boost::algorithm::is_any_of(":,"));
@@ -85,7 +85,9 @@ Topology::Topology(DDSTopology topo, DDSSession session)
} }
if (parts[0] == "state-change") { if (parts[0] == "state-change") {
AddNewStateEntry(std::stoull(parts[2]), parts[3]); DDSTask::Id taskId(std::stoull(parts[2]));
fDDSSession.UpdateChannelToTaskAssociation(senderId, taskId);
AddNewStateEntry(taskId, parts[3]);
} else if (parts[0] == "state-changes-subscription") { } else if (parts[0] == "state-changes-subscription") {
LOG(debug) << "Received from " << senderId << ": " << msg; LOG(debug) << "Received from " << senderId << ": " << msg;
if (parts[2] != "OK") { if (parts[2] != "OK") {
@@ -96,7 +98,16 @@ Topology::Topology(DDSTopology topo, DDSSession session)
LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2]; LOG(error) << "state-changes-unsubscription failed with return code: " << parts[2];
} }
} else if (parts[1] == "could not queue") { } else if (parts[1] == "could not queue") {
LOG(warn) << "Could not queue " << parts[2] << " transition on " << senderId; std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeOngoing) {
if (fState.at(fDDSSession.GetTaskId(senderId)).state != fTargetState) {
fStateChangeError =
tools::ToString("Could not queue ", parts[2], " transition on ", senderId);
lock.unlock();
fCV.notify_one();
}
}
} }
}); });
fDDSSession.StartDDSService(); fDDSSession.StartDDSService();
@@ -122,13 +133,13 @@ auto Topology::ChangeState(TopologyTransition transition, ChangeStateCallback cb
std::unique_lock<std::mutex> lock(fMtx); std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeOngoing) { if (fStateChangeOngoing) {
throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported"); throw std::runtime_error("A state change request is already in progress, concurrent requests are currently not supported");
lock.unlock();
} }
LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition); LOG(debug) << "Initiating ChangeState with " << transition << " to " << expectedState.at(transition);
fStateChangeOngoing = true; fStateChangeOngoing = true;
fChangeStateCallback = cb; fChangeStateCallback = cb;
fStateChangeTimeout = timeout; fStateChangeTimeout = timeout;
fTargetState = expectedState.at(transition); fTargetState = expectedState.at(transition);
fStateChangeError.clear();
fDDSSession.SendCommand(GetTransitionName(transition)); fDDSSession.SendCommand(GetTransitionName(transition));
} }
@@ -155,12 +166,15 @@ void Topology::WaitForState()
while (!fShutdown) { while (!fShutdown) {
if (fStateChangeOngoing) { if (fStateChangeOngoing) {
try { try {
std::unique_lock<std::mutex> lock(fMtx);
auto condition = [&] { auto condition = [&] {
// LOG(info) << "checking condition"; // LOG(info) << "checking condition";
// LOG(info) << "fShutdown: " << fShutdown; // LOG(info) << "fShutdown: " << fShutdown;
// LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(), // LOG(info) << "condition: " << std::all_of(fState.cbegin(), fState.cend(),
// [&](TopologyState::value_type i) { return i.second.state == fTargetState; }); // [&](TopologyState::value_type i) { return i.second.state == fTargetState; });
return fShutdown return fShutdown
|| !fStateChangeError.empty()
|| std::all_of( || std::all_of(
fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) { fState.cbegin(), fState.cend(), [&](TopologyState::value_type i) {
// TODO Check, if we can make sure that EXITING state change event are not missed // TODO Check, if we can make sure that EXITING state change event are not missed
@@ -170,8 +184,6 @@ void Topology::WaitForState()
}); });
}; };
std::unique_lock<std::mutex> lock(fMtx);
if (fStateChangeTimeout > std::chrono::milliseconds(0)) { if (fStateChangeTimeout > std::chrono::milliseconds(0)) {
if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) { if (!fCV.wait_for(lock, fStateChangeTimeout, condition)) {
// LOG(debug) << "timeout"; // LOG(debug) << "timeout";
@@ -187,6 +199,15 @@ void Topology::WaitForState()
} }
fStateChangeOngoing = false; fStateChangeOngoing = false;
if (!fStateChangeError.empty()) {
TopologyState state = fState;
lock.unlock();
fChangeStateCallback(
{{AsyncOpResultCode::Error, fStateChangeError}, std::move(state)});
break;
}
if (fShutdown) { if (fShutdown) {
LOG(debug) << "Aborting because a shutdown was requested"; LOG(debug) << "Aborting because a shutdown was requested";
TopologyState state = fState; TopologyState state = fState;

View File

@@ -56,7 +56,7 @@ struct DeviceStatus
DeviceState state; DeviceState state;
}; };
using TopologyState = std::unordered_map<uint64_t, DeviceStatus>; using TopologyState = std::unordered_map<DDSTask::Id, DeviceStatus>;
using TopologyTransition = fair::mq::Transition; using TopologyTransition = fair::mq::Transition;
struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; }; struct MixedState : std::runtime_error { using std::runtime_error::runtime_error; };
@@ -150,6 +150,7 @@ class Topology
ChangeStateCallback fChangeStateCallback; ChangeStateCallback fChangeStateCallback;
std::chrono::milliseconds fStateChangeTimeout; std::chrono::milliseconds fStateChangeTimeout;
bool fShutdown; bool fShutdown;
std::string fStateChangeError;
void WaitForState(); void WaitForState();
void AddNewStateEntry(uint64_t senderId, const std::string& state); void AddNewStateEntry(uint64_t senderId, const std::string& state);

View File

@@ -26,10 +26,7 @@ TEST_F(PluginServices, OnlySingleController)
mServices.ChangeDeviceState("bar", DeviceStateTransition::InitDevice), mServices.ChangeDeviceState("bar", DeviceStateTransition::InitDevice),
fair::mq::PluginServices::DeviceControlError fair::mq::PluginServices::DeviceControlError
); );
ASSERT_THROW( // no control for bar ASSERT_NO_THROW(mServices.ReleaseDeviceControl("bar"));
mServices.ReleaseDeviceControl("bar"),
fair::mq::PluginServices::DeviceControlError
);
ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo")); ASSERT_NO_THROW(mServices.ReleaseDeviceControl("foo"));
ASSERT_FALSE(mServices.GetDeviceController()); ASSERT_FALSE(mServices.GetDeviceController());

View File

@@ -105,7 +105,7 @@ TEST_F(Topology, ChangeStateTimeout)
blocker.Wait(); blocker.Wait();
} }
TEST_F(Topology, ChangeStateFullDeviceLifetime) TEST_F(Topology, ChangeStateFullDeviceLifecycle)
{ {
using fair::mq::sdk::Topology; using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition; using fair::mq::sdk::TopologyTransition;
@@ -125,4 +125,27 @@ TEST_F(Topology, ChangeStateFullDeviceLifetime)
} }
} }
TEST_F(Topology, ChangeStateFullDeviceLifecycle2)
{
using fair::mq::sdk::Topology;
using fair::mq::sdk::TopologyTransition;
Topology topo(mDDSTopo, mDDSSession);
for (int i(0); i < 10; ++i) {
for (auto transition : {TopologyTransition::InitDevice,
TopologyTransition::CompleteInit,
TopologyTransition::Bind,
TopologyTransition::Connect,
TopologyTransition::InitTask,
TopologyTransition::Run}) {
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (auto transition : {TopologyTransition::Stop,
TopologyTransition::ResetTask,
TopologyTransition::ResetDevice}) {
ASSERT_EQ(topo.ChangeState(transition).rc, fair::mq::AsyncOpResultCode::Ok);
}
}
}
} // namespace } // namespace

View File

@@ -6,7 +6,7 @@
<declrequirement name="SinkWorker" type="wnname" value="sink"/> <declrequirement name="SinkWorker" type="wnname" value="sink"/>
<decltask name="Sampler"> <decltask name="Sampler">
<exe reachable="true">fairmq-bsampler --id sampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe> <exe reachable="true">fairmq-bsampler --color false --channel-config name=data,type=push,method=bind -P dds --msg-rate 10</exe>
<requirements> <requirements>
<name>SamplerWorker</name> <name>SamplerWorker</name>
</requirements> </requirements>
@@ -16,7 +16,7 @@
</decltask> </decltask>
<decltask name="Sink"> <decltask name="Sink">
<exe reachable="true">fairmq-sink --id sink_%taskIndex% --color false --channel-config name=data,type=pull,method=connect -P dds</exe> <exe reachable="true">fairmq-sink --color false --channel-config name=data,type=pull,method=connect -P dds</exe>
<requirements> <requirements>
<name>SinkWorker</name> <name>SinkWorker</name>
</requirements> </requirements>