mirror of
https://github.com/FairRootGroup/FairMQ.git
synced 2025-10-15 09:31:45 +00:00
Implement old_state->new_state notifications
This commit is contained in:
committed by
Dennis Klein
parent
d966a0a991
commit
14980d7486
@@ -45,6 +45,8 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta
|
||||
, fEvents()
|
||||
, fEventsMutex()
|
||||
, fNewEvent()
|
||||
, fCurrentState(DeviceState::Idle)
|
||||
, fLastState(DeviceState::Idle)
|
||||
, fDeviceTerminationRequested(false)
|
||||
, fHeartbeatInterval{100}
|
||||
{
|
||||
@@ -62,6 +64,15 @@ DDS::DDS(const string& name, const Plugin::Version version, const string& mainta
|
||||
auto DDS::HandleControl() -> void
|
||||
{
|
||||
try {
|
||||
LOG(debug) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH");
|
||||
LOG(debug) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME");
|
||||
LOG(debug) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME");
|
||||
LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME");
|
||||
LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX");
|
||||
LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX");
|
||||
string dds_session_id(getenv("DDS_SESSION_ID"));
|
||||
LOG(debug) << "$DDS_SESSION_ID: " << dds_session_id;
|
||||
|
||||
// subscribe for state changes from DDS (subscriptions start firing after fService.start() is called)
|
||||
SubscribeForCustomCommands();
|
||||
|
||||
@@ -70,10 +81,12 @@ auto DDS::HandleControl() -> void
|
||||
LOG(error) << "DDS Error received: error code: " << errorCode << ", error message: " << errorMsg << endl;
|
||||
});
|
||||
|
||||
LOG(debug) << "Subscribing for DDS properties.";
|
||||
SubscribeForConnectingChannels();
|
||||
|
||||
// subscribe to device state changes, pushing new state chenges into the event queue
|
||||
// start DDS service - subscriptions will only start firing after this step
|
||||
fService.start(dds_session_id);
|
||||
|
||||
// subscribe to device state changes, pushing new state changes into the event queue
|
||||
SubscribeToDeviceStateChange([&](DeviceState newState) {
|
||||
{
|
||||
lock_guard<mutex> lock{fEventsMutex};
|
||||
@@ -87,9 +100,14 @@ auto DDS::HandleControl() -> void
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
string id = GetProperty<string>("id");
|
||||
fLastState = fCurrentState;
|
||||
fCurrentState = newState;
|
||||
for (auto subscriberId : fStateChangeSubscribers) {
|
||||
LOG(debug) << "Publishing state-change: " << newState << " to " << subscriberId;
|
||||
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(newState), to_string(subscriberId));
|
||||
LOG(debug) << "Publishing state-change: " << fLastState << "->" << newState
|
||||
<< " to " << subscriberId;
|
||||
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->"
|
||||
+ ToStr(newState),
|
||||
to_string(subscriberId));
|
||||
}
|
||||
}
|
||||
});
|
||||
@@ -105,16 +123,6 @@ auto DDS::HandleControl() -> void
|
||||
// and propagate addresses of bound channels to DDS.
|
||||
FillChannelContainers();
|
||||
|
||||
LOG(debug) << "$DDS_TASK_PATH: " << getenv("DDS_TASK_PATH");
|
||||
LOG(debug) << "$DDS_GROUP_NAME: " << getenv("DDS_GROUP_NAME");
|
||||
LOG(debug) << "$DDS_COLLECTION_NAME: " << getenv("DDS_COLLECTION_NAME");
|
||||
LOG(debug) << "$DDS_TASK_NAME: " << getenv("DDS_TASK_NAME");
|
||||
LOG(debug) << "$DDS_TASK_INDEX: " << getenv("DDS_TASK_INDEX");
|
||||
LOG(debug) << "$DDS_COLLECTION_INDEX: " << getenv("DDS_COLLECTION_INDEX");
|
||||
|
||||
// start DDS service - subscriptions will only start firing after this step
|
||||
fService.start();
|
||||
|
||||
// publish bound addresses via DDS at keys corresponding to the channel prefixes, e.g. 'data' in data[i]
|
||||
PublishBoundChannels();
|
||||
|
||||
@@ -219,6 +227,8 @@ auto DDS::FillChannelContainers() -> void
|
||||
|
||||
auto DDS::SubscribeForConnectingChannels() -> void
|
||||
{
|
||||
LOG(debug) << "Subscribing for DDS properties.";
|
||||
|
||||
fDDSKeyValue.subscribe([&] (const string& propertyId, const string& value, uint64_t senderTaskID) {
|
||||
try {
|
||||
LOG(debug) << "Received update for " << propertyId << ": value=" << value << ", senderTaskID=" << senderTaskID;
|
||||
@@ -285,14 +295,13 @@ auto DDS::PublishBoundChannels() -> void
|
||||
auto DDS::HeartbeatSender() -> void
|
||||
{
|
||||
string id = GetProperty<string>("id");
|
||||
string pid(to_string(getpid()));
|
||||
|
||||
while (!fDeviceTerminationRequested) {
|
||||
{
|
||||
lock_guard<mutex> lock{fHeartbeatSubscriberMutex};
|
||||
|
||||
for (const auto subscriberId : fHeartbeatSubscribers) {
|
||||
fDDSCustomCmd.send("heartbeat: " + id + "," + pid, to_string(subscriberId));
|
||||
fDDSCustomCmd.send("heartbeat: " + id , to_string(subscriberId));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -302,14 +311,15 @@ auto DDS::HeartbeatSender() -> void
|
||||
|
||||
auto DDS::SubscribeForCustomCommands() -> void
|
||||
{
|
||||
string id = GetProperty<string>("id");
|
||||
string pid(to_string(getpid()));
|
||||
LOG(debug) << "Subscribing for DDS custom commands.";
|
||||
|
||||
fDDSCustomCmd.subscribe([id, pid, this](const string& cmd, const string& cond, uint64_t senderId) {
|
||||
string id = GetProperty<string>("id");
|
||||
|
||||
fDDSCustomCmd.subscribe([id, this](const string& cmd, const string& cond, uint64_t senderId) {
|
||||
LOG(info) << "Received command: " << cmd;
|
||||
|
||||
if (cmd == "check-state") {
|
||||
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()) + " (pid: " + pid + ")", to_string(senderId));
|
||||
fDDSCustomCmd.send(id + ": " + ToStr(GetCurrentDeviceState()), to_string(senderId));
|
||||
} else if (cmd == "INIT DEVICE") {
|
||||
if (ChangeDeviceState(ToDeviceStateTransition(cmd))) {
|
||||
fDDSCustomCmd.send(id + ": queued " + cmd + " transition", to_string(senderId));
|
||||
@@ -360,9 +370,14 @@ auto DDS::SubscribeForCustomCommands() -> void
|
||||
fStateChangeSubscribers.insert(senderId);
|
||||
}
|
||||
fDDSCustomCmd.send("state-changes-subscription: " + id + ",OK", to_string(senderId));
|
||||
auto state = GetCurrentDeviceState();
|
||||
LOG(debug) << "Publishing state-change: " << state << " to " << senderId;
|
||||
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(state), to_string(senderId));
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
LOG(debug) << "Publishing state-change: " << fLastState << "->" << fCurrentState
|
||||
<< " to " << senderId;
|
||||
fDDSCustomCmd.send("state-change: " + id + "," + ToStr(fLastState) + "->"
|
||||
+ ToStr(fCurrentState),
|
||||
to_string(senderId));
|
||||
}
|
||||
} else if (cmd == "unsubscribe-from-state-changes") {
|
||||
{
|
||||
lock_guard<mutex> lock{fStateChangeSubscriberMutex};
|
||||
|
Reference in New Issue
Block a user