diff --git a/application/pom.xml b/application/pom.xml
index d56fbe3708..6ce77646f8 100644
--- a/application/pom.xml
+++ b/application/pom.xml
@@ -373,6 +373,10 @@
com.google.firebase
firebase-admin
+
+ org.rocksdb
+ rocksdbjni
+
diff --git a/application/src/main/data/upgrade/basic/schema_update.sql b/application/src/main/data/upgrade/basic/schema_update.sql
index 406a288505..ddfac0d768 100644
--- a/application/src/main/data/upgrade/basic/schema_update.sql
+++ b/application/src/main/data/upgrade/basic/schema_update.sql
@@ -31,9 +31,10 @@ DO $$
|| jsonb_build_object(
'processingSettings', jsonb_build_object(
'type', 'ADVANCED',
- 'timeseries', jsonb_build_object('type', 'ON_EVERY_MESSAGE'),
- 'latest', jsonb_build_object('type', 'SKIP'),
- 'webSockets', jsonb_build_object('type', 'ON_EVERY_MESSAGE')
+ 'timeseries', jsonb_build_object('type', 'ON_EVERY_MESSAGE'),
+ 'latest', jsonb_build_object('type', 'SKIP'),
+ 'webSockets', jsonb_build_object('type', 'ON_EVERY_MESSAGE'),
+ 'calculatedFields', jsonb_build_object('type', 'ON_EVERY_MESSAGE')
)
)
)::text,
@@ -61,3 +62,5 @@ DO $$
$$;
-- UPDATE SAVE TIME SERIES NODES END
+
+ALTER TABLE api_usage_state ADD COLUMN IF NOT EXISTS version BIGINT DEFAULT 1;
\ No newline at end of file
diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
index eb76473386..50a777840c 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
@@ -41,13 +41,18 @@ import org.thingsboard.script.api.js.JsInvokeService;
import org.thingsboard.script.api.tbel.TbelInvokeService;
import org.thingsboard.server.actors.service.ActorService;
import org.thingsboard.server.actors.tenant.DebugTbRateLimits;
+import org.thingsboard.server.cache.limits.RateLimitService;
import org.thingsboard.server.cluster.TbClusterService;
+import org.thingsboard.server.common.data.event.CalculatedFieldDebugEvent;
import org.thingsboard.server.common.data.event.ErrorEvent;
import org.thingsboard.server.common.data.event.LifecycleEvent;
import org.thingsboard.server.common.data.event.RuleChainDebugEvent;
import org.thingsboard.server.common.data.event.RuleNodeDebugEvent;
+import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.limit.LimitedApi;
+import org.thingsboard.server.common.data.msg.TbMsgType;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
@@ -62,6 +67,7 @@ import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.audit.AuditLogService;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
+import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.ClaimDevicesService;
@@ -94,6 +100,7 @@ import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
+import org.thingsboard.server.dao.usagerecord.ApiLimitService;
import org.thingsboard.server.dao.user.UserService;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.dao.widget.WidgetsBundleService;
@@ -101,6 +108,10 @@ import org.thingsboard.server.queue.discovery.DiscoveryService;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.service.apiusage.TbApiUsageStateService;
+import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
+import org.thingsboard.server.service.cf.CalculatedFieldStateService;
+import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
+import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.component.ComponentDiscoveryService;
import org.thingsboard.server.service.edge.rpc.EdgeRpcService;
import org.thingsboard.server.service.entitiy.entityview.TbEntityViewService;
@@ -121,13 +132,17 @@ import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.AlarmSubscriptionService;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.service.transport.TbCoreToTransportService;
+import org.thingsboard.server.utils.DebugModeRateLimitsConfig;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
@Slf4j
@Component
@@ -156,6 +171,18 @@ public class ActorSystemContext {
}
};
+ private static final FutureCallback CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK = new FutureCallback<>() {
+ @Override
+ public void onSuccess(@Nullable Void event) {
+
+ }
+
+ @Override
+ public void onFailure(Throwable th) {
+ log.error("Could not save debug Event for Calculated Field", th);
+ }
+ };
+
private final ConcurrentMap debugPerTenantLimits = new ConcurrentHashMap<>();
public ConcurrentMap getDebugPerTenantLimits() {
@@ -289,6 +316,7 @@ public class ActorSystemContext {
@Getter
private TbEntityViewService tbEntityViewService;
+ @Lazy
@Autowired
@Getter
private TelemetrySubscriptionService tsSubService;
@@ -394,6 +422,10 @@ public class ActorSystemContext {
@Getter
private SlackService slackService;
+ @Autowired
+ @Getter
+ private CalculatedFieldService calculatedFieldService;
+
@Lazy
@Autowired(required = false)
@Getter
@@ -416,6 +448,21 @@ public class ActorSystemContext {
@Getter
private TbCoreToTransportService tbCoreToTransportService;
+ @Lazy
+ @Autowired(required = false)
+ @Getter
+ private ApiLimitService apiLimitService;
+
+ @Lazy
+ @Autowired(required = false)
+ @Getter
+ private RateLimitService rateLimitService;
+
+ @Lazy
+ @Autowired(required = false)
+ @Getter
+ private DebugModeRateLimitsConfig debugModeRateLimitsConfig;
+
/**
* The following Service will be null if we operate in tb-core mode
*/
@@ -487,6 +534,21 @@ public class ActorSystemContext {
@Getter
private EntityService entityService;
+ @Lazy
+ @Autowired(required = false)
+ @Getter
+ private CalculatedFieldProcessingService calculatedFieldProcessingService;
+
+ @Lazy
+ @Autowired(required = false)
+ @Getter
+ private CalculatedFieldStateService calculatedFieldStateService;
+
+ @Lazy
+ @Autowired(required = false)
+ @Getter
+ private CalculatedFieldEntityProfileCache calculatedFieldEntityProfileCache;
+
@Value("${actors.session.max_concurrent_sessions_per_device:1}")
@Getter
private long maxConcurrentSessionsPerDevice;
@@ -558,14 +620,6 @@ public class ActorSystemContext {
@Getter
private long sessionReportTimeout;
- @Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.enabled:true}")
- @Getter
- private boolean debugPerTenantEnabled;
-
- @Value("${actors.rule.chain.debug_mode_rate_limits_per_tenant.configuration:50000:3600}")
- @Getter
- private String debugPerTenantLimitsConfiguration;
-
@Value("${actors.rpc.submit_strategy:BURST}")
@Getter
private String rpcSubmitStrategy;
@@ -719,9 +773,9 @@ public class ActorSystemContext {
}
private boolean checkLimits(TenantId tenantId, TbMsg tbMsg, Throwable error) {
- if (debugPerTenantEnabled) {
+ if (debugModeRateLimitsConfig.isRuleChainDebugPerTenantLimitsEnabled()) {
DebugTbRateLimits debugTbRateLimits = debugPerTenantLimits.computeIfAbsent(tenantId, id ->
- new DebugTbRateLimits(new TbRateLimits(debugPerTenantLimitsConfiguration), false));
+ new DebugTbRateLimits(new TbRateLimits(debugModeRateLimitsConfig.getRuleChainDebugPerTenantLimitsConfiguration()), false));
if (!debugTbRateLimits.getTbRateLimits().tryConsume()) {
if (!debugTbRateLimits.isRuleChainEventSaved()) {
@@ -751,6 +805,51 @@ public class ActorSystemContext {
Futures.addCallback(future, RULE_CHAIN_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
}
+ public void persistCalculatedFieldDebugEvent(TenantId tenantId, CalculatedFieldId calculatedFieldId, EntityId entityId, Map arguments, UUID tbMsgId, TbMsgType tbMsgType, String result, String errorMessage) {
+ if (checkLimits(tenantId)) {
+ try {
+ CalculatedFieldDebugEvent.CalculatedFieldDebugEventBuilder eventBuilder = CalculatedFieldDebugEvent.builder()
+ .tenantId(tenantId)
+ .entityId(calculatedFieldId.getId())
+ .serviceId(getServiceId())
+ .calculatedFieldId(calculatedFieldId)
+ .eventEntity(entityId);
+ if (tbMsgId != null) {
+ eventBuilder.msgId(tbMsgId);
+ }
+ if (tbMsgType != null) {
+ eventBuilder.msgType(tbMsgType.name());
+ }
+ if (arguments != null) {
+ eventBuilder.arguments(JacksonUtil.toString(
+ arguments.entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().toTbelCfArg()))
+ ));
+ }
+ if (result != null) {
+ eventBuilder.result(result);
+ }
+ if (errorMessage != null) {
+ eventBuilder.error(errorMessage);
+ }
+
+ ListenableFuture future = eventService.saveAsync(eventBuilder.build());
+ Futures.addCallback(future, CALCULATED_FIELD_DEBUG_EVENT_ERROR_CALLBACK, MoreExecutors.directExecutor());
+ } catch (IllegalArgumentException ex) {
+ log.warn("Failed to persist calculated field debug message", ex);
+ }
+ }
+ }
+
+ private boolean checkLimits(TenantId tenantId) {
+ if (debugModeRateLimitsConfig.isCalculatedFieldDebugPerTenantLimitsEnabled() &&
+ !rateLimitService.checkRateLimit(LimitedApi.CALCULATED_FIELD_DEBUG_EVENTS, (Object) tenantId, debugModeRateLimitsConfig.getCalculatedFieldDebugPerTenantLimitsConfiguration())) {
+ log.trace("[{}] Calculated field debug event limits exceeded!", tenantId);
+ return false;
+ }
+ return true;
+ }
+
public static Exception toException(Throwable error) {
return Exception.class.isInstance(error) ? (Exception) error : new Exception(error);
}
diff --git a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
index dc6a3bcf5e..50fa7e2f8d 100644
--- a/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
+++ b/application/src/main/java/org/thingsboard/server/actors/app/AppActor.java
@@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.page.PageDataIterable;
import org.thingsboard.server.common.data.plugin.ComponentLifecycleEvent;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.aware.TenantAwareMsg;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg;
@@ -87,6 +88,7 @@ public class AppActor extends ContextAwareActor {
case APP_INIT_MSG:
break;
case PARTITION_CHANGE_MSG:
+ case CF_PARTITIONS_CHANGE_MSG:
ctx.broadcastToChildren(msg, true);
break;
case COMPONENT_LIFE_CYCLE_MSG:
@@ -111,6 +113,18 @@ public class AppActor extends ContextAwareActor {
case SESSION_TIMEOUT_MSG:
ctx.broadcastToChildrenByType(msg, EntityType.TENANT);
break;
+ case CF_INIT_MSG:
+ case CF_LINK_INIT_MSG:
+ case CF_STATE_RESTORE_MSG:
+ case CF_ENTITY_LIFECYCLE_MSG:
+ //TODO: use priority from the message body. For example, messages about CF lifecycle are important and Device lifecycle are not.
+ // same for the Linked telemetry.
+ onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true);
+ break;
+ case CF_TELEMETRY_MSG:
+ case CF_LINKED_TELEMETRY_MSG:
+ onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, false);
+ break;
default:
return false;
}
@@ -175,6 +189,19 @@ public class AppActor extends ContextAwareActor {
}
}
+ private void onToCalculatedFieldSystemActorMsg(ToCalculatedFieldSystemMsg msg, boolean priority) {
+ getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> {
+ if (priority) {
+ tenantActor.tellWithHighPriority(msg);
+ } else {
+ tenantActor.tell(msg);
+ }
+ }, () -> {
+ msg.getCallback().onSuccess();
+ });
+ }
+
+
private void onToDeviceActorMsg(TenantAwareMsg msg, boolean priority) {
getOrCreateTenantActor(msg.getTenantId()).ifPresentOrElse(tenantActor -> {
if (priority) {
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java
new file mode 100644
index 0000000000..6cf34599de
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/AbstractCalculatedFieldActor.java
@@ -0,0 +1,70 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.common.util.DebugModeUtil;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.service.ContextAwareActor;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.TbActorMsg;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+
+@Slf4j
+public abstract class AbstractCalculatedFieldActor extends ContextAwareActor {
+
+ protected final TenantId tenantId;
+
+ public AbstractCalculatedFieldActor(ActorSystemContext systemContext, TenantId tenantId) {
+ super(systemContext);
+ this.tenantId = tenantId;
+ }
+
+ @Override
+ protected boolean doProcess(TbActorMsg msg) {
+ if (msg instanceof ToCalculatedFieldSystemMsg cfm) {
+ Exception cause;
+ try {
+ return doProcessCfMsg(cfm);
+ } catch (CalculatedFieldException cfe) {
+ if (DebugModeUtil.isDebugFailuresAvailable(cfe.getCtx().getCalculatedField())) {
+ String message;
+ if (cfe.getErrorMessage() != null) {
+ message = cfe.getErrorMessage();
+ } else if (cfe.getCause() != null) {
+ message = cfe.getCause().getMessage();
+ } else {
+ message = "N/A";
+ }
+ systemContext.persistCalculatedFieldDebugEvent(tenantId, cfe.getCtx().getCfId(), cfe.getEventEntity(), cfe.getArguments(), cfe.getMsgId(), cfe.getMsgType(), null, message);
+ }
+ cause = cfe.getCause();
+ } catch (Exception e) {
+ logProcessingException(e);
+ cause = e;
+ }
+ cfm.getCallback().onFailure(cause);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ abstract void logProcessingException(Exception e);
+
+ abstract boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java
new file mode 100644
index 0000000000..350a5776cf
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java
@@ -0,0 +1,81 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.TbActorCtx;
+import org.thingsboard.server.actors.TbActorException;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
+
+@Slf4j
+public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor {
+
+ private final CalculatedFieldEntityMessageProcessor processor;
+
+ CalculatedFieldEntityActor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) {
+ super(systemContext, tenantId);
+ this.processor = new CalculatedFieldEntityMessageProcessor(systemContext, tenantId, entityId);
+ }
+
+ @Override
+ public void init(TbActorCtx ctx) throws TbActorException {
+ super.init(ctx);
+ log.debug("[{}][{}] Starting CF entity actor.", processor.tenantId, processor.entityId);
+ try {
+ processor.init(ctx);
+ log.debug("[{}][{}] CF entity actor started.", processor.tenantId, processor.entityId);
+ } catch (Exception e) {
+ log.warn("[{}][{}] Unknown failure", processor.tenantId, processor.entityId, e);
+ throw new TbActorException("Failed to initialize CF entity actor", e);
+ }
+ }
+
+ @Override
+ protected boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException {
+ switch (msg.getMsgType()) {
+ case CF_PARTITIONS_CHANGE_MSG:
+ processor.process((CalculatedFieldPartitionChangeMsg) msg);
+ break;
+ case CF_STATE_RESTORE_MSG:
+ processor.process((CalculatedFieldStateRestoreMsg) msg);
+ break;
+ case CF_ENTITY_INIT_CF_MSG:
+ processor.process((EntityInitCalculatedFieldMsg) msg);
+ break;
+ case CF_ENTITY_DELETE_MSG:
+ processor.process((CalculatedFieldEntityDeleteMsg) msg);
+ break;
+ case CF_ENTITY_TELEMETRY_MSG:
+ processor.process((EntityCalculatedFieldTelemetryMsg) msg);
+ break;
+ case CF_LINKED_TELEMETRY_MSG:
+ processor.process((EntityCalculatedFieldLinkedTelemetryMsg) msg);
+ break;
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ void logProcessingException(Exception e) {
+ log.warn("[{}][{}] Processing failure", tenantId, processor.entityId, e);
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java
new file mode 100644
index 0000000000..6dc2f26050
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActorCreator.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.TbActor;
+import org.thingsboard.server.actors.TbActorId;
+import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId;
+import org.thingsboard.server.actors.TbEntityActorId;
+import org.thingsboard.server.actors.device.DeviceActor;
+import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+
+public class CalculatedFieldEntityActorCreator extends ContextBasedCreator {
+
+ private final TenantId tenantId;
+ private final EntityId entityId;
+
+ public CalculatedFieldEntityActorCreator(ActorSystemContext context, TenantId tenantId, EntityId entityId) {
+ super(context);
+ this.tenantId = tenantId;
+ this.entityId = entityId;
+ }
+
+ @Override
+ public TbActorId createActorId() {
+ return new TbCalculatedFieldEntityActorId(entityId);
+ }
+
+ @Override
+ public TbActor createActor() {
+ return new CalculatedFieldEntityActor(context, tenantId, entityId);
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityDeleteMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityDeleteMsg.java
new file mode 100644
index 0000000000..3ca6e8596a
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityDeleteMsg.java
@@ -0,0 +1,44 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+import org.thingsboard.server.common.msg.queue.TbCallback;
+
+@Data
+public class CalculatedFieldEntityDeleteMsg implements ToCalculatedFieldSystemMsg {
+
+ private final TenantId tenantId;
+ private final EntityId entityId;
+ private final TbCallback callback;
+
+ public CalculatedFieldEntityDeleteMsg(TenantId tenantId,
+ EntityId entityId,
+ TbCallback callback) {
+ this.tenantId = tenantId;
+ this.entityId = entityId;
+ this.callback = callback;
+ }
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.CF_ENTITY_DELETE_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
new file mode 100644
index 0000000000..f7fc204c0f
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
@@ -0,0 +1,439 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.common.util.DebugModeUtil;
+import org.thingsboard.common.util.JacksonUtil;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.TbActorCtx;
+import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
+import org.thingsboard.server.common.data.AttributeScope;
+import org.thingsboard.server.common.data.StringUtils;
+import org.thingsboard.server.common.data.cf.configuration.Argument;
+import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
+import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
+import org.thingsboard.server.common.data.id.CalculatedFieldId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.kv.StringDataEntry;
+import org.thingsboard.server.common.data.msg.TbMsgType;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
+import org.thingsboard.server.common.msg.queue.TbCallback;
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeScopeProto;
+import org.thingsboard.server.gen.transport.TransportProtos.AttributeValueProto;
+import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
+import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
+import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
+import org.thingsboard.server.service.cf.CalculatedFieldResult;
+import org.thingsboard.server.service.cf.CalculatedFieldStateService;
+import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
+import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
+import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+/**
+ * @author Andrew Shvayka
+ */
+@Slf4j
+public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareMsgProcessor {
+ // (1 for result persistence + 1 for the state persistence )
+ public static final int CALLBACKS_PER_CF = 2;
+
+ final TenantId tenantId;
+ final EntityId entityId;
+ final CalculatedFieldProcessingService cfService;
+ final CalculatedFieldStateService cfStateService;
+ final int partition;
+
+ TbActorCtx ctx;
+ Map states = new HashMap<>();
+
+ CalculatedFieldEntityMessageProcessor(ActorSystemContext systemContext, TenantId tenantId, EntityId entityId) {
+ super(systemContext);
+ this.tenantId = tenantId;
+ this.entityId = entityId;
+ this.cfService = systemContext.getCalculatedFieldProcessingService();
+ this.cfStateService = systemContext.getCalculatedFieldStateService();
+ this.partition = systemContext.getCalculatedFieldEntityProfileCache().getEntityIdPartition(tenantId, entityId);
+ }
+
+ void init(TbActorCtx ctx) {
+ this.ctx = ctx;
+ }
+
+ public void process(CalculatedFieldPartitionChangeMsg msg) {
+ if (!msg.getPartitions()[partition]) {
+ log.info("[{}][{}] Stopping entity actor due to change partition event.", partition, entityId);
+ ctx.stop(ctx.getSelf());
+ }
+ }
+
+ public void process(CalculatedFieldStateRestoreMsg msg) {
+ CalculatedFieldId cfId = msg.getId().cfId();
+ log.info("[{}] [{}] Processing CF state restore msg.", msg.getId().entityId(), cfId);
+ if (msg.getState() != null) {
+ states.put(cfId, msg.getState());
+ } else {
+ states.remove(cfId);
+ }
+ }
+
+ public void process(EntityInitCalculatedFieldMsg msg) throws CalculatedFieldException {
+ log.info("[{}] Processing entity init CF msg.", msg.getCtx().getCfId());
+ var ctx = msg.getCtx();
+ if (msg.isForceReinit()) {
+ log.info("Force reinitialization of CF: [{}].", ctx.getCfId());
+ states.remove(ctx.getCfId());
+ }
+ try {
+ var state = getOrInitState(ctx);
+ if (state.isSizeOk()) {
+ processStateIfReady(ctx, Collections.singletonList(ctx.getCfId()), state, null, null, msg.getCallback());
+ } else {
+ throw new RuntimeException(ctx.getSizeExceedsLimitMessage());
+ }
+ } catch (Exception e) {
+ if (e instanceof CalculatedFieldException cfe) {
+ throw cfe;
+ }
+ throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
+ }
+ }
+
+ public void process(CalculatedFieldEntityDeleteMsg msg) {
+ log.info("[{}] Processing CF entity delete msg.", msg.getEntityId());
+ if (this.entityId.equals(msg.getEntityId())) {
+ MultipleTbCallback multipleTbCallback = new MultipleTbCallback(states.size(), msg.getCallback());
+ states.forEach((cfId, state) -> cfStateService.removeState(new CalculatedFieldEntityCtxId(tenantId, cfId, entityId), multipleTbCallback));
+ ctx.stop(ctx.getSelf());
+ } else {
+ var cfId = new CalculatedFieldId(msg.getEntityId().getId());
+ var state = states.remove(cfId);
+ if (state != null) {
+ cfStateService.removeState(new CalculatedFieldEntityCtxId(tenantId, cfId, entityId), msg.getCallback());
+ }
+ }
+ }
+
+ public void process(EntityCalculatedFieldTelemetryMsg msg) throws CalculatedFieldException {
+ log.info("[{}] Processing CF telemetry msg.", msg.getEntityId());
+ var proto = msg.getProto();
+ var numberOfCallbacks = CALLBACKS_PER_CF * (msg.getEntityIdFields().size() + msg.getProfileIdFields().size());
+ MultipleTbCallback callback = new MultipleTbCallback(numberOfCallbacks, msg.getCallback());
+ List cfIdList = getCalculatedFieldIds(proto);
+ Set cfIdSet = new HashSet<>(cfIdList);
+ for (var ctx : msg.getEntityIdFields()) {
+ process(ctx, proto, cfIdSet, cfIdList, callback);
+ }
+ for (var ctx : msg.getProfileIdFields()) {
+ process(ctx, proto, cfIdSet, cfIdList, callback);
+ }
+ }
+
+ public void process(EntityCalculatedFieldLinkedTelemetryMsg msg) throws CalculatedFieldException {
+ log.info("[{}] Processing CF link telemetry msg.", msg.getEntityId());
+ var proto = msg.getProto();
+ var ctx = msg.getCtx();
+ var callback = new MultipleTbCallback(CALLBACKS_PER_CF, msg.getCallback());
+ try {
+ List cfIds = getCalculatedFieldIds(proto);
+ if (cfIds.contains(ctx.getCfId())) {
+ callback.onSuccess(CALLBACKS_PER_CF);
+ } else {
+ if (proto.getTsDataCount() > 0) {
+ processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
+ } else if (proto.getAttrDataCount() > 0) {
+ processArgumentValuesUpdate(ctx, cfIds, callback, mapToArguments(ctx, msg.getEntityId(), proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
+ } else if (proto.getRemovedTsKeysCount() > 0) {
+ processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto));
+ } else if (proto.getRemovedAttrKeysCount() > 0) {
+ processArgumentValuesUpdate(ctx, cfIds, callback, mapToArgumentsWithDefaultValue(ctx, msg.getEntityId(), proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto));
+ } else {
+ callback.onSuccess(CALLBACKS_PER_CF);
+ }
+ }
+ } catch (Exception e) {
+ throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
+ }
+ }
+
+ private void process(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, Collection cfIds, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
+ try {
+ if (cfIds.contains(ctx.getCfId())) {
+ callback.onSuccess(CALLBACKS_PER_CF);
+ } else {
+ if (proto.getTsDataCount() > 0) {
+ processTelemetry(ctx, proto, cfIdList, callback);
+ } else if (proto.getAttrDataCount() > 0) {
+ processAttributes(ctx, proto, cfIdList, callback);
+ } else if (proto.getRemovedTsKeysCount() > 0) {
+ processRemovedTelemetry(ctx, proto, cfIdList, callback);
+ } else if (proto.getRemovedAttrKeysCount() > 0) {
+ processRemovedAttributes(ctx, proto, cfIdList, callback);
+ } else {
+ callback.onSuccess(CALLBACKS_PER_CF);
+ }
+ }
+ } catch (Exception e) {
+ if (e instanceof CalculatedFieldException cfe) {
+ throw cfe;
+ }
+ throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
+ }
+ }
+
+ private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
+ processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
+ }
+
+ private void processAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
+ processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getScope(), proto.getAttrDataList()), toTbMsgId(proto), toTbMsgType(proto));
+ }
+
+ private void processRemovedTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
+ processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithFetchedValue(ctx, proto.getRemovedTsKeysList()), toTbMsgId(proto), toTbMsgType(proto));
+ }
+
+ private void processRemovedAttributes(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
+ processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArgumentsWithDefaultValue(ctx, proto.getScope(), proto.getRemovedAttrKeysList()), toTbMsgId(proto), toTbMsgType(proto));
+ }
+
+ private void processArgumentValuesUpdate(CalculatedFieldCtx ctx, List cfIdList, MultipleTbCallback callback,
+ Map newArgValues, UUID tbMsgId, TbMsgType tbMsgType) throws CalculatedFieldException {
+ if (newArgValues.isEmpty()) {
+ log.info("[{}] No new argument values to process for CF.", ctx.getCfId());
+ callback.onSuccess(CALLBACKS_PER_CF);
+ }
+ CalculatedFieldState state = states.get(ctx.getCfId());
+ boolean justRestored = false;
+ if (state == null) {
+ state = getOrInitState(ctx);
+ justRestored = true;
+ }
+ if (state.isSizeOk()) {
+ if (state.updateState(newArgValues) || justRestored) {
+ cfIdList = new ArrayList<>(cfIdList);
+ cfIdList.add(ctx.getCfId());
+ processStateIfReady(ctx, cfIdList, state, tbMsgId, tbMsgType, callback);
+ } else {
+ callback.onSuccess(CALLBACKS_PER_CF);
+ }
+ } else {
+ throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build();
+ }
+ }
+
+ @SneakyThrows
+ private CalculatedFieldState getOrInitState(CalculatedFieldCtx ctx) {
+ CalculatedFieldState state = states.get(ctx.getCfId());
+ if (state != null) {
+ return state;
+ } else {
+ ListenableFuture stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId);
+ // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime.
+ // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
+ // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
+ // but this will significantly complicate the code.
+ state = stateFuture.get(1, TimeUnit.MINUTES);
+ state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize());
+ states.put(ctx.getCfId(), state);
+ }
+ return state;
+ }
+
+ private void processStateIfReady(CalculatedFieldCtx ctx, List cfIdList, CalculatedFieldState state, UUID tbMsgId, TbMsgType tbMsgType, TbCallback callback) throws CalculatedFieldException {
+ CalculatedFieldEntityCtxId ctxId = new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId);
+ boolean stateSizeOk;
+ if (ctx.isInitialized() && state.isReady()) {
+ try {
+ CalculatedFieldResult calculationResult = state.performCalculation(ctx).get(5, TimeUnit.SECONDS);
+ state.checkStateSize(ctxId, ctx.getMaxStateSize());
+ stateSizeOk = state.isSizeOk();
+ if (stateSizeOk) {
+ cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
+ if (DebugModeUtil.isDebugAllAvailable(ctx.getCalculatedField())) {
+ systemContext.persistCalculatedFieldDebugEvent(tenantId, ctx.getCfId(), entityId, state.getArguments(), tbMsgId, tbMsgType, JacksonUtil.writeValueAsString(calculationResult.getResult()), null);
+ }
+ }
+ } catch (Exception e) {
+ throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).msgId(tbMsgId).msgType(tbMsgType).arguments(state.getArguments()).cause(e).build();
+ }
+ } else {
+ state.checkStateSize(ctxId, ctx.getMaxStateSize());
+ stateSizeOk = state.isSizeOk();
+ if (stateSizeOk) {
+ callback.onSuccess(); // State was updated but no calculation performed;
+ }
+ }
+ if (stateSizeOk) {
+ cfStateService.persistState(ctxId, state, callback);
+ } else {
+ removeStateAndRaiseSizeException(ctxId, CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).errorMessage(ctx.getSizeExceedsLimitMessage()).build(), callback);
+ }
+ }
+
+ private void removeStateAndRaiseSizeException(CalculatedFieldEntityCtxId ctxId, CalculatedFieldException ex, TbCallback callback) {
+ // We remove the state, but remember that it is over-sized in a local map.
+ cfStateService.removeState(ctxId, new TbCallback() {
+ @Override
+ public void onSuccess() {
+ callback.onFailure(ex);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ callback.onFailure(ex);
+ }
+ });
+ }
+
+ private Map mapToArguments(CalculatedFieldCtx ctx, List data) {
+ return mapToArguments(ctx.getMainEntityArguments(), data);
+ }
+
+ private Map mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, List data) {
+ var argNames = ctx.getLinkedEntityArguments().get(entityId);
+ if (argNames.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return mapToArguments(argNames, data);
+ }
+
+ private Map mapToArguments(Map argNames, List data) {
+ if (argNames.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ Map arguments = new HashMap<>();
+ for (TsKvProto item : data) {
+ ReferencedEntityKey key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_LATEST, null);
+ String argName = argNames.get(key);
+ if (argName != null) {
+ arguments.put(argName, new SingleValueArgumentEntry(item));
+ }
+ key = new ReferencedEntityKey(item.getKv().getKey(), ArgumentType.TS_ROLLING, null);
+ argName = argNames.get(key);
+ if (argName != null) {
+ arguments.put(argName, new SingleValueArgumentEntry(item));
+ }
+ }
+ return arguments;
+ }
+
+ private Map mapToArguments(CalculatedFieldCtx ctx, AttributeScopeProto scope, List attrDataList) {
+ return mapToArguments(ctx.getMainEntityArguments(), scope, attrDataList);
+ }
+
+ private Map mapToArguments(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List attrDataList) {
+ var argNames = ctx.getLinkedEntityArguments().get(entityId);
+ if (argNames.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return mapToArguments(argNames, scope, attrDataList);
+ }
+
+ private Map mapToArguments(Map argNames, AttributeScopeProto scope, List attrDataList) {
+ Map arguments = new HashMap<>();
+ for (AttributeValueProto item : attrDataList) {
+ ReferencedEntityKey key = new ReferencedEntityKey(item.getKey(), ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
+ String argName = argNames.get(key);
+ if (argName != null) {
+ arguments.put(argName, new SingleValueArgumentEntry(item));
+ }
+ }
+ return arguments;
+ }
+
+ private Map mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, EntityId entityId, AttributeScopeProto scope, List removedAttrKeys) {
+ var argNames = ctx.getLinkedEntityArguments().get(entityId);
+ if (argNames.isEmpty()) {
+ return Collections.emptyMap();
+ }
+ return mapToArgumentsWithDefaultValue(argNames, ctx.getArguments(), scope, removedAttrKeys);
+ }
+
+ private Map mapToArgumentsWithDefaultValue(CalculatedFieldCtx ctx, AttributeScopeProto scope, List removedAttrKeys) {
+ return mapToArgumentsWithDefaultValue(ctx.getMainEntityArguments(), ctx.getArguments(), scope, removedAttrKeys);
+ }
+
+ private Map mapToArgumentsWithDefaultValue(Map argNames, Map configArguments, AttributeScopeProto scope, List removedAttrKeys) {
+ Map arguments = new HashMap<>();
+ for (String removedKey : removedAttrKeys) {
+ ReferencedEntityKey key = new ReferencedEntityKey(removedKey, ArgumentType.ATTRIBUTE, AttributeScope.valueOf(scope.name()));
+ String argName = argNames.get(key);
+ if (argName != null) {
+ Argument argument = configArguments.get(argName);
+ String defaultValue = (argument != null) ? argument.getDefaultValue() : null;
+ arguments.put(argName, StringUtils.isNotEmpty(defaultValue)
+ ? new SingleValueArgumentEntry(System.currentTimeMillis(), new StringDataEntry(removedKey, defaultValue), null)
+ : new SingleValueArgumentEntry());
+
+ }
+ }
+ return arguments;
+ }
+
+ private Map mapToArgumentsWithFetchedValue(CalculatedFieldCtx ctx, List removedTelemetryKeys) {
+ Map deletedArguments = ctx.getArguments().entrySet().stream()
+ .filter(entry -> removedTelemetryKeys.contains(entry.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+
+ Map fetchedArgs = cfService.fetchArgsFromDb(tenantId, entityId, deletedArguments);
+
+ fetchedArgs.values().forEach(arg -> arg.setForceResetPrevious(true));
+ return fetchedArgs;
+ }
+
+ private static List getCalculatedFieldIds(CalculatedFieldTelemetryMsgProto proto) {
+ List cfIds = new LinkedList<>();
+ for (var cfId : proto.getPreviousCalculatedFieldsList()) {
+ cfIds.add(new CalculatedFieldId(new UUID(cfId.getCalculatedFieldIdMSB(), cfId.getCalculatedFieldIdLSB())));
+ }
+ return cfIds;
+ }
+
+ private UUID toTbMsgId(CalculatedFieldTelemetryMsgProto proto) {
+ if (proto.getTbMsgIdMSB() != 0 && proto.getTbMsgIdLSB() != 0) {
+ return new UUID(proto.getTbMsgIdMSB(), proto.getTbMsgIdLSB());
+ }
+ return null;
+ }
+
+ private TbMsgType toTbMsgType(CalculatedFieldTelemetryMsgProto proto) {
+ if (!proto.getTbMsgType().isEmpty()) {
+ return TbMsgType.valueOf(proto.getTbMsgType());
+ }
+ return null;
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java
new file mode 100644
index 0000000000..70c8dfbfd2
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldException.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.Builder;
+import lombok.Getter;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.msg.TbMsgType;
+import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
+
+import java.util.Map;
+import java.util.UUID;
+
+@Getter
+@Builder
+public class CalculatedFieldException extends Exception {
+
+ private final CalculatedFieldCtx ctx;
+ private final EntityId eventEntity;
+ private final UUID msgId;
+ private final TbMsgType msgType;
+ private Map arguments;
+ private String errorMessage;
+ private Exception cause;
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldLinkedTelemetryMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldLinkedTelemetryMsg.java
new file mode 100644
index 0000000000..3e0fba2627
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldLinkedTelemetryMsg.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+import org.thingsboard.server.common.msg.queue.TbCallback;
+import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldLinkedTelemetryMsgProto;
+import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
+
+@Data
+public class CalculatedFieldLinkedTelemetryMsg implements ToCalculatedFieldSystemMsg {
+
+ private final TenantId tenantId;
+ private final EntityId entityId;
+ private final CalculatedFieldLinkedTelemetryMsgProto proto;
+ private final TbCallback callback;
+
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.CF_LINKED_TELEMETRY_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java
new file mode 100644
index 0000000000..a5c935e83f
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java
@@ -0,0 +1,90 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.TbActorCtx;
+import org.thingsboard.server.actors.TbActorException;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
+
+/**
+ * Created by ashvayka on 15.03.18.
+ */
+@Slf4j
+public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor {
+
+ private final CalculatedFieldManagerMessageProcessor processor;
+
+ public CalculatedFieldManagerActor(ActorSystemContext systemContext, TenantId tenantId) {
+ super(systemContext, tenantId);
+ this.processor = new CalculatedFieldManagerMessageProcessor(systemContext, tenantId);
+ }
+
+ @Override
+ public void init(TbActorCtx ctx) throws TbActorException {
+ super.init(ctx);
+ log.debug("[{}] Starting CF manager actor.", processor.tenantId);
+ try {
+ processor.init(ctx);
+ log.debug("[{}] CF manager actor started.", processor.tenantId);
+ } catch (Exception e) {
+ log.warn("[{}] Unknown failure", processor.tenantId, e);
+ throw new TbActorException("Failed to initialize manager actor", e);
+ }
+ }
+
+ @Override
+ protected boolean doProcessCfMsg(ToCalculatedFieldSystemMsg msg) throws CalculatedFieldException {
+ switch (msg.getMsgType()) {
+ case CF_PARTITIONS_CHANGE_MSG:
+ processor.onPartitionChange((CalculatedFieldPartitionChangeMsg) msg);
+ break;
+ case CF_INIT_MSG:
+ processor.onFieldInitMsg((CalculatedFieldInitMsg) msg);
+ break;
+ case CF_LINK_INIT_MSG:
+ processor.onLinkInitMsg((CalculatedFieldLinkInitMsg) msg);
+ break;
+ case CF_STATE_RESTORE_MSG:
+ processor.onStateRestoreMsg((CalculatedFieldStateRestoreMsg) msg);
+ break;
+ case CF_ENTITY_LIFECYCLE_MSG:
+ processor.onEntityLifecycleMsg((CalculatedFieldEntityLifecycleMsg) msg);
+ break;
+ case CF_TELEMETRY_MSG:
+ processor.onTelemetryMsg((CalculatedFieldTelemetryMsg) msg);
+ break;
+ case CF_LINKED_TELEMETRY_MSG:
+ processor.onLinkedTelemetryMsg((CalculatedFieldLinkedTelemetryMsg) msg);
+ break;
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ void logProcessingException(Exception e) {
+ log.warn("[{}] Processing failure", tenantId, e);
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActorCreator.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActorCreator.java
new file mode 100644
index 0000000000..99bf3cdbe9
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActorCreator.java
@@ -0,0 +1,46 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.TbActor;
+import org.thingsboard.server.actors.TbActorId;
+import org.thingsboard.server.actors.TbEntityActorId;
+import org.thingsboard.server.actors.TbStringActorId;
+import org.thingsboard.server.actors.service.ContextBasedCreator;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+
+public class CalculatedFieldManagerActorCreator extends ContextBasedCreator {
+
+ private final TenantId tenantId;
+
+ public CalculatedFieldManagerActorCreator(ActorSystemContext context, TenantId tenantId) {
+ super(context);
+ this.tenantId = tenantId;
+ }
+
+ @Override
+ public TbActorId createActorId() {
+ return new TbStringActorId("CFM|" + tenantId);
+ }
+
+ @Override
+ public TbActor createActor() {
+ return new CalculatedFieldManagerActor(context, tenantId);
+ }
+
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
new file mode 100644
index 0000000000..d26d89626e
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
@@ -0,0 +1,471 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.common.util.DebugModeUtil;
+import org.thingsboard.server.actors.ActorSystemContext;
+import org.thingsboard.server.actors.TbActorCtx;
+import org.thingsboard.server.actors.TbActorRef;
+import org.thingsboard.server.actors.TbCalculatedFieldEntityActorId;
+import org.thingsboard.server.actors.service.DefaultActorService;
+import org.thingsboard.server.actors.shared.AbstractContextAwareMsgProcessor;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.cf.CalculatedField;
+import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
+import org.thingsboard.server.common.data.id.AssetId;
+import org.thingsboard.server.common.data.id.CalculatedFieldId;
+import org.thingsboard.server.common.data.id.DeviceId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldEntityLifecycleMsg;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldInitMsg;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldLinkInitMsg;
+import org.thingsboard.server.common.msg.cf.CalculatedFieldPartitionChangeMsg;
+import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
+import org.thingsboard.server.common.msg.queue.TbCallback;
+import org.thingsboard.server.dao.cf.CalculatedFieldService;
+import org.thingsboard.server.service.cf.CalculatedFieldProcessingService;
+import org.thingsboard.server.service.cf.CalculatedFieldStateService;
+import org.thingsboard.server.service.cf.cache.CalculatedFieldEntityProfileCache;
+import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
+import org.thingsboard.server.service.profile.TbAssetProfileCache;
+import org.thingsboard.server.service.profile.TbDeviceProfileCache;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto;
+
+
+/**
+ * @author Andrew Shvayka
+ */
+@Slf4j
+public class CalculatedFieldManagerMessageProcessor extends AbstractContextAwareMsgProcessor {
+
+ private final Map calculatedFields = new HashMap<>();
+ private final Map> entityIdCalculatedFields = new HashMap<>();
+ private final Map> entityIdCalculatedFieldLinks = new HashMap<>();
+
+ private final CalculatedFieldProcessingService cfExecService;
+ private final CalculatedFieldStateService cfStateService;
+ private final CalculatedFieldEntityProfileCache cfEntityCache;
+ private final CalculatedFieldService cfDaoService;
+ private final TbAssetProfileCache assetProfileCache;
+ private final TbDeviceProfileCache deviceProfileCache;
+ protected final TenantId tenantId;
+
+ protected TbActorCtx ctx;
+
+ CalculatedFieldManagerMessageProcessor(ActorSystemContext systemContext, TenantId tenantId) {
+ super(systemContext);
+ this.cfEntityCache = systemContext.getCalculatedFieldEntityProfileCache();
+ this.cfExecService = systemContext.getCalculatedFieldProcessingService();
+ this.cfStateService = systemContext.getCalculatedFieldStateService();
+ this.cfDaoService = systemContext.getCalculatedFieldService();
+ this.assetProfileCache = systemContext.getAssetProfileCache();
+ this.deviceProfileCache = systemContext.getDeviceProfileCache();
+ this.tenantId = tenantId;
+ }
+
+ void init(TbActorCtx ctx) {
+ this.ctx = ctx;
+ }
+
+ public void onFieldInitMsg(CalculatedFieldInitMsg msg) throws CalculatedFieldException {
+ log.info("[{}] Processing CF init message.", msg.getCf().getId());
+ var cf = msg.getCf();
+ var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
+ try {
+ cfCtx.init();
+ } catch (Exception e) {
+ throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(cf.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
+ }
+ calculatedFields.put(cf.getId(), cfCtx);
+ // We use copy on write lists to safely pass the reference to another actor for the iteration.
+ // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
+ entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx);
+ msg.getCallback().onSuccess();
+ }
+
+ public void onLinkInitMsg(CalculatedFieldLinkInitMsg msg) {
+ log.info("[{}] Processing CF link init message for entity [{}].", msg.getLink().getCalculatedFieldId(), msg.getLink().getEntityId());
+ var link = msg.getLink();
+ // We use copy on write lists to safely pass the reference to another actor for the iteration.
+ // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
+ entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(link);
+ msg.getCallback().onSuccess();
+ }
+
+ public void onStateRestoreMsg(CalculatedFieldStateRestoreMsg msg) {
+ var cfId = msg.getId().cfId();
+ var calculatedField = calculatedFields.get(cfId);
+
+ if (calculatedField != null) {
+ msg.getState().setRequiredArguments(calculatedField.getArgNames());
+ log.info("Pushing CF state restore msg to specific actor [{}]", msg.getId().entityId());
+ getOrCreateActor(msg.getId().entityId()).tell(msg);
+ } else {
+ cfStateService.removeState(msg.getId(), msg.getCallback());
+ }
+ }
+
+ public void onEntityLifecycleMsg(CalculatedFieldEntityLifecycleMsg msg) throws CalculatedFieldException {
+ log.info("Processing entity lifecycle event: [{}] for entity: [{}]", msg.getData().getEvent(), msg.getData().getEntityId());
+ var entityType = msg.getData().getEntityId().getEntityType();
+ var event = msg.getData().getEvent();
+ switch (entityType) {
+ case CALCULATED_FIELD: {
+ switch (event) {
+ case CREATED:
+ onCfCreated(msg.getData(), msg.getCallback());
+ break;
+ case UPDATED:
+ onCfUpdated(msg.getData(), msg.getCallback());
+ break;
+ case DELETED:
+ onCfDeleted(msg.getData(), msg.getCallback());
+ break;
+ default:
+ msg.getCallback().onSuccess();
+ break;
+ }
+ break;
+ }
+ case DEVICE:
+ case ASSET: {
+ switch (event) {
+ case CREATED:
+ onEntityCreated(msg.getData(), msg.getCallback());
+ break;
+ case UPDATED:
+ onEntityUpdated(msg.getData(), msg.getCallback());
+ break;
+ case DELETED:
+ onEntityDeleted(msg.getData(), msg.getCallback());
+ break;
+ default:
+ msg.getCallback().onSuccess();
+ break;
+ }
+ break;
+ }
+ default: {
+ msg.getCallback().onSuccess();
+ }
+ }
+ }
+
+ private void onEntityCreated(ComponentLifecycleMsg msg, TbCallback callback) {
+ EntityId entityId = msg.getEntityId();
+ EntityId profileId = getProfileId(tenantId, entityId);
+ cfEntityCache.add(tenantId, profileId, entityId);
+ var entityIdFields = getCalculatedFieldsByEntityId(entityId);
+ var profileIdFields = getCalculatedFieldsByEntityId(profileId);
+ var fieldsCount = entityIdFields.size() + profileIdFields.size();
+ if (fieldsCount > 0) {
+ MultipleTbCallback multiCallback = new MultipleTbCallback(fieldsCount, callback);
+ entityIdFields.forEach(ctx -> initCfForEntity(entityId, ctx, true, multiCallback));
+ profileIdFields.forEach(ctx -> initCfForEntity(entityId, ctx, true, multiCallback));
+ } else {
+ callback.onSuccess();
+ }
+ }
+
+ private void onEntityUpdated(ComponentLifecycleMsg msg, TbCallback callback) {
+ if (msg.getOldProfileId() != null && msg.getOldProfileId() != msg.getProfileId()) {
+ cfEntityCache.update(tenantId, msg.getOldProfileId(), msg.getProfileId(), msg.getEntityId());
+ var oldProfileCfs = getCalculatedFieldsByEntityId(msg.getOldProfileId());
+ var newProfileCfs = getCalculatedFieldsByEntityId(msg.getProfileId());
+ var fieldsCount = oldProfileCfs.size() + newProfileCfs.size();
+ if (fieldsCount > 0) {
+ MultipleTbCallback multiCallback = new MultipleTbCallback(fieldsCount, callback);
+ var entityId = msg.getEntityId();
+ oldProfileCfs.forEach(ctx -> deleteCfForEntity(entityId, ctx.getCfId(), callback));
+ newProfileCfs.forEach(ctx -> initCfForEntity(entityId, ctx, true, multiCallback));
+ } else {
+ callback.onSuccess();
+ }
+ }
+ }
+
+ private void onEntityDeleted(ComponentLifecycleMsg msg, TbCallback callback) {
+ cfEntityCache.evict(tenantId, msg.getEntityId());
+ log.info("Pushing entity lifecycle msg to specific actor [{}]", msg.getEntityId());
+ getOrCreateActor(msg.getEntityId()).tell(new CalculatedFieldEntityDeleteMsg(tenantId, msg.getEntityId(), callback));
+ }
+
+ private void onCfCreated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException {
+ var cfId = new CalculatedFieldId(msg.getEntityId().getId());
+ if (calculatedFields.containsKey(cfId)) {
+ log.warn("[{}] CF was already initialized [{}]", tenantId, cfId);
+ callback.onSuccess();
+ } else {
+ var cf = cfDaoService.findById(msg.getTenantId(), cfId);
+ if (cf == null) {
+ log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
+ callback.onSuccess();
+ } else {
+ var cfCtx = new CalculatedFieldCtx(cf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
+ try {
+ cfCtx.init();
+ } catch (Exception e) {
+ throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(cf.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
+ }
+ calculatedFields.put(cf.getId(), cfCtx);
+ // We use copy on write lists to safely pass the reference to another actor for the iteration.
+ // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
+ entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx);
+ addLinks(cf);
+ initCf(cfCtx, callback, false);
+ }
+ }
+ }
+
+ private void onCfUpdated(ComponentLifecycleMsg msg, TbCallback callback) throws CalculatedFieldException {
+ var cfId = new CalculatedFieldId(msg.getEntityId().getId());
+ var oldCfCtx = calculatedFields.get(cfId);
+ if (oldCfCtx == null) {
+ onCfCreated(msg, callback);
+ } else {
+ var newCf = cfDaoService.findById(msg.getTenantId(), cfId);
+ if (newCf == null) {
+ log.warn("[{}] Failed to lookup CF by id [{}]", tenantId, cfId);
+ callback.onSuccess();
+ } else {
+ var newCfCtx = new CalculatedFieldCtx(newCf, systemContext.getTbelInvokeService(), systemContext.getApiLimitService());
+ try {
+ newCfCtx.init();
+ } catch (Exception e) {
+ throw CalculatedFieldException.builder().ctx(newCfCtx).eventEntity(newCfCtx.getEntityId()).cause(e).errorMessage("Failed to initialize CF context").build();
+ }
+ calculatedFields.put(newCf.getId(), newCfCtx);
+ List oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());
+ List newCfList = new CopyOnWriteArrayList<>();
+ boolean found = false;
+ for (CalculatedFieldCtx oldCtx : oldCfList) {
+ if (oldCtx.getCfId().equals(newCf.getId())) {
+ newCfList.add(newCfCtx);
+ found = true;
+ } else {
+ newCfList.add(oldCtx);
+ }
+ }
+ if (!found) {
+ newCfList.add(newCfCtx);
+ }
+ entityIdCalculatedFields.put(newCf.getEntityId(), newCfList);
+
+ deleteLinks(oldCfCtx);
+ addLinks(newCf);
+
+ // We use copy on write lists to safely pass the reference to another actor for the iteration.
+ // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
+ var stateChanges = newCfCtx.hasStateChanges(oldCfCtx);
+ if (stateChanges || newCfCtx.hasOtherSignificantChanges(oldCfCtx)) {
+ initCf(newCfCtx, callback, stateChanges);
+ } else {
+ callback.onSuccess();
+ }
+ }
+ }
+ }
+
+ private void onCfDeleted(ComponentLifecycleMsg msg, TbCallback callback) {
+ var cfId = new CalculatedFieldId(msg.getEntityId().getId());
+ var cfCtx = calculatedFields.remove(cfId);
+ if (cfCtx == null) {
+ log.warn("[{}] CF was already deleted [{}]", tenantId, cfId);
+ callback.onSuccess();
+ } else {
+ entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx);
+ deleteLinks(cfCtx);
+
+ EntityId entityId = cfCtx.getEntityId();
+ EntityType entityType = cfCtx.getEntityId().getEntityType();
+ if (isProfileEntity(entityType)) {
+ var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, entityId);
+ if (!entityIds.isEmpty()) {
+ //TODO: no need to do this if we cache all created actors and know which one belong to us;
+ var multiCallback = new MultipleTbCallback(entityIds.size(), callback);
+ entityIds.forEach(id -> deleteCfForEntity(entityId, cfId, multiCallback));
+ } else {
+ callback.onSuccess();
+ }
+ } else {
+ deleteCfForEntity(entityId, cfId, callback);
+ }
+ }
+ }
+
+ public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) {
+ EntityId entityId = msg.getEntityId();
+ log.info("Received telemetry msg from entity [{}]", entityId);
+ // 2 = 1 for CF processing + 1 for links processing
+ MultipleTbCallback callback = new MultipleTbCallback(2, msg.getCallback());
+ // process all cfs related to entity, or it's profile;
+ var entityIdFields = getCalculatedFieldsByEntityId(entityId);
+ var profileIdFields = getCalculatedFieldsByEntityId(getProfileId(tenantId, entityId));
+ if (!entityIdFields.isEmpty() || !profileIdFields.isEmpty()) {
+ log.info("Pushing telemetry msg to specific actor [{}]", entityId);
+ getOrCreateActor(entityId).tell(new EntityCalculatedFieldTelemetryMsg(msg, entityIdFields, profileIdFields, callback));
+ } else {
+ callback.onSuccess();
+ }
+ // process all links (if any);
+ List linkedCalculatedFields = filterCalculatedFieldLinks(msg);
+ var linksSize = linkedCalculatedFields.size();
+ if (linksSize > 0) {
+ cfExecService.pushMsgToLinks(msg, linkedCalculatedFields, callback);
+ } else {
+ callback.onSuccess();
+ }
+ }
+
+ public void onLinkedTelemetryMsg(CalculatedFieldLinkedTelemetryMsg msg) {
+ EntityId sourceEntityId = msg.getEntityId();
+ log.info("Received linked telemetry msg from entity [{}]", sourceEntityId);
+ var proto = msg.getProto();
+ var linksList = proto.getLinksList();
+ for (var linkProto : linksList) {
+ var link = fromProto(linkProto);
+ var targetEntityId = link.entityId();
+ var targetEntityType = targetEntityId.getEntityType();
+ var cf = calculatedFields.get(link.cfId());
+ if (EntityType.DEVICE_PROFILE.equals(targetEntityType) || EntityType.ASSET_PROFILE.equals(targetEntityType)) {
+ // iterate over all entities that belong to profile and push the message for corresponding CF
+ var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, targetEntityId);
+ if (!entityIds.isEmpty()) {
+ MultipleTbCallback callback = new MultipleTbCallback(entityIds.size(), msg.getCallback());
+ var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, callback);
+ entityIds.forEach(entityId -> {
+ log.info("Pushing linked telemetry msg to specific actor [{}]", entityId);
+ getOrCreateActor(entityId).tell(newMsg);
+ });
+ } else {
+ msg.getCallback().onSuccess();
+ }
+ } else {
+ log.info("Pushing linked telemetry msg to specific actor [{}]", targetEntityId);
+ var newMsg = new EntityCalculatedFieldLinkedTelemetryMsg(tenantId, sourceEntityId, proto.getMsg(), cf, msg.getCallback());
+ getOrCreateActor(targetEntityId).tell(newMsg);
+ }
+ }
+ }
+
+ private List filterCalculatedFieldLinks(CalculatedFieldTelemetryMsg msg) {
+ EntityId entityId = msg.getEntityId();
+ var proto = msg.getProto();
+ List result = new ArrayList<>();
+ for (var link : getCalculatedFieldLinksByEntityId(entityId)) {
+ CalculatedFieldCtx ctx = calculatedFields.get(link.getCalculatedFieldId());
+ if (ctx.linkMatches(entityId, proto)) {
+ result.add(ctx.toCalculatedFieldEntityCtxId());
+ }
+ }
+ return result;
+ }
+
+ private List getCalculatedFieldsByEntityId(EntityId entityId) {
+ if (entityId == null) {
+ return Collections.emptyList();
+ }
+ var result = entityIdCalculatedFields.get(entityId);
+ if (result == null) {
+ result = Collections.emptyList();
+ }
+ return result;
+ }
+
+ private List getCalculatedFieldLinksByEntityId(EntityId entityId) {
+ if (entityId == null) {
+ return Collections.emptyList();
+ }
+ var result = entityIdCalculatedFieldLinks.get(entityId);
+ if (result == null) {
+ result = Collections.emptyList();
+ }
+ return result;
+ }
+
+ private void initCf(CalculatedFieldCtx cfCtx, TbCallback callback, boolean forceStateReinit) {
+ EntityId entityId = cfCtx.getEntityId();
+ EntityType entityType = cfCtx.getEntityId().getEntityType();
+ if (isProfileEntity(entityType)) {
+ var entityIds = cfEntityCache.getMyEntityIdsByProfileId(tenantId, entityId);
+ if (!entityIds.isEmpty()) {
+ var multiCallback = new MultipleTbCallback(entityIds.size(), callback);
+ entityIds.forEach(id -> initCfForEntity(id, cfCtx, forceStateReinit, multiCallback));
+ } else {
+ callback.onSuccess();
+ }
+ } else {
+ initCfForEntity(entityId, cfCtx, forceStateReinit, callback);
+ }
+ }
+
+ private void deleteCfForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) {
+ log.info("Pushing delete CF msg to specific actor [{}]", entityId);
+ getOrCreateActor(entityId).tell(new CalculatedFieldEntityDeleteMsg(tenantId, cfId, callback));
+ }
+
+ private void initCfForEntity(EntityId entityId, CalculatedFieldCtx cfCtx, boolean forceStateReinit, TbCallback callback) {
+ log.info("Pushing entity init CF msg to specific actor [{}]", entityId);
+ getOrCreateActor(entityId).tell(new EntityInitCalculatedFieldMsg(tenantId, cfCtx, callback, forceStateReinit));
+ }
+
+ private static boolean isProfileEntity(EntityType entityType) {
+ return EntityType.DEVICE_PROFILE.equals(entityType) || EntityType.ASSET_PROFILE.equals(entityType);
+ }
+
+ private EntityId getProfileId(TenantId tenantId, EntityId entityId) {
+ return switch (entityId.getEntityType()) {
+ case ASSET -> assetProfileCache.get(tenantId, (AssetId) entityId).getId();
+ case DEVICE -> deviceProfileCache.get(tenantId, (DeviceId) entityId).getId();
+ default -> null;
+ };
+ }
+
+ private TbActorRef getOrCreateActor(EntityId entityId) {
+ return ctx.getOrCreateChildActor(new TbCalculatedFieldEntityActorId(entityId),
+ () -> DefaultActorService.CF_ENTITY_DISPATCHER_NAME,
+ () -> new CalculatedFieldEntityActorCreator(systemContext, tenantId, entityId),
+ () -> true);
+ }
+
+ private void addLinks(CalculatedField newCf) {
+ var newLinks = newCf.getConfiguration().buildCalculatedFieldLinks(tenantId, newCf.getEntityId(), newCf.getId());
+ newLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(link));
+ }
+
+ private void deleteLinks(CalculatedFieldCtx cfCtx) {
+ var oldCf = cfCtx.getCalculatedField();
+ var oldLinks = oldCf.getConfiguration().buildCalculatedFieldLinks(tenantId, oldCf.getEntityId(), oldCf.getId());
+ oldLinks.forEach(link -> entityIdCalculatedFieldLinks.computeIfAbsent(link.getEntityId(), id -> new CopyOnWriteArrayList<>()).remove(link));
+ }
+
+ public void onPartitionChange(CalculatedFieldPartitionChangeMsg msg) {
+ ctx.broadcastToChildren(msg, true);
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java
new file mode 100644
index 0000000000..19be7c02fa
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldStateRestoreMsg.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
+
+@Data
+public class CalculatedFieldStateRestoreMsg implements ToCalculatedFieldSystemMsg {
+
+ private final CalculatedFieldEntityCtxId id;
+ private final CalculatedFieldState state;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.CF_STATE_RESTORE_MSG;
+ }
+
+ @Override
+ public TenantId getTenantId() {
+ return id.tenantId();
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldTelemetryMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldTelemetryMsg.java
new file mode 100644
index 0000000000..68cd149cdf
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldTelemetryMsg.java
@@ -0,0 +1,39 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+import org.thingsboard.server.common.msg.queue.TbCallback;
+import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
+
+@Data
+public class CalculatedFieldTelemetryMsg implements ToCalculatedFieldSystemMsg {
+
+ private final TenantId tenantId;
+ private final EntityId entityId;
+ private final CalculatedFieldTelemetryMsgProto proto;
+ private final TbCallback callback;
+
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.CF_TELEMETRY_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldLinkedTelemetryMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldLinkedTelemetryMsg.java
new file mode 100644
index 0000000000..b83aeae416
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldLinkedTelemetryMsg.java
@@ -0,0 +1,42 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+import org.thingsboard.server.common.msg.queue.TbCallback;
+import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
+
+import java.util.List;
+
+@Data
+public class EntityCalculatedFieldLinkedTelemetryMsg implements ToCalculatedFieldSystemMsg {
+
+ private final TenantId tenantId;
+ private final EntityId entityId;
+ private final CalculatedFieldTelemetryMsgProto proto;
+ private final CalculatedFieldCtx ctx;
+ private final TbCallback callback;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.CF_LINKED_TELEMETRY_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldTelemetryMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldTelemetryMsg.java
new file mode 100644
index 0000000000..8ded4b6028
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldTelemetryMsg.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+import org.thingsboard.server.common.msg.queue.TbCallback;
+import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
+
+import java.util.List;
+
+@Data
+public class EntityCalculatedFieldTelemetryMsg implements ToCalculatedFieldSystemMsg {
+
+ private final TenantId tenantId;
+ private final EntityId entityId;
+ private final CalculatedFieldTelemetryMsgProto proto;
+ // Both lists are effectively immutable in CalculatedFieldManagerMessageProcessor and must stay so.
+ private final List entityIdFields;
+ private final List profileIdFields;
+ private final TbCallback callback;
+
+ public EntityCalculatedFieldTelemetryMsg(CalculatedFieldTelemetryMsg msg,
+ List entityIdFields,
+ List profileIdFields,
+ TbCallback callback) {
+ this.tenantId = msg.getTenantId();
+ this.entityId = msg.getEntityId();
+ this.proto = msg.getProto();
+ this.entityIdFields = entityIdFields;
+ this.profileIdFields = profileIdFields;
+ this.callback = callback;
+ }
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.CF_ENTITY_TELEMETRY_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityInitCalculatedFieldMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityInitCalculatedFieldMsg.java
new file mode 100644
index 0000000000..1e8990ff8d
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityInitCalculatedFieldMsg.java
@@ -0,0 +1,41 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.Data;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.msg.MsgType;
+import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
+import org.thingsboard.server.common.msg.queue.TbCallback;
+import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldTelemetryMsgProto;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
+
+import java.util.List;
+
+@Data
+public class EntityInitCalculatedFieldMsg implements ToCalculatedFieldSystemMsg {
+
+ private final TenantId tenantId;
+ private final CalculatedFieldCtx ctx;
+ private final TbCallback callback;
+ private final boolean forceReinit;
+
+ @Override
+ public MsgType getMsgType() {
+ return MsgType.CF_ENTITY_INIT_CF_MSG;
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java
new file mode 100644
index 0000000000..d1f4c9092e
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/MultipleTbCallback.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.actors.calculatedField;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.thingsboard.server.common.msg.queue.TbCallback;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Slf4j
+public class MultipleTbCallback implements TbCallback {
+ @Getter
+ private final UUID id;
+ private final AtomicInteger counter;
+ private final TbCallback callback;
+
+ public MultipleTbCallback(int count, TbCallback callback) {
+ id = UUID.randomUUID();
+ this.counter = new AtomicInteger(count);
+ this.callback = callback;
+ }
+
+ @Override
+ public void onSuccess() {
+ onSuccess(1);
+ }
+
+ public void onSuccess(int number) {
+ log.trace("[{}][{}] onSuccess({})", id, callback.getId(), number);
+ if (counter.addAndGet(-number) <= 0) {
+ log.trace("[{}][{}] Done.", id, callback.getId());
+ callback.onSuccess();
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ log.warn("[{}][{}] onFailure.", id, callback.getId());
+ callback.onFailure(t);
+ }
+}
diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
index 8bd3bcc81d..d162b6a09a 100644
--- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
+++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
@@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import io.netty.channel.EventLoopGroup;
import lombok.extern.slf4j.Slf4j;
import org.bouncycastle.util.Arrays;
+import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ListeningExecutor;
import org.thingsboard.rule.engine.api.MailService;
@@ -64,7 +65,6 @@ import org.thingsboard.server.common.data.msg.TbNodeConnectionType;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNode;
-import org.thingsboard.common.util.DebugModeUtil;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.common.data.script.ScriptLanguage;
import org.thingsboard.server.common.msg.TbActorMsg;
@@ -79,6 +79,7 @@ import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.audit.AuditLogService;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
+import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.DeviceCredentialsService;
@@ -178,7 +179,7 @@ public class DefaultTbContext implements TbContext {
.resetRuleNodeId()
.build();
tbMsg.pushToStack(nodeCtx.getSelf().getRuleChainId(), nodeCtx.getSelf().getId());
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
+ TopicPartitionInfo tpi = resolvePartition(msg);
doEnqueue(tpi, tbMsg, new SimpleTbQueueCallback(md -> ack(msg), t -> tellFailure(msg, t)));
}
@@ -195,8 +196,7 @@ public class DefaultTbContext implements TbContext {
@Override
public void enqueue(TbMsg tbMsg, Runnable onSuccess, Consumer onFailure) {
- TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getQueueName(), getTenantId(), tbMsg.getOriginator());
- enqueue(tpi, tbMsg, onFailure, onSuccess);
+ enqueue(tbMsg, tbMsg.getQueueName(), onSuccess, onFailure);
}
@Override
@@ -897,6 +897,11 @@ public class DefaultTbContext implements TbContext {
return mainCtx.getSlackService();
}
+ @Override
+ public CalculatedFieldService getCalculatedFieldService() {
+ return mainCtx.getCalculatedFieldService();
+ }
+
@Override
public boolean isExternalNodeForceAck() {
return mainCtx.isExternalNodeForceAck();
diff --git a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
index 6c8f253138..c2131d9e74 100644
--- a/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
+++ b/application/src/main/java/org/thingsboard/server/actors/service/DefaultActorService.java
@@ -49,6 +49,8 @@ public class DefaultActorService extends TbApplicationEventListener deletedDevices;
+ private TbActorRef cfActor;
private TenantActor(ActorSystemContext systemContext, TenantId tenantId) {
super(systemContext, tenantId);
@@ -95,6 +98,11 @@ public class TenantActor extends RuleChainManagerActor {
} else {
log.info("[{}] Skip init of the rule chains due to API limits", tenantId);
}
+ //TODO: IM - extend API usage to have CF Exec Enabled? Not in 4.0;
+ cfActor = ctx.getOrCreateChildActor(new TbStringActorId("CFM|" + tenantId),
+ () -> DefaultActorService.CF_MANAGER_DISPATCHER_NAME,
+ () -> new CalculatedFieldManagerActorCreator(systemContext, tenantId),
+ () -> true);
} catch (Exception e) {
log.info("Failed to check ApiUsage \"ReExecEnabled\"!!!", e);
cantFindTenant = true;
@@ -159,12 +167,31 @@ public class TenantActor extends RuleChainManagerActor {
case RULE_CHAIN_TO_RULE_CHAIN_MSG:
onRuleChainMsg((RuleChainAwareMsg) msg);
break;
+ case CF_INIT_MSG:
+ case CF_LINK_INIT_MSG:
+ case CF_STATE_RESTORE_MSG:
+ case CF_PARTITIONS_CHANGE_MSG:
+ case CF_ENTITY_LIFECYCLE_MSG:
+ onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, true);
+ break;
+ case CF_TELEMETRY_MSG:
+ case CF_LINKED_TELEMETRY_MSG:
+ onToCalculatedFieldSystemActorMsg((ToCalculatedFieldSystemMsg) msg, false);
+ break;
default:
return false;
}
return true;
}
+ private void onToCalculatedFieldSystemActorMsg(ToCalculatedFieldSystemMsg msg, boolean priority) {
+ if (priority) {
+ cfActor.tellWithHighPriority(msg);
+ } else {
+ cfActor.tell(msg);
+ }
+ }
+
private boolean isMyPartition(EntityId entityId) {
return systemContext.resolve(ServiceType.TB_CORE, tenantId, entityId).isMyPartition();
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
index cf9d6444a2..73e278389a 100644
--- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java
+++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java
@@ -70,6 +70,7 @@ import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.asset.AssetInfo;
import org.thingsboard.server.common.data.asset.AssetProfile;
import org.thingsboard.server.common.data.audit.ActionType;
+import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.domain.Domain;
import org.thingsboard.server.common.data.edge.Edge;
import org.thingsboard.server.common.data.edge.EdgeInfo;
@@ -80,6 +81,7 @@ import org.thingsboard.server.common.data.id.AlarmCommentId;
import org.thingsboard.server.common.data.id.AlarmId;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.AssetProfileId;
+import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.DashboardId;
import org.thingsboard.server.common.data.id.DeviceId;
@@ -132,6 +134,7 @@ import org.thingsboard.server.dao.asset.AssetProfileService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.audit.AuditLogService;
+import org.thingsboard.server.dao.cf.CalculatedFieldService;
import org.thingsboard.server.dao.customer.CustomerService;
import org.thingsboard.server.dao.dashboard.DashboardService;
import org.thingsboard.server.dao.device.ClaimDevicesService;
@@ -367,6 +370,9 @@ public abstract class BaseController {
@Autowired
protected NotificationTargetService notificationTargetService;
+ @Autowired
+ protected CalculatedFieldService calculatedFieldService;
+
@Value("${server.log_controller_error_stack_trace}")
@Getter
private boolean logControllerErrorStackTrace;
@@ -672,6 +678,9 @@ public abstract class BaseController {
case MOBILE_APP_BUNDLE:
checkMobileAppBundleId(new MobileAppBundleId(entityId.getId()), operation);
return;
+ case CALCULATED_FIELD:
+ checkCalculatedFieldId(new CalculatedFieldId(entityId.getId()), operation);
+ return;
default:
checkEntityId(entityId, entitiesService::findEntityByTenantIdAndId, operation);
}
@@ -955,6 +964,10 @@ public abstract class BaseController {
}
}
+ protected CalculatedField checkCalculatedFieldId(CalculatedFieldId calculatedFieldId, Operation operation) throws ThingsboardException {
+ return checkEntityId(calculatedFieldId, calculatedFieldService::findById, operation);
+ }
+
protected HomeDashboardInfo getHomeDashboardInfo(SecurityUser securityUser, JsonNode additionalInfo) {
HomeDashboardInfo homeDashboardInfo = extractHomeDashboardInfoFromAdditionalInfo(additionalInfo);
if (homeDashboardInfo == null) {
@@ -982,7 +995,8 @@ public abstract class BaseController {
}
return new HomeDashboardInfo(dashboardId, hideDashboardToolbar);
}
- } catch (Exception ignored) {}
+ } catch (Exception ignored) {
+ }
return null;
}
diff --git a/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java
new file mode 100644
index 0000000000..fe85cf3f87
--- /dev/null
+++ b/application/src/main/java/org/thingsboard/server/controller/CalculatedFieldController.java
@@ -0,0 +1,269 @@
+/**
+ * Copyright © 2016-2025 The Thingsboard Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.thingsboard.server.controller;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.swagger.v3.oas.annotations.Parameter;
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.HttpStatus;
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+import org.thingsboard.common.util.JacksonUtil;
+import org.thingsboard.script.api.tbel.TbelCfArg;
+import org.thingsboard.script.api.tbel.TbelInvokeService;
+import org.thingsboard.server.common.data.EntityType;
+import org.thingsboard.server.common.data.EventInfo;
+import org.thingsboard.server.common.data.HasTenantId;
+import org.thingsboard.server.common.data.cf.CalculatedField;
+import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
+import org.thingsboard.server.common.data.event.EventType;
+import org.thingsboard.server.common.data.exception.ThingsboardException;
+import org.thingsboard.server.common.data.id.CalculatedFieldId;
+import org.thingsboard.server.common.data.id.EntityId;
+import org.thingsboard.server.common.data.id.EntityIdFactory;
+import org.thingsboard.server.common.data.id.HasId;
+import org.thingsboard.server.common.data.id.TenantId;
+import org.thingsboard.server.common.data.page.PageData;
+import org.thingsboard.server.common.data.page.PageLink;
+import org.thingsboard.server.config.annotations.ApiOperation;
+import org.thingsboard.server.dao.event.EventService;
+import org.thingsboard.server.queue.util.TbCoreComponent;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldScriptEngine;
+import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldTbelScriptEngine;
+import org.thingsboard.server.service.entitiy.cf.TbCalculatedFieldService;
+import org.thingsboard.server.service.security.model.SecurityUser;
+import org.thingsboard.server.service.security.permission.Operation;
+import org.thingsboard.server.service.security.permission.Resource;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+
+import static org.thingsboard.server.controller.ControllerConstants.CF_TEXT_SEARCH_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.ENTITY_ID_PARAM_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.ENTITY_TYPE_PARAM_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_END;
+import static org.thingsboard.server.controller.ControllerConstants.MARKDOWN_CODE_BLOCK_START;
+import static org.thingsboard.server.controller.ControllerConstants.PAGE_NUMBER_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.PAGE_SIZE_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.SORT_ORDER_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.SORT_PROPERTY_DESCRIPTION;
+import static org.thingsboard.server.controller.ControllerConstants.TENANT_AUTHORITY_PARAGRAPH;
+import static org.thingsboard.server.controller.ControllerConstants.TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH;
+import static org.thingsboard.server.controller.ControllerConstants.UUID_WIKI_LINK;
+
+@RestController
+@TbCoreComponent
+@RequestMapping("/api")
+@RequiredArgsConstructor
+@Slf4j
+public class CalculatedFieldController extends BaseController {
+
+ private final TbCalculatedFieldService tbCalculatedFieldService;
+ private final EventService eventService;
+ private final TbelInvokeService tbelInvokeService;
+
+ public static final String CALCULATED_FIELD_ID = "calculatedFieldId";
+
+ public static final int TIMEOUT = 20;
+
+ private static final String TEST_SCRIPT_EXPRESSION = "Execute the Script expression and return the result. The format of request: \n\n"
+ + MARKDOWN_CODE_BLOCK_START
+ + "{\n" +
+ " \"expression\": \"var temp = 0; foreach(element: temperature.values) {temp += element.value;} var avgTemperature = temp / temperature.values.size(); var adjustedTemperature = avgTemperature + 0.1 * humidity.value; return {\\\"adjustedTemperature\\\": adjustedTemperature};\",\n" +
+ " \"arguments\": {\n" +
+ " \"temperature\": {\n" +
+ " \"type\": \"TS_ROLLING\",\n" +
+ " \"timeWindow\": {\n" +
+ " \"startTs\": 1739775630002,\n" +
+ " \"endTs\": 65432211,\n" +
+ " \"limit\": 5\n" +
+ " },\n" +
+ " \"values\": [\n" +
+ " { \"ts\": 1739775639851, \"value\": 23 },\n" +
+ " { \"ts\": 1739775664561, \"value\": 43 },\n" +
+ " { \"ts\": 1739775713079, \"value\": 15 },\n" +
+ " { \"ts\": 1739775999522, \"value\": 34 },\n" +
+ " { \"ts\": 1739776228452, \"value\": 22 }\n" +
+ " ]\n" +
+ " },\n" +
+ " \"humidity\": { \"type\": \"SINGLE_VALUE\", \"ts\": 1739776478057, \"value\": 23 }\n" +
+ " }\n" +
+ "}"
+ + MARKDOWN_CODE_BLOCK_END
+ + "\n\n Expected result JSON contains \"output\" and \"error\".";
+
+ @ApiOperation(value = "Create Or Update Calculated Field (saveCalculatedField)",
+ notes = "Creates or Updates the Calculated Field. When creating calculated field, platform generates Calculated Field Id as " + UUID_WIKI_LINK +
+ "The newly created Calculated Field Id will be present in the response. " +
+ "Specify existing Calculated Field Id to update the calculated field. " +
+ "Referencing non-existing Calculated Field Id will cause 'Not Found' error. " +
+ "Remove 'id', 'tenantId' from the request body example (below) to create new Calculated Field entity. "
+ + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH)
+ @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/calculatedField", method = RequestMethod.POST)
+ @ResponseBody
+ public CalculatedField saveCalculatedField(@io.swagger.v3.oas.annotations.parameters.RequestBody(description = "A JSON value representing the calculated field.")
+ @RequestBody CalculatedField calculatedField) throws Exception {
+ calculatedField.setTenantId(getTenantId());
+ checkEntity(calculatedField.getId(), calculatedField, Resource.CALCULATED_FIELD);
+ checkEntityId(calculatedField.getEntityId(), Operation.WRITE_CALCULATED_FIELD);
+ checkReferencedEntities(calculatedField.getConfiguration(), getCurrentUser());
+ return tbCalculatedFieldService.save(calculatedField, getCurrentUser());
+ }
+
+ @ApiOperation(value = "Get Calculated Field (getCalculatedFieldById)",
+ notes = "Fetch the Calculated Field object based on the provided Calculated Field Id."
+ )
+ @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/calculatedField/{calculatedFieldId}", method = RequestMethod.GET)
+ @ResponseBody
+ public CalculatedField getCalculatedFieldById(@Parameter @PathVariable(CALCULATED_FIELD_ID) String strCalculatedFieldId) throws ThingsboardException {
+ checkParameter(CALCULATED_FIELD_ID, strCalculatedFieldId);
+ CalculatedFieldId calculatedFieldId = new CalculatedFieldId(toUUID(strCalculatedFieldId));
+ CalculatedField calculatedField = tbCalculatedFieldService.findById(calculatedFieldId, getCurrentUser());
+ checkNotNull(calculatedField);
+ checkEntityId(calculatedField.getEntityId(), Operation.READ_CALCULATED_FIELD);
+ return calculatedField;
+ }
+
+ @ApiOperation(value = "Get Calculated Fields by Entity Id (getCalculatedFieldsByEntityId)",
+ notes = "Fetch the Calculated Fields based on the provided Entity Id."
+ )
+ @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/{entityType}/{entityId}/calculatedFields", params = {"pageSize", "page"}, method = RequestMethod.GET)
+ @ResponseBody
+ public PageData getCalculatedFieldsByEntityId(
+ @Parameter(description = ENTITY_TYPE_PARAM_DESCRIPTION, required = true, schema = @Schema(defaultValue = "DEVICE")) @PathVariable("entityType") String entityType,
+ @Parameter(description = ENTITY_ID_PARAM_DESCRIPTION, required = true) @PathVariable("entityId") String entityIdStr,
+ @Parameter(description = PAGE_SIZE_DESCRIPTION, required = true) @RequestParam int pageSize,
+ @Parameter(description = PAGE_NUMBER_DESCRIPTION, required = true) @RequestParam int page,
+ @Parameter(description = CF_TEXT_SEARCH_DESCRIPTION) @RequestParam(required = false) String textSearch,
+ @Parameter(description = SORT_PROPERTY_DESCRIPTION, schema = @Schema(allowableValues = {"createdTime", "name"})) @RequestParam(required = false) String sortProperty,
+ @Parameter(description = SORT_ORDER_DESCRIPTION, schema = @Schema(allowableValues = {"ASC", "DESC"})) @RequestParam(required = false) String sortOrder) throws ThingsboardException {
+ PageLink pageLink = createPageLink(pageSize, page, textSearch, sortProperty, sortOrder);
+ checkParameter("entityId", entityIdStr);
+ EntityId entityId = EntityIdFactory.getByTypeAndUuid(entityType, entityIdStr);
+ checkEntityId(entityId, Operation.READ_CALCULATED_FIELD);
+ return checkNotNull(tbCalculatedFieldService.findAllByTenantIdAndEntityId(entityId, getCurrentUser(), pageLink));
+ }
+
+ @ApiOperation(value = "Delete Calculated Field (deleteCalculatedField)",
+ notes = "Deletes the calculated field. Referencing non-existing Calculated Field Id will cause an error." + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH)
+ @PreAuthorize("hasAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/calculatedField/{calculatedFieldId}", method = RequestMethod.DELETE)
+ @ResponseStatus(value = HttpStatus.OK)
+ public void deleteCalculatedField(@PathVariable(CALCULATED_FIELD_ID) String strCalculatedFieldId) throws Exception {
+ checkParameter(CALCULATED_FIELD_ID, strCalculatedFieldId);
+ CalculatedFieldId calculatedFieldId = new CalculatedFieldId(toUUID(strCalculatedFieldId));
+ CalculatedField calculatedField = checkCalculatedFieldId(calculatedFieldId, Operation.DELETE);
+ checkEntityId(calculatedField.getEntityId(), Operation.WRITE_CALCULATED_FIELD);
+ tbCalculatedFieldService.delete(calculatedField, getCurrentUser());
+ }
+
+ @ApiOperation(value = "Get latest calculated field debug event (getLatestCalculatedFieldDebugEvent)",
+ notes = "Gets latest calculated field debug event for specified calculated field id. " +
+ "Referencing non-existing calculated field id will cause an error. " + TENANT_AUTHORITY_PARAGRAPH)
+ @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/calculatedField/{calculatedFieldId}/debug", method = RequestMethod.GET)
+ @ResponseBody
+ public JsonNode getLatestCalculatedFieldDebugEvent(@Parameter @PathVariable(CALCULATED_FIELD_ID) String strCalculatedFieldId) throws ThingsboardException {
+ checkParameter(CALCULATED_FIELD_ID, strCalculatedFieldId);
+ CalculatedFieldId calculatedFieldId = new CalculatedFieldId(toUUID(strCalculatedFieldId));
+ CalculatedField calculatedField = checkCalculatedFieldId(calculatedFieldId, Operation.READ);
+ checkEntityId(calculatedField.getEntityId(), Operation.READ_CALCULATED_FIELD);
+ TenantId tenantId = getCurrentUser().getTenantId();
+ return Optional.ofNullable(eventService.findLatestEvents(tenantId, calculatedFieldId, EventType.DEBUG_CALCULATED_FIELD, 1))
+ .flatMap(events -> events.stream().map(EventInfo::getBody).findFirst())
+ .orElse(null);
+ }
+
+ @ApiOperation(value = "Test Script expression",
+ notes = TEST_SCRIPT_EXPRESSION + TENANT_AUTHORITY_PARAGRAPH)
+ @PreAuthorize("hasAuthority('TENANT_ADMIN')")
+ @RequestMapping(value = "/calculatedField/testScript", method = RequestMethod.POST)
+ @ResponseBody
+ public JsonNode testScript(
+ @io.swagger.v3.oas.annotations.parameters.RequestBody(description = "Test calculated field TBEL expression.")
+ @RequestBody JsonNode inputParams) {
+ String expression = inputParams.get("expression").asText();
+ Map arguments = Objects.requireNonNullElse(
+ JacksonUtil.convertValue(inputParams.get("arguments"), new TypeReference