From a941deaef9306e8a7c214a2fa3fd23545565be47 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Fri, 8 Dec 2023 11:12:24 +0200 Subject: [PATCH] [WIP] Refactor transport activity managers. Add telemetry TTL for device state. --- .../state/DefaultDeviceStateService.java | 8 +++- .../src/main/resources/thingsboard.yml | 2 + .../service/DefaultTransportService.java | 6 +-- .../FirstAndLastTransportActivityManager.java | 44 ++++++++----------- .../FirstOnlyTransportActivityManager.java | 44 ++++++++----------- .../LastOnlyTransportActivityManager.java | 34 ++++++-------- 6 files changed, 59 insertions(+), 79 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 37070718a3..e010cf3050 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -185,6 +185,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService deviceStates = new ConcurrentHashMap<>(); @@ -803,7 +807,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(deviceId, key, value)); + telemetryTtl, new TelemetrySaveCallback<>(deviceId, key, value)); } else { tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value)); } @@ -814,7 +818,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(deviceId, key, value)); + telemetryTtl, new TelemetrySaveCallback<>(deviceId, key, value)); } else { tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value)); } diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 73f0622cd1..16af7b1618 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -785,6 +785,8 @@ state: # If 'persistToTelemetry' is changed from 'false' to 'true': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_ts_view;' # If 'persistToTelemetry' is changed from 'true' to 'false': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_attribute_view;' persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}" + # Time-to-live for device state telemetry data (e.g. 'active', 'lastActivityTime'). Used only when state.persistToTelemetry is set to 'true'. + telemetryTtl: "${STATE_TELEMETRY_TTL:0}" # Tbel parameters tbel: diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java index 6453cbd24c..692421b0ab 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java @@ -255,11 +255,7 @@ public class DefaultTransportService implements TransportService { transportNotificationsConsumer.subscribe(Collections.singleton(tpi)); transportApiRequestTemplate.init(); mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer")); - - activityManager.setName("transport-activity-manager"); - activityManager.setReportingPeriod(sessionReportTimeout); - activityManager.setActivityReporter(activityReporter); - activityManager.init(); + activityManager.init("transport-activity-manager", sessionReportTimeout, activityReporter); } private final ActivityStateReporter activityReporter = (sessionId, timeToReport, state, reportCallback) -> { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java index e6bffa4dfd..6370272813 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java @@ -43,14 +43,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.TransportService; -import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.activity.AbstractActivityManager; import org.thingsboard.server.common.transport.activity.ActivityReportCallback; import org.thingsboard.server.common.transport.service.SessionMetaData; import org.thingsboard.server.common.transport.service.TransportActivityState; import org.thingsboard.server.gen.transport.TransportProtos; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -86,19 +87,16 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage SettableFuture> reportCompletedFuture = SettableFuture.create(); states.compute(sessionId, (key, activityStateWrapper) -> { if (activityStateWrapper == null) { - TransportActivityState activityState = newStateSupplier.get(); - activityState.setLastRecordedTime(newLastRecordedTime); - activityState.setLastReportedTime(0L); activityStateWrapper = new ActivityStateWrapper(); - activityStateWrapper.setState(activityState); - activityStateWrapper.setAlreadyBeenReported(false); - } else { - activityStateWrapper.getState().setLastRecordedTime(newLastRecordedTime); + activityStateWrapper.setState(newStateSupplier.get()); + } + var activityState = activityStateWrapper.getState(); + if (activityState.getLastRecordedTime() < newLastRecordedTime) { + activityState.setLastRecordedTime(newLastRecordedTime); } if (activityStateWrapper.isAlreadyBeenReported()) { return activityStateWrapper; } - var activityState = activityStateWrapper.getState(); if (activityState.getLastReportedTime() < activityState.getLastRecordedTime()) { reporter.report(key, activityState.getLastRecordedTime(), activityState, new ActivityReportCallback<>() { @Override @@ -123,14 +121,14 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage @Override public void onFailure(@NonNull Throwable t) { - log.debug("[{}] Failed to report first activity event in a period for session.", sessionId); + log.debug("[{}] Failed to report first activity event in a period.", sessionId); } }, MoreExecutors.directExecutor()); } @Override protected void onReportingPeriodEnd() { - long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout; + Set statesToRemove = new HashSet<>(); for (Map.Entry entry : states.entrySet()) { var sessionId = entry.getKey(); var activityStateWrapper = entry.getValue(); @@ -140,7 +138,7 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage if (sessionMetaData != null) { activityState.setSessionInfoProto(sessionMetaData.getSessionInfo()); } else { - states.remove(sessionId); + statesToRemove.add(sessionId); } long lastActivityTime = activityState.getLastRecordedTime(); @@ -157,25 +155,18 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage } } + long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout; boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime; if (hasExpired) { if (log.isDebugEnabled()) { - log.debug("[{}] Session has expired due to last activity time: [{}]!", sessionId, lastActivityTime); + log.debug("[{}] Session has expired due to last activity time: [{}].", sessionId, lastActivityTime); } + statesToRemove.add(sessionId); transportService.deregisterSession(sessionInfo); - transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, new TransportServiceCallback<>() { - @Override - public void onSuccess(Void msgAcknowledged) { - states.remove(sessionId); - } - - @Override - public void onError(Throwable e) { - states.remove(sessionId); - } - }); + transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO); - } else if (activityState.getLastReportedTime() < lastActivityTime) { + } + if (activityState.getLastReportedTime() < lastActivityTime) { reporter.report(sessionId, lastActivityTime, activityState, new ActivityReportCallback<>() { @Override public void onSuccess(UUID key, long reportedTime) { @@ -184,12 +175,13 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage @Override public void onFailure(UUID key, Throwable t) { - log.debug("[{}] Failed to report last activity event in a period for session.", sessionId); + log.debug("[{}] Failed to report last activity event in a period.", sessionId); } }); } activityStateWrapper.setAlreadyBeenReported(false); } + statesToRemove.forEach(states::remove); } private void updateLastReportedTime(UUID key, long newLastReportedTime) { diff --git a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java index 8cefbb0b46..768796b2be 100644 --- a/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java +++ b/common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java @@ -43,14 +43,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.util.Pair; import org.springframework.stereotype.Component; import org.thingsboard.server.common.transport.TransportService; -import org.thingsboard.server.common.transport.TransportServiceCallback; import org.thingsboard.server.common.transport.activity.AbstractActivityManager; import org.thingsboard.server.common.transport.activity.ActivityReportCallback; import org.thingsboard.server.common.transport.service.SessionMetaData; import org.thingsboard.server.common.transport.service.TransportActivityState; import org.thingsboard.server.gen.transport.TransportProtos; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -86,19 +87,16 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager> reportCompletedFuture = SettableFuture.create(); states.compute(sessionId, (key, activityStateWrapper) -> { if (activityStateWrapper == null) { - TransportActivityState activityState = newStateSupplier.get(); - activityState.setLastRecordedTime(newLastRecordedTime); - activityState.setLastReportedTime(0L); activityStateWrapper = new ActivityStateWrapper(); - activityStateWrapper.setState(activityState); - activityStateWrapper.setAlreadyBeenReported(false); - } else { - activityStateWrapper.getState().setLastRecordedTime(newLastRecordedTime); + activityStateWrapper.setState(newStateSupplier.get()); + } + var activityState = activityStateWrapper.getState(); + if (activityState.getLastRecordedTime() < newLastRecordedTime) { + activityState.setLastRecordedTime(newLastRecordedTime); } if (activityStateWrapper.isAlreadyBeenReported()) { return activityStateWrapper; } - var activityState = activityStateWrapper.getState(); if (activityState.getLastReportedTime() < activityState.getLastRecordedTime()) { reporter.report(key, activityState.getLastRecordedTime(), activityState, new ActivityReportCallback<>() { @Override @@ -123,14 +121,14 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager statesToRemove = new HashSet<>(); for (Map.Entry entry : states.entrySet()) { var sessionId = entry.getKey(); var activityStateWrapper = entry.getValue(); @@ -140,7 +138,7 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager() { - @Override - public void onSuccess(Void msgAcknowledged) { - states.remove(sessionId); - } - - @Override - public void onError(Throwable e) { - states.remove(sessionId); - } - }); + transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO); } - if ((sessionMetaData == null || hasExpired) && activityState.getLastReportedTime() < lastActivityTime) { + boolean shouldReportLeftoverEvents = sessionMetaData == null || hasExpired; + if (shouldReportLeftoverEvents && activityState.getLastReportedTime() < lastActivityTime) { reporter.report(sessionId, lastActivityTime, activityState, new ActivityReportCallback<>() { @Override public void onSuccess(UUID key, long reportedTime) { @@ -185,12 +176,13 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager { if (activityState == null) { activityState = newStateSupplier.get(); - activityState.setLastRecordedTime(newLastRecordedTime); - activityState.setLastReportedTime(0L); - } else { + } + if (activityState.getLastRecordedTime() < newLastRecordedTime) { activityState.setLastRecordedTime(newLastRecordedTime); } return activityState; @@ -82,7 +82,7 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager statesToRemove = new HashSet<>(); for (Map.Entry entry : states.entrySet()) { var sessionId = entry.getKey(); var activityState = entry.getValue(); @@ -91,7 +91,7 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager() { - @Override - public void onSuccess(Void msgAcknowledged) { - states.remove(sessionId); - } - - @Override - public void onError(Throwable e) { - states.remove(sessionId); - } - }); + transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null); sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO); - } else if (activityState.getLastReportedTime() < lastActivityTime) { + } + if (activityState.getLastReportedTime() < lastActivityTime) { reporter.report(sessionId, lastActivityTime, activityState, new ActivityReportCallback<>() { @Override public void onSuccess(UUID key, long reportedTime) { @@ -135,11 +128,12 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager