Browse Source

Refactor deleteAndNotify and deleteAndNotifyInternal for attributes

pull/12297/head
ViacheslavKlimov 2 years ago
parent
commit
eb7bc8695b
  1. 45
      application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
  2. 8
      application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java
  3. 22
      application/src/main/java/org/thingsboard/server/service/device/ClaimDevicesServiceImpl.java
  4. 8
      application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java
  5. 2
      application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java
  6. 77
      application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java
  7. 18
      application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java
  8. 4
      application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java
  9. 4
      application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java
  10. 4
      application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java
  11. 2
      application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java
  12. 2
      application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java
  13. 114
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  14. 14
      application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java
  15. 4
      application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java
  16. 39
      application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java
  17. 6
      common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
  18. 14
      dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
  19. 10
      dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java
  20. 117
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java
  21. 12
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
  22. 10
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java
  23. 12
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java
  24. 4
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java
  25. 2
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java
  26. 19
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java
  27. 2
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
  28. 30
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNodeTest.java
  29. 12
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java
  30. 4
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java
  31. 16
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeTest.java
  32. 10
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java

45
application/src/main/java/org/thingsboard/server/controller/TelemetryController.java

@ -47,6 +47,7 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.adaptor.JsonConverter;
@ -589,24 +590,30 @@ public class TelemetryController extends BaseController {
SecurityUser user = getCurrentUser();
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_ATTRIBUTES, entityIdSrc, (result, tenantId, entityId) -> {
tsSubService.deleteAndNotify(tenantId, entityId, scope, keys, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {
logAttributesDeleted(user, entityId, scope, keys, null);
if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) {
DeviceId deviceId = new DeviceId(entityId.getId());
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
user.getTenantId(), deviceId, scope.name(), keys), null);
}
result.setResult(new ResponseEntity<>(HttpStatus.OK));
}
tsSubService.deleteAttributes(AttributesDeleteRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
.scope(scope)
.keys(keys)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
logAttributesDeleted(user, entityId, scope, keys, null);
if (entityIdSrc.getEntityType().equals(EntityType.DEVICE)) {
DeviceId deviceId = new DeviceId(entityId.getId());
tbClusterService.pushMsgToCore(DeviceAttributesEventNotificationMsg.onDelete(
user.getTenantId(), deviceId, scope.name(), keys), null);
}
result.setResult(new ResponseEntity<>(HttpStatus.OK));
}
@Override
public void onFailure(Throwable t) {
logAttributesDeleted(user, entityId, scope, keys, t);
result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
}
});
@Override
public void onFailure(Throwable t) {
logAttributesDeleted(user, entityId, scope, keys, t);
result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
}
})
.build());
});
}
@ -626,7 +633,7 @@ public class TelemetryController extends BaseController {
}
SecurityUser user = getCurrentUser();
return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.WRITE_ATTRIBUTES, entityIdSrc, (result, tenantId, entityId) -> {
tsSubService.save(AttributesSaveRequest.builder()
tsSubService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
.scope(scope)
@ -680,7 +687,7 @@ public class TelemetryController extends BaseController {
TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays());
}
tsSubService.save(TimeseriesSaveRequest.builder()
tsSubService.saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(user.getCustomerId())
.entityId(entityId)

8
application/src/main/java/org/thingsboard/server/service/apiusage/DefaultTbApiUsageStateService.java

@ -215,7 +215,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
updateLock.unlock();
}
log.trace("[{}][{}] Saving new stats: {}", tenantId, ownerId, updatedEntries);
tsWsService.saveInternal(TimeseriesSaveRequest.builder()
tsWsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(usageState.getApiUsageState().getId())
.entries(updatedEntries)
@ -327,7 +327,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
}
}
if (!profileThresholds.isEmpty()) {
tsWsService.saveInternal(TimeseriesSaveRequest.builder()
tsWsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(id)
.entries(profileThresholds)
@ -359,7 +359,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
long ts = System.currentTimeMillis();
List<TsKvEntry> stateTelemetry = new ArrayList<>();
result.forEach((apiFeature, aState) -> stateTelemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(apiFeature.getApiStateKey(), aState.name()))));
tsWsService.saveInternal(TimeseriesSaveRequest.builder()
tsWsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(state.getTenantId())
.entityId(state.getApiUsageState().getId())
.entries(stateTelemetry)
@ -452,7 +452,7 @@ public class DefaultTbApiUsageStateService extends AbstractPartitionBasedService
.map(key -> new BasicTsKvEntry(state.getCurrentCycleTs(), new LongDataEntry(key.getApiCountKey(), 0L)))
.collect(Collectors.toList());
tsWsService.saveInternal(TimeseriesSaveRequest.builder()
tsWsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(state.getTenantId())
.entityId(state.getApiUsageState().getId())
.entries(counts)

22
application/src/main/java/org/thingsboard/server/service/device/ClaimDevicesServiceImpl.java

@ -28,6 +28,7 @@ import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.server.common.data.AttributeScope;
@ -178,7 +179,7 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService {
return Futures.immediateFuture(new ReclaimResult(unassignedCustomer));
}
SettableFuture<ReclaimResult> result = SettableFuture.create();
telemetryService.save(AttributesSaveRequest.builder()
telemetryService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(savedDevice.getId())
.scope(AttributeScope.SERVER_SCOPE)
@ -223,18 +224,13 @@ public class ClaimDevicesServiceImpl implements ClaimDevicesService {
cache.evict(data.getKey());
}
SettableFuture<Void> result = SettableFuture.create();
telemetryService.deleteAndNotify(device.getTenantId(),
device.getId(), AttributeScope.SERVER_SCOPE, Arrays.asList(CLAIM_ATTRIBUTE_NAME, CLAIM_DATA_ATTRIBUTE_NAME), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
result.set(tmp);
}
@Override
public void onFailure(Throwable t) {
result.setException(t);
}
});
telemetryService.deleteAttributes(AttributesDeleteRequest.builder()
.tenantId(device.getTenantId())
.entityId(device.getId())
.scope(AttributeScope.SERVER_SCOPE)
.keys(Arrays.asList(CLAIM_ATTRIBUTE_NAME, CLAIM_DATA_ATTRIBUTE_NAME))
.future(result)
.build());
return result;
}

8
application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java

