Browse Source

Save attributes strategies: ensure shared attributes subscription update is sent even if WS updates are disabled

pull/12764/head
Dmytro Skarzhynets 1 year ago
parent
commit
da4e231ea3
No known key found for this signature in database GPG Key ID: 2B51652F224037DF
  1. 4
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java
  2. 17
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  3. 205
      application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java
  4. 6
      common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java

4
application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java

@ -217,10 +217,6 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
if (entityId.getEntityType() == EntityType.DEVICE) {
if (TbAttributeSubscriptionScope.SERVER_SCOPE.name().equalsIgnoreCase(scope)) {
updateDeviceInactivityTimeout(tenantId, entityId, attributes);
} else if (TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(scope) && notifyDevice) {
clusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onUpdate(tenantId,
new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, new ArrayList<>(attributes))
, null);
}
}
callback.onSuccess();

17
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -35,9 +35,11 @@ import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
@ -45,6 +47,7 @@ import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -52,6 +55,7 @@ import org.thingsboard.server.dao.util.KvUtils;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
import org.thingsboard.server.service.cf.CalculatedFieldQueueService;
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope;
import org.thingsboard.server.service.subscription.TbSubscriptionUtils;
import java.util.ArrayList;
@ -197,6 +201,15 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}, t -> request.getCallback().onFailure(t));
if (strategy.saveAttributes()
&& entityId.getEntityType() == EntityType.DEVICE
&& TbAttributeSubscriptionScope.SHARED_SCOPE.name().equalsIgnoreCase(request.getScope().name())
&& request.isNotifyDevice()) {
addMainCallback(resultFuture, success -> clusterService.pushMsgToCore(
DeviceAttributesEventNotificationMsg.onUpdate(tenantId, new DeviceId(entityId.getId()), DataConstants.SHARED_SCOPE, request.getEntries()), null
));
}
if (strategy.sendWsUpdate()) {
addWsCallback(resultFuture, success -> onAttributesUpdate(tenantId, entityId, request.getScope().name(), request.getEntries(), request.isNotifyDevice()));
}
@ -342,6 +355,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addMainCallback(saveFuture, result -> callback.onSuccess(null), callback::onFailure);
}
private <S> void addMainCallback(ListenableFuture<S> saveFuture, Consumer<S> onSuccess) {
DonAsynchron.withCallback(saveFuture, onSuccess, null, tsCallBackExecutor);
}
private <S> void addMainCallback(ListenableFuture<S> saveFuture, Consumer<S> onSuccess, Consumer<Throwable> onFailure) {
DonAsynchron.withCallback(saveFuture, onSuccess, onFailure, tsCallBackExecutor);
}

205
application/src/test/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionServiceTest.java

