Browse Source

[WIP] Refactor transport activity managers. Add telemetry TTL for device state.

pull/9980/head
Dmytro Skarzhynets 3 years ago
parent
commit
a941deaef9
  1. 8
      application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
  2. 2
      application/src/main/resources/thingsboard.yml
  3. 6
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
  4. 44
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java
  5. 44
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java
  6. 34
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/LastOnlyTransportActivityManager.java

8
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java

@ -185,6 +185,10 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
@Getter
private int initFetchPackSize;
@Value("${state.telemetryTtl:0}")
@Getter
private int telemetryTtl;
private ListeningExecutorService deviceStateExecutor;
final ConcurrentMap<DeviceId, DeviceStateData> deviceStates = new ConcurrentHashMap<>();
@ -803,7 +807,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
tsSubService.saveAndNotifyInternal(
TenantId.SYS_TENANT_ID, deviceId,
Collections.singletonList(new BasicTsKvEntry(getCurrentTimeMillis(), new LongDataEntry(key, value))),
new TelemetrySaveCallback<>(deviceId, key, value));
telemetryTtl, new TelemetrySaveCallback<>(deviceId, key, value));
} else {
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value));
}
@ -814,7 +818,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
tsSubService.saveAndNotifyInternal(
TenantId.SYS_TENANT_ID, deviceId,
Collections.singletonList(new BasicTsKvEntry(getCurrentTimeMillis(), new BooleanDataEntry(key, value))),
new TelemetrySaveCallback<>(deviceId, key, value));
telemetryTtl, new TelemetrySaveCallback<>(deviceId, key, value));
} else {
tsSubService.saveAttrAndNotify(TenantId.SYS_TENANT_ID, deviceId, SERVER_SCOPE, key, value, new TelemetrySaveCallback<>(deviceId, key, value));
}

2
application/src/main/resources/thingsboard.yml

@ -785,6 +785,8 @@ state:
# If 'persistToTelemetry' is changed from 'false' to 'true': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_ts_view;'
# If 'persistToTelemetry' is changed from 'true' to 'false': 'CREATE OR REPLACE VIEW device_info_view AS SELECT * FROM device_info_active_attribute_view;'
persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}"
# Time-to-live for device state telemetry data (e.g. 'active', 'lastActivityTime'). Used only when state.persistToTelemetry is set to 'true'.
telemetryTtl: "${STATE_TELEMETRY_TTL:0}"
# Tbel parameters
tbel:

