diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 434f555325..ca97dae51b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -52,7 +52,6 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; @@ -232,7 +231,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } else { if (state instanceof RelatedEntitiesAggregationCalculatedFieldState relatedEntitiesAggState) { Map fetchedArgs = cfService.fetchArgsFromDb(tenantId, msg.getRelatedEntityId(), ctx.getArguments()); - updatedArgs = relatedEntitiesAggState.updateEntityData(toAggSingleEntityArguments(msg.getRelatedEntityId(), fetchedArgs)); + updatedArgs = relatedEntitiesAggState.updateEntityData(setEntityIdToSingleEntityArguments(msg.getRelatedEntityId(), fetchedArgs)); } state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); @@ -544,7 +543,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM ReferencedEntityKey key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_LATEST, null); String argName = relatedEntityArgs.get(key); if (argName != null) { - arguments.put(argName, new AggSingleEntityArgumentEntry(originator, item)); + arguments.put(argName, new SingleValueArgumentEntry(originator, item)); } } } @@ -597,7 +596,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name())); String argName = relatedEntityArgs.get(key); if (argName != null) { - arguments.put(argName, new AggSingleEntityArgumentEntry(entityId, item)); + arguments.put(argName, new SingleValueArgumentEntry(entityId, item)); } } } @@ -636,7 +635,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM SingleValueArgumentEntry argumentEntry = StringUtils.isNotEmpty(defaultValue) ? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null) : new SingleValueArgumentEntry(); - arguments.put(argName, new AggSingleEntityArgumentEntry(msgEntityId, argumentEntry)); + arguments.put(argName, new SingleValueArgumentEntry(msgEntityId, argumentEntry)); } } } @@ -668,18 +667,18 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM Map fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, deletedArguments); if (CalculatedFieldType.RELATED_ENTITIES_AGGREGATION.equals(ctx.getCfType())) { - fetchedArgs = toAggSingleEntityArguments(entityId, fetchedArgs); + fetchedArgs = setEntityIdToSingleEntityArguments(entityId, fetchedArgs); } fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true)); return fetchedArgs; } - private Map toAggSingleEntityArguments(EntityId relatedEntityId, Map fetchedArgs) { + private Map setEntityIdToSingleEntityArguments(EntityId relatedEntityId, Map fetchedArgs) { return fetchedArgs.entrySet().stream() .collect(Collectors.toMap( Map.Entry::getKey, - argEntry -> new AggSingleEntityArgumentEntry(relatedEntityId, argEntry.getValue()) + argEntry -> new SingleValueArgumentEntry(relatedEntityId, argEntry.getValue()) )); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java index cfd7c9af3e..d4bf0d52be 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java @@ -255,7 +255,7 @@ public abstract class AbstractCalculatedFieldProcessingService { List>> futures = aggEntities.stream() .map(entityId -> { ListenableFuture argumentEntryFut = fetchArgumentValue(tenantId, entityId, argument, startTs); - return Futures.transform(argumentEntryFut, argumentEntry -> Map.entry(entityId, ArgumentEntry.createAggSingleArgument(entityId, argumentEntry)), MoreExecutors.directExecutor()); + return Futures.transform(argumentEntryFut, argumentEntry -> Map.entry(entityId, ArgumentEntry.createSingleValueArgument(entityId, argumentEntry)), MoreExecutors.directExecutor()); }) .toList(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index 863d6f5b50..b331c11a47 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -22,7 +22,6 @@ import org.thingsboard.script.api.tbel.TbelCfArg; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry; @@ -40,8 +39,7 @@ import java.util.Map; @JsonSubTypes.Type(value = TsRollingArgumentEntry.class, name = "TS_ROLLING"), @JsonSubTypes.Type(value = GeofencingArgumentEntry.class, name = "GEOFENCING"), @JsonSubTypes.Type(value = PropagationArgumentEntry.class, name = "PROPAGATION"), - @JsonSubTypes.Type(value = RelatedEntitiesArgumentEntry.class, name = "RELATED_ENTITIES"), - @JsonSubTypes.Type(value = AggSingleEntityArgumentEntry.class, name = "AGGREGATE_LATEST_SINGLE") + @JsonSubTypes.Type(value = RelatedEntitiesArgumentEntry.class, name = "RELATED_ENTITIES") }) public interface ArgumentEntry { @@ -64,6 +62,10 @@ public interface ArgumentEntry { return new SingleValueArgumentEntry(kvEntry); } + static ArgumentEntry createSingleValueArgument(EntityId entityId, ArgumentEntry argumentEntry) { + return new SingleValueArgumentEntry(entityId, argumentEntry); + } + static ArgumentEntry createTsRollingArgument(List kvEntries, int limit, long timeWindow) { return new TsRollingArgumentEntry(kvEntries, limit, timeWindow); } @@ -80,8 +82,4 @@ public interface ArgumentEntry { return new RelatedEntitiesArgumentEntry(entityIdkvEntryMap, false); } - static ArgumentEntry createAggSingleArgument(EntityId entityId, ArgumentEntry argumentEntry) { - return new AggSingleEntityArgumentEntry(entityId, argumentEntry); - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java index 5c672cf04e..427df2bf5b 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java @@ -16,5 +16,5 @@ package org.thingsboard.server.service.cf.ctx.state; public enum ArgumentEntryType { - SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION, RELATED_ENTITIES, AGGREGATE_LATEST_SINGLE + SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION, RELATED_ENTITIES } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index 91c331f88e..d48ed9c268 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -18,13 +18,12 @@ package org.thingsboard.server.service.cf.ctx.state; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.Getter; import lombok.Setter; -import org.thingsboard.script.api.tbel.TbUtils; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.actors.TbActorRef; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; import org.thingsboard.server.utils.CalculatedFieldUtils; import java.io.Closeable; @@ -76,7 +75,7 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState, ArgumentEntry existingEntry = arguments.get(key); boolean entryUpdated; - if (existingEntry == null || !(newEntry instanceof AggSingleEntityArgumentEntry) && newEntry.isForceResetPrevious()) { + if (existingEntry == null || !ctx.getCfType().equals(CalculatedFieldType.RELATED_ENTITIES_AGGREGATION) && newEntry.isForceResetPrevious()) { validateNewEntry(key, newEntry); arguments.put(key, newEntry); entryUpdated = true; @@ -152,14 +151,4 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState, this.latestTimestamp = Math.max(this.latestTimestamp, newTs); } - protected Object formatResult(double result, Integer decimals) { - if (decimals == null) { - return result; - } - if (decimals.equals(0)) { - return TbUtils.toInt(result); - } - return TbUtils.toFixed(result, decimals); - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 46442847a0..aab858d85d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -634,7 +634,7 @@ public class CalculatedFieldCtx { private boolean hasRelatedEntitiesAggregationConfigurationChanges(CalculatedFieldCtx other) { if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration thisConfig && other.calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig) { - return !thisConfig.getArguments().equals(otherConfig.getArguments()) || !thisConfig.getRelation().equals(otherConfig.getRelation()); + return !thisConfig.getRelation().equals(otherConfig.getRelation()); } return false; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index 8e202308d0..2b3ba19528 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -33,7 +33,6 @@ import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalcul import java.io.Closeable; import java.util.Map; -import java.util.concurrent.ExecutionException; import static org.thingsboard.server.utils.CalculatedFieldUtils.toSingleValueArgumentProto; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java index a268577d4b..ab0ed26dfe 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SimpleCalculatedFieldState.java @@ -52,7 +52,7 @@ public class SimpleCalculatedFieldState extends BaseCalculatedFieldState { double expressionResult = ctx.evaluateSimpleExpression(expression.get(), this); Output output = ctx.getOutput(); - Object result = formatResult(expressionResult, output.getDecimalsByDefault()); + Object result = TbUtils.roundResult(expressionResult, output.getDecimalsByDefault()); JsonNode outputResult = createResultJson(ctx.isUseLatestTs(), output.getName(), result); return Futures.immediateFuture(TelemetryCalculatedFieldResult.builder() diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java index e81201c961..67fb385917 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/SingleValueArgumentEntry.java @@ -20,9 +20,11 @@ import com.fasterxml.jackson.core.type.TypeReference; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import org.springframework.lang.Nullable; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.script.api.tbel.TbelCfArg; import org.thingsboard.script.api.tbel.TbelCfSingleValueArg; +import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BasicKvEntry; import org.thingsboard.server.common.data.kv.JsonDataEntry; @@ -37,6 +39,9 @@ import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; @AllArgsConstructor public class SingleValueArgumentEntry implements ArgumentEntry { + @Nullable + protected EntityId entityId; + protected long ts; protected BasicKvEntry kvEntryValue; protected Long version; @@ -45,6 +50,11 @@ public class SingleValueArgumentEntry implements ArgumentEntry { public static final Long DEFAULT_VERSION = -1L; + public SingleValueArgumentEntry(EntityId entityId, ArgumentEntry entry) { + this(entry); + this.entityId = entityId; + } + public SingleValueArgumentEntry(ArgumentEntry entry) { if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { this.ts = singleValueArgumentEntry.ts; @@ -54,6 +64,11 @@ public class SingleValueArgumentEntry implements ArgumentEntry { } } + public SingleValueArgumentEntry(EntityId entityId, TsKvProto entry) { + this(entry); + this.entityId = entityId; + } + public SingleValueArgumentEntry(TsKvProto entry) { this.ts = entry.getTs(); if (entry.hasVersion()) { @@ -62,6 +77,11 @@ public class SingleValueArgumentEntry implements ArgumentEntry { this.kvEntryValue = ProtoUtils.fromProto(entry.getKv()); } + public SingleValueArgumentEntry(EntityId entityId, AttributeValueProto entry) { + this(entry); + this.entityId = entityId; + } + public SingleValueArgumentEntry(AttributeValueProto entry) { this.ts = entry.getLastUpdateTs(); if (entry.hasVersion()) { @@ -70,6 +90,11 @@ public class SingleValueArgumentEntry implements ArgumentEntry { this.kvEntryValue = ProtoUtils.basicKvEntryFromProto(entry); } + public SingleValueArgumentEntry(EntityId entityId, KvEntry entry) { + this(entry); + this.entityId = entityId; + } + public SingleValueArgumentEntry(KvEntry entry) { if (entry instanceof TsKvEntry tsKvEntry) { this.ts = tsKvEntry.getTs(); @@ -81,6 +106,11 @@ public class SingleValueArgumentEntry implements ArgumentEntry { this.kvEntryValue = ProtoUtils.basicKvEntryFromKvEntry(entry); } + public SingleValueArgumentEntry(EntityId entityId, long ts, BasicKvEntry kvEntryValue, Long version) { + this(ts, kvEntryValue, version); + this.entityId = entityId; + } + public SingleValueArgumentEntry(long ts, BasicKvEntry kvEntryValue, Long version) { this.ts = ts; this.kvEntryValue = kvEntryValue; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggSingleEntityArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggSingleEntityArgumentEntry.java deleted file mode 100644 index b935256860..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/AggSingleEntityArgumentEntry.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cf.ctx.state.aggregation; - -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; -import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.kv.BasicKvEntry; -import org.thingsboard.server.common.data.kv.KvEntry; -import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto; -import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; -import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType; -import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; - -@Data -@NoArgsConstructor -@AllArgsConstructor -public class AggSingleEntityArgumentEntry extends SingleValueArgumentEntry { - - private EntityId entityId; - - public AggSingleEntityArgumentEntry(EntityId entityId, ArgumentEntry entry) { - super(entry); - this.entityId = entityId; - } - - public AggSingleEntityArgumentEntry(EntityId entityId, TsKvProto entry) { - super(entry); - this.entityId = entityId; - } - - public AggSingleEntityArgumentEntry(EntityId entityId, AttributeValueProto entry) { - super(entry); - this.entityId = entityId; - } - - public AggSingleEntityArgumentEntry(EntityId entityId, KvEntry entry) { - super(entry); - this.entityId = entityId; - } - - public AggSingleEntityArgumentEntry(EntityId entityId, long ts, BasicKvEntry kvEntryValue, Long version) { - super(ts, kvEntryValue, version); - this.entityId = entityId; - } - - @Override - public boolean updateEntry(ArgumentEntry entry) { - if (entry instanceof AggSingleEntityArgumentEntry aggSingleEntityEntry) { - if (aggSingleEntityEntry.isForceResetPrevious()) { - return applyNewEntry(aggSingleEntityEntry); - } - - if (aggSingleEntityEntry.getTs() < this.ts) { - if (!isDefaultValue()) { - return false; - } - } - - Long newVersion = aggSingleEntityEntry.getVersion(); - if (newVersion == null || this.version == null || newVersion > this.version) { - return applyNewEntry(aggSingleEntityEntry); - } - } else { - throw new IllegalArgumentException("Unsupported argument entry type for aggregation single entity argument entry: " + entry.getType()); - } - return false; - } - - private boolean applyNewEntry(AggSingleEntityArgumentEntry entry) { - this.ts = entry.getTs(); - this.version = entry.getVersion(); - this.kvEntryValue = entry.getKvEntryValue(); - this.entityId = entry.getEntityId(); - return true; - } - - @Override - public ArgumentEntryType getType() { - return ArgumentEntryType.AGGREGATE_LATEST_SINGLE; - } -} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java index c0baca836c..8e78824c7c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesAggregationCalculatedFieldState.java @@ -37,7 +37,6 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.aggregation.function.AggEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.function.AggFunctionFactory; import java.util.HashMap; import java.util.Map; @@ -150,10 +149,10 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat String metricKey = entry.getKey(); AggMetric metric = entry.getValue(); - AggEntry aggMetricEntry = AggFunctionFactory.createAggFunction(metric.getFunction()); + AggEntry aggMetricEntry = AggEntry.createAggFunction(metric.getFunction()); aggregateMetric(metric, aggMetricEntry, inputs); - aggMetricEntry.result().ifPresent(result -> { - aggResult.set(metricKey, JacksonUtil.valueToTree(formatResult(result, output.getDecimalsByDefault()))); + aggMetricEntry.result(output.getDecimalsByDefault()).ifPresent(result -> { + aggResult.set(metricKey, JacksonUtil.valueToTree(result)); }); } return aggResult; @@ -188,13 +187,4 @@ public class RelatedEntitiesAggregationCalculatedFieldState extends BaseCalculat } } - private Object formatResult(Object aggregationResult, Integer decimals) { - try { - double result = Double.parseDouble(aggregationResult.toString()); - return formatResult(result, decimals); - } catch (Exception e) { - throw new IllegalArgumentException("Aggregation result cannot be parsed: " + aggregationResult, e); - } - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesArgumentEntry.java index 5385808923..45b7755af4 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/RelatedEntitiesArgumentEntry.java @@ -18,10 +18,11 @@ package org.thingsboard.server.service.cf.ctx.state.aggregation; import lombok.AllArgsConstructor; import lombok.Data; import org.thingsboard.script.api.tbel.TbelCfArg; -import org.thingsboard.script.api.tbel.TbelCfLatestValuesAggregation; +import org.thingsboard.script.api.tbel.TbelCfRelatedEntitiesAggregation; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType; +import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import java.util.Map; @@ -48,12 +49,12 @@ public class RelatedEntitiesArgumentEntry implements ArgumentEntry { if (entry instanceof RelatedEntitiesArgumentEntry relatedEntitiesArgumentEntry) { aggInputs.putAll(relatedEntitiesArgumentEntry.aggInputs); return true; - } else if (entry instanceof AggSingleEntityArgumentEntry aggSingleEntityArgumentEntry) { - ArgumentEntry argumentEntry = aggInputs.get(aggSingleEntityArgumentEntry.getEntityId()); + } else if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { + ArgumentEntry argumentEntry = aggInputs.get(singleValueArgumentEntry.getEntityId()); if (argumentEntry != null) { - argumentEntry.updateEntry(aggSingleEntityArgumentEntry); + argumentEntry.updateEntry(singleValueArgumentEntry); } else { - aggInputs.put(aggSingleEntityArgumentEntry.getEntityId(), aggSingleEntityArgumentEntry); + aggInputs.put(singleValueArgumentEntry.getEntityId(), singleValueArgumentEntry); } return true; } else { @@ -68,7 +69,7 @@ public class RelatedEntitiesArgumentEntry implements ArgumentEntry { @Override public TbelCfArg toTbelCfArg() { - return new TbelCfLatestValuesAggregation(aggInputs.values()); + return new TbelCfRelatedEntitiesAggregation(aggInputs.values()); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AggEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AggEntry.java index 4239e94fec..c4b93fd91d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AggEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AggEntry.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf.ctx.state.aggregation.function; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunction; @@ -36,10 +37,22 @@ import java.util.Optional; }) public interface AggEntry { + @JsonIgnore AggFunction getType(); void update(Object value); - Optional result(); + Optional result(Integer precision); + + static AggEntry createAggFunction(AggFunction function) { + return switch (function) { + case MIN -> new MinAggEntry(); + case MAX -> new MaxAggEntry(); + case SUM -> new SumAggEntry(); + case AVG -> new AvgAggEntry(); + case COUNT -> new CountAggEntry(); + case COUNT_UNIQUE -> new CountUniqueAggEntry(); + }; + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AggFunctionFactory.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AggFunctionFactory.java deleted file mode 100644 index 5ccc355b1f..0000000000 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AggFunctionFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cf.ctx.state.aggregation.function; - -import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunction; - -public class AggFunctionFactory { - - public static AggEntry createAggFunction(AggFunction function) { - return switch (function) { - case MIN -> new MinAggEntry(); - case MAX -> new MaxAggEntry(); - case SUM -> new SumAggEntry(); - case AVG -> new AvgAggEntry(); - case COUNT -> new CountAggEntry(); - case COUNT_UNIQUE -> new CountUniqueAggEntry(); - }; - } - -} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AvgAggEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AvgAggEntry.java index afe6abb93e..e063ff2ea2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AvgAggEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/AvgAggEntry.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf.ctx.state.aggregation.function; +import org.thingsboard.script.api.tbel.TbUtils; import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunction; import java.math.BigDecimal; @@ -34,8 +35,9 @@ public class AvgAggEntry extends BaseAggEntry { } @Override - protected double prepareResult() { - return sum.divide(BigDecimal.valueOf(count), 10, RoundingMode.HALF_UP).doubleValue(); + protected Object prepareResult(Integer precision) { + double result = sum.divide(BigDecimal.valueOf(count), RoundingMode.HALF_UP).doubleValue(); + return TbUtils.roundResult(result, precision); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/BaseAggEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/BaseAggEntry.java index 0c12bd13c0..b320435e99 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/BaseAggEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/BaseAggEntry.java @@ -28,10 +28,10 @@ public abstract class BaseAggEntry implements AggEntry { } @Override - public Optional result() { + public Optional result(Integer precision) { if (hasResult) { hasResult = false; - return Optional.of(prepareResult()); + return Optional.of(prepareResult(precision)); } else { return Optional.empty(); } @@ -39,10 +39,13 @@ public abstract class BaseAggEntry implements AggEntry { protected abstract void doUpdate(double value); - protected abstract double prepareResult(); + protected abstract Object prepareResult(Integer precision); protected double extractDoubleValue(Object value) { try { + if (value instanceof Number) { + return ((Number) value).doubleValue(); + } return Double.parseDouble(value.toString()); } catch (Exception e) { throw new NumberFormatException("Cannot parse value " + value.toString()); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/CountAggEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/CountAggEntry.java index 469048d62d..09116985d2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/CountAggEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/CountAggEntry.java @@ -29,7 +29,7 @@ public class CountAggEntry implements AggEntry { } @Override - public Optional result() { + public Optional result(Integer precision) { return Optional.of(count); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/CountUniqueAggEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/CountUniqueAggEntry.java index b8b0b92470..efb4a58c90 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/CountUniqueAggEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/CountUniqueAggEntry.java @@ -34,7 +34,7 @@ public class CountUniqueAggEntry implements AggEntry { } @Override - public Optional result() { + public Optional result(Integer precision) { return Optional.of(items.size()); } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/MaxAggEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/MaxAggEntry.java index 6e4235b72f..ddc47daf33 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/MaxAggEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/MaxAggEntry.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf.ctx.state.aggregation.function; +import org.thingsboard.script.api.tbel.TbUtils; import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunction; public class MaxAggEntry extends BaseAggEntry { @@ -29,8 +30,8 @@ public class MaxAggEntry extends BaseAggEntry { } @Override - protected double prepareResult() { - return max; + protected Object prepareResult(Integer precision) { + return TbUtils.roundResult(max, precision); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/MinAggEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/MinAggEntry.java index eeacc3a7d9..e517ad305f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/MinAggEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/MinAggEntry.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf.ctx.state.aggregation.function; +import org.thingsboard.script.api.tbel.TbUtils; import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunction; public class MinAggEntry extends BaseAggEntry { @@ -29,8 +30,8 @@ public class MinAggEntry extends BaseAggEntry { } @Override - protected double prepareResult() { - return min; + protected Object prepareResult(Integer precision) { + return TbUtils.roundResult(min, precision); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/SumAggEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/SumAggEntry.java index b90817d784..fe29d27b7e 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/SumAggEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/function/SumAggEntry.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf.ctx.state.aggregation.function; +import org.thingsboard.script.api.tbel.TbUtils; import org.thingsboard.server.common.data.cf.configuration.aggregation.AggFunction; import java.math.BigDecimal; @@ -31,8 +32,8 @@ public class SumAggEntry extends BaseAggEntry { } @Override - protected double prepareResult() { - return sum.doubleValue(); + protected Object prepareResult(Integer precision) { + return TbUtils.roundResult(sum.doubleValue(), precision); } @Override diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java index 389fad8ff3..d75ec8a70a 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java @@ -27,7 +27,6 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.BasicKvEntry; import org.thingsboard.server.common.util.KvProtoUtil; import org.thingsboard.server.common.util.ProtoUtils; -import org.thingsboard.server.gen.transport.TransportProtos.AggSingleArgumentEntryProto; import org.thingsboard.server.gen.transport.TransportProtos.AlarmRuleStateProto; import org.thingsboard.server.gen.transport.TransportProtos.AlarmStateProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; @@ -35,7 +34,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdPro import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; import org.thingsboard.server.gen.transport.TransportProtos.GeofencingArgumentProto; import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneProto; -import org.thingsboard.server.gen.transport.TransportProtos.LatestValuesAggregationStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.RelatedEntitiesAggregationStateProto; import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto; import org.thingsboard.server.gen.transport.TransportProtos.TsDoubleValProto; import org.thingsboard.server.gen.transport.TransportProtos.TsRollingArgumentProto; @@ -47,9 +46,8 @@ import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesAggregationCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmRuleState; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; @@ -98,7 +96,7 @@ public class CalculatedFieldUtils { .setId(toProto(stateId)) .setType(state.getType().name()); - LatestValuesAggregationStateProto.Builder aggBuilder = LatestValuesAggregationStateProto.newBuilder(); + RelatedEntitiesAggregationStateProto.Builder aggBuilder = RelatedEntitiesAggregationStateProto.newBuilder(); state.getArguments().forEach((argName, argEntry) -> { switch (argEntry.getType()) { case SINGLE_VALUE -> builder.addSingleValueArguments(toSingleValueArgumentProto(argName, (SingleValueArgumentEntry) argEntry)); @@ -107,7 +105,7 @@ public class CalculatedFieldUtils { case RELATED_ENTITIES -> { RelatedEntitiesArgumentEntry relatedEntitiesArgumentEntry = (RelatedEntitiesArgumentEntry) argEntry; relatedEntitiesArgumentEntry.getAggInputs() - .forEach((entityId, entry) -> aggBuilder.addAggArguments(toAggSingleArgumentProto(argName, entityId, entry))); + .forEach((entityId, entry) -> aggBuilder.addAggArguments(toSingleValueArgumentProto(argName, (SingleValueArgumentEntry) entry))); } } }); @@ -122,7 +120,7 @@ public class CalculatedFieldUtils { } if (state instanceof RelatedEntitiesAggregationCalculatedFieldState aggState) { aggBuilder.setLastArgsUpdateTs(aggState.getLastArgsRefreshTs()); - builder.setLatestValuesAggregationState(aggBuilder.build()); + builder.setRelatedEntitiesAggregationState(aggBuilder.build()); } return builder.build(); } @@ -145,17 +143,6 @@ public class CalculatedFieldUtils { return ruleState; } - public static AggSingleArgumentEntryProto toAggSingleArgumentProto(String argName, EntityId entityId, ArgumentEntry argumentEntry) { - AggSingleArgumentEntryProto.Builder builder = AggSingleArgumentEntryProto.newBuilder() - .setEntityId(ProtoUtils.toProto(entityId)); - - if (argumentEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { - builder.setValue(toSingleValueArgumentProto(argName, singleValueArgumentEntry)); - } - - return builder.build(); - } - public static SingleValueArgumentProto toSingleValueArgumentProto(String argName, SingleValueArgumentEntry entry) { SingleValueArgumentProto.Builder builder = SingleValueArgumentProto.newBuilder() .setArgName(argName); @@ -166,6 +153,10 @@ public class CalculatedFieldUtils { Optional.ofNullable(entry.getVersion()).ifPresent(builder::setVersion); + if (entry.getEntityId() != null) { + builder.setEntityId(ProtoUtils.toProto(entry.getEntityId())); + } + return builder.build(); } @@ -242,11 +233,11 @@ public class CalculatedFieldUtils { } case RELATED_ENTITIES_AGGREGATION -> { RelatedEntitiesAggregationCalculatedFieldState aggState = (RelatedEntitiesAggregationCalculatedFieldState) state; - LatestValuesAggregationStateProto aggregationStateProto = proto.getLatestValuesAggregationState(); + RelatedEntitiesAggregationStateProto aggregationStateProto = proto.getRelatedEntitiesAggregationState(); Map> arguments = new HashMap<>(); aggregationStateProto.getAggArgumentsList().forEach(argProto -> { - AggSingleEntityArgumentEntry entry = fromAggSingleValueArgumentProto(argProto); - arguments.computeIfAbsent(argProto.getValue().getArgName(), name -> new HashMap<>()).put(entry.getEntityId(), entry); + SingleValueArgumentEntry entry = fromSingleValueArgumentProto(argProto); + arguments.computeIfAbsent(argProto.getArgName(), name -> new HashMap<>()).put(entry.getEntityId(), entry); }); arguments.forEach((argName, entityInputs) -> { aggState.getArguments().put(argName, new RelatedEntitiesArgumentEntry(entityInputs, false)); @@ -258,31 +249,19 @@ public class CalculatedFieldUtils { return state; } - public static AggSingleEntityArgumentEntry fromAggSingleValueArgumentProto(AggSingleArgumentEntryProto proto) { - if (!proto.hasValue()) { - return new AggSingleEntityArgumentEntry(); - } - EntityId entityId = ProtoUtils.fromProto(proto.getEntityId()); - SingleValueArgumentProto singleValueArgument = proto.getValue(); - TsValueProto tsValueProto = singleValueArgument.getValue(); - return new AggSingleEntityArgumentEntry( - entityId, - tsValueProto.getTs(), - (BasicKvEntry) KvProtoUtil.fromTsValueProto(singleValueArgument.getArgName(), tsValueProto), - singleValueArgument.getVersion() - ); - } - public static SingleValueArgumentEntry fromSingleValueArgumentProto(SingleValueArgumentProto proto) { if (!proto.hasValue()) { return new SingleValueArgumentEntry(); } TsValueProto tsValueProto = proto.getValue(); - return new SingleValueArgumentEntry( - tsValueProto.getTs(), - (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getArgName(), tsValueProto), - proto.getVersion() - ); + BasicKvEntry kvEntry = (BasicKvEntry) KvProtoUtil.fromTsValueProto(proto.getArgName(), tsValueProto); + long ts = tsValueProto.getTs(); + long version = proto.getVersion(); + if (proto.hasEntityId()) { + EntityId entityId = ProtoUtils.fromProto(proto.getEntityId()); + return new SingleValueArgumentEntry(entityId, ts, kvEntry, version); + } + return new SingleValueArgumentEntry(ts, kvEntry, version); } public static TsRollingArgumentEntry fromRollingArgumentProto(TsRollingArgumentProto proto) { diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/AggSingleEntityArgumentEntryTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/AggSingleEntityArgumentEntryTest.java deleted file mode 100644 index 0401bb0156..0000000000 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/AggSingleEntityArgumentEntryTest.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Copyright © 2016-2025 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.service.cf.ctx.state; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.thingsboard.server.common.data.id.DeviceId; -import org.thingsboard.server.common.data.kv.BasicTsKvEntry; -import org.thingsboard.server.common.data.kv.LongDataEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; - -import java.util.UUID; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatThrownBy; - -public class AggSingleEntityArgumentEntryTest { - - private AggSingleEntityArgumentEntry entry; - - private final DeviceId device1 = new DeviceId(UUID.fromString("1984e5f4-9ff0-4187-84ae-e4438bba4c8a")); - - private final long ts = System.currentTimeMillis(); - - @BeforeEach - void setUp() { - entry = new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 100, new LongDataEntry("key", 12L), 22L)); - } - - @Test - void testUpdateEntryWhenNotAggEntryPassed() { - assertThatThrownBy(() -> entry.updateEntry(new TsRollingArgumentEntry(5, 30000L))) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("Unsupported argument entry type for aggregation single entity argument entry: " + ArgumentEntryType.TS_ROLLING); - } - - @Test - void testUpdateEntryWhenResetPrevious() { - AggSingleEntityArgumentEntry singleEntityArgumentEntry = new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 50, new LongDataEntry("key", 18L), 100L)); - singleEntityArgumentEntry.setForceResetPrevious(true); - - assertThat(entry.updateEntry(singleEntityArgumentEntry)).isTrue(); - assertThat(entry.getTs()).isEqualTo(singleEntityArgumentEntry.getTs()); - assertThat(entry.getKvEntryValue()).isEqualTo(singleEntityArgumentEntry.getKvEntryValue()); - assertThat(entry.getVersion()).isEqualTo(singleEntityArgumentEntry.getVersion()); - } - - - @Test - void testUpdateEntryWithTheSameTsAndVersion() { - assertThat(entry.updateEntry(new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 100, new LongDataEntry("key", 19L), 22L)))).isFalse(); - } - - @Test - void testUpdateEntryWithTheSameTsAndDifferentVersion() { - assertThat(entry.updateEntry(new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 100, new LongDataEntry("key", 134L), 23L)))).isTrue(); - } - - @Test - void testUpdateEntryWhenNewVersionIsNull() { - assertThat(entry.updateEntry(new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 40, new LongDataEntry("key", 56L), null)))).isTrue(); - assertThat(entry.getValue()).isEqualTo(56L); - assertThat(entry.getVersion()).isNull(); - } - - @Test - void testUpdateEntryWhenNewVersionIsGreaterThanCurrent() { - assertThat(entry.updateEntry(new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 40, new LongDataEntry("key", 76L), 23L)))).isTrue(); - assertThat(entry.getValue()).isEqualTo(76L); - assertThat(entry.getVersion()).isEqualTo(23); - } - - @Test - void testUpdateEntryWhenNewVersionIsLessThanCurrent() { - assertThat(entry.updateEntry(new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 40, new LongDataEntry("key", 11L), 20L)))).isFalse(); - } - - @Test - void testUpdateEntryWhenValueWasNotChanged() { - assertThat(entry.updateEntry(new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 40, new LongDataEntry("key", 18L), 45L)))).isTrue(); - } - - @Test - void testUpdateEntryWithOldTs() { - assertThat(entry.updateEntry(new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 150, new LongDataEntry("key", 155L), 45L)))).isFalse(); - } - -} diff --git a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/RelatedEntitiesArgumentEntryTest.java b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/RelatedEntitiesArgumentEntryTest.java index 14d2801e0e..61b45b83c9 100644 --- a/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/RelatedEntitiesArgumentEntryTest.java +++ b/application/src/test/java/org/thingsboard/server/service/cf/ctx/state/RelatedEntitiesArgumentEntryTest.java @@ -22,7 +22,6 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.service.cf.ctx.state.aggregation.RelatedEntitiesArgumentEntry; -import org.thingsboard.server.service.cf.ctx.state.aggregation.AggSingleEntityArgumentEntry; import java.util.HashMap; import java.util.Map; @@ -43,8 +42,8 @@ public class RelatedEntitiesArgumentEntryTest { @BeforeEach void setUp() { Map aggInputs = new HashMap<>(); - aggInputs.put(device1, new AggSingleEntityArgumentEntry(device1, new BasicTsKvEntry(ts - 100, new LongDataEntry("key", 12L), 1L))); - aggInputs.put(device2, new AggSingleEntityArgumentEntry(device2, new BasicTsKvEntry(ts - 150, new LongDataEntry("key", 16L), 6L))); + aggInputs.put(device1, new SingleValueArgumentEntry(device1, new BasicTsKvEntry(ts - 100, new LongDataEntry("key", 12L), 1L))); + aggInputs.put(device2, new SingleValueArgumentEntry(device2, new BasicTsKvEntry(ts - 150, new LongDataEntry("key", 16L), 6L))); entry = new RelatedEntitiesArgumentEntry(aggInputs, false); } @@ -62,8 +61,8 @@ public class RelatedEntitiesArgumentEntryTest { DeviceId device4 = new DeviceId(UUID.randomUUID()); RelatedEntitiesArgumentEntry relatedEntitiesArgumentEntry = new RelatedEntitiesArgumentEntry(Map.of( - device3, new AggSingleEntityArgumentEntry(device3, new BasicTsKvEntry(ts - 50, new LongDataEntry("key", 16L), 13L)), - device4, new AggSingleEntityArgumentEntry(device4, new BasicTsKvEntry(ts - 60, new LongDataEntry("key", 23L), 7L)) + device3, new SingleValueArgumentEntry(device3, new BasicTsKvEntry(ts - 50, new LongDataEntry("key", 16L), 13L)), + device4, new SingleValueArgumentEntry(device4, new BasicTsKvEntry(ts - 60, new LongDataEntry("key", 23L), 7L)) ), false); assertThat(entry.updateEntry(relatedEntitiesArgumentEntry)).isTrue(); @@ -75,10 +74,10 @@ public class RelatedEntitiesArgumentEntryTest { } @Test - void testUpdateEntryWhenAggSingleEntityArgumentEntryPassedAndNoEntriesById() { + void testUpdateEntryWhenSingleValueArgumentEntryPassedAndNoEntriesById() { DeviceId device3 = new DeviceId(UUID.randomUUID()); - AggSingleEntityArgumentEntry singleEntityArgumentEntry = new AggSingleEntityArgumentEntry(device3, new BasicTsKvEntry(ts - 50, new LongDataEntry("key", 18L), 10L)); + SingleValueArgumentEntry singleEntityArgumentEntry = new SingleValueArgumentEntry(device3, new BasicTsKvEntry(ts - 50, new LongDataEntry("key", 18L), 10L)); assertThat(entry.updateEntry(singleEntityArgumentEntry)).isTrue(); @@ -88,8 +87,8 @@ public class RelatedEntitiesArgumentEntryTest { } @Test - void testUpdateEntryWhenAggSingleEntityArgumentEntryPassedAndEntryByIdExist() { - AggSingleEntityArgumentEntry singleEntityArgumentEntry = new AggSingleEntityArgumentEntry(device2, new BasicTsKvEntry(ts - 50, new LongDataEntry("key", 18L), 10L)); + void testUpdateEntryWhenSingleValueArgumentEntryPassedAndEntryByIdExist() { + SingleValueArgumentEntry singleEntityArgumentEntry = new SingleValueArgumentEntry(device2, new BasicTsKvEntry(ts - 50, new LongDataEntry("key", 18L), 10L)); assertThat(entry.updateEntry(singleEntityArgumentEntry)).isTrue(); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/AggInput.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/AggInput.java index fac988d5d9..06929de81c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/AggInput.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/AggInput.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; @@ -31,6 +32,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonIgnoreProperties(ignoreUnknown = true) public interface AggInput { + @JsonIgnore String getType(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java index 69e4ee7fdf..9d4c7bdaf6 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/aggregation/RelatedEntitiesAggregationCalculatedFieldConfiguration.java @@ -15,6 +15,9 @@ */ package org.thingsboard.server.common.data.cf.configuration.aggregation; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; import lombok.Data; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; @@ -27,9 +30,12 @@ import java.util.Map; @Data public class RelatedEntitiesAggregationCalculatedFieldConfiguration implements ArgumentsBasedCalculatedFieldConfiguration { + @NotNull private RelationPathLevel relation; private Map arguments; private long deduplicationIntervalInSec; + @Valid + @NotEmpty private Map metrics; private Output output; private boolean useLatestTs; @@ -41,9 +47,6 @@ public class RelatedEntitiesAggregationCalculatedFieldConfiguration implements A @Override public void validate() { - if (relation == null) { - throw new IllegalArgumentException("Relation must be specified!"); - } relation.validate(); if (arguments.containsKey("ctx")) { throw new IllegalArgumentException("Argument name 'ctx' is reserved and cannot be used."); @@ -51,9 +54,6 @@ public class RelatedEntitiesAggregationCalculatedFieldConfiguration implements A if (arguments.values().stream().anyMatch(Argument::hasTsRollingArgument)) { throw new IllegalArgumentException("Calculated field with type: '" + getType() + "' doesn't support TS_ROLLING arguments."); } - if (metrics.isEmpty()) { - throw new IllegalArgumentException("Latest value aggregation calculated field must have at least one metric."); - } } } diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index b59e01128b..2e3a2387e7 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -888,6 +888,7 @@ message SingleValueArgumentProto { string argName = 1; TsValueProto value = 2; int64 version = 3; + EntityIdProto entityId = 4; } message TsDoubleValProto { @@ -915,14 +916,9 @@ message GeofencingArgumentProto { repeated GeofencingZoneProto zones = 2; } -message AggSingleArgumentEntryProto { - EntityIdProto entityId = 1; - SingleValueArgumentProto value = 2; -} - -message LatestValuesAggregationStateProto { +message RelatedEntitiesAggregationStateProto { int64 lastArgsUpdateTs = 1; - repeated AggSingleArgumentEntryProto aggArguments = 2; + repeated SingleValueArgumentProto aggArguments = 2; } message CalculatedFieldStateProto { @@ -932,7 +928,7 @@ message CalculatedFieldStateProto { repeated TsRollingArgumentProto rollingValueArguments = 4; repeated GeofencingArgumentProto geofencingArguments = 5; AlarmStateProto alarmState = 6; - LatestValuesAggregationStateProto latestValuesAggregationState = 7; + RelatedEntitiesAggregationStateProto relatedEntitiesAggregationState = 7; } //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. @@ -1303,7 +1299,7 @@ message ComponentLifecycleMsgProto { int64 profileIdLSB = 12; optional string info = 13; bool ownerChanged = 100; - bool relationChanged = 15; + bool relationChanged = 14; } message EdgeEventMsgProto { diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java index 072a17835d..3e677cf269 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbUtils.java @@ -1186,6 +1186,17 @@ public class TbUtils { return BigDecimal.valueOf(value).setScale(0, RoundingMode.HALF_UP).intValue(); } + // todo: register method + public static Object roundResult(double value, Integer precision) { + if (precision == null) { + return value; + } + if (precision.equals(0)) { + return toInt(value); + } + return toFixed(value, precision); + } + public static boolean isNaN(double value) { return Double.isNaN(value); } diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java index 856cc4bd39..62d6d3d002 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java @@ -29,7 +29,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonSubTypes.Type(value = TbelCfTsRollingArg.class, name = "TS_ROLLING"), @JsonSubTypes.Type(value = TbelCfGeofencingArg.class, name = "GEOFENCING_CF_ARGUMENT_VALUE"), @JsonSubTypes.Type(value = TbelCfPropagationArg.class, name = "PROPAGATION_CF_ARGUMENT_VALUE"), - @JsonSubTypes.Type(value = TbelCfLatestValuesAggregation.class, name = "LATEST_VALUES_AGGREGATION") + @JsonSubTypes.Type(value = TbelCfRelatedEntitiesAggregation.class, name = "LATEST_VALUES_AGGREGATION") }) public interface TbelCfArg extends TbelCfObject { diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfLatestValuesAggregation.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfRelatedEntitiesAggregation.java similarity index 90% rename from common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfLatestValuesAggregation.java rename to common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfRelatedEntitiesAggregation.java index 4d1b42aa94..75c17f0a0e 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfLatestValuesAggregation.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfRelatedEntitiesAggregation.java @@ -20,12 +20,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; @Data -public class TbelCfLatestValuesAggregation implements TbelCfArg { +public class TbelCfRelatedEntitiesAggregation implements TbelCfArg { private final Object value; @JsonCreator - public TbelCfLatestValuesAggregation( + public TbelCfRelatedEntitiesAggregation( @JsonProperty("value") Object value ) { this.value = value;