From ce3b89046b6ec83e2e7291db053e3ed90d5cb8ed Mon Sep 17 00:00:00 2001 From: dshvaika Date: Fri, 3 Oct 2025 15:40:15 +0300 Subject: [PATCH] propagation cf init commit --- ...CalculatedFieldEntityMessageProcessor.java | 14 +++- ...tractCalculatedFieldProcessingService.java | 34 ++++++--- ...faultCalculatedFieldProcessingService.java | 41 ++++++++--- .../cf/PropagationCalculatedFieldResult.java | 49 +++++++++++++ .../service/cf/ctx/state/ArgumentEntry.java | 8 ++- .../cf/ctx/state/ArgumentEntryType.java | 2 +- .../cf/ctx/state/CalculatedFieldCtx.java | 39 ++++++---- .../geofencing/GeofencingArgumentEntry.java | 4 +- .../GeofencingCalculatedFieldState.java | 4 ++ .../propagation/PropagationArgumentEntry.java | 72 +++++++++++++++++++ .../PropagationCalculatedFieldState.java | 59 +++++++++++++++ .../utils/CalculatedFieldArgumentUtils.java | 2 + .../server/utils/CalculatedFieldUtils.java | 25 +++++-- .../common/data/cf/CalculatedFieldType.java | 3 +- .../BaseCalculatedFieldConfiguration.java | 10 ++- .../CalculatedFieldConfiguration.java | 3 +- ...opagationCalculatedFieldConfiguration.java | 66 +++++++++++++++++ common/proto/src/main/proto/queue.proto | 1 + .../script/api/tbel/TbelCfArg.java | 3 +- ...ncingArg.java => TbelCfGeofencingArg.java} | 4 +- .../script/api/tbel/TbelCfPropagationArg.java | 42 +++++++++++ 21 files changed, 434 insertions(+), 51 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PropagationCalculatedFieldConfiguration.java rename common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/{TbelCfTsGeofencingArg.java => TbelCfGeofencingArg.java} (89%) create mode 100644 common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfPropagationArg.java diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 1625becc20..78ad6678a3 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.alarm.Alarm; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; @@ -319,13 +320,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM if (state == null) { state = createState(ctx); justRestored = true; - } else if (ctx.shouldFetchDynamicArgumentsFromDb(state)) { + } else if (ctx.shouldFetchRelationQueryDynamicArgumentsFromDb(state)) { log.debug("[{}][{}] Going to update dynamic arguments for CF.", entityId, ctx.getCfId()); try { Map dynamicArgsFromDb = cfService.fetchDynamicArgsFromDb(ctx, entityId); dynamicArgsFromDb.forEach(newArgValues::putIfAbsent); - var geofencingState = (GeofencingCalculatedFieldState) state; - geofencingState.setLastDynamicArgumentsRefreshTs(System.currentTimeMillis()); + if (ctx.getCfType() == CalculatedFieldType.GEOFENCING) { + var geofencingState = (GeofencingCalculatedFieldState) state; + geofencingState.updateLastDynamicArgumentsRefreshTs(); + } } catch (Exception e) { throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build(); } @@ -353,6 +356,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM private void initState(CalculatedFieldState state, CalculatedFieldCtx ctx) { state.init(ctx); + if (ctx.getCfType() == CalculatedFieldType.GEOFENCING && ctx.isRelationQueryDynamicArguments()) { + GeofencingCalculatedFieldState geofencingState = (GeofencingCalculatedFieldState) state; + geofencingState.updateLastDynamicArgumentsRefreshTs(); + } + Map arguments = fetchArguments(ctx); state.update(arguments, ctx); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java index 4018916582..232476c15a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java @@ -52,6 +52,8 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static org.thingsboard.server.common.data.cf.CalculatedFieldType.PROPAGATION; +import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; import static org.thingsboard.server.common.data.cf.configuration.geofencing.EntityCoordinates.ENTITY_ID_LATITUDE_ARGUMENT_KEY; import static org.thingsboard.server.common.data.cf.configuration.geofencing.EntityCoordinates.ENTITY_ID_LONGITUDE_ARGUMENT_KEY; import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultAttributeEntry; @@ -88,21 +90,26 @@ public abstract class AbstractCalculatedFieldProcessingService { protected ListenableFuture> fetchArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) { Map> argFutures = switch (ctx.getCalculatedField().getType()) { case GEOFENCING -> fetchGeofencingCalculatedFieldArguments(ctx, entityId, false, ts); - case SIMPLE, SCRIPT, ALARM -> { - Map> futures = new HashMap<>(); - for (var entry : ctx.getArguments().entrySet()) { - var argEntityId = resolveEntityId(ctx.getTenantId(), entityId, entry.getValue()); - var argValueFuture = fetchArgumentValue(ctx.getTenantId(), argEntityId, entry.getValue(), ts); - futures.put(entry.getKey(), argValueFuture); - } - yield futures; - } + case SIMPLE, SCRIPT, ALARM, PROPAGATION -> getBaseCalculatedFieldArguments(ctx, entityId, ts); }; + if (ctx.getCfType() == PROPAGATION) { + argFutures.put(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId)); + } return Futures.whenAllComplete(argFutures.values()) .call(() -> resolveArgumentFutures(argFutures), MoreExecutors.directExecutor()); } + private Map> getBaseCalculatedFieldArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) { + Map> futures = new HashMap<>(); + for (var entry : ctx.getArguments().entrySet()) { + var argEntityId = resolveEntityId(ctx.getTenantId(), entityId, entry.getValue()); + var argValueFuture = fetchArgumentValue(ctx.getTenantId(), argEntityId, entry.getValue(), ts); + futures.put(entry.getKey(), argValueFuture); + } + return futures; + } + protected EntityId resolveEntityId(TenantId tenantId, EntityId entityId, Argument argument) { if (argument.getRefEntityId() != null) { return argument.getRefEntityId(); @@ -130,6 +137,11 @@ public abstract class AbstractCalculatedFieldProcessingService { )); } + protected ListenableFuture fetchPropagationCalculatedFieldArgument(CalculatedFieldCtx ctx, EntityId entityId) { + ListenableFuture> propagationEntityIds = fromDynamicSource(ctx.getTenantId(), entityId, ctx.getPropagationArgument()); + return Futures.transform(propagationEntityIds, ArgumentEntry::createPropagationArgument, MoreExecutors.directExecutor()); + } + protected Map> fetchGeofencingCalculatedFieldArguments(CalculatedFieldCtx ctx, EntityId entityId, boolean dynamicArgumentsOnly, long startTs) { Map> argFutures = new HashMap<>(); Set> entries = ctx.getArguments().entrySet(); @@ -160,6 +172,10 @@ public abstract class AbstractCalculatedFieldProcessingService { if (!value.hasDynamicSource()) { return Futures.immediateFuture(List.of(entityId)); } + return fromDynamicSource(tenantId, entityId, value); + } + + private ListenableFuture> fromDynamicSource(TenantId tenantId, EntityId entityId, Argument value) { var refDynamicSourceConfiguration = value.getRefDynamicSourceConfiguration(); return switch (refDynamicSourceConfiguration.getType()) { case CURRENT_OWNER -> Futures.immediateFuture(List.of(resolveOwnerArgument(tenantId, entityId))); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 9b2964a736..52393d0ffe 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -23,7 +23,6 @@ import org.thingsboard.server.actors.calculatedField.MultipleTbCallback; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; -import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; @@ -50,11 +49,13 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto; @TbRuleEngineComponent @@ -89,11 +90,11 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF @Override public Map fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId) { - // only scheduledSupported CF instances supports dynamic arguments scheduled updates - if (!ctx.getCalculatedField().getType().equals(CalculatedFieldType.GEOFENCING)) { - return Map.of(); - } - return resolveArgumentFutures(fetchGeofencingCalculatedFieldArguments(ctx, entityId, true, System.currentTimeMillis())); + return switch (ctx.getCfType()) { + case GEOFENCING -> resolveArgumentFutures(fetchGeofencingCalculatedFieldArguments(ctx, entityId, true, System.currentTimeMillis())); + case PROPAGATION -> resolveArgumentFutures(Map.of(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId))); + default -> Collections.emptyMap(); + }; } @Override @@ -112,13 +113,35 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF @Override public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List cfIds, TbCallback callback) { - try { + if (!(result instanceof PropagationCalculatedFieldResult propagationCalculatedFieldResult)) { TbMsg msg = result.toTbMsg(entityId, cfIds); + sendMsgToRuleEngine(tenantId, entityId, callback, msg); + return; + } + List propagationEntityIds = propagationCalculatedFieldResult.getPropagationEntityIds(); + if (propagationEntityIds.isEmpty()) { + callback.onSuccess(); + } + if (propagationEntityIds.size() == 1) { + EntityId propagationEntityId = propagationEntityIds.get(0); + TbMsg msg = result.toTbMsg(propagationEntityId, cfIds); + sendMsgToRuleEngine(tenantId, propagationEntityId, callback, msg); + return; + } + MultipleTbCallback multipleTbCallback = new MultipleTbCallback(propagationEntityIds.size(), callback); + for (var propagationEntityId : propagationEntityIds) { + TbMsg msg = result.toTbMsg(propagationEntityId, cfIds); + sendMsgToRuleEngine(tenantId, propagationEntityId, multipleTbCallback, msg); + } + } + + private void sendMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbCallback callback, TbMsg msg) { + try { clusterService.pushMsgToRuleEngine(tenantId, entityId, msg, new TbQueueCallback() { @Override public void onSuccess(TbQueueMsgMetadata metadata) { - callback.onSuccess(); log.trace("[{}][{}] Pushed message to rule engine: {} ", tenantId, entityId, msg); + callback.onSuccess(); } @Override @@ -127,7 +150,7 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF } }); } catch (Exception e) { - log.warn("[{}][{}] Failed to push message to rule engine. CalculatedFieldResult: {}", tenantId, entityId, result, e); + log.warn("[{}][{}] Failed to push message to rule engine: {}", tenantId, entityId, msg, e); callback.onFailure(e); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java b/application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java new file mode 100644 index 0000000000..780fd220a7 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java @@ -0,0 +1,49 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf; + +import lombok.Builder; +import lombok.Data; +import org.thingsboard.server.common.data.id.CalculatedFieldId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.util.CollectionsUtil; +import org.thingsboard.server.common.msg.TbMsg; + +import java.util.List; + +@Data +@Builder +public final class PropagationCalculatedFieldResult implements CalculatedFieldResult { + + private final List propagationEntityIds; + private final TelemetryCalculatedFieldResult result; + + @Override + public TbMsg toTbMsg(EntityId entityId, List cfIds) { + return result.toTbMsg(entityId, cfIds); + } + + @Override + public String stringValue() { + return result.stringValue(); + } + + @Override + public boolean isEmpty() { + return CollectionsUtil.isEmpty(propagationEntityIds) || result.isEmpty(); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java index 2d43883131..5f8276bc99 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java @@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry; import java.util.List; import java.util.Map; @@ -35,7 +36,8 @@ import java.util.Map; @JsonSubTypes({ @JsonSubTypes.Type(value = SingleValueArgumentEntry.class, name = "SINGLE_VALUE"), @JsonSubTypes.Type(value = TsRollingArgumentEntry.class, name = "TS_ROLLING"), - @JsonSubTypes.Type(value = GeofencingArgumentEntry.class, name = "GEOFENCING") + @JsonSubTypes.Type(value = GeofencingArgumentEntry.class, name = "GEOFENCING"), + @JsonSubTypes.Type(value = PropagationArgumentEntry.class, name = "PROPAGATION") }) public interface ArgumentEntry { @@ -66,4 +68,8 @@ public interface ArgumentEntry { return new GeofencingArgumentEntry(entityIdkvEntryMap); } + static ArgumentEntry createPropagationArgument(List entityIds) { + return new PropagationArgumentEntry(entityIds); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java index 876bfa2a3f..2b118c9c07 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java @@ -16,5 +16,5 @@ package org.thingsboard.server.service.cf.ctx.state; public enum ArgumentEntryType { - SINGLE_VALUE, TS_ROLLING, GEOFENCING + SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 12e4dc0a3a..ece459350f 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.cf.configuration.ArgumentType; import org.thingsboard.server.common.data.cf.configuration.ArgumentsBasedCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.ExpressionBasedCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.Output; +import org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; import org.thingsboard.server.common.data.cf.configuration.ScheduledUpdateSupportedCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; @@ -101,6 +102,8 @@ public class CalculatedFieldCtx { private long scheduledUpdateIntervalMillis; + private Argument propagationArgument; + public CalculatedFieldCtx(CalculatedField calculatedField, ActorSystemContext systemContext) { this.calculatedField = calculatedField; @@ -154,6 +157,10 @@ public class CalculatedFieldCtx { } }); } + if (calculatedField.getConfiguration() instanceof PropagationCalculatedFieldConfiguration propagationConfig) { + propagationArgument = propagationConfig.toPropagationArgument(); + relationQueryDynamicArguments = true; + } } if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) { this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : -1L; @@ -170,7 +177,7 @@ public class CalculatedFieldCtx { public void init() { switch (cfType) { - case SCRIPT -> { + case SCRIPT, PROPAGATION -> { try { initTbelExpression(expression); initialized = true; @@ -512,21 +519,29 @@ public class CalculatedFieldCtx { return false; } - public boolean hasRelationQueryDynamicArguments() { - return relationQueryDynamicArguments && scheduledUpdateIntervalMillis != -1; + private boolean isScheduledUpdateEnabled() { + return scheduledUpdateIntervalMillis != -1; } - public boolean shouldFetchDynamicArgumentsFromDb(CalculatedFieldState state) { - if (!hasRelationQueryDynamicArguments()) { + public boolean shouldFetchRelationQueryDynamicArgumentsFromDb(CalculatedFieldState state) { + if (!relationQueryDynamicArguments) { return false; } - if (!(state instanceof GeofencingCalculatedFieldState geofencingState)) { - return false; - } - if (geofencingState.getLastDynamicArgumentsRefreshTs() == -1L) { - return true; - } - return geofencingState.getLastDynamicArgumentsRefreshTs() < System.currentTimeMillis() - scheduledUpdateIntervalMillis; + return switch (cfType) { + case PROPAGATION -> true; + case GEOFENCING -> { + if (!isScheduledUpdateEnabled()) { + yield false; + } + var geofencingState = (GeofencingCalculatedFieldState) state; + if (geofencingState.getLastDynamicArgumentsRefreshTs() == -1L) { + yield true; + } + yield geofencingState.getLastDynamicArgumentsRefreshTs() < + System.currentTimeMillis() - scheduledUpdateIntervalMillis; + } + default -> false; + }; } public void stop() { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingArgumentEntry.java index 53e5c19e72..bcc4d3ffcd 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingArgumentEntry.java @@ -18,7 +18,7 @@ package org.thingsboard.server.service.cf.ctx.state.geofencing; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.thingsboard.script.api.tbel.TbelCfArg; -import org.thingsboard.script.api.tbel.TbelCfTsGeofencingArg; +import org.thingsboard.script.api.tbel.TbelCfGeofencingArg; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.util.ProtoUtils; @@ -83,7 +83,7 @@ public class GeofencingArgumentEntry implements ArgumentEntry { @Override public TbelCfArg toTbelCfArg() { - return new TbelCfTsGeofencingArg(zoneStates); + return new TbelCfGeofencingArg(zoneStates); } private Map toZones(Map entityIdKvEntryMap) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java index 47dce596da..f3bf8750cf 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java @@ -146,6 +146,10 @@ public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState { lastDynamicArgumentsRefreshTs = -1; } + public void updateLastDynamicArgumentsRefreshTs() { + lastDynamicArgumentsRefreshTs = System.currentTimeMillis(); + } + private Map getGeofencingArguments() { return arguments.entrySet() .stream() diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java new file mode 100644 index 0000000000..c7d49a4d40 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java @@ -0,0 +1,72 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state.propagation; + +import lombok.Data; +import org.thingsboard.script.api.tbel.TbelCfArg; +import org.thingsboard.script.api.tbel.TbelCfPropagationArg; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.util.CollectionsUtil; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType; + +import java.util.List; + +@Data +public class PropagationArgumentEntry implements ArgumentEntry { + + private List propagationEntityIds; + + private boolean forceResetPrevious; + + public PropagationArgumentEntry(List propagationEntityIds) { + this.propagationEntityIds = propagationEntityIds; + } + + @Override + public ArgumentEntryType getType() { + return ArgumentEntryType.PROPAGATION; + } + + @Override + public Object getValue() { + return propagationEntityIds; + } + + @Override + public boolean updateEntry(ArgumentEntry entry) { + if (!(entry instanceof PropagationArgumentEntry propagationArgumentEntry)) { + throw new IllegalArgumentException("Unsupported argument entry type for propagation argument entry: " + entry.getType()); + } + if (propagationArgumentEntry.isEmpty()) { + propagationEntityIds.clear(); + } else { + propagationEntityIds = propagationArgumentEntry.getPropagationEntityIds(); + } + return true; + } + + @Override + public boolean isEmpty() { + return CollectionsUtil.isEmpty(propagationEntityIds); + } + + @Override + public TbelCfArg toTbelCfArg() { + return new TbelCfPropagationArg(propagationEntityIds); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java new file mode 100644 index 0000000000..21a4493c91 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java @@ -0,0 +1,59 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state.propagation; + +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.service.cf.CalculatedFieldResult; +import org.thingsboard.server.service.cf.PropagationCalculatedFieldResult; +import org.thingsboard.server.service.cf.TelemetryCalculatedFieldResult; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; +import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; + +import java.util.Map; + +import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; + +public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState { + + public PropagationCalculatedFieldState(EntityId entityId) { + super(entityId); + } + + @Override + public CalculatedFieldType getType() { + return CalculatedFieldType.PROPAGATION; + } + + @Override + public ListenableFuture performCalculation(Map updatedArgs, CalculatedFieldCtx ctx) { + ArgumentEntry argumentEntry = arguments.get(PROPAGATION_CONFIG_ARGUMENT); + if (!(argumentEntry instanceof PropagationArgumentEntry propagationArgumentEntry) || propagationArgumentEntry.isEmpty()) { + return Futures.immediateFuture(PropagationCalculatedFieldResult.builder().build()); + } + return Futures.transform(super.performCalculation(updatedArgs, ctx), telemetryCfResult -> + PropagationCalculatedFieldResult.builder() + .propagationEntityIds(propagationArgumentEntry.getPropagationEntityIds()) + .result((TelemetryCalculatedFieldResult) telemetryCfResult) + .build(), + MoreExecutors.directExecutor()); + } + +} diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java index c81d14f07b..df82488268 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java @@ -36,6 +36,7 @@ import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalculatedFieldState; import java.util.Optional; @@ -79,6 +80,7 @@ public class CalculatedFieldArgumentUtils { case SCRIPT -> new ScriptCalculatedFieldState(entityId); case GEOFENCING -> new GeofencingCalculatedFieldState(entityId); case ALARM -> new AlarmCalculatedFieldState(entityId); + case PROPAGATION -> new PropagationCalculatedFieldState(entityId); }; } diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java index 38aeb45a20..69337fe2e6 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java @@ -32,6 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.AlarmStateProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.EntityIdProto; import org.thingsboard.server.gen.transport.TransportProtos.GeofencingArgumentProto; import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneProto; import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto; @@ -50,7 +51,10 @@ import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmRuleState; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingZoneState; +import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry; +import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalculatedFieldState; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.TreeMap; @@ -58,6 +62,8 @@ import java.util.UUID; import java.util.function.Function; import java.util.stream.Collectors; +import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; + public class CalculatedFieldUtils { public static CalculatedFieldIdProto toProto(CalculatedFieldId cfId) { @@ -92,12 +98,11 @@ public class CalculatedFieldUtils { .setType(state.getType().name()); state.getArguments().forEach((argName, argEntry) -> { - if (argEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { - builder.addSingleValueArguments(toSingleValueArgumentProto(argName, singleValueArgumentEntry)); - } else if (argEntry instanceof TsRollingArgumentEntry rollingArgumentEntry) { - builder.addRollingValueArguments(toRollingArgumentProto(argName, rollingArgumentEntry)); - } else if (argEntry instanceof GeofencingArgumentEntry geofencingArgumentEntry) { - builder.addGeofencingArguments(toGeofencingArgumentProto(argName, geofencingArgumentEntry)); + switch (argEntry.getType()) { + case SINGLE_VALUE -> builder.addSingleValueArguments(toSingleValueArgumentProto(argName, (SingleValueArgumentEntry) argEntry)); + case TS_ROLLING -> builder.addRollingValueArguments(toRollingArgumentProto(argName, (TsRollingArgumentEntry) argEntry)); + case GEOFENCING -> builder.addGeofencingArguments(toGeofencingArgumentProto(argName, (GeofencingArgumentEntry) argEntry)); + case PROPAGATION -> builder.addAllPropagationEntityIds(toPropagationEntityIdsProto((PropagationArgumentEntry) argEntry)); } }); if (state instanceof AlarmCalculatedFieldState alarmState) { @@ -112,6 +117,10 @@ public class CalculatedFieldUtils { return builder.build(); } + private static List toPropagationEntityIdsProto(PropagationArgumentEntry argEntry) { + return argEntry.getPropagationEntityIds().stream().map(ProtoUtils::toProto).collect(Collectors.toList()); + } + private static AlarmRuleStateProto toAlarmRuleStateProto(AlarmRuleState ruleState) { return AlarmRuleStateProto.newBuilder() .setSeverity(Optional.ofNullable(ruleState.getSeverity()).map(Enum::name).orElse("")) @@ -178,11 +187,15 @@ public class CalculatedFieldUtils { case SCRIPT -> new ScriptCalculatedFieldState(id.entityId()); case GEOFENCING -> new GeofencingCalculatedFieldState(id.entityId()); case ALARM -> new AlarmCalculatedFieldState(id.entityId()); + case PROPAGATION -> new PropagationCalculatedFieldState(id.entityId()); }; proto.getSingleValueArgumentsList().forEach(argProto -> state.getArguments().put(argProto.getArgName(), fromSingleValueArgumentProto(argProto))); + List propagationEntityIds = proto.getPropagationEntityIdsList().stream().map(ProtoUtils::fromProto).toList(); + state.getArguments().put(PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry(propagationEntityIds)); + switch (type) { case SCRIPT -> { proto.getRollingValueArgumentsList().forEach(argProto -> diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldType.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldType.java index 3399808a35..7f38773c1e 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldType.java @@ -19,5 +19,6 @@ public enum CalculatedFieldType { SIMPLE, SCRIPT, GEOFENCING, - ALARM + ALARM, + PROPAGATION } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java index b72cdad60a..6913b1ed63 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java @@ -28,12 +28,16 @@ public abstract class BaseCalculatedFieldConfiguration implements ExpressionBase @Override public void validate() { + baseCalculatedFieldRestriction(); + if (arguments.values().stream().anyMatch(Argument::hasRelationQuerySource)) { + throw new IllegalArgumentException("Calculated field with type: '" + getType() + "' doesn't support relation query configuration!"); + } + } + + protected void baseCalculatedFieldRestriction() { if (arguments.containsKey("ctx")) { throw new IllegalArgumentException("Argument name 'ctx' is reserved and cannot be used."); } - if (arguments.values().stream().anyMatch(Argument::hasRelationQuerySource)) { - throw new IllegalArgumentException("Calculated field with type: '" + getType() + "' doesn't support relation query source configuration!"); - } } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java index d3622a2dcf..8676c6060f 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java @@ -40,7 +40,8 @@ import java.util.stream.Collectors; @Type(value = SimpleCalculatedFieldConfiguration.class, name = "SIMPLE"), @Type(value = ScriptCalculatedFieldConfiguration.class, name = "SCRIPT"), @Type(value = GeofencingCalculatedFieldConfiguration.class, name = "GEOFENCING"), - @Type(value = AlarmCalculatedFieldConfiguration.class, name = "ALARM") + @Type(value = AlarmCalculatedFieldConfiguration.class, name = "ALARM"), + @Type(value = PropagationCalculatedFieldConfiguration.class, name = "PROPAGATION") }) @JsonIgnoreProperties(ignoreUnknown = true) public interface CalculatedFieldConfiguration { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PropagationCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PropagationCalculatedFieldConfiguration.java new file mode 100644 index 0000000000..7585c30438 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PropagationCalculatedFieldConfiguration.java @@ -0,0 +1,66 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.cf.configuration; + +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.thingsboard.server.common.data.StringUtils; +import org.thingsboard.server.common.data.cf.CalculatedFieldType; +import org.thingsboard.server.common.data.relation.EntitySearchDirection; +import org.thingsboard.server.common.data.relation.RelationPathLevel; + +import java.util.List; + +@Data +@EqualsAndHashCode(callSuper = true) +public class PropagationCalculatedFieldConfiguration extends BaseCalculatedFieldConfiguration { + + public static final String PROPAGATION_CONFIG_ARGUMENT = "propagationCtx"; + + private EntitySearchDirection direction; + private String relationType; + + @Override + public CalculatedFieldType getType() { + return CalculatedFieldType.PROPAGATION; + } + + @Override + public void validate() { + baseCalculatedFieldRestriction(); + propagationRestriction(); + if (direction == null) { + throw new IllegalArgumentException("Propagation calculated field direction must be specified!"); + } + if (StringUtils.isBlank(relationType)) { + throw new IllegalArgumentException("Propagation calculated field relation type must be specified!"); + } + } + + public Argument toPropagationArgument() { + var refDynamicSourceConfiguration = new RelationPathQueryDynamicSourceConfiguration(); + refDynamicSourceConfiguration.setLevels(List.of(new RelationPathLevel(direction, relationType))); + var propagationArgument = new Argument(); + propagationArgument.setRefDynamicSourceConfiguration(refDynamicSourceConfiguration); + return propagationArgument; + } + + private void propagationRestriction() { + if (arguments.entrySet().stream().anyMatch(entry -> entry.getKey().equals(PROPAGATION_CONFIG_ARGUMENT))) { + throw new IllegalArgumentException("Argument name '" + PROPAGATION_CONFIG_ARGUMENT + "' is reserved and cannot be used."); + } + } +} diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index fac1116a30..1e3e121202 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -922,6 +922,7 @@ message CalculatedFieldStateProto { repeated TsRollingArgumentProto rollingValueArguments = 4; repeated GeofencingArgumentProto geofencingArguments = 5; AlarmStateProto alarmState = 6; + repeated EntityIdProto propagationEntityIds = 7; } //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java index 73a2183564..6f83aac1b9 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java @@ -27,7 +27,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; @JsonSubTypes({ @JsonSubTypes.Type(value = TbelCfSingleValueArg.class, name = "SINGLE_VALUE"), @JsonSubTypes.Type(value = TbelCfTsRollingArg.class, name = "TS_ROLLING"), - @JsonSubTypes.Type(value = TbelCfTsGeofencingArg.class, name = "GEOFENCING_CF_ARGUMENT_VALUE"), + @JsonSubTypes.Type(value = TbelCfGeofencingArg.class, name = "GEOFENCING_CF_ARGUMENT_VALUE"), + @JsonSubTypes.Type(value = TbelCfPropagationArg.class, name = "PROPAGATION_CF_ARGUMENT_VALUE"), }) public interface TbelCfArg extends TbelCfObject { diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfGeofencingArg.java similarity index 89% rename from common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java rename to common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfGeofencingArg.java index f1e8ec16db..0fa0f4a5bf 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfGeofencingArg.java @@ -20,12 +20,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import lombok.Data; @Data -public class TbelCfTsGeofencingArg implements TbelCfArg { +public class TbelCfGeofencingArg implements TbelCfArg { private final Object value; @JsonCreator - public TbelCfTsGeofencingArg(@JsonProperty("value") Object value) { + public TbelCfGeofencingArg(@JsonProperty("value") Object value) { this.value = value; } diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfPropagationArg.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfPropagationArg.java new file mode 100644 index 0000000000..83d7e81a86 --- /dev/null +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfPropagationArg.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.script.api.tbel; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import lombok.Data; + +@Data +public class TbelCfPropagationArg implements TbelCfArg { + + private final Object value; + + @JsonCreator + public TbelCfPropagationArg(@JsonProperty("value") Object value) { + this.value = value; + } + + @Override + public String getType() { + return "PROPAGATION_CF_ARGUMENT_VALUE"; + } + + @Override + public long memorySize() { + return OBJ_SIZE; + } + +}