Browse Source

Save time series strategies: tests for inactivity timeout update notification

pull/12789/head
Dmytro Skarzhynets 1 year ago
parent
commit
b90522cc9d
No known key found for this signature in database GPG Key ID: 2B51652F224037DF
  1. 2
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  2. 93
      application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java
  3. 25
      application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java
  4. 158
      application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java

2
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -153,7 +153,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
saveFuture = Futures.immediateFuture(0);
}
if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest()) {
if (entityId.getEntityType() == EntityType.DEVICE && request.getStrategy().saveLatest() /* Device State Service reads from the latest values when initializing */) {
findNewInactivityTimeout(request.getEntries()).ifPresent(newInactivityTimeout ->
addMainCallback(saveFuture, __ -> deviceStateManager.onDeviceInactivityTimeoutUpdate(
tenantId, new DeviceId(entityId.getId()), newInactivityTimeout, TbCallback.EMPTY)

93
application/src/test/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerServiceTest.java

@ -532,6 +532,98 @@ public class DefaultTbCoreConsumerServiceTest {
then(statsMock).should(never()).log(inactivityMsg);
}
@Test
public void givenProcessingSuccess_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenOnSuccessCallbackIsCalled() {
// GIVEN
var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// THEN
then(stateServiceMock).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, time);
then(tbCallbackMock).should().onSuccess();
then(tbCallbackMock).should(never()).onFailure(any());
}
@Test
public void givenProcessingFailure_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenOnFailureCallbackIsCalled() {
// GIVEN
var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
var runtimeException = new RuntimeException("Something bad happened!");
doThrow(runtimeException).when(stateServiceMock).onDeviceInactivityTimeoutUpdate(tenantId, deviceId, time);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// THEN
then(tbCallbackMock).should(never()).onSuccess();
then(tbCallbackMock).should().onFailure(runtimeException);
}
@Test
public void givenStatsEnabled_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenStatsAreRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", true);
var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// THEN
then(statsMock).should().log(inactivityTimeoutUpdateMsg);
}
@Test
public void givenStatsDisabled_whenForwardingInactivityTimeoutUpdateMsgToStateService_thenStatsAreNotRecorded() {
// GIVEN
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "stats", statsMock);
ReflectionTestUtils.setField(defaultTbCoreConsumerServiceMock, "statsEnabled", false);
var inactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(tenantId.getId().getMostSignificantBits())
.setTenantIdLSB(tenantId.getId().getLeastSignificantBits())
.setDeviceIdMSB(deviceId.getId().getMostSignificantBits())
.setDeviceIdLSB(deviceId.getId().getLeastSignificantBits())
.setInactivityTimeout(time)
.build();
doCallRealMethod().when(defaultTbCoreConsumerServiceMock).forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// WHEN
defaultTbCoreConsumerServiceMock.forwardToStateService(inactivityTimeoutUpdateMsg, tbCallbackMock);
// THEN
then(statsMock).should(never()).log(inactivityTimeoutUpdateMsg);
}
@Test
public void givenRestApiCallResponseMsgProto_whenForwardToRuleEngineCallService_thenCallOnQueueMsg() {
// GIVEN
@ -545,4 +637,5 @@ public class DefaultTbCoreConsumerServiceTest {
// THEN
then(ruleEngineCallServiceMock).should().onQueueMsg(restApiCallResponseMsgProto, tbCallbackMock);
}
}

25
application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateManagerTest.java

@ -123,6 +123,10 @@ public class DefaultDeviceStateManagerTest {
Arguments.of(
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
}
@ -172,6 +176,11 @@ public class DefaultDeviceStateManagerTest {
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivity(TENANT_ID, DEVICE_ID, EVENT_TS)
),
Arguments.of(
(Consumer<DeviceStateService>) deviceStateServiceMock -> doThrow(RUNTIME_EXCEPTION).when(deviceStateServiceMock).onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS),
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(Consumer<DeviceStateService>) deviceStateServiceMock -> then(deviceStateServiceMock).should().onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS)
)
);
}
@ -265,6 +274,22 @@ public class DefaultDeviceStateManagerTest {
.build();
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture());
}
),
Arguments.of(
(BiConsumer<DefaultDeviceStateManager, TbCallback>) (deviceStateManager, tbCallbackMock) -> deviceStateManager.onDeviceInactivityTimeoutUpdate(TENANT_ID, DEVICE_ID, EVENT_TS, tbCallbackMock),
(BiConsumer<TbClusterService, ArgumentCaptor<TbQueueCallback>>) (clusterServiceMock, queueCallbackCaptor) -> {
var deviceInactivityTimeoutUpdateMsg = TransportProtos.DeviceInactivityTimeoutUpdateProto.newBuilder()
.setTenantIdMSB(TENANT_ID.getId().getMostSignificantBits())
.setTenantIdLSB(TENANT_ID.getId().getLeastSignificantBits())
.setDeviceIdMSB(DEVICE_ID.getId().getMostSignificantBits())
.setDeviceIdLSB(DEVICE_ID.getId().getLeastSignificantBits())
.setInactivityTimeout(EVENT_TS)
.build();
var toCoreMsg = TransportProtos.ToCoreMsg.newBuilder()
.setDeviceInactivityTimeoutUpdateMsg(deviceInactivityTimeoutUpdateMsg)
.build();
then(clusterServiceMock).should().pushMsgToCore(eq(EXTERNAL_TPI), any(UUID.class), eq(toCoreMsg), queueCallbackCaptor.capture());
}
)
);
}

