From da4e231ea3857f25d78cce47cc34f5c13a572d50 Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Mon, 3 Mar 2025 14:47:51 +0200 Subject: [PATCH] Save attributes strategies: ensure shared attributes subscription update is sent even if WS updates are disabled --- .../DefaultSubscriptionManagerService.java | 4 - .../DefaultTelemetrySubscriptionService.java | 17 ++ ...faultTelemetrySubscriptionServiceTest.java | 205 ++++++++++++++++++ .../thingsboard/common/util/DonAsynchron.java | 6 + 4 files changed, 228 insertions(+), 4 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java index 23adacfc2e..d5a5318798 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java @@ -217,10 +217,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene if (entityId.getEntityType() == EntityType.DEVICE) { if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) { updateDeviceInactivityTimeout(tenantId, entityId, attributes); - } else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) { - clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId, - new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes)) - , null); } } callback.onSuccess(); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index d60d1621b7..3af2c04643 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -35,9 +35,11 @@ import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.ApiUsageRecordKey; +import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; @@ -45,6 +47,7 @@ import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -52,6 +55,7 @@ import org.thingsboard.server.dao.util.KvUtils; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; +import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import java.util.ArrayList; @@ -197,6 +201,15 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } }, t -> request.getCallback().onFailure(t)); + if (strategy.saveAttributes() + && entityId.getEntityType() == EntityType.DEVICE + && TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(request.getScope().name()) + && request.isNotifyDevice()) { + addMainCallback(resultFuture, success -> clusterService.pushMsgToCore( + DeviceAttributesEventNotificationMsg.onUpdate(tenantId, new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, request.getEntries()), null + )); + } + if (strategy.sendWsUpdate()) { addWsCallback(resultFuture, success -> onAttributesUpdate(tenantId, entityId, request.getScope().name(), request.getEntries(), request.isNotifyDevice())); } @@ -342,6 +355,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addMainCallback(saveFuture, result -> callback.onSuccess(null), callback::onFailure); } + private void addMainCallback(ListenableFuture saveFuture, Consumer onSuccess) { + DonAsynchron.withCallback(saveFuture, onSuccess, null, tsCallBackExecutor); + } + private void addMainCallback(ListenableFuture saveFuture, Consumer onSuccess, Consumer onFailure) { DonAsynchron.withCallback(saveFuture, onSuccess, onFailure, tsCallBackExecutor); } diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index c96b96b2e0..8bdcbc1349 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -35,15 +36,21 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateValue; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.id.ApiUsageStateId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.objects.AttributesEntityView; @@ -51,6 +58,7 @@ import org.thingsboard.server.common.data.objects.TelemetryEntityView; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg; import org.thingsboard.server.common.stats.TbApiUsageReportClient; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -74,13 +82,17 @@ import java.util.concurrent.ExecutorService; import java.util.stream.LongStream; import java.util.stream.Stream; +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; @ExtendWith(MockitoExtension.class) class DefaultTelemetrySubscriptionServiceTest { @@ -428,6 +440,199 @@ class DefaultTelemetrySubscriptionServiceTest { ); } + @Test + void shouldThrowErrorWhenTryingToSaveAttributesForApiUsageState() { + // GIVEN + var request = AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(new ApiUsageStateId(UUID.randomUUID())) + .scope(AttributeScope.SHARED_SCOPE) + .entry(new DoubleDataEntry("temperature", 65.2)) + .notifyDevice(true) + .strategy(new AttributesSaveRequest.Strategy(true, false, false)) + .build(); + + // WHEN + assertThatThrownBy(() -> telemetryService.saveAttributes(request)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Can't update API Usage State!"); + + // THEN + then(attrService).shouldHaveNoInteractions(); + } + + @Test + void shouldSendAttributesUpdateNotificationWhenDeviceSharedAttributesAreSavedAndNotifyDeviceIsTrue() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + List entries = List.of( + new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)), + new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test")) + ); + + var request = AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(deviceId) + .scope(AttributeScope.SHARED_SCOPE) + .entries(entries) + .notifyDevice(true) + .strategy(new AttributesSaveRequest.Strategy(true, false, false)) + .build(); + + given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + + // WHEN + telemetryService.saveAttributes(request); + + // THEN + var expectedAttributesUpdateMsg = DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, "SHARED_SCOPE", entries); + + then(clusterService).should().pushMsgToCore(eq(expectedAttributesUpdateMsg), isNull()); + } + + @ParameterizedTest + @EnumSource( + value = EntityType.class, + names = {"DEVICE", "API_USAGE_STATE"}, // API usage state excluded due to coverage in another test + mode = EnumSource.Mode.EXCLUDE + ) + void shouldNotSendAttributesUpdateNotificationWhenEntityIsNotDevice(EntityType entityType) { + // GIVEN + var nonDeviceId = EntityIdFactory.getByTypeAndUuid(entityType, "cc51e450-53e1-11ee-883e-e56b48fd2088"); + List entries = List.of( + new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)), + new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test")) + ); + + var request = AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(nonDeviceId) + .scope(AttributeScope.SHARED_SCOPE) + .entries(entries) + .notifyDevice(true) + .strategy(new AttributesSaveRequest.Strategy(true, false, false)) + .build(); + + given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + + // WHEN + telemetryService.saveAttributes(request); + + // THEN + then(clusterService).should(never()).pushMsgToCore(any(), any()); + } + + @ParameterizedTest + @EnumSource( + value = AttributeScope.class, + names = "SHARED_SCOPE", + mode = EnumSource.Mode.EXCLUDE + ) + void shouldNotSendAttributesUpdateNotificationWhenAttributesAreNotShared(AttributeScope notSharedScope) { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + List entries = List.of( + new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)), + new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test")) + ); + + var request = AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(deviceId) + .scope(notSharedScope) + .entries(entries) + .notifyDevice(true) + .strategy(new AttributesSaveRequest.Strategy(true, false, false)) + .build(); + + given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + + // WHEN + telemetryService.saveAttributes(request); + + // THEN + then(clusterService).should(never()).pushMsgToCore(any(), any()); + } + + @Test + void shouldNotSendAttributesUpdateNotificationWhenNotifyDeviceIsFalse() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + List entries = List.of( + new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)), + new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test")) + ); + + var request = AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(deviceId) + .scope(AttributeScope.SHARED_SCOPE) + .entries(entries) + .notifyDevice(false) + .strategy(new AttributesSaveRequest.Strategy(true, false, false)) + .build(); + + given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size()))); + + // WHEN + telemetryService.saveAttributes(request); + + // THEN + then(clusterService).should(never()).pushMsgToCore(any(), any()); + } + + @Test + void shouldNotSendAttributesUpdateNotificationWhenAttributesSaveWasSkipped() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + List entries = List.of( + new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)), + new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test")) + ); + + var request = AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(deviceId) + .scope(AttributeScope.SHARED_SCOPE) + .entries(entries) + .notifyDevice(true) + .strategy(new AttributesSaveRequest.Strategy(false, false, false)) + .build(); + + // WHEN + telemetryService.saveAttributes(request); + + // THEN + then(clusterService).should(never()).pushMsgToCore(any(), any()); + } + + @Test + void shouldNotSendAttributesUpdateNotificationWhenAttributesSaveFailed() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + List entries = List.of( + new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)), + new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test")) + ); + + var request = AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(deviceId) + .scope(AttributeScope.SHARED_SCOPE) + .entries(entries) + .notifyDevice(true) + .strategy(new AttributesSaveRequest.Strategy(true, false, false)) + .build(); + + given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFailedFuture(new RuntimeException("failed to save"))); + + // WHEN + telemetryService.saveAttributes(request); + + // THEN + then(clusterService).should(never()).pushMsgToCore(any(), any()); + } + // used to emulate versions returned by save APIs private static List listOfNNumbers(int N) { return LongStream.range(0, N).boxed().toList(); diff --git a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java index 008158246e..0f1a56cb17 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java +++ b/common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java @@ -36,6 +36,9 @@ public class DonAsynchron { FutureCallback callback = new FutureCallback() { @Override public void onSuccess(T result) { + if (onSuccess == null) { + return; + } try { onSuccess.accept(result); } catch (Throwable th) { @@ -45,6 +48,9 @@ public class DonAsynchron { @Override public void onFailure(Throwable t) { + if (onFailure == null) { + return; + } onFailure.accept(t); } };