@ -503,14 +503,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void save(TenantId tenantId, EdgeId edgeId, String key, long value) {
log.debug("[{}][{}] Updating long edge telemetry [{}] [{}]", tenantId, edgeId, key, value);
if (persistToTelemetry) {
tsSubService.save(TimeseriesSaveRequest.builder()
tsSubService.saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.entry(new LongDataEntry(key, value))
.callback(new AttributeSaveCallback(tenantId, edgeId, key, value))
.build());
} else {
tsSubService.save(AttributesSaveRequest.builder()
tsSubService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.scope(AttributeScope.SERVER_SCOPE)
@ -523,14 +523,14 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i
private void save(TenantId tenantId, EdgeId edgeId, String key, boolean value) {
log.debug("[{}][{}] Updating boolean edge telemetry [{}] [{}]", tenantId, edgeId, key, value);
if (persistToTelemetry) {
tsSubService.save(TimeseriesSaveRequest.builder()
tsSubService.saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.entry(new BooleanDataEntry(key, value))
.callback(new AttributeSaveCallback(tenantId, edgeId, key, value))
.build());
} else {
tsSubService.save(AttributesSaveRequest.builder()
tsSubService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(edgeId)
.scope(AttributeScope.SERVER_SCOPE)

2
application/src/main/java/org/thingsboard/server/service/edge/rpc/processor/telemetry/BaseTelemetryProcessor.java

@ -278,7 +278,7 @@ public abstract class BaseTelemetryProcessor extends BaseEdgeProcessor {
JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(json));
String scope = metaData.getValue("scope");
tsSubService.save(AttributesSaveRequest.builder()
tsSubService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
.scope(AttributeScope.valueOf(scope))

77
application/src/main/java/org/thingsboard/server/service/entitiy/entityview/DefaultTbEntityViewService.java

@ -25,7 +25,9 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.EntityType;
@ -284,7 +286,7 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
(startTime == 0 && endTime > lastUpdateTs) ||
(startTime < lastUpdateTs && endTime > lastUpdateTs);
}).collect(Collectors.toList());
tsSubService.save(AttributesSaveRequest.builder()
tsSubService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(entityView.getTenantId())
.entityId(entityId)
.scope(scope)
@ -340,15 +342,22 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
}, MoreExecutors.directExecutor());
return Futures.transform(latestFuture, latestValues -> {
if (latestValues != null && !latestValues.isEmpty()) {
tsSubService.saveLatestAndNotify(entityView.getTenantId(), entityId, latestValues, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {
}
@Override
public void onFailure(Throwable t) {
}
});
tsSubService.saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(entityView.getTenantId())
.entityId(entityId)
.entries(latestValues)
.onlyLatest(true)
.callback(new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {
}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), latestValues, t);
}
})
.build());
}
return null;
}, MoreExecutors.directExecutor());
@ -358,27 +367,33 @@ public class DefaultTbEntityViewService extends AbstractTbEntityService implemen
EntityViewId entityId = entityView.getId();
SettableFuture<Void> resultFuture = SettableFuture.create();
if (keys != null && !keys.isEmpty()) {
tsSubService.deleteAndNotify(entityView.getTenantId(), entityId, scope, keys, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {
try {
logAttributesDeleted(entityView.getTenantId(), user, entityId, scope, keys, null);
} catch (ThingsboardException e) {
log.error("Failed to log attribute delete", e);
}
resultFuture.set(tmp);
}
@Override
public void onFailure(Throwable t) {
try {
logAttributesDeleted(entityView.getTenantId(), user, entityId, scope, keys, t);
} catch (ThingsboardException e) {
log.error("Failed to log attribute delete", e);
}
resultFuture.setException(t);
}
});
tsSubService.deleteAttributes(AttributesDeleteRequest.builder()
.tenantId(entityView.getTenantId())
.entityId(entityId)
.scope(scope)
.keys(keys)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
try {
logAttributesDeleted(entityView.getTenantId(), user, entityId, scope, keys, null);
} catch (ThingsboardException e) {
log.error("Failed to log attribute delete", e);
}
resultFuture.set(tmp);
}
@Override
public void onFailure(Throwable t) {
try {
logAttributesDeleted(entityView.getTenantId(), user, entityId, scope, keys, t);
} catch (ThingsboardException e) {
log.error("Failed to log attribute delete", e);
}
resultFuture.setException(t);
}
})
.build());
} else {
resultFuture.set(null);
}

18
application/src/main/java/org/thingsboard/server/service/ota/DefaultOtaPackageStateService.java

@ -20,6 +20,7 @@ import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
@ -262,7 +263,7 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
telemetry.add(new BasicTsKvEntry(ts, new LongDataEntry(getTargetTelemetryKey(firmware.getType(), TS), ts)));
telemetry.add(new BasicTsKvEntry(ts, new StringDataEntry(getTelemetryKey(firmware.getType(), STATE), OtaPackageUpdateStatus.QUEUED.name())));
telemetryService.save(TimeseriesSaveRequest.builder()
telemetryService.saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.entries(telemetry)
@ -288,7 +289,7 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
BasicTsKvEntry status = new BasicTsKvEntry(System.currentTimeMillis(), new StringDataEntry(getTelemetryKey(otaPackageType, STATE), OtaPackageUpdateStatus.INITIATED.name()));
telemetryService.save(TimeseriesSaveRequest.builder()
telemetryService.saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.entry(status)
@ -347,7 +348,7 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
remove(device, otaPackageType, attrToRemove);
telemetryService.save(AttributesSaveRequest.builder()
telemetryService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(deviceId)
.scope(AttributeScope.SHARED_SCOPE)
@ -371,8 +372,12 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
}
private void remove(Device device, OtaPackageType otaPackageType, List<String> attributesKeys) {
telemetryService.deleteAndNotify(device.getTenantId(), device.getId(), AttributeScope.SHARED_SCOPE, attributesKeys,
new FutureCallback<>() {
telemetryService.deleteAttributes(AttributesDeleteRequest.builder()
.tenantId(device.getTenantId())
.entityId(device.getId())
.scope(AttributeScope.SHARED_SCOPE)
.keys(attributesKeys)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
log.trace("[{}] Success remove target {} attributes!", device.getId(), otaPackageType);
@ -383,7 +388,8 @@ public class DefaultOtaPackageStateService implements OtaPackageStateService {
public void onFailure(Throwable t) {
log.error("[{}] Failed to remove target {} attributes!", device.getId(), otaPackageType, t);
}
});
})
.build());
}
}

