From b90522cc9d61837330fb9d76edb3edf382eaafeb Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Fri, 28 Feb 2025 15:56:21 +0200 Subject: [PATCH] Save time series strategies: tests for inactivity timeout update notification --- .../DefaultTelemetrySubscriptionService.java | 2 +- .../DefaultTbCoreConsumerServiceTest.java | 93 +++++++++++ .../state/DefaultDeviceStateManagerTest.java | 25 +++ ...faultTelemetrySubscriptionServiceTest.java | 158 ++++++++++++++++++ 4 files changed, 277 insertions(+), 1 deletion(-) diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 54e87348bb..b643413bfa 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -153,7 +153,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer saveFuture = Futures.immediateFuture(0); } - if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest()) { + if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest() /* Device State Service reads from the latest values when initializing */) { findNewInactivityTimeout(request.getEntries()).ifPresent(newInactivityTimeout -> addMainCallback(saveFuture, __ -> deviceStateManager.onDeviceInactivityTimeoutUpdate( tenantId, new DeviceId(entityId.getId()), newInactivityTimeout, TbCallback.EMPTY) diff --git a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java index fe222237c6..26832f6176 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java @@ -532,6 +532,98 @@ public class DefaultTbCoreConsumerServiceTest { then(statsMock).should(never()).log(inactivityMsg); } + @Test + public void givenProcessingSuccess_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenOnSuccessCallbackIsCalled() { + // GIVEN + var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setInactivityTimeout(time) + .build(); + + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // THEN + then(stateServiceMock).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, time); + then(tbCallbackMock).should().onSuccess(); + then(tbCallbackMock).should(never()).onFailure(any()); + } + + @Test + public void givenProcessingFailure_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenOnFailureCallbackIsCalled() { + // GIVEN + var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setInactivityTimeout(time) + .build(); + + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + var runtimeException = new RuntimeException("Something bad happened!"); + doThrow(runtimeException).when(stateServiceMock).onDeviceInactivityTimeoutUpdate(tenantId, deviceId, time); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // THEN + then(tbCallbackMock).should(never()).onSuccess(); + then(tbCallbackMock).should().onFailure(runtimeException); + } + + @Test + public void givenStatsEnabled_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenStatsAreRecorded() { + // GIVEN + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true); + + var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setInactivityTimeout(time) + .build(); + + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // THEN + then(statsMock).should().log(inactivityTimeoutUpdateMsg); + } + + @Test + public void givenStatsDisabled_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenStatsAreNotRecorded() { + // GIVEN + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock); + ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false); + + var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(tenantId.getId().getMostSignificantBits()) + .setTenantIdLSB(tenantId.getId().getLeastSignificantBits()) + .setDeviceIdMSB(deviceId.getId().getMostSignificantBits()) + .setDeviceIdLSB(deviceId.getId().getLeastSignificantBits()) + .setInactivityTimeout(time) + .build(); + + doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // WHEN + defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock); + + // THEN + then(statsMock).should(never()).log(inactivityTimeoutUpdateMsg); + } + @Test public void givenRestApiCallResponseMsgProto_whenForwardToRuleEngineCallService_thenCallOnQueueMsg() { // GIVEN @@ -545,4 +637,5 @@ public class DefaultTbCoreConsumerServiceTest { // THEN then(ruleEngineCallServiceMock).should().onQueueMsg(restApiCallResponseMsgProto, tbCallbackMock); } + } diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java index 8d2a747d2d..6cf0ab9053 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java @@ -123,6 +123,10 @@ public class DefaultDeviceStateManagerTest { Arguments.of( (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS) + ), + Arguments.of( + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS) ) ); } @@ -172,6 +176,11 @@ public class DefaultDeviceStateManagerTest { (Consumer) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS), (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS) + ), + Arguments.of( + (Consumer) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS), + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (Consumer) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS) ) ); } @@ -265,6 +274,22 @@ public class DefaultDeviceStateManagerTest { .build(); then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture()); } + ), + Arguments.of( + (BiConsumer) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock), + (BiConsumer>) (clusterServiceMock, queueCallbackCaptor) -> { + var deviceInactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder() + .setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits()) + .setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits()) + .setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits()) + .setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits()) + .setInactivityTimeout(EVENT_TS) + .build(); + var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder() + .setDeviceInactivityTimeoutUpdateMsg(deviceInactivityTimeoutUpdateMsg) + .build(); + then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture()); + } ) ); } diff --git a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java index c04b92e6e4..5537cfbb4b 100644 --- a/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -35,15 +36,19 @@ import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.ApiUsageState; import org.thingsboard.server.common.data.ApiUsageStateValue; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; +import org.thingsboard.server.common.data.id.ApiUsageStateId; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.objects.AttributesEntityView; import org.thingsboard.server.common.data.objects.TelemetryEntityView; @@ -72,11 +77,16 @@ import java.util.concurrent.ExecutorService; import java.util.stream.LongStream; import java.util.stream.Stream; +import static com.google.common.util.concurrent.Futures.immediateFailedFuture; import static com.google.common.util.concurrent.Futures.immediateFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.never; @ExtendWith(MockitoExtension.class) class DefaultTelemetrySubscriptionServiceTest { @@ -368,6 +378,154 @@ class DefaultTelemetrySubscriptionServiceTest { ); } + @Test + void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() { + // GIVEN + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(new ApiUsageStateId(UUID.randomUUID())) + .entries(sampleTelemetry) + .strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL) + .callback(emptyCallback) + .build(); + + // WHEN + assertThatThrownBy(() -> telemetryService.saveTimeseries(request)) + .isInstanceOf(RuntimeException.class) + .hasMessage("Can't update API Usage State!"); + + // THEN + then(tsService).shouldHaveNoInteractions(); + then(deviceStateManager).shouldHaveNoInteractions(); + } + + @Test + void shouldNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasSavedToLatest() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(deviceId) + .entry(inactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) + .callback(emptyCallback) + .build(); + + given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(listOfNNumbers(1))); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, 5000L, TbCallback.EMPTY); + } + + @ParameterizedTest + @EnumSource( + value = EntityType.class, + names = {"DEVICE", "API_USAGE_STATE"}, // API usage state excluded due to coverage in another test + mode = EnumSource.Mode.EXCLUDE + ) + void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasUpdatedButEntityTypeIsNotDevice(EntityType entityType) { + // GIVEN + var nonDeviceId = EntityIdFactory.getByTypeAndUuid(entityType, "cc51e450-53e1-11ee-883e-e56b48fd2088"); + var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(nonDeviceId) + .entry(inactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) + .callback(emptyCallback) + .build(); + + given(tsService.saveLatest(tenantId, nonDeviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(listOfNNumbers(1))); + lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, nonDeviceId)).thenReturn(immediateFuture(Collections.emptyList())); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); + } + + @Test + void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasNotSavedToLatest() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(deviceId) + .entry(inactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(true, false, true)) + .callback(emptyCallback) + .build(); + + given(tsService.saveWithoutLatest(tenantId, deviceId, List.of(inactivityTimeout), 0L)).willReturn(immediateFuture(1)); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); + } + + @Test + void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasNotUpdated() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + var notInactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("notInactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(deviceId) + .entry(notInactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) + .callback(emptyCallback) + .build(); + + given(tsService.saveLatest(tenantId, deviceId, List.of(notInactivityTimeout))).willReturn(immediateFuture(listOfNNumbers(1))); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); + } + + @Test + void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesSaveFailed() { + // GIVEN + var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088"); + var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L)); + + var request = TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .customerId(customerId) + .entityId(deviceId) + .entry(inactivityTimeout) + .strategy(new TimeseriesSaveRequest.Strategy(false, true, false)) + .callback(emptyCallback) + .build(); + + given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFailedFuture(new RuntimeException("failed to save"))); + + // WHEN + telemetryService.saveTimeseries(request); + + // THEN + then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any()); + } + // used to emulate sequence numbers returned by save latest API private static List listOfNNumbers(int N) { return LongStream.range(0, N).boxed().toList();