Browse Source

added basic logic to update state periodically

pull/13920/head
dshvaika 10 months ago
parent
commit
c8490080a1
  1. 5
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java
  2. 55
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  3. 3
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java
  4. 76
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java
  5. 35
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledCheckForUpdatesMsg.java
  6. 37
      application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldCheckForUpdatesMsg.java
  7. 5
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java
  8. 14
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java
  9. 48
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  10. 14
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java
  11. 62
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingArgumentEntry.java
  12. 108
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java
  13. 94
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingZoneState.java
  14. 67
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java
  15. 1
      application/src/main/resources/logback.xml
  16. 3
      common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java
  17. 5
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java
  18. 12
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java
  19. 6
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java
  20. 5
      common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java
  21. 22
      common/proto/src/main/proto/queue.proto
  22. 3
      common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java
  23. 21
      common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java
  24. 1
      common/util/src/main/java/org/thingsboard/common/util/geo/PerimeterDefinition.java

5
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityActor.java

@ -51,7 +51,7 @@ public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor {
@Override
public void destroy(TbActorStopReason stopReason, Throwable cause) throws TbActorException {
log.debug("[{}] Stopping CF entity actor.", processor.tenantId);
processor.stop();
processor.stop(false);
}
@Override
@ -75,6 +75,9 @@ public class CalculatedFieldEntityActor extends AbstractCalculatedFieldActor {
case CF_LINKED_TELEMETRY_MSG:
processor.process((EntityCalculatedFieldLinkedTelemetryMsg) msg);
break;
case CF_ENTITY_CHECK_FOR_UPDATES_MSG:
processor.process((EntityCalculatedFieldCheckForUpdatesMsg) msg);
break;
default:
return false;
}

55
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -59,7 +59,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@ -91,16 +93,18 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
this.ctx = ctx;
}
public void stop() {
log.info("[{}][{}] Stopping entity actor.", tenantId, entityId);
public void stop(boolean partitionChanged) {
log.info(partitionChanged ?
"[{}][{}] Stopping entity actor due to change partition event." :
"[{}][{}] Stopping entity actor.",
tenantId, entityId);
states.clear();
ctx.stop(ctx.getSelf());
}
public void process(CalculatedFieldPartitionChangeMsg msg) {
if (!systemContext.getPartitionService().resolve(ServiceType.TB_RULE_ENGINE, DataConstants.CF_QUEUE_NAME, tenantId, entityId).isMyPartition()) {
log.info("[{}] Stopping entity actor due to change partition event.", entityId);
ctx.stop(ctx.getSelf());
stop(true);
}
}
@ -224,6 +228,25 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
}
}
public void process(EntityCalculatedFieldCheckForUpdatesMsg msg) throws CalculatedFieldException {
CalculatedFieldCtx cfCtx = msg.getCfCtx();
CalculatedFieldId cfId = cfCtx.getCfId();
log.debug("[{}] [{}] Processing CF dynamic sources refresh msg.", entityId, cfId);
try {
var state = updateStateFromDb(cfCtx);
if (state.isSizeOk()) {
processStateIfReady(cfCtx, Collections.singletonList(cfId), state, null, null, msg.getCallback());
} else {
throw new RuntimeException(cfCtx.getSizeExceedsLimitMessage());
}
} catch (Exception e) {
if (e instanceof CalculatedFieldException cfe) {
throw cfe;
}
throw CalculatedFieldException.builder().ctx(cfCtx).eventEntity(entityId).cause(e).build();
}
}
private void processTelemetry(CalculatedFieldCtx ctx, CalculatedFieldTelemetryMsgProto proto, List<CalculatedFieldId> cfIdList, MultipleTbCallback callback) throws CalculatedFieldException {
processArgumentValuesUpdate(ctx, cfIdList, callback, mapToArguments(ctx, proto.getTsDataList()), toTbMsgId(proto), toTbMsgType(proto));
}
@ -270,16 +293,19 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
CalculatedFieldState state = states.get(ctx.getCfId());
if (state != null) {
return state;
} else {
ListenableFuture<CalculatedFieldState> stateFuture = systemContext.getCalculatedFieldProcessingService().fetchStateFromDb(ctx, entityId);
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
state = stateFuture.get(1, TimeUnit.MINUTES);
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize());
states.put(ctx.getCfId(), state);
}
return updateStateFromDb(ctx);
}
private CalculatedFieldState updateStateFromDb(CalculatedFieldCtx ctx) throws InterruptedException, ExecutionException, TimeoutException {
ListenableFuture<CalculatedFieldState> stateFuture = cfService.fetchStateFromDb(ctx, entityId);
// Ugly but necessary. We do not expect to often fetch data from DB. Only once per <Entity, CalculatedField> pair lifetime.
// This call happens while processing the CF pack from the queue consumer. So the timeout should be relatively low.
// Alternatively, we can fetch the state outside the actor system and push separate command to create this actor,
// but this will significantly complicate the code.
CalculatedFieldState state = stateFuture.get(1, TimeUnit.MINUTES);
state.checkStateSize(new CalculatedFieldEntityCtxId(tenantId, ctx.getCfId(), entityId), ctx.getMaxStateSize());
states.put(ctx.getCfId(), state);
return state;
}
@ -297,12 +323,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
} else {
TbCallback effectiveCallback = calculationResults.size() > 1 ?
new MultipleTbCallback(calculationResults.size(), callback) : callback;
for (CalculatedFieldResult calculationResult : calculationResults) {
if (calculationResult.isEmpty()) {
effectiveCallback.onSuccess();
} else {
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, callback);
cfService.pushMsgToRuleEngine(tenantId, entityId, calculationResult, cfIdList, effectiveCallback);
}
}
}

3
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerActor.java

