diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index f38a1972c0..bb8c3fca44 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.actors; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -54,7 +55,6 @@ import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.limit.LimitedApi; -import org.thingsboard.server.common.data.msg.TbMsgType; import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsg; @@ -119,7 +119,6 @@ import org.thingsboard.server.service.cf.CalculatedFieldProcessingService; import org.thingsboard.server.service.cf.CalculatedFieldQueueService; import org.thingsboard.server.service.cf.CalculatedFieldStateService; import org.thingsboard.server.service.cf.OwnerService; -import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.component.ComponentDiscoveryService; import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService; @@ -144,14 +143,12 @@ import org.thingsboard.server.utils.DebugModeRateLimitsConfig; import java.io.PrintWriter; import java.io.StringWriter; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; @Slf4j @Component @@ -842,7 +839,7 @@ public class ActorSystemContext { Futures.addCallback(future, RULE_CHAIN_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor()); } - public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, String errorMessage) { + public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, JsonNode arguments, UUID tbMsgId, String tbMsgType, String result, String errorMessage) { if (checkLimits(tenantId)) { try { CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder() @@ -855,13 +852,10 @@ public class ActorSystemContext { eventBuilder.msgId(tbMsgId); } if (tbMsgType != null) { - eventBuilder.msgType(tbMsgType.name()); + eventBuilder.msgType(tbMsgType); } if (arguments != null) { - eventBuilder.arguments(JacksonUtil.toString( - arguments.entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().jsonValue())) - )); + eventBuilder.arguments(JacksonUtil.toString(arguments)); } if (result != null) { eventBuilder.result(result); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 6cc1c50325..6b45552bac 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -70,6 +70,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.thingsboard.server.common.data.DataConstants.CF_REEVALUATION_MSG; import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createStateByType; /** @@ -351,7 +352,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } if (state.isSizeOk()) { log.debug("[{}][{}] Reevaluating CF state", entityId, cfId); - processStateIfReady(state, null, ctx, Collections.singletonList(cfId), null, null, msg.getCallback()); + processStateIfReady(state, null, ctx, Collections.singletonList(cfId), null, CF_REEVALUATION_MSG, msg.getCallback()); } else { throw new RuntimeException(ctx.getSizeExceedsLimitMessage()); } @@ -432,7 +433,8 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (!updatedArgs.isEmpty() || justRestored) { cfIdList = new ArrayList<>(cfIdList); cfIdList.add(ctx.getCfId()); - processStateIfReady(state, updatedArgs, ctx, cfIdList, tbMsgId, tbMsgType, callback); + String msgType = tbMsgType == null ? null : tbMsgType.name(); + processStateIfReady(state, updatedArgs, ctx, cfIdList, tbMsgId, msgType, callback); } else { callback.onSuccess(); } @@ -474,7 +476,7 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } private void processStateIfReady(CalculatedFieldState state, Map updatedArgs, CalculatedFieldCtx ctx, - List cfIdList, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) throws CalculatedFieldException { + List cfIdList, UUID tbMsgId, String tbMsgType, TbCallback callback) throws CalculatedFieldException { callback = new MultipleTbCallback(CALLBACKS_PER_CF, callback); log.trace("[{}][{}] Processing state if ready. Current args: {}, updated args: {}", entityId, ctx.getCfId(), state.getArguments(), updatedArgs); CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId); @@ -492,19 +494,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM callback.onSuccess(); } if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) { - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, calculationResult.stringValue(), null); + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArgumentsJson(), tbMsgId, tbMsgType, calculationResult.stringValue(), null); } } } else { if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { String errorMsg = ctx.isInitialized() ? state.getReadinessStatus().errorMsg() : "Calculated field state is not initialized!"; - systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, null, errorMsg); + systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArgumentsJson(), tbMsgId, tbMsgType, null, errorMsg); } callback.onSuccess(); } } catch (Exception e) { log.debug("[{}][{}] Failed to process CF state", entityId, ctx.getCfId(), e); - throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).msgId(tbMsgId).msgType(tbMsgType).arguments(state.getArguments()).cause(e).build(); + throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).msgId(tbMsgId).msgType(tbMsgType).arguments(state.getArgumentsJson()).cause(e).build(); } finally { if (!stateSizeChecked) { state.checkStateSize(ctxId, ctx.getMaxStateSize()); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java index 70c8dfbfd2..0242a33145 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java @@ -15,14 +15,12 @@ */ package org.thingsboard.server.actors.calculatedField; +import com.fasterxml.jackson.databind.JsonNode; import lombok.Builder; import lombok.Getter; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.msg.TbMsgType; -import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; -import java.util.Map; import java.util.UUID; @Getter @@ -32,8 +30,8 @@ public class CalculatedFieldException extends Exception { private final CalculatedFieldCtx ctx; private final EntityId eventEntity; private final UUID msgId; - private final TbMsgType msgType; - private Map arguments; + private final String msgType; + private JsonNode arguments; private String errorMessage; private Exception cause; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index 741c94c796..6df4240410 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.cf.ctx.state; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import lombok.Getter; import lombok.Setter; @@ -33,6 +34,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; @Getter public abstract class BaseCalculatedFieldState implements CalculatedFieldState, Closeable { @@ -185,4 +187,10 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState, return ReadinessStatus.from(emptyArguments); } + @Override + public JsonNode getArgumentsJson() { + return JacksonUtil.valueToTree(arguments.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().jsonValue()))); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index cadd34c420..62163fecc2 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -646,7 +646,9 @@ public class CalculatedFieldCtx implements Closeable { } if (calculatedField.getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration thisConfig && other.getCalculatedField().getConfiguration() instanceof RelatedEntitiesAggregationCalculatedFieldConfiguration otherConfig - && (thisConfig.getDeduplicationIntervalInSec() != otherConfig.getDeduplicationIntervalInSec() || !thisConfig.getMetrics().equals(otherConfig.getMetrics()))) { + && (thisConfig.getDeduplicationIntervalInSec() != otherConfig.getDeduplicationIntervalInSec() + || !thisConfig.getMetrics().equals(otherConfig.getMetrics()) + || thisConfig.isUseLatestTs() != otherConfig.isUseLatestTs())) { return true; } if (calculatedField.getConfiguration() instanceof EntityAggregationCalculatedFieldConfiguration thisConfig diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index e3914cc125..8a3c618254 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -19,6 +19,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.common.data.cf.CalculatedFieldType; @@ -59,6 +60,8 @@ public interface CalculatedFieldState extends Closeable { Map getArguments(); + JsonNode getArgumentsJson(); + long getLatestTimestamp(); void setCtx(CalculatedFieldCtx ctx, TbActorRef actorCtx); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java index 6ead645322..0fadc59b8c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/aggregation/single/EntityAggregationCalculatedFieldState.java @@ -15,12 +15,15 @@ */ package org.thingsboard.server.service.cf.ctx.state.aggregation.single; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.common.util.DebugModeUtil; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.script.api.tbel.TbUtils; +import org.thingsboard.script.api.tbel.TbelCfArg; import org.thingsboard.server.actors.TbActorRef; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Output; @@ -36,6 +39,7 @@ import org.thingsboard.server.service.cf.TelemetryCalculatedFieldResult; import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.BaseCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; +import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import java.time.Instant; import java.time.ZoneId; @@ -58,6 +62,8 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt private long checkInterval; private Map metrics; + private final EntityAggregationDebugArgumentsTracker debugTracker = new EntityAggregationDebugArgumentsTracker(new HashMap<>()); + private CalculatedFieldProcessingService cfProcessingService; public EntityAggregationCalculatedFieldState(EntityId entityId) { @@ -91,9 +97,14 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt @Override public ListenableFuture performCalculation(Map updatedArgs, CalculatedFieldCtx ctx) throws Exception { + debugTracker.reset(); createIntervalIfNotExist(); long now = System.currentTimeMillis(); + if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { + debugTracker.recordUpdatedArgs(updatedArgs, arguments); + } + Map> results = new HashMap<>(); List expiredIntervals = new ArrayList<>(); getIntervals().forEach((intervalEntry, argIntervalStatuses) -> { @@ -114,6 +125,12 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt .build()); } + @Override + public Map update(Map argumentValues, CalculatedFieldCtx ctx) { + createIntervalIfNotExist(); + return super.update(argumentValues, ctx); + } + private void removeExpiredIntervals(List expiredIntervals) { expiredIntervals.forEach(expiredInterval -> { arguments.values().stream() @@ -183,6 +200,9 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt expiredIntervals.add(intervalEntry); } else if (now - startTs >= intervalEntry.getIntervalDuration()) { handleActiveInterval(intervalEntry, args, results); + if (watermarkDuration == 0) { + expiredIntervals.add(intervalEntry); + } } } @@ -262,14 +282,83 @@ public class EntityAggregationCalculatedFieldState extends BaseCalculatedFieldSt resultNode.put("ts", interval.getEndTs() - 1); resultNode.set("values", metricsNode); result.add(resultNode); + + if (DebugModeUtil.isDebugFailuresAvailable(ctx.getCalculatedField())) { + debugTracker.addInterval(interval); + } } }); return result; } + @Override + public JsonNode getArgumentsJson() { + EntityAggregationDebugArguments debugArguments = debugTracker.toDebugArguments(); + return debugArguments == null ? null : JacksonUtil.valueToTree(debugArguments); + } + @Override public boolean isReady() { return true; } + record EntityAggregationDebugArgumentsTracker(Map> processedIntervals) { + + public void reset() { + processedIntervals.clear(); + } + + public void addInterval(AggIntervalEntry interval) { + processedIntervals.computeIfAbsent(interval, k -> new HashMap<>()); + } + + public void recordUpdatedArgs(Map updatedArgs, Map arguments) { + if (updatedArgs != null && !updatedArgs.isEmpty()) { + updatedArgs.forEach((argName, argEntry) -> { + ArgumentEntry argumentEntry = arguments.get(argName); + if (argumentEntry instanceof EntityAggregationArgumentEntry entityAggEntry && argEntry instanceof SingleValueArgumentEntry singleEntry) { + entityAggEntry.getAggIntervals().forEach((aggIntervalEntry, aggIntervalEntryStatus) -> { + boolean match = singleEntry.isForceResetPrevious() || aggIntervalEntry.belongsToInterval(singleEntry.getTs()); + if (match) { + recordArg(aggIntervalEntry, argName, singleEntry.toTbelCfArg()); + } + }); + } + }); + } + } + + public void recordArg(AggIntervalEntry interval, String argName, TbelCfArg value) { + processedIntervals.computeIfAbsent(interval, k -> new HashMap<>()).put(argName, value); + } + + public EntityAggregationDebugArguments toDebugArguments() { + if (processedIntervals.isEmpty()) { + return null; + } + return EntityAggregationDebugArguments.toDebugArguments(processedIntervals); + } + + } + + record EntityAggregationDebugArguments(List processedIntervals) { + + public static EntityAggregationDebugArguments toDebugArguments(Map> processedIntervals) { + List result = new ArrayList<>(); + processedIntervals.forEach((interval, args) -> { + result.add(new IntervalDebugArgument(interval.getStartTs(), interval.getEndTs(), args)); + }); + return new EntityAggregationDebugArguments(result); + } + + } + + record IntervalDebugArgument(Long intervalStartTs, Long intervalEndTs, JsonNode updatedArguments) { + + public IntervalDebugArgument(Long intervalStartTs, Long intervalEndTs, Map updatedArguments) { + this(intervalStartTs, intervalEndTs, updatedArguments == null || updatedArguments.isEmpty() ? null : JacksonUtil.valueToTree(updatedArguments)); + } + + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java index 8a72b26a28..7830461109 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/DataConstants.java @@ -105,6 +105,8 @@ public class DataConstants { public static final String RPC_FAILED = "RPC_FAILED"; public static final String RPC_DELETED = "RPC_DELETED"; + public static final String CF_REEVALUATION_MSG = "CF_REEVALUATION_MSG"; + public static final String DEFAULT_SECRET_KEY = ""; public static final String SECRET_KEY_FIELD_NAME = "secretKey"; public static final String DURATION_MS_FIELD_NAME = "durationMs";