From c8490080a1007f3548da8e7ab80965d56783211a Mon Sep 17 00:00:00 2001 From: dshvaika Date: Fri, 1 Aug 2025 13:12:44 +0300 Subject: [PATCH] added basic logic to update state periodically --- .../CalculatedFieldEntityActor.java | 5 +- ...CalculatedFieldEntityMessageProcessor.java | 55 ++++++--- .../CalculatedFieldManagerActor.java | 3 + ...alculatedFieldManagerMessageProcessor.java | 76 ++++++++++++ ...latedFieldScheduledCheckForUpdatesMsg.java | 35 ++++++ ...tityCalculatedFieldCheckForUpdatesMsg.java | 37 ++++++ ...faultCalculatedFieldProcessingService.java | 5 +- .../ctx/state/BaseCalculatedFieldState.java | 14 --- .../cf/ctx/state/CalculatedFieldCtx.java | 48 +++++--- .../cf/ctx/state/CalculatedFieldState.java | 14 ++- .../cf/ctx/state/GeofencingArgumentEntry.java | 62 ++++++---- .../state/GeofencingCalculatedFieldState.java | 108 +++--------------- .../cf/ctx/state/GeofencingZoneState.java | 94 +++++++++++++++ .../server/utils/CalculatedFieldUtils.java | 67 ++++++++++- application/src/main/resources/logback.xml | 1 + .../server/cluster/TbClusterService.java | 3 +- .../BaseCalculatedFieldConfiguration.java | 5 + .../CalculatedFieldConfiguration.java | 12 ++ ...eofencingCalculatedFieldConfiguration.java | 6 + .../server/common/msg/MsgType.java | 5 +- common/proto/src/main/proto/queue.proto | 22 ++++ .../script/api/tbel/TbelCfArg.java | 3 +- .../api/tbel/TbelCfTsGeofencingArg.java | 21 +++- .../common/util/geo/PerimeterDefinition.java | 1 + 24 files changed, 531 insertions(+), 171 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledCheckForUpdatesMsg.java create mode 100644 application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldCheckForUpdatesMsg.java create mode 100644 application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingZoneState.java rename application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldInitService.java => common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java (62%) diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java index 2959bfc8eb..4812ed6652 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java @@ -51,7 +51,7 @@ public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor { @Override public void destroy(TbActorStopReason stopReason, Throwable cause) throws TbActorException { log.debug("[{}] Stopping CF entity actor.", processor.tenantId); - processor.stop(); + processor.stop(false); } @Override @@ -75,6 +75,9 @@ public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor { case CF_LINKED_TELEMETRY_MSG: processor.process((EntityCalculatedFieldLinkedTelemetryMsg) msg); break; + case CF_ENTITY_CHECK_FOR_UPDATES_MSG: + processor.process((EntityCalculatedFieldCheckForUpdatesMsg) msg); + break; default: return false; } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java index 75582ef4ba..68c0471cb7 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java @@ -59,7 +59,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -91,16 +93,18 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM this.ctx = ctx; } - public void stop() { - log.info("[{}][{}] Stopping entity actor.", tenantId, entityId); + public void stop(boolean partitionChanged) { + log.info(partitionChanged ? + "[{}][{}] Stopping entity actor due to change partition event." : + "[{}][{}] Stopping entity actor.", + tenantId, entityId); states.clear(); ctx.stop(ctx.getSelf()); } public void process(CalculatedFieldPartitionChangeMsg msg) { if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) { - log.info("[{}] Stopping entity actor due to change partition event.", entityId); - ctx.stop(ctx.getSelf()); + stop(true); } } @@ -224,6 +228,25 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } } + public void process(EntityCalculatedFieldCheckForUpdatesMsg msg) throws CalculatedFieldException { + CalculatedFieldCtx cfCtx = msg.getCfCtx(); + CalculatedFieldId cfId = cfCtx.getCfId(); + log.debug("[{}] [{}] Processing CF dynamic sources refresh msg.", entityId, cfId); + try { + var state = updateStateFromDb(cfCtx); + if (state.isSizeOk()) { + processStateIfReady(cfCtx, Collections.singletonList(cfId), state, null, null, msg.getCallback()); + } else { + throw new RuntimeException(cfCtx.getSizeExceedsLimitMessage()); + } + } catch (Exception e) { + if (e instanceof CalculatedFieldException cfe) { + throw cfe; + } + throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(entityId).cause(e).build(); + } + } + private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List cfIdList, MultipleTbCallback callback) throws CalculatedFieldException { processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto)); } @@ -270,16 +293,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM CalculatedFieldState state = states.get(ctx.getCfId()); if (state != null) { return state; - } else { - ListenableFuture stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId); - // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. - // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. - // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, - // but this will significantly complicate the code. - state = stateFuture.get(1, TimeUnit.MINUTES); - state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); - states.put(ctx.getCfId(), state); } + return updateStateFromDb(ctx); + } + + private CalculatedFieldState updateStateFromDb(CalculatedFieldCtx ctx) throws InterruptedException, ExecutionException, TimeoutException { + ListenableFuture stateFuture = cfService.fetchStateFromDb(ctx, entityId); + // Ugly but necessary. We do not expect to often fetch data from DB. Only once per pair lifetime. + // This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low. + // Alternatively, we can fetch the state outside the actor system and push separate command to create this actor, + // but this will significantly complicate the code. + CalculatedFieldState state = stateFuture.get(1, TimeUnit.MINUTES); + state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize()); + states.put(ctx.getCfId(), state); return state; } @@ -297,12 +323,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM } else { TbCallback effectiveCallback = calculationResults.size() > 1 ? new MultipleTbCallback(calculationResults.size(), callback) : callback; - for (CalculatedFieldResult calculationResult : calculationResults) { if (calculationResult.isEmpty()) { effectiveCallback.onSuccess(); } else { - cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback); + cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, effectiveCallback); } } } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java index 9f59a80e67..5adca78fa9 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java @@ -91,6 +91,9 @@ public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor { case CF_LINKED_TELEMETRY_MSG: processor.onLinkedTelemetryMsg((CalculatedFieldLinkedTelemetryMsg) msg); break; + case CF_SCHEDULED_CHECK_FOR_UPDATES_MSG: + processor.onScheduledCheckForUpdatesMsg((CalculatedFieldScheduledCheckForUpdatesMsg) msg); + break; default: return false; } diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java index ba1ca71bda..03de43d08b 100644 --- a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.ProfileEntityIdInfo; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.CalculatedFieldLink; +import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.DeviceId; @@ -59,7 +60,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto; @@ -72,6 +76,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware private final Map calculatedFields = new HashMap<>(); private final Map> entityIdCalculatedFields = new HashMap<>(); private final Map> entityIdCalculatedFieldLinks = new HashMap<>(); + private final Map> checkForCalculatedFieldUpdateTasks = new ConcurrentHashMap<>(); private final CalculatedFieldProcessingService cfExecService; private final CalculatedFieldStateService cfStateService; @@ -110,6 +115,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware calculatedFields.clear(); entityIdCalculatedFields.clear(); entityIdCalculatedFieldLinks.clear(); + checkForCalculatedFieldUpdateTasks.values().forEach(future -> future.cancel(true)); + checkForCalculatedFieldUpdateTasks.clear(); ctx.stop(ctx.getSelf()); } @@ -117,6 +124,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware log.debug("[{}] Processing CF actor init message.", msg.getTenantId().getId()); initEntityProfileCache(); initCalculatedFields(); + // TODO: implement cache for 1:1 relations to use in the CFs that based on a relation queries? msg.getCallback().onSuccess(); } @@ -139,6 +147,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware // We use copy on write lists to safely pass the reference to another actor for the iteration. // Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead) entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx); + scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx); msg.getCallback().onSuccess(); } @@ -324,6 +333,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } calculatedFields.put(newCf.getId(), newCfCtx); List oldCfList = entityIdCalculatedFields.get(newCf.getEntityId()); + + if (newCfCtx.hasSchedulingConfigChanges(oldCfCtx)) { + cancelCfUpdateTaskIfExists(cfId, false); + } + List newCfList = new CopyOnWriteArrayList<>(); boolean found = false; for (CalculatedFieldCtx oldCtx : oldCfList) { @@ -364,6 +378,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx); deleteLinks(cfCtx); + cancelCfUpdateTaskIfExists(cfId, true); + EntityId entityId = cfCtx.getEntityId(); EntityType entityType = cfCtx.getEntityId().getEntityType(); if (isProfileEntity(entityType)) { @@ -387,6 +403,15 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware } } + private void cancelCfUpdateTaskIfExists(CalculatedFieldId cfId, boolean cfDeleted) { + var existingTask = checkForCalculatedFieldUpdateTasks.remove(cfId); + if (existingTask != null) { + existingTask.cancel(false); + String reason = cfDeleted ? "removal" : "update"; + log.debug("[{}][{}] Cancelled check for update task for CF due to: " + reason + "!", tenantId, cfId); + } + } + public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) { EntityId entityId = msg.getEntityId(); log.debug("Received telemetry msg from entity [{}]", entityId); @@ -498,16 +523,66 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware initCfForEntity(id, cfCtx, forceStateReinit, multiCallback); } }); + scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx); } else { callback.onSuccess(); } } else { if (isMyPartition(entityId, callback)) { initCfForEntity(entityId, cfCtx, forceStateReinit, callback); + scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx); + } + } + } + + private void scheduleCalculatedFieldUpdateMsgIfNeeded(CalculatedFieldCtx cfCtx) { + CalculatedField cf = cfCtx.getCalculatedField(); + CalculatedFieldConfiguration cfConfig = cf.getConfiguration(); + if (!cfConfig.isDynamicRefreshEnabled()) { + return; + } + if (checkForCalculatedFieldUpdateTasks.containsKey(cf.getId())) { + log.debug("[{}][{}] Check for update msg for CF is already scheduled!", tenantId, cf.getId()); + return; + } + long refreshDynamicSourceInterval = TimeUnit.SECONDS.toMillis(cfConfig.getRefreshIntervalSec()); + var scheduledMsg = new CalculatedFieldScheduledCheckForUpdatesMsg(tenantId, cfCtx); + + ScheduledFuture scheduledFuture = systemContext + .schedulePeriodicMsgWithDelay(ctx, scheduledMsg, refreshDynamicSourceInterval, refreshDynamicSourceInterval); + checkForCalculatedFieldUpdateTasks.put(cf.getId(), scheduledFuture); + log.debug("[{}][{}] Scheduled check for update msg for CF!", tenantId, cf.getId()); + } + + public void onScheduledCheckForUpdatesMsg(CalculatedFieldScheduledCheckForUpdatesMsg msg) { + CalculatedFieldCtx cfCtx = msg.getCfCtx(); + EntityId entityId = cfCtx.getEntityId(); + log.debug("[{}] [{}] Processing CF scheduled update msg.", cfCtx.getCfId(), entityId); + EntityType entityType = entityId.getEntityType(); + if (isProfileEntity(entityType)) { + var entityIds = entityProfileCache.getEntityIdsByProfileId(entityId); + if (!entityIds.isEmpty()) { + var multiCallback = new MultipleTbCallback(entityIds.size(), msg.getCallback()); + entityIds.forEach(id -> { + if (isMyPartition(id, multiCallback)) { + updateCfWithDynamicSourceForEntity(id, cfCtx, multiCallback); + } + }); + } else { + msg.getCallback().onSuccess(); + } + } else { + if (isMyPartition(entityId, msg.getCallback())) { + updateCfWithDynamicSourceForEntity(entityId, cfCtx, msg.getCallback()); } } } + private void updateCfWithDynamicSourceForEntity(EntityId entityId, CalculatedFieldCtx cfCtx, TbCallback callback) { + log.debug("Pushing entity dynamic source refresh CF msg to specific actor [{}]", entityId); + getOrCreateActor(entityId).tell(new EntityCalculatedFieldCheckForUpdatesMsg(tenantId, cfCtx, callback)); + } + private void deleteCfForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) { log.debug("Pushing delete CF msg to specific actor [{}]", entityId); getOrCreateActor(entityId).tell(new CalculatedFieldEntityDeleteMsg(tenantId, cfId, callback)); @@ -571,6 +646,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware log.error("Failed to process calculated field record: {}", cf, e); } }); + // TODO: why we need to do this loop if we do this inside the onFieldInitMsg? calculatedFields.values().forEach(cf -> { entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf); }); diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledCheckForUpdatesMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledCheckForUpdatesMsg.java new file mode 100644 index 0000000000..f53d2e7572 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledCheckForUpdatesMsg.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; + +@Data +public class CalculatedFieldScheduledCheckForUpdatesMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + private final CalculatedFieldCtx cfCtx; + + @Override + public MsgType getMsgType() { + return MsgType.CF_SCHEDULED_CHECK_FOR_UPDATES_MSG; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldCheckForUpdatesMsg.java b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldCheckForUpdatesMsg.java new file mode 100644 index 0000000000..908680c068 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldCheckForUpdatesMsg.java @@ -0,0 +1,37 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.actors.calculatedField; + +import lombok.Data; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.msg.MsgType; +import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg; +import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; + +@Data +public class EntityCalculatedFieldCheckForUpdatesMsg implements ToCalculatedFieldSystemMsg { + + private final TenantId tenantId; + private final CalculatedFieldCtx cfCtx; + private final TbCallback callback; + + @Override + public MsgType getMsgType() { + return MsgType.CF_ENTITY_CHECK_FOR_UPDATES_MSG; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index 9d75692718..424df50009 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -140,7 +140,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP case SAVE_ZONES_ARGUMENT_KEY, RESTRICTED_ZONES_ARGUMENT_KEY -> { var resolvedEntityIdsFuture = resolveGeofencingEntityIds(ctx.getTenantId(), entityId, entry); argFutures.put(entry.getKey(), Futures.transformAsync(resolvedEntityIdsFuture, resolvedEntityIds -> - fetchGeofencingKvEntry(ctx.getTenantId(), resolvedEntityIds, entry.getValue()), MoreExecutors.directExecutor())); + fetchGeofencingKvEntry(ctx.getTenantId(), resolvedEntityIds, entry.getValue()), calculatedFieldCallbackExecutor)); } } } @@ -288,7 +288,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP case RELATION_QUERY -> { var relationQueryDynamicSourceConfiguration = (RelationQueryDynamicSourceConfiguration) value.getRefDynamicSourceConfiguration(); yield Futures.transform(relationService.findByQuery(tenantId, relationQueryDynamicSourceConfiguration.toEntityRelationsQuery(entityId)), - relationQueryDynamicSourceConfiguration::resolveEntityIds, MoreExecutors.directExecutor()); + relationQueryDynamicSourceConfiguration::resolveEntityIds, calculatedFieldCallbackExecutor); } }; } @@ -298,7 +298,6 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP if (argument.getRefEntityKey().getType() != ArgumentType.ATTRIBUTE) { throw new IllegalStateException("Unsupported argument key type: " + argument.getRefEntityKey().getType()); } - List>> kvFutures = geofencingEntities.stream() .map(entityId -> { var attributesFuture = attributesService.find( diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java index e21d56b6d2..eb87d375c5 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java @@ -25,8 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import static org.thingsboard.server.utils.CalculatedFieldUtils.toSingleValueArgumentProto; - @Data @AllArgsConstructor public abstract class BaseCalculatedFieldState implements CalculatedFieldState { @@ -95,18 +93,6 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState { } } - @Override - public void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx) { - if (entry instanceof TsRollingArgumentEntry) { - return; - } - if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { - if (ctx.getMaxSingleValueArgumentSize() > 0 && toSingleValueArgumentProto(name, singleValueArgumentEntry).getSerializedSize() > ctx.getMaxSingleValueArgumentSize()) { - throw new IllegalArgumentException("Single value size exceeds the maximum allowed limit. The argument will not be used for calculation."); - } - } - } - protected abstract void validateNewEntry(ArgumentEntry newEntry); private void updateLastUpdateTimestamp(ArgumentEntry entry) { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java index 89f76bd482..8124d78d3a 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java @@ -109,25 +109,29 @@ public class CalculatedFieldCtx { } public void init() { - if (CalculatedFieldType.SCRIPT.equals(cfType)) { - try { - this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService); - initialized = true; - } catch (Exception e) { - throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.", e); + switch (cfType) { + case SCRIPT -> { + try { + this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService); + initialized = true; + } catch (Exception e) { + throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.", e); + } } - } else { - if (isValidExpression(expression)) { - this.customExpression = ThreadLocal.withInitial(() -> - new ExpressionBuilder(expression) - .functions(userDefinedFunctions) - .implicitMultiplication(true) - .variables(this.arguments.keySet()) - .build() - ); - initialized = true; - } else { - throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax."); + case GEOFENCING -> initialized = true; + default -> { + if (isValidExpression(expression)) { + this.customExpression = ThreadLocal.withInitial(() -> + new ExpressionBuilder(expression) + .functions(userDefinedFunctions) + .implicitMultiplication(true) + .variables(this.arguments.keySet()) + .build() + ); + initialized = true; + } else { + throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax."); + } } } } @@ -308,6 +312,14 @@ public class CalculatedFieldCtx { return typeChanged || argumentsChanged; } + public boolean hasSchedulingConfigChanges(CalculatedFieldCtx other) { + CalculatedFieldConfiguration thisConfig = calculatedField.getConfiguration(); + CalculatedFieldConfiguration otherConfig = other.calculatedField.getConfiguration(); + boolean refreshTriggerChanged = thisConfig.isDynamicRefreshEnabled() != otherConfig.isDynamicRefreshEnabled(); + boolean refreshIntervalChanged = thisConfig.getRefreshIntervalSec() != otherConfig.getRefreshIntervalSec(); + return refreshTriggerChanged || refreshIntervalChanged; + } + public String getSizeExceedsLimitMessage() { return "Failed to init CF state. State size exceeds limit of " + (maxStateSize / 1024) + "Kb!"; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java index dc98ed836c..77e630baaa 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java @@ -26,6 +26,8 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; import java.util.List; import java.util.Map; +import static org.thingsboard.server.utils.CalculatedFieldUtils.toSingleValueArgumentProto; + @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, @@ -63,6 +65,16 @@ public interface CalculatedFieldState { void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize); - void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx); + default void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx) { + // TODO: Do we need to restrict the size of Geofencing arguments? Number of zones? + if (entry instanceof TsRollingArgumentEntry || entry instanceof GeofencingArgumentEntry) { + return; + } + if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) { + if (ctx.getMaxSingleValueArgumentSize() > 0 && toSingleValueArgumentProto(name, singleValueArgumentEntry).getSerializedSize() > ctx.getMaxSingleValueArgumentSize()) { + throw new IllegalArgumentException("Single value size exceeds the maximum allowed limit. The argument will not be used for calculation."); + } + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingArgumentEntry.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingArgumentEntry.java index 51f5d4fd4f..cf77d5da7d 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingArgumentEntry.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingArgumentEntry.java @@ -16,9 +16,9 @@ package org.thingsboard.server.service.cf.ctx.state; import lombok.Data; -import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.geo.PerimeterDefinition; +import lombok.extern.slf4j.Slf4j; import org.thingsboard.script.api.tbel.TbelCfArg; +import org.thingsboard.script.api.tbel.TbelCfTsGeofencingArg; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.kv.KvEntry; @@ -26,15 +26,18 @@ import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; -// TODO: implement @Data +@Slf4j public class GeofencingArgumentEntry implements ArgumentEntry { - private Map geofencingIdToPerimeter; + private Map zoneStates; private boolean forceResetPrevious; + public GeofencingArgumentEntry() { + } + public GeofencingArgumentEntry(Map entityIdKvEntryMap) { - this.geofencingIdToPerimeter = toPerimetersMap(entityIdKvEntryMap); + this.zoneStates = toZones(entityIdKvEntryMap); } @Override @@ -44,7 +47,7 @@ public class GeofencingArgumentEntry implements ArgumentEntry { @Override public Object getValue() { - return geofencingIdToPerimeter; + return zoneStates; } @Override @@ -52,34 +55,49 @@ public class GeofencingArgumentEntry implements ArgumentEntry { if (!(entry instanceof GeofencingArgumentEntry geofencingArgumentEntry)) { throw new IllegalArgumentException("Unsupported argument entry type for geofencing argument entry: " + entry.getType()); } - if (Objects.equals(this.geofencingIdToPerimeter, geofencingArgumentEntry.getGeofencingIdToPerimeter())) { - return false; // No change + boolean updated = false; + for (var zoneEntry : geofencingArgumentEntry.getZoneStates().entrySet()) { + if (updateZone(zoneEntry)) { + updated = true; + } } - this.geofencingIdToPerimeter = geofencingArgumentEntry.getGeofencingIdToPerimeter(); - return true; + return updated; } @Override public boolean isEmpty() { - return geofencingIdToPerimeter == null || geofencingIdToPerimeter.isEmpty(); + return zoneStates == null || zoneStates.isEmpty(); } @Override public TbelCfArg toTbelCfArg() { - return null; + return new TbelCfTsGeofencingArg(); } - private Map toPerimetersMap(Map entityIdKvEntryMap) { + private Map toZones(Map entityIdKvEntryMap) { return entityIdKvEntryMap.entrySet().stream().map(entry -> { - if (entry.getValue().getJsonValue().isEmpty()) { - return null; - } - String rawPerimeterValue = entry.getValue().getJsonValue().get(); - PerimeterDefinition perimeter = JacksonUtil.fromString(rawPerimeterValue, PerimeterDefinition.class); - return Map.entry(entry.getKey(), perimeter); - }) - .filter(Objects::nonNull) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + try { + if (entry.getValue().getJsonValue().isEmpty()) { + return null; + } + return Map.entry(entry.getKey(), new GeofencingZoneState(entry.getKey(), entry.getValue())); + } catch (Exception e) { + log.error("Failed to parse geofencing zone perimeter for entity id: {}", entry.getKey(), e); + return null; + } + }).filter(Objects::nonNull).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private boolean updateZone(Map.Entry zoneEntry) { + EntityId zoneId = zoneEntry.getKey(); + GeofencingZoneState newZoneState = zoneEntry.getValue(); + + GeofencingZoneState existingZoneState = zoneStates.get(zoneId); + if (existingZoneState == null) { + zoneStates.put(zoneId, newZoneState); + return true; + } + return existingZoneState.update(newZoneState); } } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java index 11853e7823..787f47d82c 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java @@ -21,19 +21,15 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.Data; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.geo.Coordinates; -import org.thingsboard.common.util.geo.PerimeterDefinition; -import org.thingsboard.rule.engine.geo.EntityGeofencingState; -import org.thingsboard.rule.engine.util.GpsGeofencingEvents; import org.thingsboard.server.common.data.cf.CalculatedFieldType; -import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.service.cf.CalculatedFieldResult; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import org.thingsboard.server.utils.CalculatedFieldUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; @Data public class GeofencingCalculatedFieldState implements CalculatedFieldState { @@ -45,10 +41,11 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState { private List requiredArguments; private Map arguments; + + protected boolean sizeExceedsLimit; + private long latestTimestamp = -1; - private Map saveZoneStates; - private Map restrictedZoneStates; public GeofencingCalculatedFieldState() { this(List.of(ENTITY_ID_LATITUDE_ARGUMENT_KEY, ENTITY_ID_LONGITUDE_ARGUMENT_KEY, SAVE_ZONES_ARGUMENT_KEY, RESTRICTED_ZONES_ARGUMENT_KEY)); @@ -57,8 +54,6 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState { public GeofencingCalculatedFieldState(List argNames) { this.requiredArguments = argNames; this.arguments = new HashMap<>(); - this.saveZoneStates = new HashMap<>(); - this.restrictedZoneStates = new HashMap<>(); } @Override @@ -68,7 +63,6 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState { @Override public boolean updateState(CalculatedFieldCtx ctx, Map argumentValues) { - // TODO: Do I need to check argument for null? if (arguments == null) { arguments = new HashMap<>(); } @@ -79,17 +73,12 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState { String key = entry.getKey(); ArgumentEntry newEntry = entry.getValue(); - // TODO: Do I need to check argument size? - // checkArgumentSize(key, newEntry, ctx); + checkArgumentSize(key, newEntry, ctx); ArgumentEntry existingEntry = arguments.get(key); boolean entryUpdated; - // TODO: What is force reset previos? - // if (existingEntry == null || newEntry.isForceResetPrevious()) { - - // fresh start of state. No entry exists yet. - if (existingEntry == null) { + if (existingEntry == null || newEntry.isForceResetPrevious()) { switch (key) { case ENTITY_ID_LATITUDE_ARGUMENT_KEY: case ENTITY_ID_LONGITUDE_ARGUMENT_KEY: @@ -111,25 +100,8 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState { throw new IllegalArgumentException("Unsupported argument: " + key); } } else { - entryUpdated = switch (key) { - case ENTITY_ID_LATITUDE_ARGUMENT_KEY, - ENTITY_ID_LONGITUDE_ARGUMENT_KEY -> existingEntry.updateEntry(newEntry); - case SAVE_ZONES_ARGUMENT_KEY, - RESTRICTED_ZONES_ARGUMENT_KEY -> { - // TODO: ensure zone cleanup working correctly. - boolean updated = existingEntry.updateEntry(newEntry); - if (updated) { - Map currentStates = - key.equals(SAVE_ZONES_ARGUMENT_KEY) ? saveZoneStates : restrictedZoneStates; - Set newZoneIds = ((GeofencingArgumentEntry) newEntry).getGeofencingIdToPerimeter().keySet(); - currentStates.keySet().removeIf(existingZoneId -> !newZoneIds.contains(existingZoneId)); - } - yield updated; - } - default -> throw new IllegalStateException("Unsupported argument: " + key); - }; + entryUpdated = existingEntry.updateEntry(newEntry); } - if (entryUpdated) { stateUpdated = true; updateLastUpdateTimestamp(newEntry); @@ -141,8 +113,8 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState { @Override public ListenableFuture> performCalculation(CalculatedFieldCtx ctx) { - List savedZonesStatesResults = updateSavedGeofencingZonesState(ctx); - List restrictedZonesStatesResults = updateRestrictedGeofencingZonesState(ctx); + List savedZonesStatesResults = updateGeofencingZonesState(ctx, false); + List restrictedZonesStatesResults = updateGeofencingZonesState(ctx, true); List allZoneStatesResults = new ArrayList<>(savedZonesStatesResults.size() + restrictedZonesStatesResults.size()); @@ -158,22 +130,12 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState { arguments.values().stream().noneMatch(ArgumentEntry::isEmpty); } - // TODO: implement - @Override - public boolean isSizeExceedsLimit() { - return false; - } - - // TODO: implement @Override public void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize) { - - } - - // TODO: implement - @Override - public void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx) { - + if (!sizeExceedsLimit && maxStateSize > 0 && CalculatedFieldUtils.toProto(ctxId, this).getSerializedSize() > maxStateSize) { + arguments.clear(); + sizeExceedsLimit = true; + } } private void updateLastUpdateTimestamp(ArgumentEntry entry) { @@ -184,59 +146,27 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState { this.latestTimestamp = Math.max(this.latestTimestamp, newTs); } - private List updateSavedGeofencingZonesState(CalculatedFieldCtx ctx) { - return updateGeofencingZonesState(ctx, saveZoneStates, false); - } - - private List updateRestrictedGeofencingZonesState(CalculatedFieldCtx ctx) { - return updateGeofencingZonesState(ctx, restrictedZoneStates, true); - } - // TODO: Ensure all cases are covered based on rule node logic. - private List updateGeofencingZonesState(CalculatedFieldCtx ctx, Map zoneStates, boolean restricted) { + private List updateGeofencingZonesState(CalculatedFieldCtx ctx, boolean restricted) { var results = new ArrayList(); - long stateSwitchTime = System.currentTimeMillis(); double latitude = (double) arguments.get(ENTITY_ID_LATITUDE_ARGUMENT_KEY).getValue(); double longitude = (double) arguments.get(ENTITY_ID_LONGITUDE_ARGUMENT_KEY).getValue(); - Coordinates entityCoordinates = new Coordinates(latitude, longitude); + Coordinates entityCoordinates = new Coordinates(latitude, longitude); String zoneKey = restricted ? RESTRICTED_ZONES_ARGUMENT_KEY : SAVE_ZONES_ARGUMENT_KEY; GeofencingArgumentEntry zonesEntry = (GeofencingArgumentEntry) arguments.get(zoneKey); - for (Map.Entry entry : zonesEntry.getGeofencingIdToPerimeter().entrySet()) { - EntityId zoneId = entry.getKey(); - PerimeterDefinition perimeter = entry.getValue(); - - boolean inside = perimeter.checkMatches(entityCoordinates); - - // Always present or created - EntityGeofencingState state = zoneStates.computeIfAbsent( - zoneId, id -> new EntityGeofencingState(false, 0L, false) - ); - - String event; - if (state.getStateSwitchTime() == 0L || state.isInside() != inside) { - // First state or transition (entered/left) - state.setInside(inside); - state.setStateSwitchTime(stateSwitchTime); - state.setStayed(false); - - event = inside ? GpsGeofencingEvents.ENTERED : GpsGeofencingEvents.LEFT; - } else { - // No transition - event = inside ? GpsGeofencingEvents.INSIDE : GpsGeofencingEvents.OUTSIDE; - } - + for (var zoneEntry : zonesEntry.getZoneStates().entrySet()) { + GeofencingZoneState state = zoneEntry.getValue(); + String event = state.evaluate(entityCoordinates, stateSwitchTime); ObjectNode stateNode = JacksonUtil.newObjectNode(); stateNode.put("entityId", ctx.getEntityId().toString()); - stateNode.put("zoneId", zoneId.getId().toString()); + stateNode.put("zoneId", state.getZoneId().toString()); stateNode.put("restricted", restricted); stateNode.put("event", event); - results.add(new CalculatedFieldResult(ctx.getOutput().getType(), ctx.getOutput().getScope(), stateNode)); } - return results; } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingZoneState.java b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingZoneState.java new file mode 100644 index 0000000000..abee7cabb6 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingZoneState.java @@ -0,0 +1,94 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.cf.ctx.state; + +import lombok.Data; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.geo.Coordinates; +import org.thingsboard.common.util.geo.PerimeterDefinition; +import org.thingsboard.rule.engine.geo.EntityGeofencingState; +import org.thingsboard.rule.engine.util.GpsGeofencingEvents; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneProto; + +import java.util.UUID; + +@Data +public class GeofencingZoneState { + + private final EntityId zoneId; + + private long ts; + private Long version; + private PerimeterDefinition perimeterDefinition; + + private EntityGeofencingState state; + + public GeofencingZoneState(EntityId zoneId, KvEntry entry) { + this.zoneId = zoneId; + if (entry instanceof TsKvEntry tsKvEntry) { + this.ts = tsKvEntry.getTs(); + this.version = tsKvEntry.getVersion(); + } else if (entry instanceof AttributeKvEntry attributeKvEntry) { + this.ts = attributeKvEntry.getLastUpdateTs(); + this.version = attributeKvEntry.getVersion(); + } + this.perimeterDefinition = JacksonUtil.fromString(entry.getJsonValue().orElseThrow(), PerimeterDefinition.class); + } + + public GeofencingZoneState(GeofencingZoneProto proto) { + this.zoneId = EntityIdFactory.getByTypeAndUuid(proto.getZoneId().getType(), + new UUID(proto.getZoneId().getZoneIdMSB(), proto.getZoneId().getZoneIdLSB())); + this.ts = proto.getTs(); + this.version = proto.getVersion(); + this.perimeterDefinition = JacksonUtil.fromString(proto.getPerimeterDefinition(), PerimeterDefinition.class); + this.state = new EntityGeofencingState(proto.getInside(), proto.getStateSwitchTime(), proto.getStayed()); + } + + public boolean update(GeofencingZoneState newZoneState) { + if (newZoneState.getTs() <= this.ts) { + return false; + } + Long newVersion = newZoneState.getVersion(); + if (newVersion == null || this.version == null || newVersion > this.version) { + this.ts = newZoneState.getTs(); + this.version = newVersion; + this.perimeterDefinition = newZoneState.getPerimeterDefinition(); + // TODO: should we reinitialize state if zone changed? + return true; + } + return false; + } + + public String evaluate(Coordinates entityCoordinates, long currentTs) { + boolean inside = perimeterDefinition.checkMatches(entityCoordinates); + if (state == null) { + state = new EntityGeofencingState(inside, ts, false); + } + if (state.getStateSwitchTime() == 0L || state.isInside() != inside) { + state.setInside(inside); + state.setStateSwitchTime(currentTs); + state.setStayed(false); + return inside ? GpsGeofencingEvents.ENTERED : GpsGeofencingEvents.LEFT; + } + return inside ? GpsGeofencingEvents.INSIDE : GpsGeofencingEvents.OUTSIDE; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java index d9a6248b96..302ced0df1 100644 --- a/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java +++ b/application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java @@ -15,6 +15,8 @@ */ package org.thingsboard.server.utils; +import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.rule.engine.geo.EntityGeofencingState; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.cf.CalculatedFieldType; import org.thingsboard.server.common.data.id.CalculatedFieldId; @@ -26,21 +28,30 @@ import org.thingsboard.server.common.util.KvProtoUtil; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto; import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto; +import org.thingsboard.server.gen.transport.TransportProtos.GeofencingArgumentProto; +import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneIdProto; +import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneProto; import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto; import org.thingsboard.server.gen.transport.TransportProtos.TsDoubleValProto; import org.thingsboard.server.gen.transport.TransportProtos.TsRollingArgumentProto; import org.thingsboard.server.gen.transport.TransportProtos.TsValueProto; import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId; +import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.GeofencingArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.GeofencingCalculatedFieldState; +import org.thingsboard.server.service.cf.ctx.state.GeofencingZoneState; import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry; +import java.util.Map; import java.util.Optional; import java.util.TreeMap; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; public class CalculatedFieldUtils { @@ -80,6 +91,8 @@ public class CalculatedFieldUtils { builder.addSingleValueArguments(toSingleValueArgumentProto(argName, singleValueArgumentEntry)); } else if (argEntry instanceof TsRollingArgumentEntry rollingArgumentEntry) { builder.addRollingValueArguments(toRollingArgumentProto(argName, rollingArgumentEntry)); + } else if (argEntry instanceof GeofencingArgumentEntry geofencingArgumentEntry) { + builder.addGeofencingArguments(toGeofencingArgumentProto(argName, geofencingArgumentEntry)); } }); return builder.build(); @@ -109,6 +122,42 @@ public class CalculatedFieldUtils { return builder.build(); } + + private static GeofencingArgumentProto toGeofencingArgumentProto(String argName, GeofencingArgumentEntry geofencingArgumentEntry) { + GeofencingArgumentProto.Builder builder = GeofencingArgumentProto.newBuilder() + .setArgName(argName); + Map zoneStates = geofencingArgumentEntry.getZoneStates(); + zoneStates.forEach((entityId, zoneState) -> { + builder.addZones(toGeofencingZoneProto(entityId, zoneState)); + }); + return builder.build(); + } + + private static GeofencingZoneProto toGeofencingZoneProto(EntityId entityId, GeofencingZoneState zoneState) { + GeofencingZoneProto.Builder builder = GeofencingZoneProto.newBuilder() + .setZoneId(toGeofencingZoneIdProto(entityId)) + .setTs(zoneState.getTs()) + .setVersion(zoneState.getVersion()) + .setPerimeterDefinition(JacksonUtil.toString(zoneState.getPerimeterDefinition())); + if (zoneState.getState() != null) { + EntityGeofencingState state = zoneState.getState(); + builder.setInside(state.isInside()) + .setStayed(state.isStayed()) + .setStateSwitchTime(state.getStateSwitchTime()); + + } + return builder.build(); + } + + private static GeofencingZoneIdProto toGeofencingZoneIdProto(EntityId zoneId) { + return GeofencingZoneIdProto.newBuilder() + .setType(zoneId.getEntityType().name()) + .setZoneIdLSB(zoneId.getId().getLeastSignificantBits()) + .setZoneIdMSB(zoneId.getId().getMostSignificantBits()) + .build(); + } + + public static CalculatedFieldState fromProto(CalculatedFieldStateProto proto) { if (StringUtils.isEmpty(proto.getType())) { return null; @@ -122,8 +171,6 @@ public class CalculatedFieldUtils { case GEOFENCING -> new GeofencingCalculatedFieldState(); }; - // TODO: add logic to restore geofencing state from proto - proto.getSingleValueArgumentsList().forEach(argProto -> state.getArguments().put(argProto.getArgName(), fromSingleValueArgumentProto(argProto))); @@ -132,6 +179,11 @@ public class CalculatedFieldUtils { state.getArguments().put(argProto.getKey(), fromRollingArgumentProto(argProto))); } + if (CalculatedFieldType.GEOFENCING.equals(type)) { + proto.getGeofencingArgumentsList().forEach(argProto -> + state.getArguments().put(argProto.getArgName(), fromGeofencingArgumentProto(argProto))); + } + return state; } @@ -153,4 +205,15 @@ public class CalculatedFieldUtils { return new TsRollingArgumentEntry(tsRecords, proto.getLimit(), proto.getTimeWindow()); } + + private static ArgumentEntry fromGeofencingArgumentProto(GeofencingArgumentProto proto) { + Map zoneStates = proto.getZonesList() + .stream() + .map(GeofencingZoneState::new) + .collect(Collectors.toMap(GeofencingZoneState::getZoneId, Function.identity())); + GeofencingArgumentEntry geofencingArgumentEntry = new GeofencingArgumentEntry(); + geofencingArgumentEntry.setZoneStates(zoneStates); + return geofencingArgumentEntry; + } + } diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index c37be2e620..5478d65d93 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -56,6 +56,7 @@ + diff --git a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java index 6da6fcf8a8..1805788007 100644 --- a/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java +++ b/common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java @@ -38,7 +38,6 @@ import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest; import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; -import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.RestApiCallResponseMsgProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg; @@ -81,7 +80,7 @@ public interface TbClusterService extends TbQueueClusterService { void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback); - void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, TransportProtos.ToCalculatedFieldMsg msg, TbQueueCallback callback); + void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback); void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java index 8227ff4603..d3cfe3a59a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java @@ -58,4 +58,9 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel return link; } + @Override + public boolean hasDynamicSourceArguments() { + return arguments.values().stream().anyMatch(arg -> arg.getRefDynamicSource() != null); + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java index b713f9030d..d090feeca7 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java @@ -59,4 +59,16 @@ public interface CalculatedFieldConfiguration { CalculatedFieldLink buildCalculatedFieldLink(TenantId tenantId, EntityId referencedEntityId, CalculatedFieldId calculatedFieldId); + @JsonIgnore + boolean hasDynamicSourceArguments(); + + @JsonIgnore + default boolean isDynamicRefreshEnabled() { + return hasDynamicSourceArguments() && getRefreshIntervalSec() > 0; + } + + default int getRefreshIntervalSec() { + return 0; + } + } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java index 6c1450d713..db6a5fd460 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java @@ -23,9 +23,15 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldType; @EqualsAndHashCode(callSuper = true) public class GeofencingCalculatedFieldConfiguration extends BaseCalculatedFieldConfiguration implements CalculatedFieldConfiguration { + private int refreshIntervalSec; + @Override public CalculatedFieldType getType() { return CalculatedFieldType.GEOFENCING; } + public boolean isDynamicRefreshEnabled() { + return refreshIntervalSec > 0; + } + } diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java index f1c404ce16..bfcd3f3071 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java @@ -150,7 +150,10 @@ public enum MsgType { /* CF Manager Actor -> CF Entity actor */ CF_ENTITY_TELEMETRY_MSG, CF_ENTITY_INIT_CF_MSG, - CF_ENTITY_DELETE_MSG; + CF_ENTITY_DELETE_MSG, + + CF_SCHEDULED_CHECK_FOR_UPDATES_MSG, + CF_ENTITY_CHECK_FOR_UPDATES_MSG; @Getter private final boolean ignoreOnStart; diff --git a/common/proto/src/main/proto/queue.proto b/common/proto/src/main/proto/queue.proto index 2667838b60..885bad2cca 100644 --- a/common/proto/src/main/proto/queue.proto +++ b/common/proto/src/main/proto/queue.proto @@ -894,11 +894,33 @@ message TsRollingArgumentProto { repeated TsDoubleValProto tsValue = 4; } +message GeofencingZoneIdProto { + string type = 1; + int64 zoneIdMSB = 2; + int64 zoneIdLSB = 3; +} + +message GeofencingZoneProto { + GeofencingZoneIdProto zoneId = 1; + int64 ts = 2; + string perimeterDefinition = 3; + int64 version = 4; + bool inside = 5; + int64 stateSwitchTime = 6; + bool stayed = 7; +} + +message GeofencingArgumentProto { + string argName = 1; // e.g., "restrictedZones" or "saveZones" + repeated GeofencingZoneProto zones = 2; +} + message CalculatedFieldStateProto { CalculatedFieldEntityCtxIdProto id = 1; string type = 2; repeated SingleValueArgumentProto singleValueArguments = 3; repeated TsRollingArgumentProto rollingValueArguments = 4; + repeated GeofencingArgumentProto geofencingArguments = 5; } //Used to report session state to tb-Service and persist this state in the cache on the tb-Service level. diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java index f95b08195e..73a2183564 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java @@ -26,7 +26,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo; ) @JsonSubTypes({ @JsonSubTypes.Type(value = TbelCfSingleValueArg.class, name = "SINGLE_VALUE"), - @JsonSubTypes.Type(value = TbelCfTsRollingArg.class, name = "TS_ROLLING") + @JsonSubTypes.Type(value = TbelCfTsRollingArg.class, name = "TS_ROLLING"), + @JsonSubTypes.Type(value = TbelCfTsGeofencingArg.class, name = "GEOFENCING_CF_ARGUMENT_VALUE"), }) public interface TbelCfArg extends TbelCfObject { diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldInitService.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java similarity index 62% rename from application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldInitService.java rename to common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java index 6505dae581..46ac553a76 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldInitService.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java @@ -13,7 +13,24 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.cf; +package org.thingsboard.script.api.tbel; + +// TODO: should I add any specific logic for this? +public class TbelCfTsGeofencingArg implements TbelCfArg { + + public TbelCfTsGeofencingArg() { + + } + + @Override + public String getType() { + return "GEOFENCING_CF_ARGUMENT_VALUE"; + } + + + @Override + public long memorySize() { + return OBJ_SIZE; + } -public interface CalculatedFieldInitService { } diff --git a/common/util/src/main/java/org/thingsboard/common/util/geo/PerimeterDefinition.java b/common/util/src/main/java/org/thingsboard/common/util/geo/PerimeterDefinition.java index 68191f1a17..4cba6f9c8a 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/geo/PerimeterDefinition.java +++ b/common/util/src/main/java/org/thingsboard/common/util/geo/PerimeterDefinition.java @@ -35,5 +35,6 @@ public interface PerimeterDefinition extends Serializable { @JsonIgnore PerimeterType getType(); + @JsonIgnore boolean checkMatches(Coordinates entityCoordinates); }