@ -91,6 +91,9 @@ public class CalculatedFieldManagerActor extends AbstractCalculatedFieldActor {
case CF_LINKED_TELEMETRY_MSG:
processor.onLinkedTelemetryMsg((CalculatedFieldLinkedTelemetryMsg) msg);
break;
case CF_SCHEDULED_CHECK_FOR_UPDATES_MSG:
processor.onScheduledCheckForUpdatesMsg((CalculatedFieldScheduledCheckForUpdatesMsg) msg);
break;
default:
return false;
}

76
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldManagerMessageProcessor.java

@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ProfileEntityIdInfo;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.CalculatedFieldLink;
import org.thingsboard.server.common.data.cf.configuration.CalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.AssetId;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.DeviceId;
@ -59,7 +60,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.thingsboard.server.utils.CalculatedFieldUtils.fromProto;
@ -72,6 +76,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
private final Map<CalculatedFieldId, CalculatedFieldCtx> calculatedFields = new HashMap<>();
private final Map<EntityId, List<CalculatedFieldCtx>> entityIdCalculatedFields = new HashMap<>();
private final Map<EntityId, List<CalculatedFieldLink>> entityIdCalculatedFieldLinks = new HashMap<>();
private final Map<CalculatedFieldId, ScheduledFuture<?>> checkForCalculatedFieldUpdateTasks = new ConcurrentHashMap<>();
private final CalculatedFieldProcessingService cfExecService;
private final CalculatedFieldStateService cfStateService;
@ -110,6 +115,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
calculatedFields.clear();
entityIdCalculatedFields.clear();
entityIdCalculatedFieldLinks.clear();
checkForCalculatedFieldUpdateTasks.values().forEach(future -> future.cancel(true));
checkForCalculatedFieldUpdateTasks.clear();
ctx.stop(ctx.getSelf());
}
@ -117,6 +124,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.debug("[{}] Processing CF actor init message.", msg.getTenantId().getId());
initEntityProfileCache();
initCalculatedFields();
// TODO: implement cache for 1:1 relations to use in the CFs that based on a relation queries?
msg.getCallback().onSuccess();
}
@ -139,6 +147,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
// We use copy on write lists to safely pass the reference to another actor for the iteration.
// Alternative approach would be to use any list but avoid modifications to the list (change the complete map value instead)
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cfCtx);
scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx);
msg.getCallback().onSuccess();
}
@ -324,6 +333,11 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
calculatedFields.put(newCf.getId(), newCfCtx);
List<CalculatedFieldCtx> oldCfList = entityIdCalculatedFields.get(newCf.getEntityId());
if (newCfCtx.hasSchedulingConfigChanges(oldCfCtx)) {
cancelCfUpdateTaskIfExists(cfId, false);
}
List<CalculatedFieldCtx> newCfList = new CopyOnWriteArrayList<>();
boolean found = false;
for (CalculatedFieldCtx oldCtx : oldCfList) {
@ -364,6 +378,8 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
entityIdCalculatedFields.get(cfCtx.getEntityId()).remove(cfCtx);
deleteLinks(cfCtx);
cancelCfUpdateTaskIfExists(cfId, true);
EntityId entityId = cfCtx.getEntityId();
EntityType entityType = cfCtx.getEntityId().getEntityType();
if (isProfileEntity(entityType)) {
@ -387,6 +403,15 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
}
}
private void cancelCfUpdateTaskIfExists(CalculatedFieldId cfId, boolean cfDeleted) {
var existingTask = checkForCalculatedFieldUpdateTasks.remove(cfId);
if (existingTask != null) {
existingTask.cancel(false);
String reason = cfDeleted ? "removal" : "update";
log.debug("[{}][{}] Cancelled check for update task for CF due to: " + reason + "!", tenantId, cfId);
}
}
public void onTelemetryMsg(CalculatedFieldTelemetryMsg msg) {
EntityId entityId = msg.getEntityId();
log.debug("Received telemetry msg from entity [{}]", entityId);
@ -498,16 +523,66 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
initCfForEntity(id, cfCtx, forceStateReinit, multiCallback);
}
});
scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx);
} else {
callback.onSuccess();
}
} else {
if (isMyPartition(entityId, callback)) {
initCfForEntity(entityId, cfCtx, forceStateReinit, callback);
scheduleCalculatedFieldUpdateMsgIfNeeded(cfCtx);
}
}
}
private void scheduleCalculatedFieldUpdateMsgIfNeeded(CalculatedFieldCtx cfCtx) {
CalculatedField cf = cfCtx.getCalculatedField();
CalculatedFieldConfiguration cfConfig = cf.getConfiguration();
if (!cfConfig.isDynamicRefreshEnabled()) {
return;
}
if (checkForCalculatedFieldUpdateTasks.containsKey(cf.getId())) {
log.debug("[{}][{}] Check for update msg for CF is already scheduled!", tenantId, cf.getId());
return;
}
long refreshDynamicSourceInterval = TimeUnit.SECONDS.toMillis(cfConfig.getRefreshIntervalSec());
var scheduledMsg = new CalculatedFieldScheduledCheckForUpdatesMsg(tenantId, cfCtx);
ScheduledFuture<?> scheduledFuture = systemContext
.schedulePeriodicMsgWithDelay(ctx, scheduledMsg, refreshDynamicSourceInterval, refreshDynamicSourceInterval);
checkForCalculatedFieldUpdateTasks.put(cf.getId(), scheduledFuture);
log.debug("[{}][{}] Scheduled check for update msg for CF!", tenantId, cf.getId());
}
public void onScheduledCheckForUpdatesMsg(CalculatedFieldScheduledCheckForUpdatesMsg msg) {
CalculatedFieldCtx cfCtx = msg.getCfCtx();
EntityId entityId = cfCtx.getEntityId();
log.debug("[{}] [{}] Processing CF scheduled update msg.", cfCtx.getCfId(), entityId);
EntityType entityType = entityId.getEntityType();
if (isProfileEntity(entityType)) {
var entityIds = entityProfileCache.getEntityIdsByProfileId(entityId);
if (!entityIds.isEmpty()) {
var multiCallback = new MultipleTbCallback(entityIds.size(), msg.getCallback());
entityIds.forEach(id -> {
if (isMyPartition(id, multiCallback)) {
updateCfWithDynamicSourceForEntity(id, cfCtx, multiCallback);
}
});
} else {
msg.getCallback().onSuccess();
}
} else {
if (isMyPartition(entityId, msg.getCallback())) {
updateCfWithDynamicSourceForEntity(entityId, cfCtx, msg.getCallback());
}
}
}
private void updateCfWithDynamicSourceForEntity(EntityId entityId, CalculatedFieldCtx cfCtx, TbCallback callback) {
log.debug("Pushing entity dynamic source refresh CF msg to specific actor [{}]", entityId);
getOrCreateActor(entityId).tell(new EntityCalculatedFieldCheckForUpdatesMsg(tenantId, cfCtx, callback));
}
private void deleteCfForEntity(EntityId entityId, CalculatedFieldId cfId, TbCallback callback) {
log.debug("Pushing delete CF msg to specific actor [{}]", entityId);
getOrCreateActor(entityId).tell(new CalculatedFieldEntityDeleteMsg(tenantId, cfId, callback));
@ -571,6 +646,7 @@ public class CalculatedFieldManagerMessageProcessor extends AbstractContextAware
log.error("Failed to process calculated field record: {}", cf, e);
}
});
// TODO: why we need to do this loop if we do this inside the onFieldInitMsg?
calculatedFields.values().forEach(cf -> {
entityIdCalculatedFields.computeIfAbsent(cf.getEntityId(), id -> new CopyOnWriteArrayList<>()).add(cf);
});