4
application/src/main/java/org/thingsboard/server/service/state/DefaultDeviceStateService.java

@ -877,7 +877,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
private void save(DeviceId deviceId, KvEntry kvEntry, long ts) {
if (persistToTelemetry) {
tsSubService.saveInternal(TimeseriesSaveRequest.builder()
tsSubService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(TenantId.SYS_TENANT_ID)
.entityId(deviceId)
.entry(new BasicTsKvEntry(ts, kvEntry))
@ -885,7 +885,7 @@ public class DefaultDeviceStateService extends AbstractPartitionBasedService<Dev
.callback(new TelemetrySaveCallback<>(deviceId, kvEntry))
.build());
} else {
tsSubService.save(AttributesSaveRequest.builder()
tsSubService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(TenantId.SYS_TENANT_ID)
.entityId(deviceId)
.scope(AttributeScope.SERVER_SCOPE)

4
application/src/main/java/org/thingsboard/server/service/stats/DefaultRuleEngineStatisticsService.java

@ -89,7 +89,7 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
if (!tsList.isEmpty()) {
long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getQueueStatsTtlDays);
ttl = TimeUnit.DAYS.toSeconds(ttl);
tsService.saveInternal(TimeseriesSaveRequest.builder()
tsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(queueStatsId)
.entries(tsList)
@ -109,7 +109,7 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS
TsKvEntry tsKv = new BasicTsKvEntry(e.getTs(), new JsonDataEntry(RULE_ENGINE_EXCEPTION, e.toJsonString(maxErrorMessageLength)));
long ttl = apiLimitService.getLimit(tenantId, DefaultTenantProfileConfiguration::getRuleEngineExceptionsTtlDays);
ttl = TimeUnit.DAYS.toSeconds(ttl);
tsService.saveInternal(TimeseriesSaveRequest.builder()
tsService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(getQueueStatsId(tenantId, queueName))
.entry(tsKv)

4
application/src/main/java/org/thingsboard/server/service/sync/ie/importing/csv/AbstractBulkImportService.java

@ -208,7 +208,7 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
accessValidator.validateEntityAndCallback(user, Operation.WRITE_TELEMETRY, entity.getId(), (result, tenantId, entityId) -> {
TenantProfile tenantProfile = tenantProfileCache.get(tenantId);
long tenantTtl = TimeUnit.DAYS.toSeconds(((DefaultTenantProfileConfiguration) tenantProfile.getProfileData().getConfiguration()).getDefaultStorageTtlDays());
tsSubscriptionService.save(TimeseriesSaveRequest.builder()
tsSubscriptionService.saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.customerId(user.getCustomerId())
.entityId(entityId)
@ -238,7 +238,7 @@ public abstract class AbstractBulkImportService<E extends HasId<? extends Entity
List<AttributeKvEntry> attributes = new ArrayList<>(JsonConverter.convertToAttributes(kvsEntry.getValue()));
accessValidator.validateEntityAndCallback(user, Operation.WRITE_ATTRIBUTES, entity.getId(), (result, tenantId, entityId) -> {
tsSubscriptionService.save(AttributesSaveRequest.builder()
tsSubscriptionService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
.scope(AttributeScope.valueOf(scope))

2
application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java

@ -258,7 +258,7 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
})
.collect(Collectors.toList());
// fixme: attributes are saved outside the transaction
tsSubService.save(AttributesSaveRequest.builder()
tsSubService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(user.getTenantId())
.entityId(entity.getId())
.scope(scope)

2
application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java

@ -201,7 +201,7 @@ public class DefaultSystemInfoService extends TbApplicationEventListener<Partiti
private void doSave(List<TsKvEntry> telemetry) {
ApiUsageState apiUsageState = apiUsageStateClient.getApiUsageState(TenantId.SYS_TENANT_ID);
telemetryService.saveInternal(TimeseriesSaveRequest.builder()
telemetryService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(TenantId.SYS_TENANT_ID)
.entityId(apiUsageState.getId())
.entries(telemetry)

114
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -27,11 +27,11 @@ import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.ApiUsageRecordKey;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.EntityView;
import org.thingsboard.server.common.data.id.CustomerId;
@ -111,27 +111,31 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
@Override
public void save(TimeseriesSaveRequest request) {
public void saveTimeseries(TimeseriesSaveRequest request) {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
checkInternalEntity(entityId);
boolean sysTenant = TenantId.SYS_TENANT_ID.equals(tenantId) || tenantId == null;
if (sysTenant || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
if (sysTenant || request.isOnlyLatest() || apiUsageStateService.getApiUsageState(tenantId).isDbStorageEnabled()) {
KvUtils.validate(request.getEntries(), valueNoXssValidation);
FutureCallback<Integer> callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
ListenableFuture<Integer> future = saveInternal(request);
Futures.addCallback(future, callback, tsCallBackExecutor);
ListenableFuture<Integer> future = saveTimeseriesInternal(request);
if (!request.isOnlyLatest()) {
FutureCallback<Integer> callback = getApiUsageCallback(tenantId, request.getCustomerId(), sysTenant, request.getCallback());
Futures.addCallback(future, callback, tsCallBackExecutor);
}
} else {
request.getCallback().onFailure(new RuntimeException("DB storage writes are disabled due to API limits!"));
}
}
@Override
public ListenableFuture<Integer> saveInternal(TimeseriesSaveRequest request) {
public ListenableFuture<Integer> saveTimeseriesInternal(TimeseriesSaveRequest request) {
TenantId tenantId = request.getTenantId();
EntityId entityId = request.getEntityId();
ListenableFuture<Integer> saveFuture;
if (request.isSaveLatest()) {
if (request.isOnlyLatest()) {
saveFuture = Futures.transform(tsService.saveLatest(tenantId, entityId, request.getEntries()), result -> 0, MoreExecutors.directExecutor());
} else if (request.isSaveLatest()) {
saveFuture = tsService.save(tenantId, entityId, request.getEntries(), request.getTtl());
} else {
saveFuture = tsService.saveWithoutLatest(tenantId, entityId, request.getEntries(), request.getTtl());
@ -139,63 +143,37 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addMainCallback(saveFuture, request.getCallback());
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, request.getEntries()));
if (request.isSaveLatest()) {
if (request.isSaveLatest() && !request.isOnlyLatest()) {
addEntityViewCallback(tenantId, entityId, request.getEntries());
}
return saveFuture;
}
@Override
public void save(AttributesSaveRequest request) {
public void saveAttributes(AttributesSaveRequest request) {
checkInternalEntity(request.getEntityId());
saveInternal(request);
saveAttributesInternal(request);
}
@Override
public void saveInternal(AttributesSaveRequest request) {
public void saveAttributesInternal(AttributesSaveRequest request) {
log.trace("Executing saveInternal [{}]", request);
ListenableFuture<List<Long>> saveFuture = attrService.save(request.getTenantId(), request.getEntityId(), request.getScope(), request.getEntries());
addVoidCallback(saveFuture, request.getCallback());
addMainCallback(saveFuture, request.getCallback());
addWsCallback(saveFuture, success -> onAttributesUpdate(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getEntries(), request.isNotifyDevice()));
}
@Override
public void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
checkInternalEntity(entityId);
saveLatestAndNotifyInternal(tenantId, entityId, ts, callback);
}
@Override
public void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback) {
ListenableFuture<List<Long>> saveFuture = tsService.saveLatest(tenantId, entityId, ts);
addVoidCallback(saveFuture, callback);
addWsCallback(saveFuture, success -> onTimeSeriesUpdate(tenantId, entityId, ts));
}
@Override
public void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> keys, FutureCallback<Void> callback) {
checkInternalEntity(entityId);
deleteAndNotifyInternal(tenantId, entityId, scope, keys, false, callback);
}
@Override
public void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback) {
checkInternalEntity(entityId);
deleteAndNotifyInternal(tenantId, entityId, scope, keys, notifyDevice, callback);
}
@Override
public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback) {
ListenableFuture<List<String>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
addVoidCallback(deleteFuture, callback);
addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope, keys, notifyDevice));
public void deleteAttributes(AttributesDeleteRequest request) {
checkInternalEntity(request.getEntityId());
deleteAttributesInternal(request);
}
@Override
public void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback) {
ListenableFuture<List<String>> deleteFuture = attrService.removeAll(tenantId, entityId, scope, keys);
addVoidCallback(deleteFuture, callback);
addWsCallback(deleteFuture, success -> onAttributesDelete(tenantId, entityId, scope.name(), keys, notifyDevice));
public void deleteAttributesInternal(AttributesDeleteRequest request) {
ListenableFuture<List<String>> deleteFuture = attrService.removeAll(request.getTenantId(), request.getEntityId(), request.getScope(), request.getKeys());
addMainCallback(deleteFuture, request.getCallback());
addWsCallback(deleteFuture, success -> onAttributesDelete(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getKeys(), request.isNotifyDevice()));
}
@Override
@ -207,7 +185,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) {
ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
addVoidCallback(deleteFuture, callback);
addMainCallback(deleteFuture, callback);
}
@Override
@ -229,7 +207,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback) {
ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries);
addVoidCallback(deleteFuture, callback);
addMainCallback(deleteFuture, callback);
addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list));
}
@ -260,15 +238,21 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
if (!entityViewLatest.isEmpty()) {
saveLatestAndNotify(tenantId, entityView.getId(), entityViewLatest, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {
}
@Override
public void onFailure(Throwable t) {
}
});
saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityView.getId())
.entries(entityViewLatest)
.onlyLatest(true)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void tmp) {}
@Override
public void onFailure(Throwable t) {
log.error("[{}][{}] Failed to save entity view latest timeseries: {}", tenantId, entityView.getId(), entityViewLatest, t);
}
})
.build());
}
}
}
@ -328,22 +312,8 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
});
}
private <S> void addVoidCallback(ListenableFuture<S> saveFuture, final FutureCallback<Void> callback) {
if (callback == null) return;
Futures.addCallback(saveFuture, new FutureCallback<S>() {
@Override
public void onSuccess(@Nullable S result) {
callback.onSuccess(null);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}, tsCallBackExecutor);
}
private <S> void addMainCallback(ListenableFuture<S> saveFuture, final FutureCallback<Void> callback) {
if (callback == null) return;
Futures.addCallback(saveFuture, new FutureCallback<S>() {
@Override
public void onSuccess(@Nullable S result) {

14
application/src/main/java/org/thingsboard/server/service/telemetry/InternalTelemetryService.java

@ -17,13 +17,12 @@ package org.thingsboard.server.service.telemetry;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.List;
@ -32,16 +31,11 @@ import java.util.List;
*/
public interface InternalTelemetryService extends RuleEngineTelemetryService {
ListenableFuture<Integer> saveInternal(TimeseriesSaveRequest request);
ListenableFuture<Integer> saveTimeseriesInternal(TimeseriesSaveRequest request);
void saveInternal(AttributesSaveRequest request);
void saveAttributesInternal(AttributesSaveRequest request);
void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
@Deprecated(since = "3.7.0")
void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback);
void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback);
void deleteAttributesInternal(AttributesDeleteRequest request);
void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);

4
application/src/test/java/org/thingsboard/server/controller/WebsocketApiTest.java

@ -806,7 +806,7 @@ public class WebsocketApiTest extends AbstractControllerTest {
private void sendTelemetry(Device device, List<TsKvEntry> tsData) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
tsService.save(TimeseriesSaveRequest.builder()
tsService.saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(device.getTenantId())
.entityId(device.getId())
.entries(tsData)
@ -833,7 +833,7 @@ public class WebsocketApiTest extends AbstractControllerTest {
private void sendAttributes(TenantId tenantId, EntityId entityId, TbAttributeSubscriptionScope scope, List<AttributeKvEntry> attrData) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
tsService.save(AttributesSaveRequest.builder()
tsService.saveAttributes(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
.scope(scope.getAttributeScope())

39
application/src/test/java/org/thingsboard/server/service/state/DefaultDeviceStateServiceTest.java

@ -24,7 +24,6 @@ import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.test.util.ReflectionTestUtils;
@ -211,7 +210,7 @@ public class DefaultDeviceStateServiceTest {
service.onDeviceConnect(tenantId, deviceId, lastConnectTime);
// THEN
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(LAST_CONNECT_TIME) &&
@ -298,7 +297,7 @@ public class DefaultDeviceStateServiceTest {
service.onDeviceDisconnect(tenantId, deviceId, lastDisconnectTime);
// THEN
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(LAST_DISCONNECT_TIME) &&
@ -421,13 +420,13 @@ public class DefaultDeviceStateServiceTest {
service.onDeviceInactivity(tenantId, deviceId, lastInactivityTime);
// THEN
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) &&
request.getEntries().get(0).getValue().equals(lastInactivityTime)
));
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
@ -465,12 +464,12 @@ public class DefaultDeviceStateServiceTest {
service.updateInactivityStateIfExpired(System.currentTimeMillis(), deviceId, deviceStateData);
// THEN
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME)
));
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
@ -627,7 +626,7 @@ public class DefaultDeviceStateServiceTest {
long newTimeout = System.currentTimeMillis() - deviceState.getLastActivityTime() + increase;
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
verify(telemetrySubscriptionService, never()).save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE)
));
Thread.sleep(defaultTimeout + increase);
@ -668,7 +667,7 @@ public class DefaultDeviceStateServiceTest {
long newTimeout = 1;
Thread.sleep(newTimeout);
verify(telemetrySubscriptionService, never()).save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE)
));
}
@ -730,13 +729,13 @@ public class DefaultDeviceStateServiceTest {
long newTimeout = 1;
service.onDeviceInactivityTimeoutUpdate(tenantId, deviceId, newTimeout);
verify(telemetrySubscriptionService, never()).save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
verify(telemetrySubscriptionService, never()).saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE)
));
}
private void activityVerify(boolean isActive) {
verify(telemetrySubscriptionService).save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
verify(telemetrySubscriptionService).saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) &&
request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(isActive)
@ -786,7 +785,7 @@ public class DefaultDeviceStateServiceTest {
// THEN
assertThat(deviceState.isActive()).isEqualTo(true);
assertThat(deviceState.getLastActivityTime()).isEqualTo(lastReportedActivity);
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) &&
request.getEntries().get(0).getKey().equals(LAST_ACTIVITY_TIME) &&
request.getEntries().get(0).getValue().equals(lastReportedActivity)
@ -794,7 +793,7 @@ public class DefaultDeviceStateServiceTest {
assertThat(deviceState.getLastInactivityAlarmTime()).isEqualTo(expectedInactivityAlarmTime);
if (shouldSetInactivityAlarmTimeToZero) {
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) &&
request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) &&
request.getEntries().get(0).getValue().equals(0L)
@ -802,7 +801,7 @@ public class DefaultDeviceStateServiceTest {
}
if (shouldUpdateActivityStateToActive) {
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) &&
request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(true)
@ -886,7 +885,7 @@ public class DefaultDeviceStateServiceTest {
assertThat(deviceState.getInactivityTimeout()).isEqualTo(newInactivityTimeout);
assertThat(deviceState.isActive()).isEqualTo(expectedActivityState);
if (activityState && !expectedActivityState) {
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(false)
));
@ -984,7 +983,7 @@ public class DefaultDeviceStateServiceTest {
assertThat(state.getLastInactivityAlarmTime()).isEqualTo(expectedLastInactivityAlarmTime);
if (shouldUpdateActivityStateToInactive) {
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(false)
));
@ -1002,7 +1001,7 @@ public class DefaultDeviceStateServiceTest {
assertThat(actualNotification.getDeviceId()).isEqualTo(deviceId);
assertThat(actualNotification.isActive()).isFalse();
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getTenantId().equals(TenantId.SYS_TENANT_ID) && request.getEntityId().equals(deviceId) &&
request.getScope().equals(AttributeScope.SERVER_SCOPE) &&
request.getEntries().get(0).getKey().equals(INACTIVITY_ALARM_TIME) &&
@ -1133,7 +1132,7 @@ public class DefaultDeviceStateServiceTest {
// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(false);
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(false)
));
@ -1164,7 +1163,7 @@ public class DefaultDeviceStateServiceTest {
// THEN
ArgumentCaptor<AttributesSaveRequest> attributeRequestCaptor = ArgumentCaptor.forClass(AttributesSaveRequest.class);
then(telemetrySubscriptionService).should(times(2)).save(attributeRequestCaptor.capture());
then(telemetrySubscriptionService).should(times(2)).saveAttributes(attributeRequestCaptor.capture());
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
@ -1231,7 +1230,7 @@ public class DefaultDeviceStateServiceTest {
// THEN
await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> {
assertThat(service.deviceStates.get(deviceId).getState().isActive()).isEqualTo(true);
then(telemetrySubscriptionService).should().save(argThat((ArgumentMatcher<AttributesSaveRequest>) request ->
then(telemetrySubscriptionService).should().saveAttributes(argThat(request ->
request.getEntityId().equals(deviceId) && request.getEntries().get(0).getKey().equals(ACTIVITY_STATE) &&
request.getEntries().get(0).getValue().equals(true)
));

6
common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java

@ -37,16 +37,10 @@ public interface AttributesService {
ListenableFuture<List<AttributeKvEntry>> findAll(TenantId tenantId, EntityId entityId, AttributeScope scope);
@Deprecated(since = "3.7.0")
ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes);
ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes);
ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute);
@Deprecated(since = "3.7.0")
ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys);
ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> attributeKeys);
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);