6
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -255,11 +255,7 @@ public class DefaultTransportService implements TransportService {
transportNotificationsConsumer.subscribe(Collections.singleton(tpi));
transportApiRequestTemplate.init();
mainConsumerExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("transport-consumer"));
activityManager.setName("transport-activity-manager");
activityManager.setReportingPeriod(sessionReportTimeout);
activityManager.setActivityReporter(activityReporter);
activityManager.init();
activityManager.init("transport-activity-manager", sessionReportTimeout, activityReporter);
}
private final ActivityStateReporter<UUID, TransportActivityState> activityReporter = (sessionId, timeToReport, state, reportCallback) -> {

44
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstAndLastTransportActivityManager.java

@ -43,14 +43,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.activity.AbstractActivityManager;
import org.thingsboard.server.common.transport.activity.ActivityReportCallback;
import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.service.TransportActivityState;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -86,19 +87,16 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage
SettableFuture<Pair<UUID, Long>> reportCompletedFuture = SettableFuture.create();
states.compute(sessionId, (key, activityStateWrapper) -> {
if (activityStateWrapper == null) {
TransportActivityState activityState = newStateSupplier.get();
activityState.setLastRecordedTime(newLastRecordedTime);
activityState.setLastReportedTime(0L);
activityStateWrapper = new ActivityStateWrapper();
activityStateWrapper.setState(activityState);
activityStateWrapper.setAlreadyBeenReported(false);
} else {
activityStateWrapper.getState().setLastRecordedTime(newLastRecordedTime);
activityStateWrapper.setState(newStateSupplier.get());
}
var activityState = activityStateWrapper.getState();
if (activityState.getLastRecordedTime() < newLastRecordedTime) {
activityState.setLastRecordedTime(newLastRecordedTime);
}
if (activityStateWrapper.isAlreadyBeenReported()) {
return activityStateWrapper;
}
var activityState = activityStateWrapper.getState();
if (activityState.getLastReportedTime() < activityState.getLastRecordedTime()) {
reporter.report(key, activityState.getLastRecordedTime(), activityState, new ActivityReportCallback<>() {
@Override
@ -123,14 +121,14 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage
@Override
public void onFailure(@NonNull Throwable t) {
log.debug("[{}] Failed to report first activity event in a period for session.", sessionId);
log.debug("[{}] Failed to report first activity event in a period.", sessionId);
}
}, MoreExecutors.directExecutor());
}
@Override
protected void onReportingPeriodEnd() {
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
Set<UUID> statesToRemove = new HashSet<>();
for (Map.Entry<UUID, ActivityStateWrapper> entry : states.entrySet()) {
var sessionId = entry.getKey();
var activityStateWrapper = entry.getValue();
@ -140,7 +138,7 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage
if (sessionMetaData != null) {
activityState.setSessionInfoProto(sessionMetaData.getSessionInfo());
} else {
states.remove(sessionId);
statesToRemove.add(sessionId);
}
long lastActivityTime = activityState.getLastRecordedTime();
@ -157,25 +155,18 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage
}
}
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime;
if (hasExpired) {
if (log.isDebugEnabled()) {
log.debug("[{}] Session has expired due to last activity time: [{}]!", sessionId, lastActivityTime);
log.debug("[{}] Session has expired due to last activity time: [{}].", sessionId, lastActivityTime);
}
statesToRemove.add(sessionId);
transportService.deregisterSession(sessionInfo);
transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, new TransportServiceCallback<>() {
@Override
public void onSuccess(Void msgAcknowledged) {
states.remove(sessionId);
}
@Override
public void onError(Throwable e) {
states.remove(sessionId);
}
});
transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO);
} else if (activityState.getLastReportedTime() < lastActivityTime) {
}
if (activityState.getLastReportedTime() < lastActivityTime) {
reporter.report(sessionId, lastActivityTime, activityState, new ActivityReportCallback<>() {
@Override
public void onSuccess(UUID key, long reportedTime) {
@ -184,12 +175,13 @@ public class FirstAndLastTransportActivityManager extends AbstractActivityManage
@Override
public void onFailure(UUID key, Throwable t) {
log.debug("[{}] Failed to report last activity event in a period for session.", sessionId);
log.debug("[{}] Failed to report last activity event in a period.", sessionId);
}
});
}
activityStateWrapper.setAlreadyBeenReported(false);
}
statesToRemove.forEach(states::remove);
}
private void updateLastReportedTime(UUID key, long newLastReportedTime) {

44
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/FirstOnlyTransportActivityManager.java

@ -43,14 +43,15 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.util.Pair;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.activity.AbstractActivityManager;
import org.thingsboard.server.common.transport.activity.ActivityReportCallback;
import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.service.TransportActivityState;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -86,19 +87,16 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager<U
SettableFuture<Pair<UUID, Long>> reportCompletedFuture = SettableFuture.create();
states.compute(sessionId, (key, activityStateWrapper) -> {
if (activityStateWrapper == null) {
TransportActivityState activityState = newStateSupplier.get();
activityState.setLastRecordedTime(newLastRecordedTime);
activityState.setLastReportedTime(0L);
activityStateWrapper = new ActivityStateWrapper();
activityStateWrapper.setState(activityState);
activityStateWrapper.setAlreadyBeenReported(false);
} else {
activityStateWrapper.getState().setLastRecordedTime(newLastRecordedTime);
activityStateWrapper.setState(newStateSupplier.get());
}
var activityState = activityStateWrapper.getState();
if (activityState.getLastRecordedTime() < newLastRecordedTime) {
activityState.setLastRecordedTime(newLastRecordedTime);
}
if (activityStateWrapper.isAlreadyBeenReported()) {
return activityStateWrapper;
}
var activityState = activityStateWrapper.getState();
if (activityState.getLastReportedTime() < activityState.getLastRecordedTime()) {
reporter.report(key, activityState.getLastRecordedTime(), activityState, new ActivityReportCallback<>() {
@Override
@ -123,14 +121,14 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager<U
@Override
public void onFailure(@NonNull Throwable t) {
log.debug("[{}] Failed to report first activity event in a period for session.", sessionId);
log.debug("[{}] Failed to report first activity event in a period .", sessionId);
}
}, MoreExecutors.directExecutor());
}
@Override
protected void onReportingPeriodEnd() {
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
Set<UUID> statesToRemove = new HashSet<>();
for (Map.Entry<UUID, ActivityStateWrapper> entry : states.entrySet()) {
var sessionId = entry.getKey();
var activityStateWrapper = entry.getValue();
@ -140,7 +138,7 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager<U
if (sessionMetaData != null) {
activityState.setSessionInfoProto(sessionMetaData.getSessionInfo());
} else {
states.remove(sessionId);
statesToRemove.add(sessionId);
}
long lastActivityTime = activityState.getLastRecordedTime();
@ -157,26 +155,19 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager<U
}
}
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime;
if (hasExpired) {
if (log.isDebugEnabled()) {
log.debug("[{}] Session has expired due to last activity time: {}!", sessionId, lastActivityTime);
log.debug("[{}] Session has expired due to last activity time: {}.", sessionId, lastActivityTime);
}
statesToRemove.add(sessionId);
transportService.deregisterSession(sessionInfo);
transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, new TransportServiceCallback<>() {
@Override
public void onSuccess(Void msgAcknowledged) {
states.remove(sessionId);
}
@Override
public void onError(Throwable e) {
states.remove(sessionId);
}
});
transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO);
}
if ((sessionMetaData == null || hasExpired) && activityState.getLastReportedTime() < lastActivityTime) {
boolean shouldReportLeftoverEvents = sessionMetaData == null || hasExpired;
if (shouldReportLeftoverEvents && activityState.getLastReportedTime() < lastActivityTime) {
reporter.report(sessionId, lastActivityTime, activityState, new ActivityReportCallback<>() {
@Override
public void onSuccess(UUID key, long reportedTime) {
@ -185,12 +176,13 @@ public class FirstOnlyTransportActivityManager extends AbstractActivityManager<U
@Override
public void onFailure(UUID key, Throwable t) {
log.debug("[{}] Failed to report last activity event in a period for session.", sessionId);
log.debug("[{}] Failed to report last activity event in a period.", sessionId);
}
});
}
activityStateWrapper.setAlreadyBeenReported(false);
}
statesToRemove.forEach(states::remove);
}
private void updateLastReportedTime(UUID key, long newLastReportedTime) {

34
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/activity/LastOnlyTransportActivityManager.java

@ -36,14 +36,15 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.transport.TransportService;
import org.thingsboard.server.common.transport.TransportServiceCallback;
import org.thingsboard.server.common.transport.activity.AbstractActivityManager;
import org.thingsboard.server.common.transport.activity.ActivityReportCallback;
import org.thingsboard.server.common.transport.service.SessionMetaData;
import org.thingsboard.server.common.transport.service.TransportActivityState;
import org.thingsboard.server.gen.transport.TransportProtos;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -71,9 +72,8 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager<UU
states.compute(activityKey, (__, activityState) -> {
if (activityState == null) {
activityState = newStateSupplier.get();
activityState.setLastRecordedTime(newLastRecordedTime);
activityState.setLastReportedTime(0L);
} else {
}
if (activityState.getLastRecordedTime() < newLastRecordedTime) {
activityState.setLastRecordedTime(newLastRecordedTime);
}
return activityState;
@ -82,7 +82,7 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager<UU
@Override
protected void onReportingPeriodEnd() {
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
Set<UUID> statesToRemove = new HashSet<>();
for (Map.Entry<UUID, TransportActivityState> entry : states.entrySet()) {
var sessionId = entry.getKey();
var activityState = entry.getValue();
@ -91,7 +91,7 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager<UU
if (sessionMetaData != null) {
activityState.setSessionInfoProto(sessionMetaData.getSessionInfo());
} else {
states.remove(sessionId);
statesToRemove.add(sessionId);
}
long lastActivityTime = activityState.getLastRecordedTime();
@ -108,25 +108,18 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager<UU
}
}
long expirationTime = System.currentTimeMillis() - sessionInactivityTimeout;
boolean hasExpired = sessionMetaData != null && lastActivityTime < expirationTime;
if (hasExpired) {
if (log.isDebugEnabled()) {
log.debug("[{}] Session has expired due to last activity time: [{}]!", sessionId, lastActivityTime);
log.debug("[{}] Session has expired due to last activity time: [{}].", sessionId, lastActivityTime);
}
statesToRemove.add(sessionId);
transportService.deregisterSession(sessionInfo);
transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, new TransportServiceCallback<>() {
@Override
public void onSuccess(Void msgAcknowledged) {
states.remove(sessionId);
}
@Override
public void onError(Throwable e) {
states.remove(sessionId);
}
});
transportService.process(sessionInfo, SESSION_EVENT_MSG_CLOSED, null);
sessionMetaData.getListener().onRemoteSessionCloseCommand(sessionId, SESSION_EXPIRED_NOTIFICATION_PROTO);
} else if (activityState.getLastReportedTime() < lastActivityTime) {
}
if (activityState.getLastReportedTime() < lastActivityTime) {
reporter.report(sessionId, lastActivityTime, activityState, new ActivityReportCallback<>() {
@Override
public void onSuccess(UUID key, long reportedTime) {
@ -135,11 +128,12 @@ public class LastOnlyTransportActivityManager extends AbstractActivityManager<UU
@Override
public void onFailure(UUID key, Throwable t) {
log.debug("[{}] Failed to report last activity event in a period for session.", sessionId);
log.debug("[{}] Failed to report last activity event in a period.", sessionId);
}
});
}
}
statesToRemove.forEach(states::remove);
}
private void updateLastReportedTime(UUID key, long newLastReportedTime) {

Loading…
Cancel
Save