Browse Source

Merge pull request #12752 from irynamatveieva/cf-ts-attr-deletion

[WIP] Calculated fields: ts/attributes deletion
pull/12767/head
Andrew Shvayka 1 year ago
committed by GitHub
parent
commit
82b40dfad7
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 64
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  2. 5
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java
  3. 6
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java
  4. 22
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java
  5. 37
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java
  6. 4
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java
  7. 2
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java
  8. 50
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  9. 8
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java
  10. 8
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java
  11. 26
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  12. 8
      common/proto/src/main/proto/queue.proto
  13. 21
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java
  14. 21
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java
  15. 2
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java

64
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -24,11 +24,14 @@ import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.actors.TbActorCtx; import org.thingsboard.server.actors.TbActorCtx;
import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor; import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg; import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
@ -56,6 +59,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/** /**
@ -171,6 +175,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
} else if (proto.getAttrDataCount() > 0) { } else if (proto.getAttrDataCount() > 0) {
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
} else if (proto.getRemovedTsKeysCount() > 0) {
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto));
} else if (proto.getRemovedAttrKeysCount() > 0) {
processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithDefaultValue(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto));
} else { } else {
callback.onSuccess(CALLBACKS_PER_CF); callback.onSuccess(CALLBACKS_PER_CF);
} }
@ -189,6 +197,10 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
processTelemetry(ctx, proto, cfIdList, callback); processTelemetry(ctx, proto, cfIdList, callback);
} else if (proto.getAttrDataCount() > 0) { } else if (proto.getAttrDataCount() > 0) {
processAttributes(ctx, proto, cfIdList, callback); processAttributes(ctx, proto, cfIdList, callback);
} else if (proto.getRemovedTsKeysCount() > 0) {
processRemovedTelemetry(ctx, proto, cfIdList, callback);
} else if (proto.getRemovedAttrKeysCount() > 0) {
processRemovedAttributes(ctx, proto, cfIdList, callback);
} else { } else {
callback.onSuccess(CALLBACKS_PER_CF); callback.onSuccess(CALLBACKS_PER_CF);
} }
@ -209,6 +221,14 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto)); processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
} }
private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto));
}
private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithDefaultValue(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto));
}
private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback, private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback,
Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException { Map<String, ArgumentEntry> newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException {
if (newArgValues.isEmpty()) { if (newArgValues.isEmpty()) {
@ -310,7 +330,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
return mapToArguments(argNames, data); return mapToArguments(argNames, data);
} }
private static Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, List<TsKvProto> data) { private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, List<TsKvProto> data) {
if (argNames.isEmpty()) { if (argNames.isEmpty()) {
return Collections.emptyMap(); return Collections.emptyMap();
} }
@ -342,7 +362,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
return mapToArguments(argNames, scope, attrDataList); return mapToArguments(argNames, scope, attrDataList);
} }
private static Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) { private Map<String, ArgumentEntry> mapToArguments(Map<ReferencedEntityKey, String> argNames, AttributeScopeProto scope, List<AttributeValueProto> attrDataList) {
Map<String, ArgumentEntry> arguments = new HashMap<>(); Map<String, ArgumentEntry> arguments = new HashMap<>();
for (AttributeValueProto item : attrDataList) { for (AttributeValueProto item : attrDataList) {
ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
@ -354,6 +374,46 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
return arguments; return arguments;
} }
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List<String> removedAttrKeys) {
var argNames = ctx.getLinkedEntityArguments().get(entityId);
if (argNames.isEmpty()) {
return Collections.emptyMap();
}
return mapToArgumentsWithDefaultValue(argNames, ctx.getArguments(), scope, removedAttrKeys);
}
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, AttributeScopeProto scope, List<String> removedAttrKeys) {
return mapToArgumentsWithDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys);
}
private Map<String, ArgumentEntry> mapToArgumentsWithDefaultValue(Map<ReferencedEntityKey, String> argNames, Map<String, Argument> configArguments, AttributeScopeProto scope, List<String> removedAttrKeys) {
Map<String, ArgumentEntry> arguments = new HashMap<>();
for (String removedKey : removedAttrKeys) {
ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
String argName = argNames.get(key);
if (argName != null) {
Argument argument = configArguments.get(argName);
String defaultValue = (argument != null) ? argument.getDefaultValue() : null;
arguments.put(argName, StringUtils.isNotEmpty(defaultValue)
? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null)
: new SingleValueArgumentEntry());
}
}
return arguments;
}
private Map<String, ArgumentEntry> mapToArgumentsWithFetchedValue(CalculatedFieldCtx ctx, List<String> removedTelemetryKeys) {
Map<String, Argument> deletedArguments = ctx.getArguments().entrySet().stream()
.filter(entry -> removedTelemetryKeys.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, ArgumentEntry> fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, deletedArguments);
fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true));
return fetchedArgs;
}
private static List<CalculatedFieldId> getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) { private static List<CalculatedFieldId> getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) {
List<CalculatedFieldId> cfIds = new LinkedList<>(); List<CalculatedFieldId> cfIds = new LinkedList<>();
for (var cfId : proto.getPreviousCalculatedFieldsList()) { for (var cfId : proto.getPreviousCalculatedFieldsList()) {

5
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java

@ -17,20 +17,25 @@ package org.thingsboard.server.service.cf;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg; import org.thingsboard.server.actors.calculatedField.CalculatedFieldTelemetryMsg;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import java.util.List; import java.util.List;
import java.util.Map;
public interface CalculatedFieldProcessingService { public interface CalculatedFieldProcessingService {
ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId); ListenableFuture<CalculatedFieldState> fetchStateFromDb(CalculatedFieldCtx ctx, EntityId entityId);
Map<String, ArgumentEntry> fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map<String, Argument> arguments);
void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List<CalculatedFieldId> cfIds, TbCallback callback); void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculationResult, List<CalculatedFieldId> cfIds, TbCallback callback);
void pushMsgToLinks(CalculatedFieldTelemetryMsg msg, List<CalculatedFieldEntityCtxId> linkedCalculatedFields, TbCallback callback); void pushMsgToLinks(CalculatedFieldTelemetryMsg msg, List<CalculatedFieldEntityCtxId> linkedCalculatedFields, TbCallback callback);

6
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldQueueService.java

@ -16,7 +16,9 @@
package org.thingsboard.server.service.cf; package org.thingsboard.server.service.cf;
import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.FutureCallback;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.kv.TimeseriesSaveResult; import org.thingsboard.server.common.data.kv.TimeseriesSaveResult;
@ -34,4 +36,8 @@ public interface CalculatedFieldQueueService {
void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback); void pushRequestToQueue(AttributesSaveRequest request, List<Long> result, FutureCallback<Void> callback);
void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback);
void pushRequestToQueue(TimeseriesDeleteRequest request, List<String> result, FutureCallback<Void> callback);
} }

22
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java

@ -147,6 +147,28 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
}, calculatedFieldCallbackExecutor); }, calculatedFieldCallbackExecutor);
} }
@Override
public Map<String, ArgumentEntry> fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map<String, Argument> arguments) {
Map<String, ListenableFuture<ArgumentEntry>> argFutures = new HashMap<>();
for (var entry : arguments.entrySet()) {
var argEntityId = entry.getValue().getRefEntityId() != null ? entry.getValue().getRefEntityId() : entityId;
var argValueFuture = fetchKvEntry(tenantId, argEntityId, entry.getValue());
argFutures.put(entry.getKey(), argValueFuture);
}
return argFutures.entrySet().stream()
.collect(Collectors.toMap(
Entry::getKey, // Keep the key as is
entry -> {
try {
// Resolve the future to get the value
return entry.getValue().get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Error getting future result for key: " + entry.getKey(), e);
}
}
));
}
@Override @Override
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List<CalculatedFieldId> cfIds, TbCallback callback) { public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult calculatedFieldResult, List<CalculatedFieldId> cfIds, TbCallback callback) {
try { try {

37
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldQueueService.java

@ -21,7 +21,9 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.thingsboard.rule.engine.api.AttributesDeleteRequest;
import org.thingsboard.rule.engine.api.AttributesSaveRequest; import org.thingsboard.rule.engine.api.AttributesSaveRequest;
import org.thingsboard.rule.engine.api.TimeseriesDeleteRequest;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest; import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityType;
@ -48,6 +50,7 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.profile.TbAssetProfileCache; import org.thingsboard.server.service.profile.TbAssetProfileCache;
import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import java.util.ArrayList;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -104,6 +107,23 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback); () -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
} }
@Override
public void pushRequestToQueue(AttributesDeleteRequest request, List<String> result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result, request.getScope()), cf -> cf.linkMatchesAttrKeys(entityId, result, request.getScope()),
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
}
@Override
public void pushRequestToQueue(TimeseriesDeleteRequest request, List<String> result, FutureCallback<Void> callback) {
var tenantId = request.getTenantId();
var entityId = request.getEntityId();
checkEntityAndPushToQueue(tenantId, entityId, cf -> cf.matchesKeys(result), cf -> cf.linkMatchesTsKeys(entityId, result),
() -> toCalculatedFieldTelemetryMsgProto(request, result), callback);
}
private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId, private void checkEntityAndPushToQueue(TenantId tenantId, EntityId entityId,
Predicate<CalculatedFieldCtx> mainEntityFilter, Predicate<CalculatedFieldCtx> linkedEntityFilter, Predicate<CalculatedFieldCtx> mainEntityFilter, Predicate<CalculatedFieldCtx> linkedEntityFilter,
Supplier<ToCalculatedFieldMsg> msg, FutureCallback<Void> callback) { Supplier<ToCalculatedFieldMsg> msg, FutureCallback<Void> callback) {
@ -174,6 +194,23 @@ public class DefaultCalculatedFieldQueueService implements CalculatedFieldQueueS
return msg.build(); return msg.build();
} }
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(AttributesDeleteRequest request, List<String> removedKeys) {
CalculatedFieldTelemetryMsgProto telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), new ArrayList<>(), request.getTbMsgId(), request.getTbMsgType())
.setScope(AttributeScopeProto.valueOf(request.getScope().name()))
.addAllRemovedAttrKeys(removedKeys).build();
return ToCalculatedFieldMsg.newBuilder()
.setTelemetryMsg(telemetryMsg)
.build();
}
private ToCalculatedFieldMsg toCalculatedFieldTelemetryMsgProto(TimeseriesDeleteRequest request, List<String> removedKeys) {
CalculatedFieldTelemetryMsgProto telemetryMsg = buildTelemetryMsgProto(request.getTenantId(), request.getEntityId(), new ArrayList<>(), request.getTbMsgId(), request.getTbMsgType())
.addAllRemovedTsKeys(removedKeys).build();
return ToCalculatedFieldMsg.newBuilder()
.setTelemetryMsg(telemetryMsg)
.build();
}
private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) { private CalculatedFieldTelemetryMsgProto.Builder buildTelemetryMsgProto(TenantId tenantId, EntityId entityId, List<CalculatedFieldId> calculatedFieldIds, UUID tbMsgId, TbMsgType tbMsgType) {
CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder(); CalculatedFieldTelemetryMsgProto.Builder telemetryMsg = CalculatedFieldTelemetryMsgProto.newBuilder();

4
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java

@ -54,4 +54,8 @@ public interface ArgumentEntry {
TbelCfArg toTbelCfArg(); TbelCfArg toTbelCfArg();
boolean isForceResetPrevious();
void setForceResetPrevious(boolean forceResetPrevious);
} }

2
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

@ -55,7 +55,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
ArgumentEntry newEntry = entry.getValue(); ArgumentEntry newEntry = entry.getValue();
ArgumentEntry existingEntry = arguments.get(key); ArgumentEntry existingEntry = arguments.get(key);
if (existingEntry == null) { if (existingEntry == null || newEntry.isForceResetPrevious()) {
validateNewEntry(newEntry); validateNewEntry(newEntry);
arguments.put(key, newEntry); arguments.put(key, newEntry);
stateUpdated = true; stateUpdated = true;

50
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -169,7 +169,7 @@ public class CalculatedFieldCtx {
return map != null && matchesTimeSeries(map, values); return map != null && matchesTimeSeries(map, values);
} }
private static boolean matchesAttributes(Map<ReferencedEntityKey, String> argMap, List<AttributeKvEntry> values, AttributeScope scope) { private boolean matchesAttributes(Map<ReferencedEntityKey, String> argMap, List<AttributeKvEntry> values, AttributeScope scope) {
for (AttributeKvEntry attrKv : values) { for (AttributeKvEntry attrKv : values) {
ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope); ReferencedEntityKey attrKey = new ReferencedEntityKey(attrKv.getKey(), ArgumentType.ATTRIBUTE, scope);
if (argMap.containsKey(attrKey)) { if (argMap.containsKey(attrKey)) {
@ -193,18 +193,64 @@ public class CalculatedFieldCtx {
return false; return false;
} }
public boolean matchesKeys(List<String> keys, AttributeScope scope) {
return matchesAttributesKeys(mainEntityArguments, keys, scope);
}
public boolean matchesKeys(List<String> keys) {
return matchesTimeSeriesKeys(mainEntityArguments, keys);
}
private boolean matchesAttributesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys, AttributeScope scope) {
for (String key : keys) {
ReferencedEntityKey attrKey = new ReferencedEntityKey(key, ArgumentType.ATTRIBUTE, scope);
if (argMap.containsKey(attrKey)) {
return true;
}
}
return false;
}
private boolean matchesTimeSeriesKeys(Map<ReferencedEntityKey, String> argMap, List<String> keys) {
for (String key : keys) {
ReferencedEntityKey latestKey = new ReferencedEntityKey(key, ArgumentType.TS_LATEST, null);
if (argMap.containsKey(latestKey)) {
return true;
}
ReferencedEntityKey rollingKey = new ReferencedEntityKey(key, ArgumentType.TS_ROLLING, null);
if (argMap.containsKey(rollingKey)) {
return true;
}
}
return false;
}
public boolean linkMatchesAttrKeys(EntityId entityId, List<String> keys, AttributeScope scope) {
var map = linkedEntityArguments.get(entityId);
return map != null && matchesAttributesKeys(map, keys, scope);
}
public boolean linkMatchesTsKeys(EntityId entityId, List<String> keys) {
var map = linkedEntityArguments.get(entityId);
return map != null && matchesTimeSeriesKeys(map, keys);
}
public boolean linkMatches(EntityId entityId, CalculatedFieldTelemetryMsgProto proto) { public boolean linkMatches(EntityId entityId, CalculatedFieldTelemetryMsgProto proto) {
if (!proto.getTsDataList().isEmpty()) { if (!proto.getTsDataList().isEmpty()) {
List<TsKvEntry> updatedTelemetry = proto.getTsDataList().stream() List<TsKvEntry> updatedTelemetry = proto.getTsDataList().stream()
.map(ProtoUtils::fromProto) .map(ProtoUtils::fromProto)
.toList(); .toList();
return linkMatches(entityId, updatedTelemetry); return linkMatches(entityId, updatedTelemetry);
} else { } else if (!proto.getAttrDataList().isEmpty()) {
AttributeScope scope = AttributeScope.valueOf(proto.getScope().name()); AttributeScope scope = AttributeScope.valueOf(proto.getScope().name());
List<AttributeKvEntry> updatedTelemetry = proto.getAttrDataList().stream() List<AttributeKvEntry> updatedTelemetry = proto.getAttrDataList().stream()
.map(ProtoUtils::fromProto) .map(ProtoUtils::fromProto)
.toList(); .toList();
return linkMatches(entityId, updatedTelemetry, scope); return linkMatches(entityId, updatedTelemetry, scope);
} else if (!proto.getRemovedTsKeysList().isEmpty()) {
return linkMatchesTsKeys(entityId, proto.getRemovedTsKeysList());
} else {
return linkMatchesAttrKeys(entityId, proto.getRemovedAttrKeysList(), AttributeScope.valueOf(proto.getScope().name()));
} }
} }

8
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java

@ -38,6 +38,8 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
private BasicKvEntry kvEntryValue; private BasicKvEntry kvEntryValue;
private Long version; private Long version;
private boolean forceResetPrevious;
public SingleValueArgumentEntry(TsKvProto entry) { public SingleValueArgumentEntry(TsKvProto entry) {
this.ts = entry.getTs(); this.ts = entry.getTs();
this.version = entry.getVersion(); this.version = entry.getVersion();
@ -61,6 +63,12 @@ public class SingleValueArgumentEntry implements ArgumentEntry {
this.kvEntryValue = ProtoUtils.basicKvEntryFromKvEntry(entry); this.kvEntryValue = ProtoUtils.basicKvEntryFromKvEntry(entry);
} }
public SingleValueArgumentEntry(long ts, BasicKvEntry kvEntryValue, Long version) {
this.ts = ts;
this.kvEntryValue = kvEntryValue;
this.version = version;
}
@Override @Override
public ArgumentEntryType getType() { public ArgumentEntryType getType() {
return ArgumentEntryType.SINGLE_VALUE; return ArgumentEntryType.SINGLE_VALUE;

8
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/TsRollingArgumentEntry.java

@ -41,6 +41,8 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
private Long timeWindow; private Long timeWindow;
private TreeMap<Long, Double> tsRecords = new TreeMap<>(); private TreeMap<Long, Double> tsRecords = new TreeMap<>();
private boolean forceResetPrevious;
public TsRollingArgumentEntry(List<TsKvEntry> kvEntries, int limit, long timeWindow) { public TsRollingArgumentEntry(List<TsKvEntry> kvEntries, int limit, long timeWindow) {
this.limit = limit; this.limit = limit;
this.timeWindow = timeWindow; this.timeWindow = timeWindow;
@ -59,6 +61,12 @@ public class TsRollingArgumentEntry implements ArgumentEntry {
this.timeWindow = timeWindow; this.timeWindow = timeWindow;
} }
public TsRollingArgumentEntry(Integer limit, Long timeWindow, TreeMap<Long, Double> tsRecords) {
this.limit = limit;
this.timeWindow = timeWindow;
this.tsRecords = tsRecords;
}
@Override @Override
public ArgumentEntryType getType() { public ArgumentEntryType getType() {
return ArgumentEntryType.TS_ROLLING; return ArgumentEntryType.TS_ROLLING;

26
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -192,7 +192,9 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override @Override
public void deleteAttributesInternal(AttributesDeleteRequest request) { public void deleteAttributesInternal(AttributesDeleteRequest request) {
ListenableFuture<List<String>> deleteFuture = attrService.removeAll(request.getTenantId(), request.getEntityId(), request.getScope(), request.getKeys()); ListenableFuture<List<String>> deleteFuture = attrService.removeAll(request.getTenantId(), request.getEntityId(), request.getScope(), request.getKeys());
addMainCallback(deleteFuture, request.getCallback()); DonAsynchron.withCallback(deleteFuture, result -> {
calculatedFieldQueueService.pushRequestToQueue(request, result, request.getCallback());
}, safeCallback(request.getCallback()), tsCallBackExecutor);
addWsCallback(deleteFuture, success -> onAttributesDelete(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getKeys(), request.isNotifyDevice())); addWsCallback(deleteFuture, success -> onAttributesDelete(request.getTenantId(), request.getEntityId(), request.getScope().name(), request.getKeys(), request.isNotifyDevice()));
} }
@ -212,10 +214,14 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
deleteFuture = tsService.remove(request.getTenantId(), request.getEntityId(), request.getDeleteHistoryQueries()); deleteFuture = tsService.remove(request.getTenantId(), request.getEntityId(), request.getDeleteHistoryQueries());
addWsCallback(deleteFuture, result -> onTimeSeriesDelete(request.getTenantId(), request.getEntityId(), request.getKeys(), result)); addWsCallback(deleteFuture, result -> onTimeSeriesDelete(request.getTenantId(), request.getEntityId(), request.getKeys(), result));
} }
addMainCallback(deleteFuture, __ -> request.getCallback().onSuccess(request.getKeys()), request.getCallback()::onFailure); DonAsynchron.withCallback(deleteFuture, result -> {
calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), request.getKeys()));
}, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor);
} else { } else {
ListenableFuture<List<String>> deleteFuture = tsService.removeAllLatest(request.getTenantId(), request.getEntityId()); ListenableFuture<List<String>> deleteFuture = tsService.removeAllLatest(request.getTenantId(), request.getEntityId());
addMainCallback(deleteFuture, request.getCallback()::onSuccess, request.getCallback()::onFailure); DonAsynchron.withCallback(deleteFuture, result -> {
calculatedFieldQueueService.pushRequestToQueue(request, request.getKeys(), getCalculatedFieldCallback(request.getCallback(), result));
}, safeCallback(getCalculatedFieldCallback(request.getCallback(), request.getKeys())), tsCallBackExecutor);
} }
} }
@ -344,4 +350,18 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}; };
} }
private FutureCallback<Void> getCalculatedFieldCallback(FutureCallback<List<String>> originalCallback, List<String> keys) {
return new FutureCallback<Void>() {
@Override
public void onSuccess(Void unused) {
originalCallback.onSuccess(keys);
}
@Override
public void onFailure(Throwable t) {
originalCallback.onFailure(t);
}
};
}
} }

8
common/proto/src/main/proto/queue.proto

@ -783,9 +783,11 @@ message CalculatedFieldTelemetryMsgProto {
repeated TsKvProto tsData = 9; repeated TsKvProto tsData = 9;
AttributeScopeProto scope = 10; AttributeScopeProto scope = 10;
repeated AttributeValueProto attrData = 11; repeated AttributeValueProto attrData = 11;
int64 tbMsgIdMSB = 12; repeated string removedTsKeys = 12;
int64 tbMsgIdLSB = 13; repeated string removedAttrKeys = 13;
string tbMsgType = 14; int64 tbMsgIdMSB = 14;
int64 tbMsgIdLSB = 15;
string tbMsgType = 16;
} }
message CalculatedFieldLinkedTelemetryMsgProto { message CalculatedFieldLinkedTelemetryMsgProto {

21
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/AttributesDeleteRequest.java

@ -24,8 +24,10 @@ import lombok.ToString;
import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.msg.TbMsgType;
import java.util.List; import java.util.List;
import java.util.UUID;
@Getter @Getter
@ToString @ToString
@ -37,6 +39,8 @@ public class AttributesDeleteRequest {
private final AttributeScope scope; private final AttributeScope scope;
private final List<String> keys; private final List<String> keys;
private final boolean notifyDevice; private final boolean notifyDevice;
private final UUID tbMsgId;
private final TbMsgType tbMsgType;
private final FutureCallback<Void> callback; private final FutureCallback<Void> callback;
public static Builder builder() { public static Builder builder() {
@ -50,9 +54,12 @@ public class AttributesDeleteRequest {
private AttributeScope scope; private AttributeScope scope;
private List<String> keys; private List<String> keys;
private boolean notifyDevice; private boolean notifyDevice;
private UUID tbMsgId;
private TbMsgType tbMsgType;
private FutureCallback<Void> callback; private FutureCallback<Void> callback;
Builder() {} Builder() {
}
public Builder tenantId(TenantId tenantId) { public Builder tenantId(TenantId tenantId) {
this.tenantId = tenantId; this.tenantId = tenantId;
@ -89,6 +96,16 @@ public class AttributesDeleteRequest {
return this; return this;
} }
public Builder tbMsgId(UUID tbMsgId) {
this.tbMsgId = tbMsgId;
return this;
}
public Builder tbMsgType(TbMsgType tbMsgType) {
this.tbMsgType = tbMsgType;
return this;
}
public Builder callback(FutureCallback<Void> callback) { public Builder callback(FutureCallback<Void> callback) {
this.callback = callback; this.callback = callback;
return this; return this;
@ -109,7 +126,7 @@ public class AttributesDeleteRequest {
} }
public AttributesDeleteRequest build() { public AttributesDeleteRequest build() {
return new AttributesDeleteRequest(tenantId, entityId, scope, keys, notifyDevice, callback); return new AttributesDeleteRequest(tenantId, entityId, scope, keys, notifyDevice, tbMsgId, tbMsgType, callback);
} }
} }

21
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TimeseriesDeleteRequest.java

@ -23,8 +23,10 @@ import lombok.ToString;
import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.msg.TbMsgType;
import java.util.List; import java.util.List;
import java.util.UUID;
@Getter @Getter
@ToString @ToString
@ -35,6 +37,8 @@ public class TimeseriesDeleteRequest {
private final EntityId entityId; private final EntityId entityId;
private final List<String> keys; private final List<String> keys;
private final List<DeleteTsKvQuery> deleteHistoryQueries; private final List<DeleteTsKvQuery> deleteHistoryQueries;
private final UUID tbMsgId;
private final TbMsgType tbMsgType;
private final FutureCallback<List<String>> callback; private final FutureCallback<List<String>> callback;
public static Builder builder() { public static Builder builder() {
@ -47,9 +51,12 @@ public class TimeseriesDeleteRequest {
private EntityId entityId; private EntityId entityId;
private List<String> keys; private List<String> keys;
private List<DeleteTsKvQuery> deleteHistoryQueries; private List<DeleteTsKvQuery> deleteHistoryQueries;
private UUID tbMsgId;
private TbMsgType tbMsgType;
private FutureCallback<List<String>> callback; private FutureCallback<List<String>> callback;
Builder() {} Builder() {
}
public Builder tenantId(TenantId tenantId) { public Builder tenantId(TenantId tenantId) {
this.tenantId = tenantId; this.tenantId = tenantId;
@ -71,13 +78,23 @@ public class TimeseriesDeleteRequest {
return this; return this;
} }
public Builder tbMsgId(UUID tbMsgId) {
this.tbMsgId = tbMsgId;
return this;
}
public Builder tbMsgType(TbMsgType tbMsgType) {
this.tbMsgType = tbMsgType;
return this;
}
public Builder callback(FutureCallback<List<String>> callback) { public Builder callback(FutureCallback<List<String>> callback) {
this.callback = callback; this.callback = callback;
return this; return this;
} }
public TimeseriesDeleteRequest build() { public TimeseriesDeleteRequest build() {
return new TimeseriesDeleteRequest(tenantId, entityId, keys, deleteHistoryQueries, callback); return new TimeseriesDeleteRequest(tenantId, entityId, keys, deleteHistoryQueries, tbMsgId, tbMsgType, callback);
} }
} }

2
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgDeleteAttributesNode.java

@ -76,6 +76,8 @@ public class TbMsgDeleteAttributesNode implements TbNode {
.scope(scope) .scope(scope)
.keys(keysToDelete) .keys(keysToDelete)
.notifyDevice(checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope)) .notifyDevice(checkNotifyDevice(msg.getMetaData().getValue(NOTIFY_DEVICE_METADATA_KEY), scope))
.tbMsgId(msg.getId())
.tbMsgType(msg.getInternalType())
.callback(config.isSendAttributesDeletedNotification() ? .callback(config.isSendAttributesDeletedNotification() ?
new AttributesDeleteNodeCallback(ctx, msg, scope.name(), keysToDelete) : new AttributesDeleteNodeCallback(ctx, msg, scope.name(), keysToDelete) :
new TelemetryNodeCallback(ctx, msg)) new TelemetryNodeCallback(ctx, msg))

Loading…
Cancel
Save