35
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldScheduledCheckForUpdatesMsg.java

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
@Data
public class CalculatedFieldScheduledCheckForUpdatesMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final CalculatedFieldCtx cfCtx;
@Override
public MsgType getMsgType() {
return MsgType.CF_SCHEDULED_CHECK_FOR_UPDATES_MSG;
}
}

37
application/src/main/java/org/thingsboard/server/actors/calculatedField/EntityCalculatedFieldCheckForUpdatesMsg.java

@ -0,0 +1,37 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.actors.calculatedField;
import lombok.Data;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.ToCalculatedFieldSystemMsg;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
@Data
public class EntityCalculatedFieldCheckForUpdatesMsg implements ToCalculatedFieldSystemMsg {
private final TenantId tenantId;
private final CalculatedFieldCtx cfCtx;
private final TbCallback callback;
@Override
public MsgType getMsgType() {
return MsgType.CF_ENTITY_CHECK_FOR_UPDATES_MSG;
}
}

5
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java

@ -140,7 +140,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
case SAVE_ZONES_ARGUMENT_KEY, RESTRICTED_ZONES_ARGUMENT_KEY -> {
var resolvedEntityIdsFuture = resolveGeofencingEntityIds(ctx.getTenantId(), entityId, entry);
argFutures.put(entry.getKey(), Futures.transformAsync(resolvedEntityIdsFuture, resolvedEntityIds ->
fetchGeofencingKvEntry(ctx.getTenantId(), resolvedEntityIds, entry.getValue()), MoreExecutors.directExecutor()));
fetchGeofencingKvEntry(ctx.getTenantId(), resolvedEntityIds, entry.getValue()), calculatedFieldCallbackExecutor));
}
}
}
@ -288,7 +288,7 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
case RELATION_QUERY -> {
var relationQueryDynamicSourceConfiguration = (RelationQueryDynamicSourceConfiguration) value.getRefDynamicSourceConfiguration();
yield Futures.transform(relationService.findByQuery(tenantId, relationQueryDynamicSourceConfiguration.toEntityRelationsQuery(entityId)),
relationQueryDynamicSourceConfiguration::resolveEntityIds, MoreExecutors.directExecutor());
relationQueryDynamicSourceConfiguration::resolveEntityIds, calculatedFieldCallbackExecutor);
}
};
}
@ -298,7 +298,6 @@ public class DefaultCalculatedFieldProcessingService implements CalculatedFieldP
if (argument.getRefEntityKey().getType() != ArgumentType.ATTRIBUTE) {
throw new IllegalStateException("Unsupported argument key type: " + argument.getRefEntityKey().getType());
}
List<ListenableFuture<Map.Entry<EntityId, AttributeKvEntry>>> kvFutures = geofencingEntities.stream()
.map(entityId -> {
var attributesFuture = attributesService.find(

14
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/BaseCalculatedFieldState.java

@ -25,8 +25,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.thingsboard.server.utils.CalculatedFieldUtils.toSingleValueArgumentProto;
@Data
@AllArgsConstructor
public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
@ -95,18 +93,6 @@ public abstract class BaseCalculatedFieldState implements CalculatedFieldState {
}
}
@Override
public void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx) {
if (entry instanceof TsRollingArgumentEntry) {
return;
}
if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
if (ctx.getMaxSingleValueArgumentSize() > 0 && toSingleValueArgumentProto(name, singleValueArgumentEntry).getSerializedSize() > ctx.getMaxSingleValueArgumentSize()) {
throw new IllegalArgumentException("Single value size exceeds the maximum allowed limit. The argument will not be used for calculation.");
}
}
}
protected abstract void validateNewEntry(ArgumentEntry newEntry);
private void updateLastUpdateTimestamp(ArgumentEntry entry) {

48
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -109,25 +109,29 @@ public class CalculatedFieldCtx {
}
public void init() {
if (CalculatedFieldType.SCRIPT.equals(cfType)) {
try {
this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService);
initialized = true;
} catch (Exception e) {
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.", e);
switch (cfType) {
case SCRIPT -> {
try {
this.calculatedFieldScriptEngine = initEngine(tenantId, expression, tbelInvokeService);
initialized = true;
} catch (Exception e) {
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.", e);
}
}
} else {
if (isValidExpression(expression)) {
this.customExpression = ThreadLocal.withInitial(() ->
new ExpressionBuilder(expression)
.functions(userDefinedFunctions)
.implicitMultiplication(true)
.variables(this.arguments.keySet())
.build()
);
initialized = true;
} else {
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.");
case GEOFENCING -> initialized = true;
default -> {
if (isValidExpression(expression)) {
this.customExpression = ThreadLocal.withInitial(() ->
new ExpressionBuilder(expression)
.functions(userDefinedFunctions)
.implicitMultiplication(true)
.variables(this.arguments.keySet())
.build()
);
initialized = true;
} else {
throw new RuntimeException("Failed to init calculated field ctx. Invalid expression syntax.");
}
}
}
}
@ -308,6 +312,14 @@ public class CalculatedFieldCtx {
return typeChanged || argumentsChanged;
}
public boolean hasSchedulingConfigChanges(CalculatedFieldCtx other) {
CalculatedFieldConfiguration thisConfig = calculatedField.getConfiguration();
CalculatedFieldConfiguration otherConfig = other.calculatedField.getConfiguration();
boolean refreshTriggerChanged = thisConfig.isDynamicRefreshEnabled() != otherConfig.isDynamicRefreshEnabled();
boolean refreshIntervalChanged = thisConfig.getRefreshIntervalSec() != otherConfig.getRefreshIntervalSec();
return refreshTriggerChanged || refreshIntervalChanged;
}
public String getSizeExceedsLimitMessage() {
return "Failed to init CF state. State size exceeds limit of " + (maxStateSize / 1024) + "Kb!";
}

14
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldState.java

@ -26,6 +26,8 @@ import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import java.util.List;
import java.util.Map;
import static org.thingsboard.server.utils.CalculatedFieldUtils.toSingleValueArgumentProto;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
@ -63,6 +65,16 @@ public interface CalculatedFieldState {
void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize);
void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx);
default void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx) {
// TODO: Do we need to restrict the size of Geofencing arguments? Number of zones?
if (entry instanceof TsRollingArgumentEntry || entry instanceof GeofencingArgumentEntry) {
return;
}
if (entry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
if (ctx.getMaxSingleValueArgumentSize() > 0 && toSingleValueArgumentProto(name, singleValueArgumentEntry).getSerializedSize() > ctx.getMaxSingleValueArgumentSize()) {
throw new IllegalArgumentException("Single value size exceeds the maximum allowed limit. The argument will not be used for calculation.");
}
}
}
}

