diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 3ed11c7727..f7fc204c0f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -24,11 +24,14 @@ import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -56,6 +59,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** @@ -171,6 +175,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); } else if (proto.getAttrDataCount() > 0) { processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); + } else if (proto.getRemovedTsKeysCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); + } else if (proto.getRemovedAttrKeysCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithDefaultValue(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); } else { callback.onSuccess(CALLBACKS_PER_CF); } @@ -189,6 +197,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM processTelemetry(ctx, proto, cfIdList, callback); } else if (proto.getAttrDataCount() > 0) { processAttributes(ctx, proto, cfIdList, callback); + } else if (proto.getRemovedTsKeysCount() > 0) { + processRemovedTelemetry(ctx, proto, cfIdList, callback); + } else if (proto.getRemovedAttrKeysCount() > 0) { + processRemovedAttributes(ctx, proto, cfIdList, callback); } else { callback.onSuccess(CALLBACKS_PER_CF); } @@ -209,6 +221,14 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); } + private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { + processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); + } + + private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { + processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithDefaultValue(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); + } + private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback, Map newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException { if (newArgValues.isEmpty()) { @@ -310,7 +330,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return mapToArguments(argNames, data); } - private static Map mapToArguments(Map argNames, List data) { + private Map mapToArguments(Map argNames, List data) { if (argNames.isEmpty()) { return Collections.emptyMap(); } @@ -342,7 +362,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return mapToArguments(argNames, scope, attrDataList); } - private static Map mapToArguments(Map argNames, AttributeScopeProto scope, List attrDataList) { + private Map mapToArguments(Map argNames, AttributeScopeProto scope, List attrDataList) { Map arguments = new HashMap<>(); for (AttributeValueProto item : attrDataList) { ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); @@ -354,6 +374,46 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return arguments; } + private Map mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List removedAttrKeys) { + var argNames = ctx.getLinkedEntityArguments().get(entityId); + if (argNames.isEmpty()) { + return Collections.emptyMap(); + } + return mapToArgumentsWithDefaultValue(argNames, ctx.getArguments(), scope, removedAttrKeys); + } + + private Map mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, AttributeScopeProto scope, List removedAttrKeys) { + return mapToArgumentsWithDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys); + } + + private Map mapToArgumentsWithDefaultValue(Map argNames, Map configArguments, AttributeScopeProto scope, List removedAttrKeys) { + Map arguments = new HashMap<>(); + for (String removedKey : removedAttrKeys) { + ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); + String argName = argNames.get(key); + if (argName != null) { + Argument argument = configArguments.get(argName); + String defaultValue = (argument != null) ? argument.getDefaultValue() : null; + arguments.put(argName, StringUtils.isNotEmpty(defaultValue) + ? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null) + : new SingleValueArgumentEntry()); + + } + } + return arguments; + } + + private Map mapToArgumentsWithFetchedValue(CalculatedFieldCtx ctx, List removedTelemetryKeys) { + Map deletedArguments = ctx.getArguments().entrySet().stream() + .filter(entry -> removedTelemetryKeys.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Map fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, deletedArguments); + + fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true)); + return fetchedArgs; + } + private static List getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) { List cfIds = new LinkedList<>(); for (var cfId : proto.getPreviousCalculatedFieldsList()) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java index 996cb4a9d5..847caccaff 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java @@ -17,20 +17,25 @@ package org.thingsboard.server.service.cf; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; +import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import java.util.List; +import java.util.Map; public interface CalculatedFieldProcessingService { ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); + Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments); + void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List cfIds, TbCallback callback); void pushMsgToLinks(CalculatedFieldTelemetryMsg msg, List linkedCalculatedFields, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java index fdb42ce1b0..b688993a8d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java @@ -16,7 +16,9 @@ package org.thingsboard.server.service.cf; import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; @@ -34,4 +36,8 @@ public interface CalculatedFieldQueueService { void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); + void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback); + + void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback); + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 217e09fc31..5ca4993d3f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -147,6 +147,28 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP }, calculatedFieldCallbackExecutor); } + @Override + public Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments) { + Map> argFutures = new HashMap<>(); + for (var entry : arguments.entrySet()) { + var argEntityId = entry.getValue().getRefEntityId() != null ? entry.getValue().getRefEntityId() : entityId; + var argValueFuture = fetchKvEntry(tenantId, argEntityId, entry.getValue()); + argFutures.put(entry.getKey(), argValueFuture); + } + return argFutures.entrySet().stream() + .collect(Collectors.toMap( + Entry::getKey, // Keep the key as is + entry -> { + try { + // Resolve the future to get the value + return entry.getValue().get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Error getting future result for key: " + entry.getKey(), e); + } + } + )); + } + @Override public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List cfIds, TbCallback callback) { try { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 81e82c26a8..7421dfe187 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -21,7 +21,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.EntityType; @@ -48,6 +50,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Set; @@ -104,6 +107,23 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } + @Override + public void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result, request.getScope()), cf -> cf.linkMatchesAttrKeys(entityId, result, request.getScope()), + () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); + } + + @Override + public void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result), cf -> cf.linkMatchesTsKeys(entityId, result), + () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); + } + private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId, Predicate mainEntityFilter, Predicate linkedEntityFilter, Supplier msg, FutureCallback callback) { @@ -174,6 +194,23 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS return msg.build(); } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesDeleteRequest request, List removedKeys) { + CalculatedFieldTelemetryMsgProto telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), new ArrayList<>(), request.getTbMsgId(), request.getTbMsgType()) + .setScope(AttributeScopeProto.valueOf(request.getScope().name())) + .addAllRemovedAttrKeys(removedKeys).build(); + return ToCalculatedFieldMsg.newBuilder() + .setTelemetryMsg(telemetryMsg) + .build(); + } + + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesDeleteRequest request, List removedKeys) { + CalculatedFieldTelemetryMsgProto telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), new ArrayList<>(), request.getTbMsgId(), request.getTbMsgType()) + .addAllRemovedTsKeys(removedKeys).build(); + return ToCalculatedFieldMsg.newBuilder() + .setTelemetryMsg(telemetryMsg) + .build(); + } + private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) { CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index 0b2ccb347f..6f827065af 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -54,4 +54,8 @@ public interface ArgumentEntry { TbelCfArg toTbelCfArg(); + boolean isForceResetPrevious(); + + void setForceResetPrevious(boolean forceResetPrevious); + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index f325a66c83..d59a58f296 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -55,7 +55,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { ArgumentEntry newEntry = entry.getValue(); ArgumentEntry existingEntry = arguments.get(key); - if (existingEntry == null) { + if (existingEntry == null || newEntry.isForceResetPrevious()) { validateNewEntry(newEntry); arguments.put(key, newEntry); stateUpdated = true; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index e42466b30e..d3b7020c25 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -169,7 +169,7 @@ public class CalculatedFieldCtx { return map != null && matchesTimeSeries(map, values); } - private static boolean matchesAttributes(Map argMap, List values, AttributeScope scope) { + private boolean matchesAttributes(Map argMap, List values, AttributeScope scope) { for (AttributeKvEntry attrKv : values) { ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope); if (argMap.containsKey(attrKey)) { @@ -193,18 +193,64 @@ public class CalculatedFieldCtx { return false; } + public boolean matchesKeys(List keys, AttributeScope scope) { + return matchesAttributesKeys(mainEntityArguments, keys, scope); + } + + public boolean matchesKeys(List keys) { + return matchesTimeSeriesKeys(mainEntityArguments, keys); + } + + private boolean matchesAttributesKeys(Map argMap, List keys, AttributeScope scope) { + for (String key : keys) { + ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope); + if (argMap.containsKey(attrKey)) { + return true; + } + } + return false; + } + + private boolean matchesTimeSeriesKeys(Map argMap, List keys) { + for (String key : keys) { + ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null); + if (argMap.containsKey(latestKey)) { + return true; + } + ReferencedEntityKey rollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null); + if (argMap.containsKey(rollingKey)) { + return true; + } + } + return false; + } + + public boolean linkMatchesAttrKeys(EntityId entityId, List keys, AttributeScope scope) { + var map = linkedEntityArguments.get(entityId); + return map != null && matchesAttributesKeys(map, keys, scope); + } + + public boolean linkMatchesTsKeys(EntityId entityId, List keys) { + var map = linkedEntityArguments.get(entityId); + return map != null && matchesTimeSeriesKeys(map, keys); + } + public boolean linkMatches(EntityId entityId, CalculatedFieldTelemetryMsgProto proto) { if (!proto.getTsDataList().isEmpty()) { List updatedTelemetry = proto.getTsDataList().stream() .map(ProtoUtils::fromProto) .toList(); return linkMatches(entityId, updatedTelemetry); - } else { + } else if (!proto.getAttrDataList().isEmpty()) { AttributeScope scope = AttributeScope.valueOf(proto.getScope().name()); List updatedTelemetry = proto.getAttrDataList().stream() .map(ProtoUtils::fromProto) .toList(); return linkMatches(entityId, updatedTelemetry, scope); + } else if (!proto.getRemovedTsKeysList().isEmpty()) { + return linkMatchesTsKeys(entityId, proto.getRemovedTsKeysList()); + } else { + return linkMatchesAttrKeys(entityId, proto.getRemovedAttrKeysList(), AttributeScope.valueOf(proto.getScope().name())); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index 55620f78ca..2e8bb63f5f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -38,6 +38,8 @@ public class SingleValueArgumentEntry implements ArgumentEntry { private BasicKvEntry kvEntryValue; private Long version; + private boolean forceResetPrevious; + public SingleValueArgumentEntry(TsKvProto entry) { this.ts = entry.getTs(); this.version = entry.getVersion(); @@ -61,6 +63,12 @@ public class SingleValueArgumentEntry implements ArgumentEntry { this.kvEntryValue = ProtoUtils.basicKvEntryFromKvEntry(entry); } + public SingleValueArgumentEntry(long ts, BasicKvEntry kvEntryValue, Long version) { + this.ts = ts; + this.kvEntryValue = kvEntryValue; + this.version = version; + } + @Override public ArgumentEntryType getType() { return ArgumentEntryType.SINGLE_VALUE; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java index 3e4990ea8a..1114d993a3 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java @@ -41,6 +41,8 @@ public class TsRollingArgumentEntry implements ArgumentEntry { private Long timeWindow; private TreeMap tsRecords = new TreeMap<>(); + private boolean forceResetPrevious; + public TsRollingArgumentEntry(List kvEntries, int limit, long timeWindow) { this.limit = limit; this.timeWindow = timeWindow; @@ -59,6 +61,12 @@ public class TsRollingArgumentEntry implements ArgumentEntry { this.timeWindow = timeWindow; } + public TsRollingArgumentEntry(Integer limit, Long timeWindow, TreeMap tsRecords) { + this.limit = limit; + this.timeWindow = timeWindow; + this.tsRecords = tsRecords; + } + @Override public ArgumentEntryType getType() { return ArgumentEntryType.TS_ROLLING; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 62be1c73e4..2bdbd917ab 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -192,7 +192,9 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void deleteAttributesInternal(AttributesDeleteRequest request) { ListenableFuture> deleteFuture = attrService.removeAll(request.getTenantId(), request.getEntityId(), request.getScope(), request.getKeys()); - addMainCallback(deleteFuture, request.getCallback()); + DonAsynchron.withCallback(deleteFuture, result -> { + calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback()); + }, safeCallback(request.getCallback()), tsCallBackExecutor); addWsCallback(deleteFuture, success -> onAttributesDelete(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getKeys(), request.isNotifyDevice())); } @@ -212,10 +214,14 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer deleteFuture = tsService.remove(request.getTenantId(), request.getEntityId(), request.getDeleteHistoryQueries()); addWsCallback(deleteFuture, result -> onTimeSeriesDelete(request.getTenantId(), request.getEntityId(), request.getKeys(), result)); } - addMainCallback(deleteFuture, __ -> request.getCallback().onSuccess(request.getKeys()), request.getCallback()::onFailure); + DonAsynchron.withCallback(deleteFuture, result -> { + calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), request.getKeys())); + }, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor); } else { ListenableFuture> deleteFuture = tsService.removeAllLatest(request.getTenantId(), request.getEntityId()); - addMainCallback(deleteFuture, request.getCallback()::onSuccess, request.getCallback()::onFailure); + DonAsynchron.withCallback(deleteFuture, result -> { + calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), result)); + }, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor); } } @@ -344,4 +350,18 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer }; } + private FutureCallback getCalculatedFieldCallback(FutureCallback> originalCallback, List keys) { + return new FutureCallback() { + @Override + public void onSuccess(Void unused) { + originalCallback.onSuccess(keys); + } + + @Override + public void onFailure(Throwable t) { + originalCallback.onFailure(t); + } + }; + } + } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 025520da3b..fe412dbc0c 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -783,9 +783,11 @@ message CalculatedFieldTelemetryMsgProto { repeated TsKvProto tsData = 9; AttributeScopeProto scope = 10; repeated AttributeValueProto attrData = 11; - int64 tbMsgIdMSB = 12; - int64 tbMsgIdLSB = 13; - string tbMsgType = 14; + repeated string removedTsKeys = 12; + repeated string removedAttrKeys = 13; + int64 tbMsgIdMSB = 14; + int64 tbMsgIdLSB = 15; + string tbMsgType = 16; } message CalculatedFieldLinkedTelemetryMsgProto { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java index f1142443ea..77de99aae0 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java @@ -24,8 +24,10 @@ import lombok.ToString; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; import java.util.List; +import java.util.UUID; @Getter @ToString @@ -37,6 +39,8 @@ public class AttributesDeleteRequest { private final AttributeScope scope; private final List keys; private final boolean notifyDevice; + private final UUID tbMsgId; + private final TbMsgType tbMsgType; private final FutureCallback callback; public static Builder builder() { @@ -50,9 +54,12 @@ public class AttributesDeleteRequest { private AttributeScope scope; private List keys; private boolean notifyDevice; + private UUID tbMsgId; + private TbMsgType tbMsgType; private FutureCallback callback; - Builder() {} + Builder() { + } public Builder tenantId(TenantId tenantId) { this.tenantId = tenantId; @@ -89,6 +96,16 @@ public class AttributesDeleteRequest { return this; } + public Builder tbMsgId(UUID tbMsgId) { + this.tbMsgId = tbMsgId; + return this; + } + + public Builder tbMsgType(TbMsgType tbMsgType) { + this.tbMsgType = tbMsgType; + return this; + } + public Builder callback(FutureCallback callback) { this.callback = callback; return this; @@ -109,7 +126,7 @@ public class AttributesDeleteRequest { } public AttributesDeleteRequest build() { - return new AttributesDeleteRequest(tenantId, entityId, scope, keys, notifyDevice, callback); + return new AttributesDeleteRequest(tenantId, entityId, scope, keys, notifyDevice, tbMsgId, tbMsgType, callback); } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java index 01cad78b98..73d18268de 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java @@ -23,8 +23,10 @@ import lombok.ToString; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.msg.TbMsgType; import java.util.List; +import java.util.UUID; @Getter @ToString @@ -35,6 +37,8 @@ public class TimeseriesDeleteRequest { private final EntityId entityId; private final List keys; private final List deleteHistoryQueries; + private final UUID tbMsgId; + private final TbMsgType tbMsgType; private final FutureCallback> callback; public static Builder builder() { @@ -47,9 +51,12 @@ public class TimeseriesDeleteRequest { private EntityId entityId; private List keys; private List deleteHistoryQueries; + private UUID tbMsgId; + private TbMsgType tbMsgType; private FutureCallback> callback; - Builder() {} + Builder() { + } public Builder tenantId(TenantId tenantId) { this.tenantId = tenantId; @@ -71,13 +78,23 @@ public class TimeseriesDeleteRequest { return this; } + public Builder tbMsgId(UUID tbMsgId) { + this.tbMsgId = tbMsgId; + return this; + } + + public Builder tbMsgType(TbMsgType tbMsgType) { + this.tbMsgType = tbMsgType; + return this; + } + public Builder callback(FutureCallback> callback) { this.callback = callback; return this; } public TimeseriesDeleteRequest build() { - return new TimeseriesDeleteRequest(tenantId, entityId, keys, deleteHistoryQueries, callback); + return new TimeseriesDeleteRequest(tenantId, entityId, keys, deleteHistoryQueries, tbMsgId, tbMsgType, callback); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java index a8896b5f6b..585181423d 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java @@ -76,6 +76,8 @@ public class TbMsgDeleteAttributesNode implements TbNode { .scope(scope) .keys(keysToDelete) .notifyDevice(checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope)) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) .callback(config.isSendAttributesDeletedNotification() ? new AttributesDeleteNodeCallback(ctx, msg, scope.name(), keysToDelete) : new TelemetryNodeCallback(ctx, msg))