@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@ -35,15 +36,21 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.ApiUsageState;
import org.thingsboard.server.common.data.ApiUsageStateValue;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.ApiUsageStateId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.EntityViewId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.objects.AttributesEntityView;
@ -51,6 +58,7 @@ import org.thingsboard.server.common.data.objects.TelemetryEntityView;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rule.engine.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.common.stats.TbApiUsageReportClient;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -74,13 +82,17 @@ import java.util.concurrent.ExecutorService;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.then;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.never;
@ExtendWith(MockitoExtension.class)
class DefaultTelemetrySubscriptionServiceTest {
@ -428,6 +440,199 @@ class DefaultTelemetrySubscriptionServiceTest {
);
}
@Test
void shouldThrowErrorWhenTryingToSaveAttributesForApiUsageState() {
// GIVEN
var request = AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(new ApiUsageStateId(UUID.randomUUID()))
.scope(AttributeScope.SHARED_SCOPE)
.entry(new DoubleDataEntry("temperature", 65.2))
.notifyDevice(true)
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
// WHEN
assertThatThrownBy(() -> telemetryService.saveAttributes(request))
.isInstanceOf(RuntimeException.class)
.hasMessage("Can't update API Usage State!");
// THEN
then(attrService).shouldHaveNoInteractions();
}
@Test
void shouldSendAttributesUpdateNotificationWhenDeviceSharedAttributesAreSavedAndNotifyDeviceIsTrue() {
// GIVEN
var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
List<AttributeKvEntry> entries = List.of(
new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)),
new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test"))
);
var request = AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.scope(AttributeScope.SHARED_SCOPE)
.entries(entries)
.notifyDevice(true)
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size())));
// WHEN
telemetryService.saveAttributes(request);
// THEN
var expectedAttributesUpdateMsg = DeviceAttributesEventNotificationMsg.onUpdate(tenantId, deviceId, "SHARED_SCOPE", entries);
then(clusterService).should().pushMsgToCore(eq(expectedAttributesUpdateMsg), isNull());
}
@ParameterizedTest
@EnumSource(
value = EntityType.class,
names = {"DEVICE", "API_USAGE_STATE"}, // API usage state excluded due to coverage in another test
mode = EnumSource.Mode.EXCLUDE
)
void shouldNotSendAttributesUpdateNotificationWhenEntityIsNotDevice(EntityType entityType) {
// GIVEN
var nonDeviceId = EntityIdFactory.getByTypeAndUuid(entityType, "cc51e450-53e1-11ee-883e-e56b48fd2088");
List<AttributeKvEntry> entries = List.of(
new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)),
new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test"))
);
var request = AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(nonDeviceId)
.scope(AttributeScope.SHARED_SCOPE)
.entries(entries)
.notifyDevice(true)
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, nonDeviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size())));
// WHEN
telemetryService.saveAttributes(request);
// THEN
then(clusterService).should(never()).pushMsgToCore(any(), any());
}
@ParameterizedTest
@EnumSource(
value = AttributeScope.class,
names = "SHARED_SCOPE",
mode = EnumSource.Mode.EXCLUDE
)
void shouldNotSendAttributesUpdateNotificationWhenAttributesAreNotShared(AttributeScope notSharedScope) {
// GIVEN
var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
List<AttributeKvEntry> entries = List.of(
new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)),
new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test"))
);
var request = AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.scope(notSharedScope)
.entries(entries)
.notifyDevice(true)
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size())));
// WHEN
telemetryService.saveAttributes(request);
// THEN
then(clusterService).should(never()).pushMsgToCore(any(), any());
}
@Test
void shouldNotSendAttributesUpdateNotificationWhenNotifyDeviceIsFalse() {
// GIVEN
var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
List<AttributeKvEntry> entries = List.of(
new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)),
new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test"))
);
var request = AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.scope(AttributeScope.SHARED_SCOPE)
.entries(entries)
.notifyDevice(false)
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFuture(listOfNNumbers(entries.size())));
// WHEN
telemetryService.saveAttributes(request);
// THEN
then(clusterService).should(never()).pushMsgToCore(any(), any());
}
@Test
void shouldNotSendAttributesUpdateNotificationWhenAttributesSaveWasSkipped() {
// GIVEN
var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
List<AttributeKvEntry> entries = List.of(
new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)),
new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test"))
);
var request = AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.scope(AttributeScope.SHARED_SCOPE)
.entries(entries)
.notifyDevice(true)
.strategy(new AttributesSaveRequest.Strategy(false, false, false))
.build();
// WHEN
telemetryService.saveAttributes(request);
// THEN
then(clusterService).should(never()).pushMsgToCore(any(), any());
}
@Test
void shouldNotSendAttributesUpdateNotificationWhenAttributesSaveFailed() {
// GIVEN
var deviceId = DeviceId.fromString("cc51e450-53e1-11ee-883e-e56b48fd2088");
List<AttributeKvEntry> entries = List.of(
new BaseAttributeKvEntry(123L, new DoubleDataEntry("shared1", 65.2)),
new BaseAttributeKvEntry(456L, new StringDataEntry("shared2", "test"))
);
var request = AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.scope(AttributeScope.SHARED_SCOPE)
.entries(entries)
.notifyDevice(true)
.strategy(new AttributesSaveRequest.Strategy(true, false, false))
.build();
given(attrService.save(tenantId, deviceId, request.getScope(), entries)).willReturn(immediateFailedFuture(new RuntimeException("failed to save")));
// WHEN
telemetryService.saveAttributes(request);
// THEN
then(clusterService).should(never()).pushMsgToCore(any(), any());
}
// used to emulate versions returned by save APIs
private static List<Long> listOfNNumbers(int N) {
return LongStream.range(0, N).boxed().toList();

6
common/util/src/main/java/org/thingsboard/common/util/DonAsynchron.java

@ -36,6 +36,9 @@ public class DonAsynchron {
FutureCallback<T> callback = new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
if (onSuccess == null) {
return;
}
try {
onSuccess.accept(result);
} catch (Throwable th) {
@ -45,6 +48,9 @@ public class DonAsynchron {
@Override
public void onFailure(Throwable t) {
if (onFailure == null) {
return;
}
onFailure.accept(t);
}
};

Loading…
Cancel
Save