From 49328d270bf7e7236f7235ddf933ec4a926bc68a Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Tue, 25 Feb 2025 17:13:05 +0200 Subject: [PATCH 1/3] wip ts/attributes deletion --- ...CalculatedFieldEntityMessageProcessor.java | 75 +++++++++++++++++-- .../cf/CalculatedFieldQueueService.java | 6 ++ .../DefaultCalculatedFieldQueueService.java | 37 +++++++++ .../cf/ctx/state/CalculatedFieldCtx.java | 48 +++++++++++- .../DefaultTelemetrySubscriptionService.java | 23 +++++- common/proto/src/main/proto/queue.proto | 8 +- .../engine/api/AttributesDeleteRequest.java | 21 +++++- .../engine/api/TimeseriesDeleteRequest.java | 21 +++++- .../telemetry/TbMsgDeleteAttributesNode.java | 2 + 9 files changed, 223 insertions(+), 18 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 2da53757ff..a53e77d241 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -24,11 +24,13 @@ import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.queue.TbCallback; @@ -164,7 +166,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (cfIds.contains(ctx.getCfId())) { callback.onSuccess(CALLBACKS_PER_CF); } else { - if (proto.getTsDataCount() > 0) { + if (proto.getRemovedTsKeysCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapDeletedAttributesToDefault(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); + } else if (proto.getRemovedAttrKeysCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapDeletedAttributesToDefault(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); + } else if (proto.getTsDataCount() > 0) { processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); } else if (proto.getAttrDataCount() > 0) { processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); @@ -182,7 +188,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (cfIds.contains(ctx.getCfId())) { callback.onSuccess(CALLBACKS_PER_CF); } else { - if (proto.getTsDataCount() > 0) { + if (proto.getRemovedTsKeysCount() > 0) { + processRemovedTelemetry(ctx, proto, cfIdList, callback); + } else if (proto.getRemovedAttrKeysCount() > 0) { + processRemovedAttributes(ctx, proto, cfIdList, callback); + } else if (proto.getTsDataCount() > 0) { processTelemetry(ctx, proto, cfIdList, callback); } else if (proto.getAttrDataCount() > 0) { processAttributes(ctx, proto, cfIdList, callback); @@ -191,23 +201,43 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } } catch (Exception e) { + if (e instanceof CalculatedFieldException) { + throw (CalculatedFieldException) e; + } throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } } - @SneakyThrows - private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) { + private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { + // reinit cf (consider fetching only removed ts) + log.info("Force reinitialization of CF: [{}].", ctx.getCfId()); + states.remove(ctx.getCfId()); + try { + var state = getOrInitState(ctx); + if (state.isSizeOk()) { + processStateIfReady(ctx, Collections.singletonList(ctx.getCfId()), state, null, null, callback); + } else { + throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); + } + } catch (Exception e) { + throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); + } + } + + private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { + processArgumentValuesUpdate(ctx, cfIdList, callback, mapDeletedAttributesToDefault(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); + } + + private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); } - @SneakyThrows - private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) { + private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); } - @SneakyThrows private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback, - Map newArgValues, UUID tbMsgId, TbMsgType tbMsgType) { + Map newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException { if (newArgValues.isEmpty()) { log.info("[{}] No new argument values to process for CF.", ctx.getCfId()); callback.onSuccess(CALLBACKS_PER_CF); @@ -348,6 +378,35 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return arguments; } + private Map mapDeletedAttributesToDefault(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List removedAttrKeys) { + var argNames = ctx.getLinkedEntityArguments().get(entityId); + if (argNames.isEmpty()) { + return Collections.emptyMap(); + } + return mapToArgumentsDefaultValue(argNames, ctx.getArguments(), scope, removedAttrKeys); + } + + private Map mapDeletedAttributesToDefault(CalculatedFieldCtx ctx, AttributeScopeProto scope, List removedAttrKeys) { + return mapToArgumentsDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys); + } + + private static Map mapToArgumentsDefaultValue(Map argNames, Map configArguments, AttributeScopeProto scope, List removedAttrKeys) { + Map arguments = new HashMap<>(); + for (String removedKey : removedAttrKeys) { + ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); + String argName = argNames.get(key); + if (argName != null) { + Argument argument = configArguments.get(argName); + String defaultValue = (argument != null) ? argument.getDefaultValue() : null; + arguments.put(argName, (defaultValue != null) + ? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null) + : new SingleValueArgumentEntry()); + + } + } + return arguments; + } + private static List getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) { List cfIds = new LinkedList<>(); for (var cfId : proto.getPreviousCalculatedFieldsList()) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java index b84b54af81..1b0d1462f2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java @@ -16,7 +16,9 @@ package org.thingsboard.server.service.cf; import com.google.common.util.concurrent.FutureCallback; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; @@ -34,4 +36,8 @@ public interface CalculatedFieldQueueService { void pushRequestToQueue(AttributesSaveRequest request, List result, FutureCallback callback); + void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback); + + void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback); + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java index 9d1f9b6db5..13905dbcce 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java @@ -21,7 +21,9 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import org.thingsboard.rule.engine.api.AttributesDeleteRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest; +import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.EntityType; @@ -48,6 +50,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache; +import java.util.ArrayList; import java.util.EnumSet; import java.util.List; import java.util.Set; @@ -104,6 +107,23 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); } + @Override + public void pushRequestToQueue(AttributesDeleteRequest request, List result, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result, request.getScope()), cf -> cf.linkMatchesAttrKeys(entityId, result, request.getScope()), + () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); + } + + @Override + public void pushRequestToQueue(TimeseriesDeleteRequest request, List result, FutureCallback callback) { + var tenantId = request.getTenantId(); + var entityId = request.getEntityId(); + + checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result), cf -> cf.linkMatchesTsKeys(entityId, result), + () -> toCalculatedFieldTelemetryMsgProto(request, result), callback); + } + private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId, Predicate mainEntityFilter, Predicate linkedEntityFilter, Supplier msg, FutureCallback callback) { @@ -174,6 +194,23 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS return msg.build(); } + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesDeleteRequest request, List removedKeys) { + CalculatedFieldTelemetryMsgProto telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), new ArrayList<>(), request.getTbMsgId(), request.getTbMsgType()) + .setScope(AttributeScopeProto.valueOf(request.getScope().name())) + .addAllRemovedAttrKeys(removedKeys).build(); + return ToCalculatedFieldMsg.newBuilder() + .setTelemetryMsg(telemetryMsg) + .build(); + } + + private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesDeleteRequest request, List removedKeys) { + CalculatedFieldTelemetryMsgProto telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), new ArrayList<>(), request.getTbMsgId(), request.getTbMsgType()) + .addAllRemovedTsKeys(removedKeys).build(); + return ToCalculatedFieldMsg.newBuilder() + .setTelemetryMsg(telemetryMsg) + .build(); + } + private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) { CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 4e4844dbe8..e254680019 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -151,10 +151,28 @@ public class CalculatedFieldCtx { } } + public boolean matchesKeys(List keys, AttributeScope scope) { + return matchesAttributesKeys(mainEntityArguments, keys, scope); + } + + public boolean matchesKeys(List keys) { + return matchesTimeSeriesKeys(mainEntityArguments, keys); + } + public boolean matches(List values, AttributeScope scope) { return matchesAttributes(mainEntityArguments, values, scope); } + public boolean linkMatchesAttrKeys(EntityId entityId, List keys, AttributeScope scope) { + var map = linkedEntityArguments.get(entityId); + return map != null && matchesAttributesKeys(map, keys, scope); + } + + public boolean linkMatchesTsKeys(EntityId entityId, List keys) { + var map = linkedEntityArguments.get(entityId); + return map != null && matchesTimeSeriesKeys(map, keys); + } + public boolean linkMatches(EntityId entityId, List values, AttributeScope scope) { var map = linkedEntityArguments.get(entityId); return map != null && matchesAttributes(map, values, scope); @@ -179,6 +197,16 @@ public class CalculatedFieldCtx { return false; } + private boolean matchesAttributesKeys(Map argMap, List keys, AttributeScope scope) { + for (String key : keys) { + ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope); + if (argMap.containsKey(attrKey)) { + return true; + } + } + return false; + } + private boolean matchesTimeSeries(Map argMap, List values) { for (TsKvEntry tsKv : values) { ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null); @@ -193,8 +221,26 @@ public class CalculatedFieldCtx { return false; } + private boolean matchesTimeSeriesKeys(Map argMap, List keys) { + for (String key : keys) { + ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null); + if (argMap.containsKey(latestKey)) { + return true; + } + ReferencedEntityKey rollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null); + if (argMap.containsKey(rollingKey)) { + return true; + } + } + return false; + } + public boolean linkMatches(EntityId entityId, CalculatedFieldTelemetryMsgProto proto) { - if (!proto.getTsDataList().isEmpty()) { + if (!proto.getRemovedTsKeysList().isEmpty()) { + return linkMatchesTsKeys(entityId, proto.getRemovedTsKeysList()); + } else if (!proto.getRemovedAttrKeysList().isEmpty()) { + return linkMatchesAttrKeys(entityId, proto.getRemovedAttrKeysList(), AttributeScope.valueOf(proto.getScope().name())); + } else if (!proto.getTsDataList().isEmpty()) { List updatedTelemetry = proto.getTsDataList().stream() .map(ProtoUtils::fromProto) .toList(); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 79222ca002..3f9b88b8d6 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -186,7 +186,9 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void deleteAttributesInternal(AttributesDeleteRequest request) { ListenableFuture> deleteFuture = attrService.removeAll(request.getTenantId(), request.getEntityId(), request.getScope(), request.getKeys()); - addMainCallback(deleteFuture, request.getCallback()); + DonAsynchron.withCallback(deleteFuture, result -> { + calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback()); + }, safeCallback(request.getCallback()), tsCallBackExecutor); addWsCallback(deleteFuture, success -> onAttributesDelete(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getKeys(), request.isNotifyDevice())); } @@ -206,7 +208,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer deleteFuture = tsService.remove(request.getTenantId(), request.getEntityId(), request.getDeleteHistoryQueries()); addWsCallback(deleteFuture, result -> onTimeSeriesDelete(request.getTenantId(), request.getEntityId(), request.getKeys(), result)); } - addMainCallback(deleteFuture, __ -> request.getCallback().onSuccess(request.getKeys()), request.getCallback()::onFailure); + DonAsynchron.withCallback(deleteFuture, result -> { + calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), request.getKeys())); + }, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor); +// addMainCallback(deleteFuture, __ -> request.getCallback().onSuccess(request.getKeys()), request.getCallback()::onFailure); } else { ListenableFuture> deleteFuture = tsService.removeAllLatest(request.getTenantId(), request.getEntityId()); addMainCallback(deleteFuture, request.getCallback()::onSuccess, request.getCallback()::onFailure); @@ -338,4 +343,18 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer }; } + private FutureCallback getCalculatedFieldCallback(FutureCallback> originalCallback, List keys) { + return new FutureCallback() { + @Override + public void onSuccess(Void unused) { + originalCallback.onSuccess(keys); + } + + @Override + public void onFailure(Throwable t) { + originalCallback.onFailure(t); + } + }; + } + } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 5fc7e7c59c..432edd2201 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -783,9 +783,11 @@ message CalculatedFieldTelemetryMsgProto { repeated TsKvProto tsData = 9; AttributeScopeProto scope = 10; repeated AttributeValueProto attrData = 11; - int64 tbMsgIdMSB = 12; - int64 tbMsgIdLSB = 13; - string tbMsgType = 14; + repeated string removedTsKeys = 12; + repeated string removedAttrKeys = 13; + int64 tbMsgIdMSB = 14; + int64 tbMsgIdLSB = 15; + string tbMsgType = 16; } message CalculatedFieldLinkedTelemetryMsgProto { diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java index 118c62e78c..e161c0f165 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java @@ -24,8 +24,10 @@ import lombok.ToString; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.msg.TbMsgType; import java.util.List; +import java.util.UUID; @Getter @ToString @@ -37,6 +39,8 @@ public class AttributesDeleteRequest { private final AttributeScope scope; private final List keys; private final boolean notifyDevice; + private final UUID tbMsgId; + private final TbMsgType tbMsgType; private final FutureCallback callback; public static Builder builder() { @@ -50,9 +54,12 @@ public class AttributesDeleteRequest { private AttributeScope scope; private List keys; private boolean notifyDevice; + private UUID tbMsgId; + private TbMsgType tbMsgType; private FutureCallback callback; - Builder() {} + Builder() { + } public Builder tenantId(TenantId tenantId) { this.tenantId = tenantId; @@ -89,6 +96,16 @@ public class AttributesDeleteRequest { return this; } + public Builder tbMsgId(UUID tbMsgId) { + this.tbMsgId = tbMsgId; + return this; + } + + public Builder tbMsgType(TbMsgType tbMsgType) { + this.tbMsgType = tbMsgType; + return this; + } + public Builder callback(FutureCallback callback) { this.callback = callback; return this; @@ -109,7 +126,7 @@ public class AttributesDeleteRequest { } public AttributesDeleteRequest build() { - return new AttributesDeleteRequest(tenantId, entityId, scope, keys, notifyDevice, callback); + return new AttributesDeleteRequest(tenantId, entityId, scope, keys, notifyDevice, tbMsgId, tbMsgType, callback); } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java index b124806fff..62fc28737c 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java @@ -23,8 +23,10 @@ import lombok.ToString; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.msg.TbMsgType; import java.util.List; +import java.util.UUID; @Getter @ToString @@ -35,6 +37,8 @@ public class TimeseriesDeleteRequest { private final EntityId entityId; private final List keys; private final List deleteHistoryQueries; + private final UUID tbMsgId; + private final TbMsgType tbMsgType; private final FutureCallback> callback; public static Builder builder() { @@ -47,9 +51,12 @@ public class TimeseriesDeleteRequest { private EntityId entityId; private List keys; private List deleteHistoryQueries; + private UUID tbMsgId; + private TbMsgType tbMsgType; private FutureCallback> callback; - Builder() {} + Builder() { + } public Builder tenantId(TenantId tenantId) { this.tenantId = tenantId; @@ -71,13 +78,23 @@ public class TimeseriesDeleteRequest { return this; } + public Builder tbMsgId(UUID tbMsgId) { + this.tbMsgId = tbMsgId; + return this; + } + + public Builder tbMsgType(TbMsgType tbMsgType) { + this.tbMsgType = tbMsgType; + return this; + } + public Builder callback(FutureCallback> callback) { this.callback = callback; return this; } public TimeseriesDeleteRequest build() { - return new TimeseriesDeleteRequest(tenantId, entityId, keys, deleteHistoryQueries, callback); + return new TimeseriesDeleteRequest(tenantId, entityId, keys, deleteHistoryQueries, tbMsgId, tbMsgType, callback); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java index a89a4f37d8..87a8b73c49 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java @@ -76,6 +76,8 @@ public class TbMsgDeleteAttributesNode implements TbNode { .scope(scope) .keys(keysToDelete) .notifyDevice(checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope)) + .tbMsgId(msg.getId()) + .tbMsgType(msg.getInternalType()) .callback(config.isSendAttributesDeletedNotification() ? new AttributesDeleteNodeCallback(ctx, msg, scope.name(), keysToDelete) : new TelemetryNodeCallback(ctx, msg)) From dd0b73cfb3d9ac56db83c98a660741a50fd4ec3b Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 26 Feb 2025 11:38:20 +0200 Subject: [PATCH 2/3] added implementation for ts deletion --- ...CalculatedFieldEntityMessageProcessor.java | 77 ++++++++++--------- .../cf/CalculatedFieldProcessingService.java | 5 ++ ...faultCalculatedFieldProcessingService.java | 22 ++++++ .../service/cf/ctx/state/ArgumentEntry.java | 4 + .../ctx/state/BaseCalculatedFieldState.java | 2 +- .../cf/ctx/state/CalculatedFieldCtx.java | 70 ++++++++--------- .../ctx/state/SingleValueArgumentEntry.java | 8 ++ .../cf/ctx/state/TsRollingArgumentEntry.java | 8 ++ .../DefaultTelemetrySubscriptionService.java | 5 +- 9 files changed, 126 insertions(+), 75 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index a53e77d241..adc4615faa 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -24,6 +24,7 @@ import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; @@ -58,6 +59,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** @@ -166,14 +168,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (cfIds.contains(ctx.getCfId())) { callback.onSuccess(CALLBACKS_PER_CF); } else { - if (proto.getRemovedTsKeysCount() > 0) { - processArgumentValuesUpdate(ctx, cfIds, callback, mapDeletedAttributesToDefault(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); - } else if (proto.getRemovedAttrKeysCount() > 0) { - processArgumentValuesUpdate(ctx, cfIds, callback, mapDeletedAttributesToDefault(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); - } else if (proto.getTsDataCount() > 0) { + if (proto.getTsDataCount() > 0) { processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); } else if (proto.getAttrDataCount() > 0) { processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); + } + if (proto.getRemovedTsKeysCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); + } else if (proto.getRemovedAttrKeysCount() > 0) { + processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithDefaultValue(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); } else { callback.onSuccess(CALLBACKS_PER_CF); } @@ -188,14 +191,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (cfIds.contains(ctx.getCfId())) { callback.onSuccess(CALLBACKS_PER_CF); } else { + if (proto.getTsDataCount() > 0) { + processTelemetry(ctx, proto, cfIdList, callback); + } else if (proto.getAttrDataCount() > 0) { + processAttributes(ctx, proto, cfIdList, callback); + } if (proto.getRemovedTsKeysCount() > 0) { processRemovedTelemetry(ctx, proto, cfIdList, callback); } else if (proto.getRemovedAttrKeysCount() > 0) { processRemovedAttributes(ctx, proto, cfIdList, callback); - } else if (proto.getTsDataCount() > 0) { - processTelemetry(ctx, proto, cfIdList, callback); - } else if (proto.getAttrDataCount() > 0) { - processAttributes(ctx, proto, cfIdList, callback); } else { callback.onSuccess(CALLBACKS_PER_CF); } @@ -208,26 +212,6 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } - private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { - // reinit cf (consider fetching only removed ts) - log.info("Force reinitialization of CF: [{}].", ctx.getCfId()); - states.remove(ctx.getCfId()); - try { - var state = getOrInitState(ctx); - if (state.isSizeOk()) { - processStateIfReady(ctx, Collections.singletonList(ctx.getCfId()), state, null, null, callback); - } else { - throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); - } - } catch (Exception e) { - throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); - } - } - - private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { - processArgumentValuesUpdate(ctx, cfIdList, callback, mapDeletedAttributesToDefault(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); - } - private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); } @@ -236,6 +220,14 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); } + private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { + processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto)); + } + + private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { + processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithDefaultValue(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto)); + } + private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback, Map newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException { if (newArgValues.isEmpty()) { @@ -334,7 +326,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return mapToArguments(argNames, data); } - private static Map mapToArguments(Map argNames, List data) { + private Map mapToArguments(Map argNames, List data) { if (argNames.isEmpty()) { return Collections.emptyMap(); } @@ -366,7 +358,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return mapToArguments(argNames, scope, attrDataList); } - private static Map mapToArguments(Map argNames, AttributeScopeProto scope, List attrDataList) { + private Map mapToArguments(Map argNames, AttributeScopeProto scope, List attrDataList) { Map arguments = new HashMap<>(); for (AttributeValueProto item : attrDataList) { ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); @@ -378,19 +370,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return arguments; } - private Map mapDeletedAttributesToDefault(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List removedAttrKeys) { + private Map mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List removedAttrKeys) { var argNames = ctx.getLinkedEntityArguments().get(entityId); if (argNames.isEmpty()) { return Collections.emptyMap(); } - return mapToArgumentsDefaultValue(argNames, ctx.getArguments(), scope, removedAttrKeys); + return mapToArgumentsWithDefaultValue(argNames, ctx.getArguments(), scope, removedAttrKeys); } - private Map mapDeletedAttributesToDefault(CalculatedFieldCtx ctx, AttributeScopeProto scope, List removedAttrKeys) { - return mapToArgumentsDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys); + private Map mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, AttributeScopeProto scope, List removedAttrKeys) { + return mapToArgumentsWithDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys); } - private static Map mapToArgumentsDefaultValue(Map argNames, Map configArguments, AttributeScopeProto scope, List removedAttrKeys) { + private Map mapToArgumentsWithDefaultValue(Map argNames, Map configArguments, AttributeScopeProto scope, List removedAttrKeys) { Map arguments = new HashMap<>(); for (String removedKey : removedAttrKeys) { ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); @@ -398,7 +390,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (argName != null) { Argument argument = configArguments.get(argName); String defaultValue = (argument != null) ? argument.getDefaultValue() : null; - arguments.put(argName, (defaultValue != null) + arguments.put(argName, StringUtils.isNotEmpty(defaultValue) ? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null) : new SingleValueArgumentEntry()); @@ -407,6 +399,17 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM return arguments; } + private Map mapToArgumentsWithFetchedValue(CalculatedFieldCtx ctx, List removedTelemetryKeys) { + Map deletedArguments = ctx.getArguments().entrySet().stream() + .filter(entry -> removedTelemetryKeys.contains(entry.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + Map fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, deletedArguments); + + fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true)); + return fetchedArgs; + } + private static List getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) { List cfIds = new LinkedList<>(); for (var cfId : proto.getPreviousCalculatedFieldsList()) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java index 24b428593a..c81690110c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java @@ -17,20 +17,25 @@ package org.thingsboard.server.service.cf; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; +import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import java.util.List; +import java.util.Map; public interface CalculatedFieldProcessingService { ListenableFuture fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); + Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments); + void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List cfIds, TbCallback callback); void pushMsgToLinks(CalculatedFieldTelemetryMsg msg, List linkedCalculatedFields, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 3abaea75b4..a98566b48c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -147,6 +147,28 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP }, calculatedFieldCallbackExecutor); } + @Override + public Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments) { + Map> argFutures = new HashMap<>(); + for (var entry : arguments.entrySet()) { + var argEntityId = entry.getValue().getRefEntityId() != null ? entry.getValue().getRefEntityId() : entityId; + var argValueFuture = fetchKvEntry(tenantId, argEntityId, entry.getValue()); + argFutures.put(entry.getKey(), argValueFuture); + } + return argFutures.entrySet().stream() + .collect(Collectors.toMap( + Entry::getKey, // Keep the key as is + entry -> { + try { + // Resolve the future to get the value + return entry.getValue().get(); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException("Error getting future result for key: " + entry.getKey(), e); + } + } + )); + } + @Override public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List cfIds, TbCallback callback) { try { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index 13656d5b0a..edbcb456c6 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -54,4 +54,8 @@ public interface ArgumentEntry { TbelCfArg toTbelCfArg(); + boolean isForceResetPrevious(); + + void setForceResetPrevious(boolean forceResetPrevious); + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index 0f77027650..e889fdb487 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -55,7 +55,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { ArgumentEntry newEntry = entry.getValue(); ArgumentEntry existingEntry = arguments.get(key); - if (existingEntry == null) { + if (existingEntry == null || newEntry.isForceResetPrevious()) { validateNewEntry(newEntry); arguments.put(key, newEntry); stateUpdated = true; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index e254680019..8cca5c3961 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -151,28 +151,10 @@ public class CalculatedFieldCtx { } } - public boolean matchesKeys(List keys, AttributeScope scope) { - return matchesAttributesKeys(mainEntityArguments, keys, scope); - } - - public boolean matchesKeys(List keys) { - return matchesTimeSeriesKeys(mainEntityArguments, keys); - } - public boolean matches(List values, AttributeScope scope) { return matchesAttributes(mainEntityArguments, values, scope); } - public boolean linkMatchesAttrKeys(EntityId entityId, List keys, AttributeScope scope) { - var map = linkedEntityArguments.get(entityId); - return map != null && matchesAttributesKeys(map, keys, scope); - } - - public boolean linkMatchesTsKeys(EntityId entityId, List keys) { - var map = linkedEntityArguments.get(entityId); - return map != null && matchesTimeSeriesKeys(map, keys); - } - public boolean linkMatches(EntityId entityId, List values, AttributeScope scope) { var map = linkedEntityArguments.get(entityId); return map != null && matchesAttributes(map, values, scope); @@ -187,7 +169,7 @@ public class CalculatedFieldCtx { return map != null && matchesTimeSeries(map, values); } - private static boolean matchesAttributes(Map argMap, List values, AttributeScope scope) { + private boolean matchesAttributes(Map argMap, List values, AttributeScope scope) { for (AttributeKvEntry attrKv : values) { ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope); if (argMap.containsKey(attrKey)) { @@ -197,16 +179,6 @@ public class CalculatedFieldCtx { return false; } - private boolean matchesAttributesKeys(Map argMap, List keys, AttributeScope scope) { - for (String key : keys) { - ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope); - if (argMap.containsKey(attrKey)) { - return true; - } - } - return false; - } - private boolean matchesTimeSeries(Map argMap, List values) { for (TsKvEntry tsKv : values) { ReferencedEntityKey latestKey = new ReferencedEntityKey(tsKv.getKey(), ArgumentType.TS_LATEST, null); @@ -221,6 +193,24 @@ public class CalculatedFieldCtx { return false; } + public boolean matchesKeys(List keys, AttributeScope scope) { + return matchesAttributesKeys(mainEntityArguments, keys, scope); + } + + public boolean matchesKeys(List keys) { + return matchesTimeSeriesKeys(mainEntityArguments, keys); + } + + private boolean matchesAttributesKeys(Map argMap, List keys, AttributeScope scope) { + for (String key : keys) { + ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope); + if (argMap.containsKey(attrKey)) { + return true; + } + } + return false; + } + private boolean matchesTimeSeriesKeys(Map argMap, List keys) { for (String key : keys) { ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null); @@ -235,22 +225,32 @@ public class CalculatedFieldCtx { return false; } + public boolean linkMatchesAttrKeys(EntityId entityId, List keys, AttributeScope scope) { + var map = linkedEntityArguments.get(entityId); + return map != null && matchesAttributesKeys(map, keys, scope); + } + + public boolean linkMatchesTsKeys(EntityId entityId, List keys) { + var map = linkedEntityArguments.get(entityId); + return map != null && matchesTimeSeriesKeys(map, keys); + } + public boolean linkMatches(EntityId entityId, CalculatedFieldTelemetryMsgProto proto) { - if (!proto.getRemovedTsKeysList().isEmpty()) { - return linkMatchesTsKeys(entityId, proto.getRemovedTsKeysList()); - } else if (!proto.getRemovedAttrKeysList().isEmpty()) { - return linkMatchesAttrKeys(entityId, proto.getRemovedAttrKeysList(), AttributeScope.valueOf(proto.getScope().name())); - } else if (!proto.getTsDataList().isEmpty()) { + if (!proto.getTsDataList().isEmpty()) { List updatedTelemetry = proto.getTsDataList().stream() .map(ProtoUtils::fromProto) .toList(); return linkMatches(entityId, updatedTelemetry); - } else { + } else if (!proto.getAttrDataList().isEmpty()) { AttributeScope scope = AttributeScope.valueOf(proto.getScope().name()); List updatedTelemetry = proto.getAttrDataList().stream() .map(ProtoUtils::fromProto) .toList(); return linkMatches(entityId, updatedTelemetry, scope); + } else if (!proto.getRemovedTsKeysList().isEmpty()) { + return linkMatchesTsKeys(entityId, proto.getRemovedTsKeysList()); + } else { + return linkMatchesAttrKeys(entityId, proto.getRemovedAttrKeysList(), AttributeScope.valueOf(proto.getScope().name())); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index c096233224..031629993d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -38,6 +38,8 @@ public class SingleValueArgumentEntry implements ArgumentEntry { private BasicKvEntry kvEntryValue; private Long version; + private boolean forceResetPrevious; + public SingleValueArgumentEntry(TsKvProto entry) { this.ts = entry.getTs(); this.version = entry.getVersion(); @@ -61,6 +63,12 @@ public class SingleValueArgumentEntry implements ArgumentEntry { this.kvEntryValue = ProtoUtils.basicKvEntryFromKvEntry(entry); } + public SingleValueArgumentEntry(long ts, BasicKvEntry kvEntryValue, Long version) { + this.ts = ts; + this.kvEntryValue = kvEntryValue; + this.version = version; + } + @Override public ArgumentEntryType getType() { return ArgumentEntryType.SINGLE_VALUE; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java index 866e2f5e09..b10e9f4c7a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java @@ -42,6 +42,8 @@ public class TsRollingArgumentEntry implements ArgumentEntry { private Long timeWindow; private TreeMap tsRecords = new TreeMap<>(); + private boolean forceResetPrevious; + public TsRollingArgumentEntry(List kvEntries, int limit, long timeWindow) { this.limit = limit; this.timeWindow = timeWindow; @@ -60,6 +62,12 @@ public class TsRollingArgumentEntry implements ArgumentEntry { this.timeWindow = timeWindow; } + public TsRollingArgumentEntry(Integer limit, Long timeWindow, TreeMap tsRecords) { + this.limit = limit; + this.timeWindow = timeWindow; + this.tsRecords = tsRecords; + } + @Override public ArgumentEntryType getType() { return ArgumentEntryType.TS_ROLLING; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 3f9b88b8d6..9f5b1cfea1 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -211,10 +211,11 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer DonAsynchron.withCallback(deleteFuture, result -> { calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), request.getKeys())); }, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor); -// addMainCallback(deleteFuture, __ -> request.getCallback().onSuccess(request.getKeys()), request.getCallback()::onFailure); } else { ListenableFuture> deleteFuture = tsService.removeAllLatest(request.getTenantId(), request.getEntityId()); - addMainCallback(deleteFuture, request.getCallback()::onSuccess, request.getCallback()::onFailure); + DonAsynchron.withCallback(deleteFuture, result -> { + calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), result)); + }, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor); } } From e5555c9d60ba86555c3f02339191ea9d3bfbf564 Mon Sep 17 00:00:00 2001 From: IrynaMatveieva Date: Wed, 26 Feb 2025 16:11:25 +0200 Subject: [PATCH 3/3] minor fixes --- .../calculatedField/CalculatedFieldEntityMessageProcessor.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 1a0f2ddd9a..f7fc204c0f 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -197,8 +197,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM processTelemetry(ctx, proto, cfIdList, callback); } else if (proto.getAttrDataCount() > 0) { processAttributes(ctx, proto, cfIdList, callback); - } - if (proto.getRemovedTsKeysCount() > 0) { + } else if (proto.getRemovedTsKeysCount() > 0) { processRemovedTelemetry(ctx, proto, cfIdList, callback); } else if (proto.getRemovedAttrKeysCount() > 0) { processRemovedAttributes(ctx, proto, cfIdList, callback);