14
dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java

@ -101,14 +101,6 @@ public class BaseAttributesService implements AttributesService {
return attributesDao.save(tenantId, entityId, scope, attribute);
}
@Override
public ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
AttributeUtils.validate(attributes, valueNoXssValidation);
List<ListenableFuture<Long>> saveFutures = attributes.stream().map(attribute -> attributesDao.save(tenantId, entityId, AttributeScope.valueOf(scope), attribute)).collect(Collectors.toList());
return Futures.allAsList(saveFutures);
}
@Override
public ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
@ -117,12 +109,6 @@ public class BaseAttributesService implements AttributesService {
return Futures.allAsList(saveFutures);
}
@Override
public ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
validate(entityId, scope);
return Futures.allAsList(attributesDao.removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys));
}
@Override
public ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> attributeKeys) {
validate(entityId, scope);

10
dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java

@ -222,11 +222,6 @@ public class CachedAttributesService implements AttributesService {
return doSave(tenantId, entityId, scope, attribute);
}
@Override
public ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
return save(tenantId, entityId, AttributeScope.valueOf(scope), attributes);
}
@Override
public ListenableFuture<List<Long>> save(TenantId tenantId, EntityId entityId, AttributeScope scope, List<AttributeKvEntry> attributes) {
validate(entityId, scope);
@ -255,11 +250,6 @@ public class CachedAttributesService implements AttributesService {
log.trace("[{}][{}][{}] after cache put.", entityId, scope, key);
}
@Override
public ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, String scope, List<String> attributeKeys) {
return removeAll(tenantId, entityId, AttributeScope.valueOf(scope), attributeKeys);
}
@Override
public ListenableFuture<List<String>> removeAll(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> attributeKeys) {
validate(entityId, scope);

117
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java

@ -0,0 +1,117 @@
/**
* Copyright © 2016-2024 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.SettableFuture;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import java.util.List;
@Getter
@ToString
@AllArgsConstructor(access = AccessLevel.PRIVATE)
public class AttributesDeleteRequest {
private final TenantId tenantId;
private final EntityId entityId;
private final AttributeScope scope;
private final List<String> keys;
private final boolean notifyDevice;
private final FutureCallback<Void> callback;
public static Builder builder() {
return new Builder();
}
public static class Builder {
private TenantId tenantId;
private EntityId entityId;
private AttributeScope scope;
private List<String> keys;
private boolean notifyDevice;
private FutureCallback<Void> callback;
Builder() {}
public Builder tenantId(TenantId tenantId) {
this.tenantId = tenantId;
return this;
}
public Builder entityId(EntityId entityId) {
this.entityId = entityId;
return this;
}
public Builder scope(AttributeScope scope) {
this.scope = scope;
return this;
}
@Deprecated
public Builder scope(String scope) {
try {
this.scope = AttributeScope.valueOf(scope);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid attribute scope '" + scope + "'");
}
return this;
}
public Builder keys(List<String> keys) {
this.keys = keys;
return this;
}
public Builder notifyDevice(boolean notifyDevice) {
this.notifyDevice = notifyDevice;
return this;
}
public Builder callback(FutureCallback<Void> callback) {
this.callback = callback;
return this;
}
public Builder future(SettableFuture<Void> future) {
return callback(new FutureCallback<>() {
@Override
public void onSuccess(Void result) {
future.set(result);
}
@Override
public void onFailure(Throwable t) {
future.setException(t);
}
});
}
public AttributesDeleteRequest build() {
return new AttributesDeleteRequest(tenantId, entityId, scope, keys, notifyDevice, callback);
}
}
}

12
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java

@ -16,11 +16,9 @@
package org.thingsboard.rule.engine.api;
import com.google.common.util.concurrent.FutureCallback;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.Collection;
import java.util.List;
@ -30,15 +28,11 @@ import java.util.List;
*/
public interface RuleEngineTelemetryService {
void save(TimeseriesSaveRequest request);
void saveTimeseries(TimeseriesSaveRequest request);
void save(AttributesSaveRequest request);
void saveAttributes(AttributesSaveRequest request);
void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> keys, FutureCallback<Void> callback);
void deleteAndNotify(TenantId tenantId, EntityId entityId, AttributeScope scope, List<String> keys, boolean notifyDevice, FutureCallback<Void> callback);
void deleteAttributes(AttributesDeleteRequest request);
void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);