158
application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java

@ -25,6 +25,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -35,15 +36,19 @@ import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.ApiUsageStateValue;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.ApiUsageStateId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.objects.AttributesEntityView;
import org.thingsboard.server.common.data.objects.TelemetryEntityView;
@ -72,11 +77,16 @@ import java.util.concurrent.ExecutorService;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class)
class DefaultTelemetrySubscriptionServiceTest {
@ -368,6 +378,154 @@ class DefaultTelemetrySubscriptionServiceTest {
);
}
@Test
void shouldThrowErrorWhenTryingToSaveTimeseriesForApiUsageState() {
// GIVEN
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(new ApiUsageStateId(UUID.randomUUID()))
.entries(sampleTelemetry)
.strategy(TimeseriesSaveRequest.Strategy.SAVE_ALL)
.callback(emptyCallback)
.build();
// WHEN
assertThatThrownBy(() -> telemetryService.saveTimeseries(request))
.isInstanceOf(RuntimeException.class)
.hasMessage("Can't update API Usage State!");
// THEN
then(tsService).shouldHaveNoInteractions();
then(deviceStateManager).shouldHaveNoInteractions();
}
@Test
void shouldNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasSavedToLatest() {
// GIVEN
var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L));
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(deviceId)
.entry(inactivityTimeout)
.strategy(new TimeseriesSaveRequest.Strategy(false, true, false))
.callback(emptyCallback)
.build();
given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(listOfNNumbers(1)));
// WHEN
telemetryService.saveTimeseries(request);
// THEN
then(deviceStateManager).should().onDeviceInactivityTimeoutUpdate(tenantId, deviceId, 5000L, TbCallback.EMPTY);
}
@ParameterizedTest
@EnumSource(
value = EntityType.class,
names = {"DEVICE", "API_USAGE_STATE"}, // API usage state excluded due to coverage in another test
mode = EnumSource.Mode.EXCLUDE
)
void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasUpdatedButEntityTypeIsNotDevice(EntityType entityType) {
// GIVEN
var nonDeviceId = EntityIdFactory.getByTypeAndUuid(entityType, "cc51e450-53e1-11ee-883e-e56b48fd2088");
var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L));
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(nonDeviceId)
.entry(inactivityTimeout)
.strategy(new TimeseriesSaveRequest.Strategy(false, true, false))
.callback(emptyCallback)
.build();
given(tsService.saveLatest(tenantId, nonDeviceId, List.of(inactivityTimeout))).willReturn(immediateFuture(listOfNNumbers(1)));
lenient().when(tbEntityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, nonDeviceId)).thenReturn(immediateFuture(Collections.emptyList()));
// WHEN
telemetryService.saveTimeseries(request);
// THEN
then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any());
}
@Test
void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesWasNotSavedToLatest() {
// GIVEN
var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L));
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(deviceId)
.entry(inactivityTimeout)
.strategy(new TimeseriesSaveRequest.Strategy(true, false, true))
.callback(emptyCallback)
.build();
given(tsService.saveWithoutLatest(tenantId, deviceId, List.of(inactivityTimeout), 0L)).willReturn(immediateFuture(1));
// WHEN
telemetryService.saveTimeseries(request);
// THEN
then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any());
}
@Test
void shouldNotNotifyDeviceStateManagerWhenInactivityTimeoutTimeseriesWasNotUpdated() {
// GIVEN
var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
var notInactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("notInactivityTimeout", 5000L));
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(deviceId)
.entry(notInactivityTimeout)
.strategy(new TimeseriesSaveRequest.Strategy(false, true, false))
.callback(emptyCallback)
.build();
given(tsService.saveLatest(tenantId, deviceId, List.of(notInactivityTimeout))).willReturn(immediateFuture(listOfNNumbers(1)));
// WHEN
telemetryService.saveTimeseries(request);
// THEN
then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any());
}
@Test
void shouldNotNotifyDeviceStateManagerWhenDeviceInactivityTimeoutTimeseriesSaveFailed() {
// GIVEN
var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
var inactivityTimeout = new BasicTsKvEntry(123L, new LongDataEntry("inactivityTimeout", 5000L));
var request = TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(customerId)
.entityId(deviceId)
.entry(inactivityTimeout)
.strategy(new TimeseriesSaveRequest.Strategy(false, true, false))
.callback(emptyCallback)
.build();
given(tsService.saveLatest(tenantId, deviceId, List.of(inactivityTimeout))).willReturn(immediateFailedFuture(new RuntimeException("failed to save")));
// WHEN
telemetryService.saveTimeseries(request);
// THEN
then(deviceStateManager).should(never()).onDeviceInactivityTimeoutUpdate(any(), any(), anyLong(), any());
}
// used to emulate sequence numbers returned by save latest API
private static List<Long> listOfNNumbers(int N) {
return LongStream.range(0, N).boxed().toList();

Loading…
Cancel
Save