From eb7bc8695ba00451db345288f1d5ccc2019ec908 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Tue, 17 Dec 2024 17:27:41 +0200 Subject: [PATCH] Refactor deleteAndNotify and deleteAndNotifyInternal for attributes --- .../controller/TelemetryController.java | 45 ++++--- .../DefaultTbApiUsageStateService.java | 8 +- .../device/ClaimDevicesServiceImpl.java | 22 ++-- .../service/edge/rpc/EdgeGrpcService.java | 8 +- .../telemetry/BaseTelemetryProcessor.java | 2 +- .../DefaultTbEntityViewService.java | 77 +++++++----- .../ota/DefaultOtaPackageStateService.java | 18 ++- .../state/DefaultDeviceStateService.java | 4 +- .../DefaultRuleEngineStatisticsService.java | 4 +- .../csv/AbstractBulkImportService.java | 4 +- .../impl/BaseEntityImportService.java | 2 +- .../system/DefaultSystemInfoService.java | 2 +- .../DefaultTelemetrySubscriptionService.java | 114 +++++++---------- .../telemetry/InternalTelemetryService.java | 14 +-- .../server/controller/WebsocketApiTest.java | 4 +- .../state/DefaultDeviceStateServiceTest.java | 39 +++--- .../dao/attributes/AttributesService.java | 6 - .../dao/attributes/BaseAttributesService.java | 14 --- .../attributes/CachedAttributesService.java | 10 -- .../engine/api/AttributesDeleteRequest.java | 117 ++++++++++++++++++ .../api/RuleEngineTelemetryService.java | 12 +- .../engine/api/TimeseriesSaveRequest.java | 10 +- .../TbCopyAttributesToEntityViewNode.java | 12 +- .../rule/engine/math/TbMathNode.java | 4 +- .../engine/telemetry/TbMsgAttributesNode.java | 2 +- .../telemetry/TbMsgDeleteAttributesNode.java | 19 +-- .../engine/telemetry/TbMsgTimeseriesNode.java | 2 +- .../TbCopyAttributesToEntityViewNodeTest.java | 30 ++--- .../rule/engine/math/TbMathNodeTest.java | 12 +- .../telemetry/TbMsgAttributesNodeTest.java | 4 +- .../TbMsgDeleteAttributesNodeTest.java | 16 +-- .../telemetry/TbMsgTimeseriesNodeTest.java | 10 +- 32 files changed, 364 insertions(+), 283 deletions(-) create mode 100644 rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index b6d854399c..a518eaa1af 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -47,6 +47,7 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.adaptor.JsonConverter; @@ -589,24 +590,30 @@ public class TelemetryController extends BaseController { SecurityUser user = getCurrentUser(); return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_ATTRIBUTES, entityIdSrc, (result, tenantId, entityId) -> { - tsSubService.deleteAndNotify(tenantId, entityId, scope, keys, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void tmp) { - logAttributesDeleted(user, entityId, scope, keys, null); - if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) { - DeviceId deviceId = new DeviceId(entityId.getId()); - tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( - user.getTenantId(), deviceId, scope.name(), keys), null); - } - result.setResult(new ResponseEntity<>(HttpStatus.OK)); - } + tsSubService.deleteAttributes(AttributesDeleteRequest.builder() + .tenantId(tenantId) + .entityId(entityId) + .scope(scope) + .keys(keys) + .callback(new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void tmp) { + logAttributesDeleted(user, entityId, scope, keys, null); + if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) { + DeviceId deviceId = new DeviceId(entityId.getId()); + tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete( + user.getTenantId(), deviceId, scope.name(), keys), null); + } + result.setResult(new ResponseEntity<>(HttpStatus.OK)); + } - @Override - public void onFailure(Throwable t) { - logAttributesDeleted(user, entityId, scope, keys, t); - result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); - } - }); + @Override + public void onFailure(Throwable t) { + logAttributesDeleted(user, entityId, scope, keys, t); + result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); + } + }) + .build()); }); } @@ -626,7 +633,7 @@ public class TelemetryController extends BaseController { } SecurityUser user = getCurrentUser(); return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_ATTRIBUTES, entityIdSrc, (result, tenantId, entityId) -> { - tsSubService.save(AttributesSaveRequest.builder() + tsSubService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(entityId) .scope(scope) @@ -680,7 +687,7 @@ public class TelemetryController extends BaseController { TenantProfile tenantProfile = tenantProfileCache.get(tenantId); tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays()); } - tsSubService.save(TimeseriesSaveRequest.builder() + tsSubService.saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(tenantId) .customerId(user.getCustomerId()) .entityId(entityId) diff --git a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java index d9c5077ab7..3a351528e4 100644 --- a/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java @@ -215,7 +215,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService updateLock.unlock(); } log.trace("[{}][{}] Saving new stats: {}", tenantId, ownerId, updatedEntries); - tsWsService.saveInternal(TimeseriesSaveRequest.builder() + tsWsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(usageState.getApiUsageState().getId()) .entries(updatedEntries) @@ -327,7 +327,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService } } if (!profileThresholds.isEmpty()) { - tsWsService.saveInternal(TimeseriesSaveRequest.builder() + tsWsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(id) .entries(profileThresholds) @@ -359,7 +359,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService long ts = System.currentTimeMillis(); List stateTelemetry = new ArrayList<>(); result.forEach((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name())))); - tsWsService.saveInternal(TimeseriesSaveRequest.builder() + tsWsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() .tenantId(state.getTenantId()) .entityId(state.getApiUsageState().getId()) .entries(stateTelemetry) @@ -452,7 +452,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService .map(key -> new BasicTsKvEntry(state.getCurrentCycleTs(), new LongDataEntry(key.getApiCountKey(), 0L))) .collect(Collectors.toList()); - tsWsService.saveInternal(TimeseriesSaveRequest.builder() + tsWsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() .tenantId(state.getTenantId()) .entityId(state.getApiUsageState().getId()) .entries(counts) diff --git a/application/src/main/java/org/thingsboard/server/service/device/ClaimDevicesServiceImpl.java b/application/src/main/java/org/thingsboard/server/service/device/ClaimDevicesServiceImpl.java index cd3e096ad8..f82f4c74d5 100644 --- a/application/src/main/java/org/thingsboard/server/service/device/ClaimDevicesServiceImpl.java +++ b/application/src/main/java/org/thingsboard/server/service/device/ClaimDevicesServiceImpl.java @@ -28,6 +28,7 @@ import org.springframework.cache.Cache; import org.springframework.cache.CacheManager; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.server.common.data.AttributeScope; @@ -178,7 +179,7 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService { return Futures.immediateFuture(new ReclaimResult(unassignedCustomer)); } SettableFuture result = SettableFuture.create(); - telemetryService.save(AttributesSaveRequest.builder() + telemetryService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(savedDevice.getId()) .scope(AttributeScope.SERVER_SCOPE) @@ -223,18 +224,13 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService { cache.evict(data.getKey()); } SettableFuture result = SettableFuture.create(); - telemetryService.deleteAndNotify(device.getTenantId(), - device.getId(), AttributeScope.SERVER_SCOPE, Arrays.asList(CLAIM_ATTRIBUTE_NAME, CLAIM_DATA_ATTRIBUTE_NAME), new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void tmp) { - result.set(tmp); - } - - @Override - public void onFailure(Throwable t) { - result.setException(t); - } - }); + telemetryService.deleteAttributes(AttributesDeleteRequest.builder() + .tenantId(device.getTenantId()) + .entityId(device.getId()) + .scope(AttributeScope.SERVER_SCOPE) + .keys(Arrays.asList(CLAIM_ATTRIBUTE_NAME, CLAIM_DATA_ATTRIBUTE_NAME)) + .future(result) + .build()); return result; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 7551302d33..75cebfa394 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -503,14 +503,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void save(TenantId tenantId, EdgeId edgeId, String key, long value) { log.debug("[{}][{}] Updating long edge telemetry [{}] [{}]", tenantId, edgeId, key, value); if (persistToTelemetry) { - tsSubService.save(TimeseriesSaveRequest.builder() + tsSubService.saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) .entry(new LongDataEntry(key, value)) .callback(new AttributeSaveCallback(tenantId, edgeId, key, value)) .build()); } else { - tsSubService.save(AttributesSaveRequest.builder() + tsSubService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) .scope(AttributeScope.SERVER_SCOPE) @@ -523,14 +523,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private void save(TenantId tenantId, EdgeId edgeId, String key, boolean value) { log.debug("[{}][{}] Updating boolean edge telemetry [{}] [{}]", tenantId, edgeId, key, value); if (persistToTelemetry) { - tsSubService.save(TimeseriesSaveRequest.builder() + tsSubService.saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) .entry(new BooleanDataEntry(key, value)) .callback(new AttributeSaveCallback(tenantId, edgeId, key, value)) .build()); } else { - tsSubService.save(AttributesSaveRequest.builder() + tsSubService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(edgeId) .scope(AttributeScope.SERVER_SCOPE) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java index 9ef40280c9..bc3f660cbf 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java @@ -278,7 +278,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor { JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); List attributes = new ArrayList<>(JsonConverter.convertToAttributes(json)); String scope = metaData.getValue("scope"); - tsSubService.save(AttributesSaveRequest.builder() + tsSubService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(entityId) .scope(AttributeScope.valueOf(scope)) diff --git a/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java b/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java index 790b1086b4..7dffb43cea 100644 --- a/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java +++ b/application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java @@ -25,7 +25,9 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.ConcurrentReferenceHashMap; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.EntityType; @@ -284,7 +286,7 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen (startTime == 0 && endTime > lastUpdateTs) || (startTime < lastUpdateTs && endTime > lastUpdateTs); }).collect(Collectors.toList()); - tsSubService.save(AttributesSaveRequest.builder() + tsSubService.saveAttributes(AttributesSaveRequest.builder() .tenantId(entityView.getTenantId()) .entityId(entityId) .scope(scope) @@ -340,15 +342,22 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen }, MoreExecutors.directExecutor()); return Futures.transform(latestFuture, latestValues -> { if (latestValues != null && !latestValues.isEmpty()) { - tsSubService.saveLatestAndNotify(entityView.getTenantId(), entityId, latestValues, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void tmp) { - } - - @Override - public void onFailure(Throwable t) { - } - }); + tsSubService.saveTimeseries(TimeseriesSaveRequest.builder() + .tenantId(entityView.getTenantId()) + .entityId(entityId) + .entries(latestValues) + .onlyLatest(true) + .callback(new FutureCallback() { + @Override + public void onSuccess(@Nullable Void tmp) { + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), latestValues, t); + } + }) + .build()); } return null; }, MoreExecutors.directExecutor()); @@ -358,27 +367,33 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen EntityViewId entityId = entityView.getId(); SettableFuture resultFuture = SettableFuture.create(); if (keys != null && !keys.isEmpty()) { - tsSubService.deleteAndNotify(entityView.getTenantId(), entityId, scope, keys, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void tmp) { - try { - logAttributesDeleted(entityView.getTenantId(), user, entityId, scope, keys, null); - } catch (ThingsboardException e) { - log.error("Failed to log attribute delete", e); - } - resultFuture.set(tmp); - } - - @Override - public void onFailure(Throwable t) { - try { - logAttributesDeleted(entityView.getTenantId(), user, entityId, scope, keys, t); - } catch (ThingsboardException e) { - log.error("Failed to log attribute delete", e); - } - resultFuture.setException(t); - } - }); + tsSubService.deleteAttributes(AttributesDeleteRequest.builder() + .tenantId(entityView.getTenantId()) + .entityId(entityId) + .scope(scope) + .keys(keys) + .callback(new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void tmp) { + try { + logAttributesDeleted(entityView.getTenantId(), user, entityId, scope, keys, null); + } catch (ThingsboardException e) { + log.error("Failed to log attribute delete", e); + } + resultFuture.set(tmp); + } + + @Override + public void onFailure(Throwable t) { + try { + logAttributesDeleted(entityView.getTenantId(), user, entityId, scope, keys, t); + } catch (ThingsboardException e) { + log.error("Failed to log attribute delete", e); + } + resultFuture.setException(t); + } + }) + .build()); } else { resultFuture.set(null); } diff --git a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java index 3db4e53379..3aedad3ddb 100644 --- a/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java @@ -20,6 +20,7 @@ import jakarta.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; @@ -262,7 +263,7 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService { telemetry.add(new BasicTsKvEntry(ts, new LongDataEntry(getTargetTelemetryKey(firmware.getType(), TS), ts))); telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(getTelemetryKey(firmware.getType(), STATE), OtaPackageUpdateStatus.QUEUED.name()))); - telemetryService.save(TimeseriesSaveRequest.builder() + telemetryService.saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(deviceId) .entries(telemetry) @@ -288,7 +289,7 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService { BasicTsKvEntry status = new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(getTelemetryKey(otaPackageType, STATE), OtaPackageUpdateStatus.INITIATED.name())); - telemetryService.save(TimeseriesSaveRequest.builder() + telemetryService.saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(deviceId) .entry(status) @@ -347,7 +348,7 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService { remove(device, otaPackageType, attrToRemove); - telemetryService.save(AttributesSaveRequest.builder() + telemetryService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(deviceId) .scope(AttributeScope.SHARED_SCOPE) @@ -371,8 +372,12 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService { } private void remove(Device device, OtaPackageType otaPackageType, List attributesKeys) { - telemetryService.deleteAndNotify(device.getTenantId(), device.getId(), AttributeScope.SHARED_SCOPE, attributesKeys, - new FutureCallback<>() { + telemetryService.deleteAttributes(AttributesDeleteRequest.builder() + .tenantId(device.getTenantId()) + .entityId(device.getId()) + .scope(AttributeScope.SHARED_SCOPE) + .keys(attributesKeys) + .callback(new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) { log.trace("[{}] Success remove target {} attributes!", device.getId(), otaPackageType); @@ -383,7 +388,8 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService { public void onFailure(Throwable t) { log.error("[{}] Failed to remove target {} attributes!", device.getId(), otaPackageType, t); } - }); + }) + .build()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java index 8280cf44b8..da62e1bd01 100644 --- a/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java +++ b/application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java @@ -877,7 +877,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService(deviceId, kvEntry)) .build()); } else { - tsSubService.save(AttributesSaveRequest.builder() + tsSubService.saveAttributes(AttributesSaveRequest.builder() .tenantId(TenantId.SYS_TENANT_ID) .entityId(deviceId) .scope(AttributeScope.SERVER_SCOPE) diff --git a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java index 2a214388a1..91766c0f5d 100644 --- a/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java +++ b/application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java @@ -89,7 +89,7 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS if (!tsList.isEmpty()) { long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getQueueStatsTtlDays); ttl = TimeUnit.DAYS.toSeconds(ttl); - tsService.saveInternal(TimeseriesSaveRequest.builder() + tsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(queueStatsId) .entries(tsList) @@ -109,7 +109,7 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS TsKvEntry tsKv = new BasicTsKvEntry(e.getTs(), new JsonDataEntry(RULE_ENGINE_EXCEPTION, e.toJsonString(maxErrorMessageLength))); long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getRuleEngineExceptionsTtlDays); ttl = TimeUnit.DAYS.toSeconds(ttl); - tsService.saveInternal(TimeseriesSaveRequest.builder() + tsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() .tenantId(tenantId) .entityId(getQueueStatsId(tenantId, queueName)) .entry(tsKv) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java index f3e8266ce6..3edb7c9be0 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java @@ -208,7 +208,7 @@ public abstract class AbstractBulkImportService { TenantProfile tenantProfile = tenantProfileCache.get(tenantId); long tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays()); - tsSubscriptionService.save(TimeseriesSaveRequest.builder() + tsSubscriptionService.saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(tenantId) .customerId(user.getCustomerId()) .entityId(entityId) @@ -238,7 +238,7 @@ public abstract class AbstractBulkImportService attributes = new ArrayList<>(JsonConverter.convertToAttributes(kvsEntry.getValue())); accessValidator.validateEntityAndCallback(user, Operation.WRITE_ATTRIBUTES, entity.getId(), (result, tenantId, entityId) -> { - tsSubscriptionService.save(AttributesSaveRequest.builder() + tsSubscriptionService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(entityId) .scope(AttributeScope.valueOf(scope)) diff --git a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java index 199f54cd12..0d9c67823a 100644 --- a/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java +++ b/application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java @@ -258,7 +258,7 @@ public abstract class BaseEntityImportService telemetry) { ApiUsageState apiUsageState = apiUsageStateClient.getApiUsageState(TenantId.SYS_TENANT_ID); - telemetryService.saveInternal(TimeseriesSaveRequest.builder() + telemetryService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() .tenantId(TenantId.SYS_TENANT_ID) .entityId(apiUsageState.getId()) .entries(telemetry) diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 793aabc6cc..ad7f963894 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -27,11 +27,11 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.ApiUsageRecordKey; -import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; import org.thingsboard.server.common.data.id.CustomerId; @@ -111,27 +111,31 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } @Override - public void save(TimeseriesSaveRequest request) { + public void saveTimeseries(TimeseriesSaveRequest request) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); checkInternalEntity(entityId); boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null; - if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { + if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) { KvUtils.validate(request.getEntries(), valueNoXssValidation); - FutureCallback callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback()); - ListenableFuture future = saveInternal(request); - Futures.addCallback(future, callback, tsCallBackExecutor); + ListenableFuture future = saveTimeseriesInternal(request); + if (!request.isOnlyLatest()) { + FutureCallback callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback()); + Futures.addCallback(future, callback, tsCallBackExecutor); + } } else { request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!")); } } @Override - public ListenableFuture saveInternal(TimeseriesSaveRequest request) { + public ListenableFuture saveTimeseriesInternal(TimeseriesSaveRequest request) { TenantId tenantId = request.getTenantId(); EntityId entityId = request.getEntityId(); ListenableFuture saveFuture; - if (request.isSaveLatest()) { + if (request.isOnlyLatest()) { + saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor()); + } else if (request.isSaveLatest()) { saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl()); } else { saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl()); @@ -139,63 +143,37 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addMainCallback(saveFuture, request.getCallback()); addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries())); - if (request.isSaveLatest()) { + if (request.isSaveLatest() && !request.isOnlyLatest()) { addEntityViewCallback(tenantId, entityId, request.getEntries()); } return saveFuture; } @Override - public void save(AttributesSaveRequest request) { + public void saveAttributes(AttributesSaveRequest request) { checkInternalEntity(request.getEntityId()); - saveInternal(request); + saveAttributesInternal(request); } @Override - public void saveInternal(AttributesSaveRequest request) { + public void saveAttributesInternal(AttributesSaveRequest request) { log.trace("Executing saveInternal [{}]", request); ListenableFuture> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries()); - addVoidCallback(saveFuture, request.getCallback()); + addMainCallback(saveFuture, request.getCallback()); addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice())); } @Override - public void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback) { - checkInternalEntity(entityId); - saveLatestAndNotifyInternal(tenantId, entityId, ts, callback); - } - - @Override - public void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback) { - ListenableFuture> saveFuture = tsService.saveLatest(tenantId, entityId, ts); - addVoidCallback(saveFuture, callback); - addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts)); - } - - @Override - public void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List keys, FutureCallback callback) { - checkInternalEntity(entityId); - deleteAndNotifyInternal(tenantId, entityId, scope, keys, false, callback); - } - - @Override - public void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List keys, boolean notifyDevice, FutureCallback callback) { - checkInternalEntity(entityId); - deleteAndNotifyInternal(tenantId, entityId, scope, keys, notifyDevice, callback); - } - - @Override - public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, FutureCallback callback) { - ListenableFuture> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys); - addVoidCallback(deleteFuture, callback); - addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys, notifyDevice)); + public void deleteAttributes(AttributesDeleteRequest request) { + checkInternalEntity(request.getEntityId()); + deleteAttributesInternal(request); } @Override - public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, AttributeScope scope, List keys, boolean notifyDevice, FutureCallback callback) { - ListenableFuture> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys); - addVoidCallback(deleteFuture, callback); - addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope.name(), keys, notifyDevice)); + public void deleteAttributesInternal(AttributesDeleteRequest request) { + ListenableFuture> deleteFuture = attrService.removeAll(request.getTenantId(), request.getEntityId(), request.getScope(), request.getKeys()); + addMainCallback(deleteFuture, request.getCallback()); + addWsCallback(deleteFuture, success -> onAttributesDelete(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getKeys(), request.isNotifyDevice())); } @Override @@ -207,7 +185,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback) { ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); - addVoidCallback(deleteFuture, callback); + addMainCallback(deleteFuture, callback); } @Override @@ -229,7 +207,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback) { ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); - addVoidCallback(deleteFuture, callback); + addMainCallback(deleteFuture, callback); addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list)); } @@ -260,15 +238,21 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } if (!entityViewLatest.isEmpty()) { - saveLatestAndNotify(tenantId, entityView.getId(), entityViewLatest, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void tmp) { - } - - @Override - public void onFailure(Throwable t) { - } - }); + saveTimeseries(TimeseriesSaveRequest.builder() + .tenantId(tenantId) + .entityId(entityView.getId()) + .entries(entityViewLatest) + .onlyLatest(true) + .callback(new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void tmp) {} + + @Override + public void onFailure(Throwable t) { + log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t); + } + }) + .build()); } } } @@ -328,22 +312,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer }); } - private void addVoidCallback(ListenableFuture saveFuture, final FutureCallback callback) { - if (callback == null) return; - Futures.addCallback(saveFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable S result) { - callback.onSuccess(null); - } - - @Override - public void onFailure(Throwable t) { - callback.onFailure(t); - } - }, tsCallBackExecutor); - } - private void addMainCallback(ListenableFuture saveFuture, final FutureCallback callback) { + if (callback == null) return; Futures.addCallback(saveFuture, new FutureCallback() { @Override public void onSuccess(@Nullable S result) { diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java index 20eff8b6b5..1db6a86507 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java @@ -17,13 +17,12 @@ package org.thingsboard.server.service.telemetry; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; -import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; @@ -32,16 +31,11 @@ import java.util.List; */ public interface InternalTelemetryService extends RuleEngineTelemetryService { - ListenableFuture saveInternal(TimeseriesSaveRequest request); + ListenableFuture saveTimeseriesInternal(TimeseriesSaveRequest request); - void saveInternal(AttributesSaveRequest request); + void saveAttributesInternal(AttributesSaveRequest request); - void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback); - - @Deprecated(since = "3.7.0") - void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List keys, boolean notifyDevice, FutureCallback callback); - - void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, AttributeScope scope, List keys, boolean notifyDevice, FutureCallback callback); + void deleteAttributesInternal(AttributesDeleteRequest request); void deleteLatestInternal(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback); diff --git a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java index 05e7132ce8..b64c87ccbd 100644 --- a/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java @@ -806,7 +806,7 @@ public class WebsocketApiTest extends AbstractControllerTest { private void sendTelemetry(Device device, List tsData) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - tsService.save(TimeseriesSaveRequest.builder() + tsService.saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(device.getTenantId()) .entityId(device.getId()) .entries(tsData) @@ -833,7 +833,7 @@ public class WebsocketApiTest extends AbstractControllerTest { private void sendAttributes(TenantId tenantId, EntityId entityId, TbAttributeSubscriptionScope scope, List attrData) throws InterruptedException { CountDownLatch latch = new CountDownLatch(1); - tsService.save(AttributesSaveRequest.builder() + tsService.saveAttributes(AttributesSaveRequest.builder() .tenantId(tenantId) .entityId(entityId) .scope(scope.getAttributeScope()) diff --git a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java index b7207da23a..b58d19e0e5 100644 --- a/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java @@ -24,7 +24,6 @@ import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; -import org.mockito.ArgumentMatcher; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; @@ -211,7 +210,7 @@ public class DefaultDeviceStateServiceTest { service.onDeviceConnect(tenantId, deviceId, lastConnectTime); // THEN - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(LAST_CONNECT_TIME) && @@ -298,7 +297,7 @@ public class DefaultDeviceStateServiceTest { service.onDeviceDisconnect(tenantId, deviceId, lastDisconnectTime); // THEN - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(LAST_DISCONNECT_TIME) && @@ -421,13 +420,13 @@ public class DefaultDeviceStateServiceTest { service.onDeviceInactivity(tenantId, deviceId, lastInactivityTime); // THEN - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) && request.getEntries().get(0).getValue().equals(lastInactivityTime) )); - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && @@ -465,12 +464,12 @@ public class DefaultDeviceStateServiceTest { service.updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, deviceStateData); // THEN - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) )); - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && @@ -627,7 +626,7 @@ public class DefaultDeviceStateServiceTest { long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase; service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); - verify(telemetrySubscriptionService, never()).save(argThat((ArgumentMatcher) request -> + verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) )); Thread.sleep(defaultTimeout + increase); @@ -668,7 +667,7 @@ public class DefaultDeviceStateServiceTest { long newTimeout = 1; Thread.sleep(newTimeout); - verify(telemetrySubscriptionService, never()).save(argThat((ArgumentMatcher) request -> + verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) )); } @@ -730,13 +729,13 @@ public class DefaultDeviceStateServiceTest { long newTimeout = 1; service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout); - verify(telemetrySubscriptionService, never()).save(argThat((ArgumentMatcher) request -> + verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) )); } private void activityVerify(boolean isActive) { - verify(telemetrySubscriptionService).save(argThat((ArgumentMatcher) request -> + verify(telemetrySubscriptionService).saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(isActive) @@ -786,7 +785,7 @@ public class DefaultDeviceStateServiceTest { // THEN assertThat(deviceState.isActive()).isEqualTo(true); assertThat(deviceState.getLastActivityTime()).isEqualTo(lastReportedActivity); - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(LAST_ACTIVITY_TIME) && request.getEntries().get(0).getValue().equals(lastReportedActivity) @@ -794,7 +793,7 @@ public class DefaultDeviceStateServiceTest { assertThat(deviceState.getLastInactivityAlarmTime()).isEqualTo(expectedInactivityAlarmTime); if (shouldSetInactivityAlarmTimeToZero) { - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) && request.getEntries().get(0).getValue().equals(0L) @@ -802,7 +801,7 @@ public class DefaultDeviceStateServiceTest { } if (shouldUpdateActivityStateToActive) { - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(true) @@ -886,7 +885,7 @@ public class DefaultDeviceStateServiceTest { assertThat(deviceState.getInactivityTimeout()).isEqualTo(newInactivityTimeout); assertThat(deviceState.isActive()).isEqualTo(expectedActivityState); if (activityState && !expectedActivityState) { - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(false) )); @@ -984,7 +983,7 @@ public class DefaultDeviceStateServiceTest { assertThat(state.getLastInactivityAlarmTime()).isEqualTo(expectedLastInactivityAlarmTime); if (shouldUpdateActivityStateToInactive) { - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(false) )); @@ -1002,7 +1001,7 @@ public class DefaultDeviceStateServiceTest { assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId); assertThat(actualNotification.isActive()).isFalse(); - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) && request.getScope().equals(AttributeScope.SERVER_SCOPE) && request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) && @@ -1133,7 +1132,7 @@ public class DefaultDeviceStateServiceTest { // THEN await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(false); - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(false) )); @@ -1164,7 +1163,7 @@ public class DefaultDeviceStateServiceTest { // THEN ArgumentCaptor attributeRequestCaptor = ArgumentCaptor.forClass(AttributesSaveRequest.class); - then(telemetrySubscriptionService).should(times(2)).save(attributeRequestCaptor.capture()); + then(telemetrySubscriptionService).should(times(2)).saveAttributes(attributeRequestCaptor.capture()); await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true); @@ -1231,7 +1230,7 @@ public class DefaultDeviceStateServiceTest { // THEN await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> { assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true); - then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher) request -> + then(telemetrySubscriptionService).should().saveAttributes(argThat(request -> request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) && request.getEntries().get(0).getValue().equals(true) )); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java index a205ee9b84..8668551fbb 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java @@ -37,16 +37,10 @@ public interface AttributesService { ListenableFuture> findAll(TenantId tenantId, EntityId entityId, AttributeScope scope); - @Deprecated(since = "3.7.0") - ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes); - ListenableFuture> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes); ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute); - @Deprecated(since = "3.7.0") - ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys); - ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributeKeys); List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index 88d6757de3..0694178540 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -101,14 +101,6 @@ public class BaseAttributesService implements AttributesService { return attributesDao.save(tenantId, entityId, scope, attribute); } - @Override - public ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes) { - validate(entityId, scope); - AttributeUtils.validate(attributes, valueNoXssValidation); - List> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, AttributeScope.valueOf(scope), attribute)).collect(Collectors.toList()); - return Futures.allAsList(saveFutures); - } - @Override public ListenableFuture> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { validate(entityId, scope); @@ -117,12 +109,6 @@ public class BaseAttributesService implements AttributesService { return Futures.allAsList(saveFutures); } - @Override - public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { - validate(entityId, scope); - return Futures.allAsList(attributesDao.removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys)); - } - @Override public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributeKeys) { validate(entityId, scope); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index 3de90355ed..1ebd5c0ba4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -222,11 +222,6 @@ public class CachedAttributesService implements AttributesService { return doSave(tenantId, entityId, scope, attribute); } - @Override - public ListenableFuture> save(TenantId tenantId, EntityId entityId, String scope, List attributes) { - return save(tenantId, entityId, AttributeScope.valueOf(scope), attributes); - } - @Override public ListenableFuture> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributes) { validate(entityId, scope); @@ -255,11 +250,6 @@ public class CachedAttributesService implements AttributesService { log.trace("[{}][{}][{}] after cache put.", entityId, scope, key); } - @Override - public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, String scope, List attributeKeys) { - return removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys); - } - @Override public ListenableFuture> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List attributeKeys) { validate(entityId, scope); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java new file mode 100644 index 0000000000..118c62e78c --- /dev/null +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java @@ -0,0 +1,117 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.api; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.SettableFuture; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; + +import java.util.List; + +@Getter +@ToString +@AllArgsConstructor(access = AccessLevel.PRIVATE) +public class AttributesDeleteRequest { + + private final TenantId tenantId; + private final EntityId entityId; + private final AttributeScope scope; + private final List keys; + private final boolean notifyDevice; + private final FutureCallback callback; + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + + private TenantId tenantId; + private EntityId entityId; + private AttributeScope scope; + private List keys; + private boolean notifyDevice; + private FutureCallback callback; + + Builder() {} + + public Builder tenantId(TenantId tenantId) { + this.tenantId = tenantId; + return this; + } + + public Builder entityId(EntityId entityId) { + this.entityId = entityId; + return this; + } + + public Builder scope(AttributeScope scope) { + this.scope = scope; + return this; + } + + @Deprecated + public Builder scope(String scope) { + try { + this.scope = AttributeScope.valueOf(scope); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Invalid attribute scope '" + scope + "'"); + } + return this; + } + + public Builder keys(List keys) { + this.keys = keys; + return this; + } + + public Builder notifyDevice(boolean notifyDevice) { + this.notifyDevice = notifyDevice; + return this; + } + + public Builder callback(FutureCallback callback) { + this.callback = callback; + return this; + } + + public Builder future(SettableFuture future) { + return callback(new FutureCallback<>() { + @Override + public void onSuccess(Void result) { + future.set(result); + } + + @Override + public void onFailure(Throwable t) { + future.setException(t); + } + }); + } + + public AttributesDeleteRequest build() { + return new AttributesDeleteRequest(tenantId, entityId, scope, keys, notifyDevice, callback); + } + + } + +} diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index 420c0c659b..17ed6aaa13 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -16,11 +16,9 @@ package org.thingsboard.rule.engine.api; import com.google.common.util.concurrent.FutureCallback; -import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; -import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.Collection; import java.util.List; @@ -30,15 +28,11 @@ import java.util.List; */ public interface RuleEngineTelemetryService { - void save(TimeseriesSaveRequest request); + void saveTimeseries(TimeseriesSaveRequest request); - void save(AttributesSaveRequest request); + void saveAttributes(AttributesSaveRequest request); - void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List ts, FutureCallback callback); - - void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List keys, FutureCallback callback); - - void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List keys, boolean notifyDevice, FutureCallback callback); + void deleteAttributes(AttributesDeleteRequest request); void deleteLatest(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java index 57fba9232f..2b5881212d 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java @@ -39,6 +39,7 @@ public class TimeseriesSaveRequest { private final List entries; private final long ttl; private final boolean saveLatest; + private final boolean onlyLatest; private final FutureCallback callback; public static Builder builder() { @@ -54,6 +55,7 @@ public class TimeseriesSaveRequest { private long ttl; private FutureCallback callback; private boolean saveLatest = true; + private boolean onlyLatest; Builder() {} @@ -95,6 +97,12 @@ public class TimeseriesSaveRequest { return this; } + public Builder onlyLatest(boolean onlyLatest) { + this.onlyLatest = onlyLatest; + this.saveLatest = true; + return this; + } + public Builder callback(FutureCallback callback) { this.callback = callback; return this; @@ -115,7 +123,7 @@ public class TimeseriesSaveRequest { } public TimeseriesSaveRequest build() { - return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, callback); + return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, callback); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java index e18ced7ee0..a4b0227551 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java @@ -23,6 +23,7 @@ import com.google.gson.JsonPrimitive; import jakarta.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.DonAsynchron; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleNode; @@ -106,14 +107,19 @@ public class TbCopyAttributesToEntityViewNode implements TbNode { List filteredAttributes = attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr, entityView)).collect(Collectors.toList()); if (!filteredAttributes.isEmpty()) { - ctx.getTelemetryService().deleteAndNotify(ctx.getTenantId(), entityView.getId(), scope, filteredAttributes, - getFutureCallback(ctx, msg, entityView)); + ctx.getTelemetryService().deleteAttributes(AttributesDeleteRequest.builder() + .tenantId(ctx.getTenantId()) + .entityId(entityView.getId()) + .scope(scope) + .keys(filteredAttributes) + .callback(getFutureCallback(ctx, msg, entityView)) + .build()); } } else { Set attributes = JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData())); List filteredAttributes = attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr.getKey(), entityView)).collect(Collectors.toList()); - ctx.getTelemetryService().save(AttributesSaveRequest.builder() + ctx.getTelemetryService().saveAttributes(AttributesSaveRequest.builder() .tenantId(ctx.getTenantId()) .entityId(entityView.getId()) .scope(scope) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java index 1d18290129..2ce2212054 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java @@ -145,7 +145,7 @@ public class TbMathNode implements TbNode { private ListenableFuture saveTimeSeries(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) { final BasicTsKvEntry basicTsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(mathResultDef.getKey(), result)); SettableFuture future = SettableFuture.create(); - ctx.getTelemetryService().save(TimeseriesSaveRequest.builder() + ctx.getTelemetryService().saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(ctx.getTenantId()) .entityId(msg.getOriginator()) .entry(basicTsKvEntry) @@ -165,7 +165,7 @@ public class TbMathNode implements TbNode { kvEntry = new DoubleDataEntry(mathResultDef.getKey(), value); } SettableFuture future = SettableFuture.create(); - ctx.getTelemetryService().save(AttributesSaveRequest.builder() + ctx.getTelemetryService().saveAttributes(AttributesSaveRequest.builder() .tenantId(ctx.getTenantId()) .entityId(msg.getOriginator()) .scope(attributeScope) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 2250e0bbdd..e83a125265 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -119,7 +119,7 @@ public class TbMsgAttributesNode implements TbNode { FutureCallback callback = sendAttributesUpdateNotification ? new AttributesUpdateNodeCallback(ctx, msg, scope.name(), attributes) : new TelemetryNodeCallback(ctx, msg); - ctx.getTelemetryService().save(AttributesSaveRequest.builder() + ctx.getTelemetryService().saveAttributes(AttributesSaveRequest.builder() .tenantId(ctx.getTenantId()) .entityId(msg.getOriginator()) .scope(scope) diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java index 66faa80f9f..f1f1a29f3c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.telemetry; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -70,16 +71,16 @@ public class TbMsgDeleteAttributesNode implements TbNode { ctx.tellSuccess(msg); } else { AttributeScope scope = getScope(msg.getMetaData().getValue(SCOPE)); - ctx.getTelemetryService().deleteAndNotify( - ctx.getTenantId(), - msg.getOriginator(), - scope, - keysToDelete, - checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope), - config.isSendAttributesDeletedNotification() ? + ctx.getTelemetryService().deleteAttributes(AttributesDeleteRequest.builder() + .tenantId(ctx.getTenantId()) + .entityId(msg.getOriginator()) + .scope(scope) + .keys(keysToDelete) + .notifyDevice(checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope)) + .callback(config.isSendAttributesDeletedNotification() ? new AttributesDeleteNodeCallback(ctx, msg, scope.name(), keysToDelete) : - new TelemetryNodeCallback(ctx, msg) - ); + new TelemetryNodeCallback(ctx, msg)) + .build()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index b0d5e23b50..27f45feb47 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -105,7 +105,7 @@ public class TbMsgTimeseriesNode implements TbNode { if (ttl == 0L) { ttl = tenantProfileDefaultStorageTtl; } - ctx.getTelemetryService().save(TimeseriesSaveRequest.builder() + ctx.getTelemetryService().saveTimeseries(TimeseriesSaveRequest.builder() .tenantId(ctx.getTenantId()) .customerId(msg.getCustomerId()) .entityId(msg.getOriginator()) diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNodeTest.java index fcd6083eb2..24dd064b57 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNodeTest.java @@ -15,7 +15,6 @@ */ package org.thingsboard.rule.engine.action; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -24,9 +23,9 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; -import org.mockito.ThrowingConsumer; import org.mockito.junit.jupiter.MockitoExtension; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; @@ -56,7 +55,6 @@ import java.util.UUID; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.assertArg; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; @@ -118,7 +116,7 @@ public class TbCopyAttributesToEntityViewNodeTest { AttributesSaveRequest request = invocation.getArgument(0); request.getCallback().onSuccess(null); return null; - }).when(telemetryServiceMock).save(any(AttributesSaveRequest.class)); + }).when(telemetryServiceMock).saveAttributes(any(AttributesSaveRequest.class)); TbMsg newMsg = TbMsg.newMsg(msg, msg.getQueueName(), msg.getRuleChainId(), msg.getRuleNodeId()); // TODO: use newMsg() with any(TbMsgType.class), replace in other tests as well. doAnswer(invocation -> newMsg).when(ctxMock).newMsg(any(), any(String.class), any(), any(), any(), any()); @@ -126,7 +124,7 @@ public class TbCopyAttributesToEntityViewNodeTest { node.onMsg(ctxMock, msg); verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID)); - verify(telemetryServiceMock).save(assertArg((ThrowingConsumer) request -> { + verify(telemetryServiceMock).saveAttributes(assertArg(request -> { assertThat(request.getTenantId()).isEqualTo(TENANT_ID); assertThat(request.getEntityId()).isEqualTo(ENTITY_VIEW_ID); assertThat(request.getScope()).isEqualTo(AttributeScope.CLIENT_SCOPE); @@ -151,21 +149,23 @@ public class TbCopyAttributesToEntityViewNodeTest { mockEntityViewLookup(entityView); when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock); doAnswer(invocation -> { - FutureCallback callback = invocation.getArgument(4); - callback.onSuccess(null); + AttributesDeleteRequest request = invocation.getArgument(0); + request.getCallback().onSuccess(null); return null; - }).when(telemetryServiceMock).deleteAndNotify(any(), any(), any(AttributeScope.class), anyList(), any(FutureCallback.class)); + }).when(telemetryServiceMock).deleteAttributes(any()); TbMsg newMsg = TbMsg.newMsg(msg, msg.getQueueName(), msg.getRuleChainId(), msg.getRuleNodeId()); doAnswer(invocation -> newMsg).when(ctxMock).newMsg(any(), any(String.class), any(), any(), any(), any()); node.onMsg(ctxMock, msg); verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID)); - ArgumentCaptor> filteredAttributesCaptor = ArgumentCaptor.forClass(List.class); - verify(telemetryServiceMock).deleteAndNotify(eq(TENANT_ID), eq(ENTITY_VIEW_ID), eq(AttributeScope.SERVER_SCOPE), filteredAttributesCaptor.capture(), any(FutureCallback.class)); - List filteredAttributesCaptorValue = filteredAttributesCaptor.getValue(); - assertThat(filteredAttributesCaptorValue.size()).isEqualTo(1); - assertThat(filteredAttributesCaptorValue.get(0)).isEqualTo("serverAttribute1"); + verify(telemetryServiceMock).deleteAttributes(assertArg(request -> { + assertThat(request.getTenantId()).isEqualTo(TENANT_ID); + assertThat(request.getEntityId()).isEqualTo(ENTITY_VIEW_ID); + assertThat(request.getScope()).isEqualTo(AttributeScope.SERVER_SCOPE); + assertThat(request.getKeys().size()).isEqualTo(1); + assertThat(request.getKeys().get(0)).isEqualTo("serverAttribute1"); + })); verify(ctxMock).ack(eq(msg)); verify(ctxMock).enqueueForTellNext(eq(newMsg), eq(TbNodeConnectionType.SUCCESS)); verifyNoMoreInteractions(ctxMock, entityViewServiceMock, telemetryServiceMock); @@ -202,14 +202,14 @@ public class TbCopyAttributesToEntityViewNodeTest { AttributesSaveRequest request = invocation.getArgument(0); request.getCallback().onSuccess(null); return null; - }).when(telemetryServiceMock).save(any(AttributesSaveRequest.class)); + }).when(telemetryServiceMock).saveAttributes(any(AttributesSaveRequest.class)); TbMsg newMsg = TbMsg.newMsg(msg, msg.getQueueName(), msg.getRuleChainId(), msg.getRuleNodeId()); doAnswer(invocation -> newMsg).when(ctxMock).newMsg(any(), any(String.class), any(), any(), any(), any()); node.onMsg(ctxMock, msg); verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID)); - verify(telemetryServiceMock).save(assertArg((ThrowingConsumer) request -> { + verify(telemetryServiceMock).saveAttributes(assertArg(request -> { assertThat(request.getTenantId()).isEqualTo(TENANT_ID); assertThat(request.getEntityId()).isEqualTo(ENTITY_VIEW_ID); assertThat(request.getScope()).isEqualTo(AttributeScope.CLIENT_SCOPE); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java index bec8946277..ad607fb262 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java @@ -441,13 +441,13 @@ public class TbMathNodeTest { AttributesSaveRequest request = invocation.getArgument(0); request.getCallback().onSuccess(null); return null; - }).when(telemetryService).save(any(AttributesSaveRequest.class)); + }).when(telemetryService).saveAttributes(any(AttributesSaveRequest.class)); node.onMsg(ctx, msg); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); - verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer) request -> { + verify(telemetryService, times(1)).saveAttributes(assertArg(request -> { assertThat(request.getEntries()).singleElement().extracting(KvEntry::getValue).isInstanceOf(Double.class); })); @@ -471,13 +471,13 @@ public class TbMathNodeTest { TimeseriesSaveRequest request = invocation.getArgument(0); request.getCallback().onSuccess(null); return null; - }).when(telemetryService).save(any(TimeseriesSaveRequest.class)); + }).when(telemetryService).saveTimeseries(any(TimeseriesSaveRequest.class)); node.onMsg(ctx, msg); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); - verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer) request -> { + verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> { assertThat(request.getEntries()).size().isOne(); assertThat(request.isSaveLatest()).isTrue(); })); @@ -502,13 +502,13 @@ public class TbMathNodeTest { TimeseriesSaveRequest request = invocation.getArgument(0); request.getCallback().onSuccess(null); return null; - }).when(telemetryService).save(any(TimeseriesSaveRequest.class)); + }).when(telemetryService).saveTimeseries(any(TimeseriesSaveRequest.class)); node.onMsg(ctx, msg); ArgumentCaptor msgCaptor = ArgumentCaptor.forClass(TbMsg.class); verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture()); - verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer) request -> { + verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> { assertThat(request.getEntries()).size().isOne(); assertThat(request.isSaveLatest()).isTrue(); })); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java index 74aec7b566..05c172d27f 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java @@ -22,10 +22,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.mockito.ThrowingConsumer; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; -import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -171,7 +169,7 @@ class TbMsgAttributesNodeTest extends AbstractRuleNodeUpgradeTest { node.saveAttr(testAttrList, ctxMock, testTbMsg, AttributeScope.SHARED_SCOPE, false); - verify(telemetryServiceMock, times(1)).save(assertArg((ThrowingConsumer) request -> { + verify(telemetryServiceMock, times(1)).saveAttributes(assertArg(request -> { assertThat(request.getTenantId()).isEqualTo(tenantId); assertThat(request.getEntityId()).isEqualTo(deviceId); assertThat(request.getScope()).isEqualTo(AttributeScope.SHARED_SCOPE); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeTest.java index 3d546c80f7..d1fd1690f6 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeTest.java @@ -21,11 +21,11 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; -import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -41,10 +41,9 @@ import java.util.function.Consumer; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyList; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.assertArg; import static org.mockito.BDDMockito.willAnswer; import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.mock; @@ -78,11 +77,10 @@ public class TbMsgDeleteAttributesNodeTest { willReturn(telemetryService).given(ctx).getTelemetryService(); willAnswer(invocation -> { - TelemetryNodeCallback callBack = invocation.getArgument(5); - callBack.onSuccess(null); + AttributesDeleteRequest request = invocation.getArgument(0); + request.getCallback().onSuccess(null); return null; - }).given(telemetryService).deleteAndNotify( - any(), any(), any(AttributeScope.class), anyList(), anyBoolean(), any()); + }).given(telemetryService).deleteAttributes(any()); } @AfterEach @@ -153,6 +151,8 @@ public class TbMsgDeleteAttributesNodeTest { } verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture()); verify(ctx, never()).tellFailure(any(), any()); - verify(telemetryService, times(1)).deleteAndNotify(any(), any(), any(AttributeScope.class), anyList(), eq(notifyDevice || notifyDeviceMetadata), any()); + verify(telemetryService, times(1)).deleteAttributes(assertArg(request -> { + assertThat(request.isNotifyDevice()).isEqualTo(notifyDevice || notifyDeviceMetadata); + })); } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java index a66c27e06a..d963eb65f4 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java @@ -130,12 +130,12 @@ public class TbMsgTimeseriesNodeTest { TimeseriesSaveRequest request = invocation.getArgument(0); request.getCallback().onSuccess(null); return null; - }).when(telemetryServiceMock).save(any(TimeseriesSaveRequest.class)); + }).when(telemetryServiceMock).saveTimeseries(any(TimeseriesSaveRequest.class)); node.onMsg(ctxMock, msg); List expectedList = getTsKvEntriesListWithTs(data, System.currentTimeMillis()); - verify(telemetryServiceMock).save(assertArg((ThrowingConsumer) request -> { + verify(telemetryServiceMock).saveTimeseries(assertArg(request -> { assertThat(request.getTenantId()).isEqualTo(TENANT_ID); assertThat(request.getCustomerId()).isNull(); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); @@ -171,12 +171,12 @@ public class TbMsgTimeseriesNodeTest { TimeseriesSaveRequest request = invocation.getArgument(0); request.getCallback().onSuccess(null); return null; - }).when(telemetryServiceMock).save(any(TimeseriesSaveRequest.class)); + }).when(telemetryServiceMock).saveTimeseries(any(TimeseriesSaveRequest.class)); node.onMsg(ctxMock, msg); List expectedList = getTsKvEntriesListWithTs(data, ts); - verify(telemetryServiceMock).save(assertArg((ThrowingConsumer) request -> { + verify(telemetryServiceMock).saveTimeseries(assertArg(request -> { assertThat(request.getTenantId()).isEqualTo(TENANT_ID); assertThat(request.getCustomerId()).isNull(); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID); @@ -209,7 +209,7 @@ public class TbMsgTimeseriesNodeTest { TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metadata, data); node.onMsg(ctxMock, msg); - verify(telemetryServiceMock).save(assertArg((ThrowingConsumer) request -> { + verify(telemetryServiceMock).saveTimeseries(assertArg(request -> { assertThat(request.getTenantId()).isEqualTo(TENANT_ID); assertThat(request.getCustomerId()).isNull(); assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);