62
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingArgumentEntry.java

@ -16,9 +16,9 @@
package org.thingsboard.server.service.cf.ctx.state;
import lombok.Data;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.geo.PerimeterDefinition;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.script.api.tbel.TbelCfArg;
import org.thingsboard.script.api.tbel.TbelCfTsGeofencingArg;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.KvEntry;
@ -26,15 +26,18 @@ import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
// TODO: implement
@Data
@Slf4j
public class GeofencingArgumentEntry implements ArgumentEntry {
private Map<EntityId, PerimeterDefinition> geofencingIdToPerimeter;
private Map<EntityId, GeofencingZoneState> zoneStates;
private boolean forceResetPrevious;
public GeofencingArgumentEntry() {
}
public GeofencingArgumentEntry(Map<EntityId, KvEntry> entityIdKvEntryMap) {
this.geofencingIdToPerimeter = toPerimetersMap(entityIdKvEntryMap);
this.zoneStates = toZones(entityIdKvEntryMap);
}
@Override
@ -44,7 +47,7 @@ public class GeofencingArgumentEntry implements ArgumentEntry {
@Override
public Object getValue() {
return geofencingIdToPerimeter;
return zoneStates;
}
@Override
@ -52,34 +55,49 @@ public class GeofencingArgumentEntry implements ArgumentEntry {
if (!(entry instanceof GeofencingArgumentEntry geofencingArgumentEntry)) {
throw new IllegalArgumentException("Unsupported argument entry type for geofencing argument entry: " + entry.getType());
}
if (Objects.equals(this.geofencingIdToPerimeter, geofencingArgumentEntry.getGeofencingIdToPerimeter())) {
return false; // No change
boolean updated = false;
for (var zoneEntry : geofencingArgumentEntry.getZoneStates().entrySet()) {
if (updateZone(zoneEntry)) {
updated = true;
}
}
this.geofencingIdToPerimeter = geofencingArgumentEntry.getGeofencingIdToPerimeter();
return true;
return updated;
}
@Override
public boolean isEmpty() {
return geofencingIdToPerimeter == null || geofencingIdToPerimeter.isEmpty();
return zoneStates == null || zoneStates.isEmpty();
}
@Override
public TbelCfArg toTbelCfArg() {
return null;
return new TbelCfTsGeofencingArg();
}
private Map<EntityId, PerimeterDefinition> toPerimetersMap(Map<EntityId, KvEntry> entityIdKvEntryMap) {
private Map<EntityId, GeofencingZoneState> toZones(Map<EntityId, KvEntry> entityIdKvEntryMap) {
return entityIdKvEntryMap.entrySet().stream().map(entry -> {
if (entry.getValue().getJsonValue().isEmpty()) {
return null;
}
String rawPerimeterValue = entry.getValue().getJsonValue().get();
PerimeterDefinition perimeter = JacksonUtil.fromString(rawPerimeterValue, PerimeterDefinition.class);
return Map.entry(entry.getKey(), perimeter);
})
.filter(Objects::nonNull)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
try {
if (entry.getValue().getJsonValue().isEmpty()) {
return null;
}
return Map.entry(entry.getKey(), new GeofencingZoneState(entry.getKey(), entry.getValue()));
} catch (Exception e) {
log.error("Failed to parse geofencing zone perimeter for entity id: {}", entry.getKey(), e);
return null;
}
}).filter(Objects::nonNull).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private boolean updateZone(Map.Entry<EntityId, GeofencingZoneState> zoneEntry) {
EntityId zoneId = zoneEntry.getKey();
GeofencingZoneState newZoneState = zoneEntry.getValue();
GeofencingZoneState existingZoneState = zoneStates.get(zoneId);
if (existingZoneState == null) {
zoneStates.put(zoneId, newZoneState);
return true;
}
return existingZoneState.update(newZoneState);
}
}

108
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingCalculatedFieldState.java

@ -21,19 +21,15 @@ import com.google.common.util.concurrent.ListenableFuture;
import lombok.Data;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.geo.Coordinates;
import org.thingsboard.common.util.geo.PerimeterDefinition;
import org.thingsboard.rule.engine.geo.EntityGeofencingState;
import org.thingsboard.rule.engine.util.GpsGeofencingEvents;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.utils.CalculatedFieldUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Data
public class GeofencingCalculatedFieldState implements CalculatedFieldState {
@ -45,10 +41,11 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
private List<String> requiredArguments;
private Map<String, ArgumentEntry> arguments;
protected boolean sizeExceedsLimit;
private long latestTimestamp = -1;
private Map<EntityId, EntityGeofencingState> saveZoneStates;
private Map<EntityId, EntityGeofencingState> restrictedZoneStates;
public GeofencingCalculatedFieldState() {
this(List.of(ENTITY_ID_LATITUDE_ARGUMENT_KEY, ENTITY_ID_LONGITUDE_ARGUMENT_KEY, SAVE_ZONES_ARGUMENT_KEY, RESTRICTED_ZONES_ARGUMENT_KEY));
@ -57,8 +54,6 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
public GeofencingCalculatedFieldState(List<String> argNames) {
this.requiredArguments = argNames;
this.arguments = new HashMap<>();
this.saveZoneStates = new HashMap<>();
this.restrictedZoneStates = new HashMap<>();
}
@Override
@ -68,7 +63,6 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
@Override
public boolean updateState(CalculatedFieldCtx ctx, Map<String, ArgumentEntry> argumentValues) {
// TODO: Do I need to check argument for null?
if (arguments == null) {
arguments = new HashMap<>();
}
@ -79,17 +73,12 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
String key = entry.getKey();
ArgumentEntry newEntry = entry.getValue();
// TODO: Do I need to check argument size?
// checkArgumentSize(key, newEntry, ctx);
checkArgumentSize(key, newEntry, ctx);
ArgumentEntry existingEntry = arguments.get(key);
boolean entryUpdated;
// TODO: What is force reset previos?
// if (existingEntry == null || newEntry.isForceResetPrevious()) {
// fresh start of state. No entry exists yet.
if (existingEntry == null) {
if (existingEntry == null || newEntry.isForceResetPrevious()) {
switch (key) {
case ENTITY_ID_LATITUDE_ARGUMENT_KEY:
case ENTITY_ID_LONGITUDE_ARGUMENT_KEY:
@ -111,25 +100,8 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
throw new IllegalArgumentException("Unsupported argument: " + key);
}
} else {
entryUpdated = switch (key) {
case ENTITY_ID_LATITUDE_ARGUMENT_KEY,
ENTITY_ID_LONGITUDE_ARGUMENT_KEY -> existingEntry.updateEntry(newEntry);
case SAVE_ZONES_ARGUMENT_KEY,
RESTRICTED_ZONES_ARGUMENT_KEY -> {
// TODO: ensure zone cleanup working correctly.
boolean updated = existingEntry.updateEntry(newEntry);
if (updated) {
Map<EntityId, EntityGeofencingState> currentStates =
key.equals(SAVE_ZONES_ARGUMENT_KEY) ? saveZoneStates : restrictedZoneStates;
Set<EntityId> newZoneIds = ((GeofencingArgumentEntry) newEntry).getGeofencingIdToPerimeter().keySet();
currentStates.keySet().removeIf(existingZoneId -> !newZoneIds.contains(existingZoneId));
}
yield updated;
}
default -> throw new IllegalStateException("Unsupported argument: " + key);
};
entryUpdated = existingEntry.updateEntry(newEntry);
}
if (entryUpdated) {
stateUpdated = true;
updateLastUpdateTimestamp(newEntry);
@ -141,8 +113,8 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
@Override
public ListenableFuture<List<CalculatedFieldResult>> performCalculation(CalculatedFieldCtx ctx) {
List<CalculatedFieldResult> savedZonesStatesResults = updateSavedGeofencingZonesState(ctx);
List<CalculatedFieldResult> restrictedZonesStatesResults = updateRestrictedGeofencingZonesState(ctx);
List<CalculatedFieldResult> savedZonesStatesResults = updateGeofencingZonesState(ctx, false);
List<CalculatedFieldResult> restrictedZonesStatesResults = updateGeofencingZonesState(ctx, true);
List<CalculatedFieldResult> allZoneStatesResults =
new ArrayList<>(savedZonesStatesResults.size() + restrictedZonesStatesResults.size());
@ -158,22 +130,12 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
arguments.values().stream().noneMatch(ArgumentEntry::isEmpty);
}
// TODO: implement
@Override
public boolean isSizeExceedsLimit() {
return false;
}
// TODO: implement
@Override
public void checkStateSize(CalculatedFieldEntityCtxId ctxId, long maxStateSize) {
}
// TODO: implement
@Override
public void checkArgumentSize(String name, ArgumentEntry entry, CalculatedFieldCtx ctx) {
if (!sizeExceedsLimit && maxStateSize > 0 && CalculatedFieldUtils.toProto(ctxId, this).getSerializedSize() > maxStateSize) {
arguments.clear();
sizeExceedsLimit = true;
}
}
private void updateLastUpdateTimestamp(ArgumentEntry entry) {
@ -184,59 +146,27 @@ public class GeofencingCalculatedFieldState implements CalculatedFieldState {
this.latestTimestamp = Math.max(this.latestTimestamp, newTs);
}
private List<CalculatedFieldResult> updateSavedGeofencingZonesState(CalculatedFieldCtx ctx) {
return updateGeofencingZonesState(ctx, saveZoneStates, false);
}
private List<CalculatedFieldResult> updateRestrictedGeofencingZonesState(CalculatedFieldCtx ctx) {
return updateGeofencingZonesState(ctx, restrictedZoneStates, true);
}
// TODO: Ensure all cases are covered based on rule node logic.
private List<CalculatedFieldResult> updateGeofencingZonesState(CalculatedFieldCtx ctx, Map<EntityId, EntityGeofencingState> zoneStates, boolean restricted) {
private List<CalculatedFieldResult> updateGeofencingZonesState(CalculatedFieldCtx ctx, boolean restricted) {
var results = new ArrayList<CalculatedFieldResult>();
long stateSwitchTime = System.currentTimeMillis();
double latitude = (double) arguments.get(ENTITY_ID_LATITUDE_ARGUMENT_KEY).getValue();
double longitude = (double) arguments.get(ENTITY_ID_LONGITUDE_ARGUMENT_KEY).getValue();
Coordinates entityCoordinates = new Coordinates(latitude, longitude);
Coordinates entityCoordinates = new Coordinates(latitude, longitude);
String zoneKey = restricted ? RESTRICTED_ZONES_ARGUMENT_KEY : SAVE_ZONES_ARGUMENT_KEY;
GeofencingArgumentEntry zonesEntry = (GeofencingArgumentEntry) arguments.get(zoneKey);
for (Map.Entry<EntityId, PerimeterDefinition> entry : zonesEntry.getGeofencingIdToPerimeter().entrySet()) {
EntityId zoneId = entry.getKey();
PerimeterDefinition perimeter = entry.getValue();
boolean inside = perimeter.checkMatches(entityCoordinates);
// Always present or created
EntityGeofencingState state = zoneStates.computeIfAbsent(
zoneId, id -> new EntityGeofencingState(false, 0L, false)
);
String event;
if (state.getStateSwitchTime() == 0L || state.isInside() != inside) {
// First state or transition (entered/left)
state.setInside(inside);
state.setStateSwitchTime(stateSwitchTime);
state.setStayed(false);
event = inside ? GpsGeofencingEvents.ENTERED : GpsGeofencingEvents.LEFT;
} else {
// No transition
event = inside ? GpsGeofencingEvents.INSIDE : GpsGeofencingEvents.OUTSIDE;
}
for (var zoneEntry : zonesEntry.getZoneStates().entrySet()) {
GeofencingZoneState state = zoneEntry.getValue();
String event = state.evaluate(entityCoordinates, stateSwitchTime);
ObjectNode stateNode = JacksonUtil.newObjectNode();
stateNode.put("entityId", ctx.getEntityId().toString());
stateNode.put("zoneId", zoneId.getId().toString());
stateNode.put("zoneId", state.getZoneId().toString());
stateNode.put("restricted", restricted);
stateNode.put("event", event);
results.add(new CalculatedFieldResult(ctx.getOutput().getType(), ctx.getOutput().getScope(), stateNode));
}
return results;
}

94
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/GeofencingZoneState.java

@ -0,0 +1,94 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state;
import lombok.Data;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.geo.Coordinates;
import org.thingsboard.common.util.geo.PerimeterDefinition;
import org.thingsboard.rule.engine.geo.EntityGeofencingState;
import org.thingsboard.rule.engine.util.GpsGeofencingEvents;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneProto;
import java.util.UUID;
@Data
public class GeofencingZoneState {
private final EntityId zoneId;
private long ts;
private Long version;
private PerimeterDefinition perimeterDefinition;
private EntityGeofencingState state;
public GeofencingZoneState(EntityId zoneId, KvEntry entry) {
this.zoneId = zoneId;
if (entry instanceof TsKvEntry tsKvEntry) {
this.ts = tsKvEntry.getTs();
this.version = tsKvEntry.getVersion();
} else if (entry instanceof AttributeKvEntry attributeKvEntry) {
this.ts = attributeKvEntry.getLastUpdateTs();
this.version = attributeKvEntry.getVersion();
}
this.perimeterDefinition = JacksonUtil.fromString(entry.getJsonValue().orElseThrow(), PerimeterDefinition.class);
}
public GeofencingZoneState(GeofencingZoneProto proto) {
this.zoneId = EntityIdFactory.getByTypeAndUuid(proto.getZoneId().getType(),
new UUID(proto.getZoneId().getZoneIdMSB(), proto.getZoneId().getZoneIdLSB()));
this.ts = proto.getTs();
this.version = proto.getVersion();
this.perimeterDefinition = JacksonUtil.fromString(proto.getPerimeterDefinition(), PerimeterDefinition.class);
this.state = new EntityGeofencingState(proto.getInside(), proto.getStateSwitchTime(), proto.getStayed());
}
public boolean update(GeofencingZoneState newZoneState) {
if (newZoneState.getTs() <= this.ts) {
return false;
}
Long newVersion = newZoneState.getVersion();
if (newVersion == null || this.version == null || newVersion > this.version) {
this.ts = newZoneState.getTs();
this.version = newVersion;
this.perimeterDefinition = newZoneState.getPerimeterDefinition();
// TODO: should we reinitialize state if zone changed?
return true;
}
return false;
}
public String evaluate(Coordinates entityCoordinates, long currentTs) {
boolean inside = perimeterDefinition.checkMatches(entityCoordinates);
if (state == null) {
state = new EntityGeofencingState(inside, ts, false);
}
if (state.getStateSwitchTime() == 0L || state.isInside() != inside) {
state.setInside(inside);
state.setStateSwitchTime(currentTs);
state.setStayed(false);
return inside ? GpsGeofencingEvents.ENTERED : GpsGeofencingEvents.LEFT;
}
return inside ? GpsGeofencingEvents.INSIDE : GpsGeofencingEvents.OUTSIDE;
}
}

67
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java

@ -15,6 +15,8 @@
*/
package org.thingsboard.server.utils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.geo.EntityGeofencingState;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
@ -26,21 +28,30 @@ import org.thingsboard.server.common.util.KvProtoUtil;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.GeofencingArgumentProto;
import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneProto;
import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsDoubleValProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsRollingArgumentProto;
import org.thingsboard.server.gen.transport.TransportProtos.TsValueProto;
import org.thingsboard.server.service.cf.ctx.CalculatedFieldEntityCtxId;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.GeofencingArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.GeofencingCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.GeofencingZoneState;
import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.TsRollingArgumentEntry;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CalculatedFieldUtils {
@ -80,6 +91,8 @@ public class CalculatedFieldUtils {
builder.addSingleValueArguments(toSingleValueArgumentProto(argName, singleValueArgumentEntry));
} else if (argEntry instanceof TsRollingArgumentEntry rollingArgumentEntry) {
builder.addRollingValueArguments(toRollingArgumentProto(argName, rollingArgumentEntry));
} else if (argEntry instanceof GeofencingArgumentEntry geofencingArgumentEntry) {
builder.addGeofencingArguments(toGeofencingArgumentProto(argName, geofencingArgumentEntry));
}
});
return builder.build();
@ -109,6 +122,42 @@ public class CalculatedFieldUtils {
return builder.build();
}
private static GeofencingArgumentProto toGeofencingArgumentProto(String argName, GeofencingArgumentEntry geofencingArgumentEntry) {
GeofencingArgumentProto.Builder builder = GeofencingArgumentProto.newBuilder()
.setArgName(argName);
Map<EntityId, GeofencingZoneState> zoneStates = geofencingArgumentEntry.getZoneStates();
zoneStates.forEach((entityId, zoneState) -> {
builder.addZones(toGeofencingZoneProto(entityId, zoneState));
});
return builder.build();
}
private static GeofencingZoneProto toGeofencingZoneProto(EntityId entityId, GeofencingZoneState zoneState) {
GeofencingZoneProto.Builder builder = GeofencingZoneProto.newBuilder()
.setZoneId(toGeofencingZoneIdProto(entityId))
.setTs(zoneState.getTs())
.setVersion(zoneState.getVersion())
.setPerimeterDefinition(JacksonUtil.toString(zoneState.getPerimeterDefinition()));
if (zoneState.getState() != null) {
EntityGeofencingState state = zoneState.getState();
builder.setInside(state.isInside())
.setStayed(state.isStayed())
.setStateSwitchTime(state.getStateSwitchTime());
}
return builder.build();
}
private static GeofencingZoneIdProto toGeofencingZoneIdProto(EntityId zoneId) {
return GeofencingZoneIdProto.newBuilder()
.setType(zoneId.getEntityType().name())
.setZoneIdLSB(zoneId.getId().getLeastSignificantBits())
.setZoneIdMSB(zoneId.getId().getMostSignificantBits())
.build();
}
public static CalculatedFieldState fromProto(CalculatedFieldStateProto proto) {
if (StringUtils.isEmpty(proto.getType())) {
return null;
@ -122,8 +171,6 @@ public class CalculatedFieldUtils {
case GEOFENCING -> new GeofencingCalculatedFieldState();
};
// TODO: add logic to restore geofencing state from proto
proto.getSingleValueArgumentsList().forEach(argProto ->
state.getArguments().put(argProto.getArgName(), fromSingleValueArgumentProto(argProto)));
@ -132,6 +179,11 @@ public class CalculatedFieldUtils {
state.getArguments().put(argProto.getKey(), fromRollingArgumentProto(argProto)));
}
if (CalculatedFieldType.GEOFENCING.equals(type)) {
proto.getGeofencingArgumentsList().forEach(argProto ->
state.getArguments().put(argProto.getArgName(), fromGeofencingArgumentProto(argProto)));
}
return state;
}
@ -153,4 +205,15 @@ public class CalculatedFieldUtils {
return new TsRollingArgumentEntry(tsRecords, proto.getLimit(), proto.getTimeWindow());
}
private static ArgumentEntry fromGeofencingArgumentProto(GeofencingArgumentProto proto) {
Map<EntityId, GeofencingZoneState> zoneStates = proto.getZonesList()
.stream()
.map(GeofencingZoneState::new)
.collect(Collectors.toMap(GeofencingZoneState::getZoneId, Function.identity()));
GeofencingArgumentEntry geofencingArgumentEntry = new GeofencingArgumentEntry();
geofencingArgumentEntry.setZoneStates(zoneStates);
return geofencingArgumentEntry;
}
}

1
application/src/main/resources/logback.xml

@ -56,6 +56,7 @@
<!-- Device actor message processor debug -->
<!-- <logger name="org.thingsboard.server.actors.device.DeviceActorMessageProcessor" level="DEBUG" />-->
<logger name="org.thingsboard.server.actors.calculatedField" level="TRACE" />
<logger name="com.microsoft.azure.servicebus.primitives.CoreMessageReceiver" level="OFF" />
<root level="INFO">

3
common/cluster-api/src/main/java/org/thingsboard/server/cluster/TbClusterService.java

@ -38,7 +38,6 @@ import org.thingsboard.server.common.msg.edge.ToEdgeSyncRequest;
import org.thingsboard.server.common.msg.plugin.ComponentLifecycleMsg;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.RestApiCallResponseMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCalculatedFieldNotificationMsg;
@ -81,7 +80,7 @@ public interface TbClusterService extends TbQueueClusterService {
void pushNotificationToTransport(String targetServiceId, ToTransportMsg response, TbQueueCallback callback);
void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, TransportProtos.ToCalculatedFieldMsg msg, TbQueueCallback callback);
void pushMsgToCalculatedFields(TenantId tenantId, EntityId entityId, ToCalculatedFieldMsg msg, TbQueueCallback callback);
void pushMsgToCalculatedFields(TopicPartitionInfo tpi, UUID msgId, ToCalculatedFieldMsg msg, TbQueueCallback callback);

5
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java

@ -58,4 +58,9 @@ public abstract class BaseCalculatedFieldConfiguration implements CalculatedFiel
return link;
}
@Override
public boolean hasDynamicSourceArguments() {
return arguments.values().stream().anyMatch(arg -> arg.getRefDynamicSource() != null);
}
}

12
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java

@ -59,4 +59,16 @@ public interface CalculatedFieldConfiguration {
CalculatedFieldLink buildCalculatedFieldLink(TenantId tenantId, EntityId referencedEntityId, CalculatedFieldId calculatedFieldId);
@JsonIgnore
boolean hasDynamicSourceArguments();
@JsonIgnore
default boolean isDynamicRefreshEnabled() {
return hasDynamicSourceArguments() && getRefreshIntervalSec() > 0;
}
default int getRefreshIntervalSec() {
return 0;
}
}

6
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/GeofencingCalculatedFieldConfiguration.java

@ -23,9 +23,15 @@ import org.thingsboard.server.common.data.cf.CalculatedFieldType;
@EqualsAndHashCode(callSuper = true)
public class GeofencingCalculatedFieldConfiguration extends BaseCalculatedFieldConfiguration implements CalculatedFieldConfiguration {
private int refreshIntervalSec;
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.GEOFENCING;
}
public boolean isDynamicRefreshEnabled() {
return refreshIntervalSec > 0;
}
}

5
common/message/src/main/java/org/thingsboard/server/common/msg/MsgType.java

@ -150,7 +150,10 @@ public enum MsgType {
/* CF Manager Actor -> CF Entity actor */
CF_ENTITY_TELEMETRY_MSG,
CF_ENTITY_INIT_CF_MSG,
CF_ENTITY_DELETE_MSG;
CF_ENTITY_DELETE_MSG,
CF_SCHEDULED_CHECK_FOR_UPDATES_MSG,
CF_ENTITY_CHECK_FOR_UPDATES_MSG;
@Getter
private final boolean ignoreOnStart;

22
common/proto/src/main/proto/queue.proto

@ -894,11 +894,33 @@ message TsRollingArgumentProto {
repeated TsDoubleValProto tsValue = 4;
}
message GeofencingZoneIdProto {
string type = 1;
int64 zoneIdMSB = 2;
int64 zoneIdLSB = 3;
}
message GeofencingZoneProto {
GeofencingZoneIdProto zoneId = 1;
int64 ts = 2;
string perimeterDefinition = 3;
int64 version = 4;
bool inside = 5;
int64 stateSwitchTime = 6;
bool stayed = 7;
}
message GeofencingArgumentProto {
string argName = 1; // e.g., "restrictedZones" or "saveZones"
repeated GeofencingZoneProto zones = 2;
}
message CalculatedFieldStateProto {
CalculatedFieldEntityCtxIdProto id = 1;
string type = 2;
repeated SingleValueArgumentProto singleValueArguments = 3;
repeated TsRollingArgumentProto rollingValueArguments = 4;
repeated GeofencingArgumentProto geofencingArguments = 5;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.

3
common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java

@ -26,7 +26,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
)
@JsonSubTypes({
@JsonSubTypes.Type(value = TbelCfSingleValueArg.class, name = "SINGLE_VALUE"),
@JsonSubTypes.Type(value = TbelCfTsRollingArg.class, name = "TS_ROLLING")
@JsonSubTypes.Type(value = TbelCfTsRollingArg.class, name = "TS_ROLLING"),
@JsonSubTypes.Type(value = TbelCfTsGeofencingArg.class, name = "GEOFENCING_CF_ARGUMENT_VALUE"),
})
public interface TbelCfArg extends TbelCfObject {

21
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldInitService.java → common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java

@ -13,7 +13,24 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf;
package org.thingsboard.script.api.tbel;
// TODO: should I add any specific logic for this?
public class TbelCfTsGeofencingArg implements TbelCfArg {
public TbelCfTsGeofencingArg() {
}
@Override
public String getType() {
return "GEOFENCING_CF_ARGUMENT_VALUE";
}
@Override
public long memorySize() {
return OBJ_SIZE;
}
public interface CalculatedFieldInitService {
}

1
common/util/src/main/java/org/thingsboard/common/util/geo/PerimeterDefinition.java

@ -35,5 +35,6 @@ public interface PerimeterDefinition extends Serializable {
@JsonIgnore
PerimeterType getType();
@JsonIgnore
boolean checkMatches(Coordinates entityCoordinates);
}

Loading…
Cancel
Save