10
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesSaveRequest.java

@ -39,6 +39,7 @@ public class TimeseriesSaveRequest {
private final List<TsKvEntry> entries;
private final long ttl;
private final boolean saveLatest;
private final boolean onlyLatest;
private final FutureCallback<Void> callback;
public static Builder builder() {
@ -54,6 +55,7 @@ public class TimeseriesSaveRequest {
private long ttl;
private FutureCallback<Void> callback;
private boolean saveLatest = true;
private boolean onlyLatest;
Builder() {}
@ -95,6 +97,12 @@ public class TimeseriesSaveRequest {
return this;
}
public Builder onlyLatest(boolean onlyLatest) {
this.onlyLatest = onlyLatest;
this.saveLatest = true;
return this;
}
public Builder callback(FutureCallback<Void> callback) {
this.callback = callback;
return this;
@ -115,7 +123,7 @@ public class TimeseriesSaveRequest {
}
public TimeseriesSaveRequest build() {
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, callback);
return new TimeseriesSaveRequest(tenantId, customerId, entityId, entries, ttl, saveLatest, onlyLatest, callback);
}
}

12
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNode.java

@ -23,6 +23,7 @@ import com.google.gson.JsonPrimitive;
import jakarta.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleNode;
@ -106,14 +107,19 @@ public class TbCopyAttributesToEntityViewNode implements TbNode {
List<String> filteredAttributes =
attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr, entityView)).collect(Collectors.toList());
if (!filteredAttributes.isEmpty()) {
ctx.getTelemetryService().deleteAndNotify(ctx.getTenantId(), entityView.getId(), scope, filteredAttributes,
getFutureCallback(ctx, msg, entityView));
ctx.getTelemetryService().deleteAttributes(AttributesDeleteRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(entityView.getId())
.scope(scope)
.keys(filteredAttributes)
.callback(getFutureCallback(ctx, msg, entityView))
.build());
}
} else {
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(JsonParser.parseString(msg.getData()));
List<AttributeKvEntry> filteredAttributes =
attributes.stream().filter(attr -> attributeContainsInEntityView(scope, attr.getKey(), entityView)).collect(Collectors.toList());
ctx.getTelemetryService().save(AttributesSaveRequest.builder()
ctx.getTelemetryService().saveAttributes(AttributesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(entityView.getId())
.scope(scope)

4
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/math/TbMathNode.java

@ -145,7 +145,7 @@ public class TbMathNode implements TbNode {
private ListenableFuture<Void> saveTimeSeries(TbContext ctx, TbMsg msg, double result, TbMathResult mathResultDef) {
final BasicTsKvEntry basicTsKvEntry = new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry(mathResultDef.getKey(), result));
SettableFuture<Void> future = SettableFuture.create();
ctx.getTelemetryService().save(TimeseriesSaveRequest.builder()
ctx.getTelemetryService().saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.entry(basicTsKvEntry)
@ -165,7 +165,7 @@ public class TbMathNode implements TbNode {
kvEntry = new DoubleDataEntry(mathResultDef.getKey(), value);
}
SettableFuture<Void> future = SettableFuture.create();
ctx.getTelemetryService().save(AttributesSaveRequest.builder()
ctx.getTelemetryService().saveAttributes(AttributesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.scope(attributeScope)

2
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java

@ -119,7 +119,7 @@ public class TbMsgAttributesNode implements TbNode {
FutureCallback<Void> callback = sendAttributesUpdateNotification ?
new AttributesUpdateNodeCallback(ctx, msg, scope.name(), attributes) :
new TelemetryNodeCallback(ctx, msg);
ctx.getTelemetryService().save(AttributesSaveRequest.builder()
ctx.getTelemetryService().saveAttributes(AttributesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.scope(scope)

19
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.telemetry;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
@ -70,16 +71,16 @@ public class TbMsgDeleteAttributesNode implements TbNode {
ctx.tellSuccess(msg);
} else {
AttributeScope scope = getScope(msg.getMetaData().getValue(SCOPE));
ctx.getTelemetryService().deleteAndNotify(
ctx.getTenantId(),
msg.getOriginator(),
scope,
keysToDelete,
checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope),
config.isSendAttributesDeletedNotification() ?
ctx.getTelemetryService().deleteAttributes(AttributesDeleteRequest.builder()
.tenantId(ctx.getTenantId())
.entityId(msg.getOriginator())
.scope(scope)
.keys(keysToDelete)
.notifyDevice(checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope))
.callback(config.isSendAttributesDeletedNotification() ?
new AttributesDeleteNodeCallback(ctx, msg, scope.name(), keysToDelete) :
new TelemetryNodeCallback(ctx, msg)
);
new TelemetryNodeCallback(ctx, msg))
.build());
}
}

2
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java

@ -105,7 +105,7 @@ public class TbMsgTimeseriesNode implements TbNode {
if (ttl == 0L) {
ttl = tenantProfileDefaultStorageTtl;
}
ctx.getTelemetryService().save(TimeseriesSaveRequest.builder()
ctx.getTelemetryService().saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(ctx.getTenantId())
.customerId(msg.getCustomerId())
.entityId(msg.getOriginator())

30
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbCopyAttributesToEntityViewNodeTest.java

@ -15,7 +15,6 @@
*/
package org.thingsboard.rule.engine.action;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -24,9 +23,9 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.ThrowingConsumer;
import org.mockito.junit.jupiter.MockitoExtension;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
@ -56,7 +55,6 @@ import java.util.UUID;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.assertArg;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
@ -118,7 +116,7 @@ public class TbCopyAttributesToEntityViewNodeTest {
AttributesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryServiceMock).save(any(AttributesSaveRequest.class));
}).when(telemetryServiceMock).saveAttributes(any(AttributesSaveRequest.class));
TbMsg newMsg = TbMsg.newMsg(msg, msg.getQueueName(), msg.getRuleChainId(), msg.getRuleNodeId());
// TODO: use newMsg() with any(TbMsgType.class), replace in other tests as well.
doAnswer(invocation -> newMsg).when(ctxMock).newMsg(any(), any(String.class), any(), any(), any(), any());
@ -126,7 +124,7 @@ public class TbCopyAttributesToEntityViewNodeTest {
node.onMsg(ctxMock, msg);
verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID));
verify(telemetryServiceMock).save(assertArg((ThrowingConsumer<AttributesSaveRequest>) request -> {
verify(telemetryServiceMock).saveAttributes(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(TENANT_ID);
assertThat(request.getEntityId()).isEqualTo(ENTITY_VIEW_ID);
assertThat(request.getScope()).isEqualTo(AttributeScope.CLIENT_SCOPE);
@ -151,21 +149,23 @@ public class TbCopyAttributesToEntityViewNodeTest {
mockEntityViewLookup(entityView);
when(ctxMock.getTelemetryService()).thenReturn(telemetryServiceMock);
doAnswer(invocation -> {
FutureCallback<Void> callback = invocation.getArgument(4);
callback.onSuccess(null);
AttributesDeleteRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryServiceMock).deleteAndNotify(any(), any(), any(AttributeScope.class), anyList(), any(FutureCallback.class));
}).when(telemetryServiceMock).deleteAttributes(any());
TbMsg newMsg = TbMsg.newMsg(msg, msg.getQueueName(), msg.getRuleChainId(), msg.getRuleNodeId());
doAnswer(invocation -> newMsg).when(ctxMock).newMsg(any(), any(String.class), any(), any(), any(), any());
node.onMsg(ctxMock, msg);
verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID));
ArgumentCaptor<List<String>> filteredAttributesCaptor = ArgumentCaptor.forClass(List.class);
verify(telemetryServiceMock).deleteAndNotify(eq(TENANT_ID), eq(ENTITY_VIEW_ID), eq(AttributeScope.SERVER_SCOPE), filteredAttributesCaptor.capture(), any(FutureCallback.class));
List<String> filteredAttributesCaptorValue = filteredAttributesCaptor.getValue();
assertThat(filteredAttributesCaptorValue.size()).isEqualTo(1);
assertThat(filteredAttributesCaptorValue.get(0)).isEqualTo("serverAttribute1");
verify(telemetryServiceMock).deleteAttributes(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(TENANT_ID);
assertThat(request.getEntityId()).isEqualTo(ENTITY_VIEW_ID);
assertThat(request.getScope()).isEqualTo(AttributeScope.SERVER_SCOPE);
assertThat(request.getKeys().size()).isEqualTo(1);
assertThat(request.getKeys().get(0)).isEqualTo("serverAttribute1");
}));
verify(ctxMock).ack(eq(msg));
verify(ctxMock).enqueueForTellNext(eq(newMsg), eq(TbNodeConnectionType.SUCCESS));
verifyNoMoreInteractions(ctxMock, entityViewServiceMock, telemetryServiceMock);
@ -202,14 +202,14 @@ public class TbCopyAttributesToEntityViewNodeTest {
AttributesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryServiceMock).save(any(AttributesSaveRequest.class));
}).when(telemetryServiceMock).saveAttributes(any(AttributesSaveRequest.class));
TbMsg newMsg = TbMsg.newMsg(msg, msg.getQueueName(), msg.getRuleChainId(), msg.getRuleNodeId());
doAnswer(invocation -> newMsg).when(ctxMock).newMsg(any(), any(String.class), any(), any(), any(), any());
node.onMsg(ctxMock, msg);
verify(entityViewServiceMock).findEntityViewsByTenantIdAndEntityIdAsync(eq(TENANT_ID), eq(DEVICE_ID));
verify(telemetryServiceMock).save(assertArg((ThrowingConsumer<AttributesSaveRequest>) request -> {
verify(telemetryServiceMock).saveAttributes(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(TENANT_ID);
assertThat(request.getEntityId()).isEqualTo(ENTITY_VIEW_ID);
assertThat(request.getScope()).isEqualTo(AttributeScope.CLIENT_SCOPE);

12
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/math/TbMathNodeTest.java

@ -441,13 +441,13 @@ public class TbMathNodeTest {
AttributesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryService).save(any(AttributesSaveRequest.class));
}).when(telemetryService).saveAttributes(any(AttributesSaveRequest.class));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer<AttributesSaveRequest>) request -> {
verify(telemetryService, times(1)).saveAttributes(assertArg(request -> {
assertThat(request.getEntries()).singleElement().extracting(KvEntry::getValue).isInstanceOf(Double.class);
}));
@ -471,13 +471,13 @@ public class TbMathNodeTest {
TimeseriesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryService).save(any(TimeseriesSaveRequest.class));
}).when(telemetryService).saveTimeseries(any(TimeseriesSaveRequest.class));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer<TimeseriesSaveRequest>) request -> {
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
assertThat(request.getEntries()).size().isOne();
assertThat(request.isSaveLatest()).isTrue();
}));
@ -502,13 +502,13 @@ public class TbMathNodeTest {
TimeseriesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryService).save(any(TimeseriesSaveRequest.class));
}).when(telemetryService).saveTimeseries(any(TimeseriesSaveRequest.class));
node.onMsg(ctx, msg);
ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class);
verify(ctx, timeout(TIMEOUT)).tellSuccess(msgCaptor.capture());
verify(telemetryService, times(1)).save(assertArg((ThrowingConsumer<TimeseriesSaveRequest>) request -> {
verify(telemetryService, times(1)).saveTimeseries(assertArg(request -> {
assertThat(request.getEntries()).size().isOne();
assertThat(request.isSaveLatest()).isTrue();
}));

4
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNodeTest.java

@ -22,10 +22,8 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.ThrowingConsumer;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
@ -171,7 +169,7 @@ class TbMsgAttributesNodeTest extends AbstractRuleNodeUpgradeTest {
node.saveAttr(testAttrList, ctxMock, testTbMsg, AttributeScope.SHARED_SCOPE, false);
verify(telemetryServiceMock, times(1)).save(assertArg((ThrowingConsumer<AttributesSaveRequest>) request -> {
verify(telemetryServiceMock, times(1)).saveAttributes(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(tenantId);
assertThat(request.getEntityId()).isEqualTo(deviceId);
assertThat(request.getScope()).isEqualTo(AttributeScope.SHARED_SCOPE);

16
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNodeTest.java

@ -21,11 +21,11 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.RuleEngineTelemetryService;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.msg.TbMsgType;
@ -41,10 +41,9 @@ import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.assertArg;
import static org.mockito.BDDMockito.willAnswer;
import static org.mockito.BDDMockito.willReturn;
import static org.mockito.Mockito.mock;
@ -78,11 +77,10 @@ public class TbMsgDeleteAttributesNodeTest {
willReturn(telemetryService).given(ctx).getTelemetryService();
willAnswer(invocation -> {
TelemetryNodeCallback callBack = invocation.getArgument(5);
callBack.onSuccess(null);
AttributesDeleteRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).given(telemetryService).deleteAndNotify(
any(), any(), any(AttributeScope.class), anyList(), anyBoolean(), any());
}).given(telemetryService).deleteAttributes(any());
}
@AfterEach
@ -153,6 +151,8 @@ public class TbMsgDeleteAttributesNodeTest {
}
verify(ctx, times(1)).tellSuccess(newMsgCaptor.capture());
verify(ctx, never()).tellFailure(any(), any());
verify(telemetryService, times(1)).deleteAndNotify(any(), any(), any(AttributeScope.class), anyList(), eq(notifyDevice || notifyDeviceMetadata), any());
verify(telemetryService, times(1)).deleteAttributes(assertArg(request -> {
assertThat(request.isNotifyDevice()).isEqualTo(notifyDevice || notifyDeviceMetadata);
}));
}
}

10
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNodeTest.java

@ -130,12 +130,12 @@ public class TbMsgTimeseriesNodeTest {
TimeseriesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryServiceMock).save(any(TimeseriesSaveRequest.class));
}).when(telemetryServiceMock).saveTimeseries(any(TimeseriesSaveRequest.class));
node.onMsg(ctxMock, msg);
List<TsKvEntry> expectedList = getTsKvEntriesListWithTs(data, System.currentTimeMillis());
verify(telemetryServiceMock).save(assertArg((ThrowingConsumer<TimeseriesSaveRequest>) request -> {
verify(telemetryServiceMock).saveTimeseries(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(TENANT_ID);
assertThat(request.getCustomerId()).isNull();
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
@ -171,12 +171,12 @@ public class TbMsgTimeseriesNodeTest {
TimeseriesSaveRequest request = invocation.getArgument(0);
request.getCallback().onSuccess(null);
return null;
}).when(telemetryServiceMock).save(any(TimeseriesSaveRequest.class));
}).when(telemetryServiceMock).saveTimeseries(any(TimeseriesSaveRequest.class));
node.onMsg(ctxMock, msg);
List<TsKvEntry> expectedList = getTsKvEntriesListWithTs(data, ts);
verify(telemetryServiceMock).save(assertArg((ThrowingConsumer<TimeseriesSaveRequest>) request -> {
verify(telemetryServiceMock).saveTimeseries(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(TENANT_ID);
assertThat(request.getCustomerId()).isNull();
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);
@ -209,7 +209,7 @@ public class TbMsgTimeseriesNodeTest {
TbMsg msg = TbMsg.newMsg(TbMsgType.POST_TELEMETRY_REQUEST, DEVICE_ID, metadata, data);
node.onMsg(ctxMock, msg);
verify(telemetryServiceMock).save(assertArg((ThrowingConsumer<TimeseriesSaveRequest>) request -> {
verify(telemetryServiceMock).saveTimeseries(assertArg(request -> {
assertThat(request.getTenantId()).isEqualTo(TENANT_ID);
assertThat(request.getCustomerId()).isNull();
assertThat(request.getEntityId()).isEqualTo(DEVICE_ID);

Loading…
Cancel
Save