Browse Source

propagation cf init commit

pull/14107/head
dshvaika 8 months ago
parent
commit
ce3b89046b
  1. 14
      application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java
  2. 34
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java
  3. 41
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java
  4. 49
      application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java
  5. 8
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java
  6. 2
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java
  7. 39
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java
  8. 4
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingArgumentEntry.java
  9. 4
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java
  10. 72
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java
  11. 59
      application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java
  12. 2
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java
  13. 25
      application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java
  14. 3
      common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldType.java
  15. 10
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java
  16. 3
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java
  17. 66
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PropagationCalculatedFieldConfiguration.java
  18. 1
      common/proto/src/main/proto/queue.proto
  19. 3
      common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java
  20. 4
      common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfGeofencingArg.java
  21. 42
      common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfPropagationArg.java

14
application/src/main/java/org/thingsboard/server/actors/calculatedField/CalculatedFieldEntityMessageProcessor.java

@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
@ -319,13 +320,15 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
if (state == null) {
state = createState(ctx);
justRestored = true;
} else if (ctx.shouldFetchDynamicArgumentsFromDb(state)) {
} else if (ctx.shouldFetchRelationQueryDynamicArgumentsFromDb(state)) {
log.debug("[{}][{}] Going to update dynamic arguments for CF.", entityId, ctx.getCfId());
try {
Map<String, ArgumentEntry> dynamicArgsFromDb = cfService.fetchDynamicArgsFromDb(ctx, entityId);
dynamicArgsFromDb.forEach(newArgValues::putIfAbsent);
var geofencingState = (GeofencingCalculatedFieldState) state;
geofencingState.setLastDynamicArgumentsRefreshTs(System.currentTimeMillis());
if (ctx.getCfType() == CalculatedFieldType.GEOFENCING) {
var geofencingState = (GeofencingCalculatedFieldState) state;
geofencingState.updateLastDynamicArgumentsRefreshTs();
}
} catch (Exception e) {
throw CalculatedFieldException.builder().ctx(ctx).eventEntity(entityId).cause(e).build();
}
@ -353,6 +356,11 @@ public class CalculatedFieldEntityMessageProcessor extends AbstractContextAwareM
private void initState(CalculatedFieldState state, CalculatedFieldCtx ctx) {
state.init(ctx);
if (ctx.getCfType() == CalculatedFieldType.GEOFENCING && ctx.isRelationQueryDynamicArguments()) {
GeofencingCalculatedFieldState geofencingState = (GeofencingCalculatedFieldState) state;
geofencingState.updateLastDynamicArgumentsRefreshTs();
}
Map<String, ArgumentEntry> arguments = fetchArguments(ctx);
state.update(arguments, ctx);

34
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

@ -52,6 +52,8 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.cf.CalculatedFieldType.PROPAGATION;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
import static org.thingsboard.server.common.data.cf.configuration.geofencing.EntityCoordinates.ENTITY_ID_LATITUDE_ARGUMENT_KEY;
import static org.thingsboard.server.common.data.cf.configuration.geofencing.EntityCoordinates.ENTITY_ID_LONGITUDE_ARGUMENT_KEY;
import static org.thingsboard.server.utils.CalculatedFieldArgumentUtils.createDefaultAttributeEntry;
@ -88,21 +90,26 @@ public abstract class AbstractCalculatedFieldProcessingService {
protected ListenableFuture<Map<String, ArgumentEntry>> fetchArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) {
Map<String, ListenableFuture<ArgumentEntry>> argFutures = switch (ctx.getCalculatedField().getType()) {
case GEOFENCING -> fetchGeofencingCalculatedFieldArguments(ctx, entityId, false, ts);
case SIMPLE, SCRIPT, ALARM -> {
Map<String, ListenableFuture<ArgumentEntry>> futures = new HashMap<>();
for (var entry : ctx.getArguments().entrySet()) {
var argEntityId = resolveEntityId(ctx.getTenantId(), entityId, entry.getValue());
var argValueFuture = fetchArgumentValue(ctx.getTenantId(), argEntityId, entry.getValue(), ts);
futures.put(entry.getKey(), argValueFuture);
}
yield futures;
}
case SIMPLE, SCRIPT, ALARM, PROPAGATION -> getBaseCalculatedFieldArguments(ctx, entityId, ts);
};
if (ctx.getCfType() == PROPAGATION) {
argFutures.put(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId));
}
return Futures.whenAllComplete(argFutures.values())
.call(() -> resolveArgumentFutures(argFutures),
MoreExecutors.directExecutor());
}
private Map<String, ListenableFuture<ArgumentEntry>> getBaseCalculatedFieldArguments(CalculatedFieldCtx ctx, EntityId entityId, long ts) {
Map<String, ListenableFuture<ArgumentEntry>> futures = new HashMap<>();
for (var entry : ctx.getArguments().entrySet()) {
var argEntityId = resolveEntityId(ctx.getTenantId(), entityId, entry.getValue());
var argValueFuture = fetchArgumentValue(ctx.getTenantId(), argEntityId, entry.getValue(), ts);
futures.put(entry.getKey(), argValueFuture);
}
return futures;
}
protected EntityId resolveEntityId(TenantId tenantId, EntityId entityId, Argument argument) {
if (argument.getRefEntityId() != null) {
return argument.getRefEntityId();
@ -130,6 +137,11 @@ public abstract class AbstractCalculatedFieldProcessingService {
));
}
protected ListenableFuture<ArgumentEntry> fetchPropagationCalculatedFieldArgument(CalculatedFieldCtx ctx, EntityId entityId) {
ListenableFuture<List<EntityId>> propagationEntityIds = fromDynamicSource(ctx.getTenantId(), entityId, ctx.getPropagationArgument());
return Futures.transform(propagationEntityIds, ArgumentEntry::createPropagationArgument, MoreExecutors.directExecutor());
}
protected Map<String, ListenableFuture<ArgumentEntry>> fetchGeofencingCalculatedFieldArguments(CalculatedFieldCtx ctx, EntityId entityId, boolean dynamicArgumentsOnly, long startTs) {
Map<String, ListenableFuture<ArgumentEntry>> argFutures = new HashMap<>();
Set<Map.Entry<String, Argument>> entries = ctx.getArguments().entrySet();
@ -160,6 +172,10 @@ public abstract class AbstractCalculatedFieldProcessingService {
if (!value.hasDynamicSource()) {
return Futures.immediateFuture(List.of(entityId));
}
return fromDynamicSource(tenantId, entityId, value);
}
private ListenableFuture<List<EntityId>> fromDynamicSource(TenantId tenantId, EntityId entityId, Argument value) {
var refDynamicSourceConfiguration = value.getRefDynamicSourceConfiguration();
return switch (refDynamicSourceConfiguration.getType()) {
case CURRENT_OWNER -> Futures.immediateFuture(List.of(resolveOwnerArgument(tenantId, entityId)));

41
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java

@ -23,7 +23,6 @@ import org.thingsboard.server.actors.calculatedField.MultipleTbCallback;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
@ -50,11 +49,13 @@ import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
import static org.thingsboard.server.utils.CalculatedFieldUtils.toProto;
@TbRuleEngineComponent
@ -89,11 +90,11 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF
@Override
public Map<String, ArgumentEntry> fetchDynamicArgsFromDb(CalculatedFieldCtx ctx, EntityId entityId) {
// only scheduledSupported CF instances supports dynamic arguments scheduled updates
if (!ctx.getCalculatedField().getType().equals(CalculatedFieldType.GEOFENCING)) {
return Map.of();
}
return resolveArgumentFutures(fetchGeofencingCalculatedFieldArguments(ctx, entityId, true, System.currentTimeMillis()));
return switch (ctx.getCfType()) {
case GEOFENCING -> resolveArgumentFutures(fetchGeofencingCalculatedFieldArguments(ctx, entityId, true, System.currentTimeMillis()));
case PROPAGATION -> resolveArgumentFutures(Map.of(PROPAGATION_CONFIG_ARGUMENT, fetchPropagationCalculatedFieldArgument(ctx, entityId)));
default -> Collections.emptyMap();
};
}
@Override
@ -112,13 +113,35 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF
@Override
public void pushMsgToRuleEngine(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List<CalculatedFieldId> cfIds, TbCallback callback) {
try {
if (!(result instanceof PropagationCalculatedFieldResult propagationCalculatedFieldResult)) {
TbMsg msg = result.toTbMsg(entityId, cfIds);
sendMsgToRuleEngine(tenantId, entityId, callback, msg);
return;
}
List<EntityId> propagationEntityIds = propagationCalculatedFieldResult.getPropagationEntityIds();
if (propagationEntityIds.isEmpty()) {
callback.onSuccess();
}
if (propagationEntityIds.size() == 1) {
EntityId propagationEntityId = propagationEntityIds.get(0);
TbMsg msg = result.toTbMsg(propagationEntityId, cfIds);
sendMsgToRuleEngine(tenantId, propagationEntityId, callback, msg);
return;
}
MultipleTbCallback multipleTbCallback = new MultipleTbCallback(propagationEntityIds.size(), callback);
for (var propagationEntityId : propagationEntityIds) {
TbMsg msg = result.toTbMsg(propagationEntityId, cfIds);
sendMsgToRuleEngine(tenantId, propagationEntityId, multipleTbCallback, msg);
}
}
private void sendMsgToRuleEngine(TenantId tenantId, EntityId entityId, TbCallback callback, TbMsg msg) {
try {
clusterService.pushMsgToRuleEngine(tenantId, entityId, msg, new TbQueueCallback() {
@Override
public void onSuccess(TbQueueMsgMetadata metadata) {
callback.onSuccess();
log.trace("[{}][{}] Pushed message to rule engine: {} ", tenantId, entityId, msg);
callback.onSuccess();
}
@Override
@ -127,7 +150,7 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF
}
});
} catch (Exception e) {
log.warn("[{}][{}] Failed to push message to rule engine. CalculatedFieldResult: {}", tenantId, entityId, result, e);
log.warn("[{}][{}] Failed to push message to rule engine: {}", tenantId, entityId, msg, e);
callback.onFailure(e);
}
}

49
application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java

@ -0,0 +1,49 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.List;
@Data
@Builder
public final class PropagationCalculatedFieldResult implements CalculatedFieldResult {
private final List<EntityId> propagationEntityIds;
private final TelemetryCalculatedFieldResult result;
@Override
public TbMsg toTbMsg(EntityId entityId, List<CalculatedFieldId> cfIds) {
return result.toTbMsg(entityId, cfIds);
}
@Override
public String stringValue() {
return result.stringValue();
}
@Override
public boolean isEmpty() {
return CollectionsUtil.isEmpty(propagationEntityIds) || result.isEmpty();
}
}

8
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntry.java

@ -23,6 +23,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry;
import java.util.List;
import java.util.Map;
@ -35,7 +36,8 @@ import java.util.Map;
@JsonSubTypes({
@JsonSubTypes.Type(value = SingleValueArgumentEntry.class, name = "SINGLE_VALUE"),
@JsonSubTypes.Type(value = TsRollingArgumentEntry.class, name = "TS_ROLLING"),
@JsonSubTypes.Type(value = GeofencingArgumentEntry.class, name = "GEOFENCING")
@JsonSubTypes.Type(value = GeofencingArgumentEntry.class, name = "GEOFENCING"),
@JsonSubTypes.Type(value = PropagationArgumentEntry.class, name = "PROPAGATION")
})
public interface ArgumentEntry {
@ -66,4 +68,8 @@ public interface ArgumentEntry {
return new GeofencingArgumentEntry(entityIdkvEntryMap);
}
static ArgumentEntry createPropagationArgument(List<EntityId> entityIds) {
return new PropagationArgumentEntry(entityIds);
}
}

2
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/ArgumentEntryType.java

@ -16,5 +16,5 @@
package org.thingsboard.server.service.cf.ctx.state;
public enum ArgumentEntryType {
SINGLE_VALUE, TS_ROLLING, GEOFENCING
SINGLE_VALUE, TS_ROLLING, GEOFENCING, PROPAGATION
}

39
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/CalculatedFieldCtx.java

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.ArgumentsBasedCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.ExpressionBasedCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.Output;
import org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.cf.configuration.ScheduledUpdateSupportedCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
@ -101,6 +102,8 @@ public class CalculatedFieldCtx {
private long scheduledUpdateIntervalMillis;
private Argument propagationArgument;
public CalculatedFieldCtx(CalculatedField calculatedField,
ActorSystemContext systemContext) {
this.calculatedField = calculatedField;
@ -154,6 +157,10 @@ public class CalculatedFieldCtx {
}
});
}
if (calculatedField.getConfiguration() instanceof PropagationCalculatedFieldConfiguration propagationConfig) {
propagationArgument = propagationConfig.toPropagationArgument();
relationQueryDynamicArguments = true;
}
}
if (calculatedField.getConfiguration() instanceof ScheduledUpdateSupportedCalculatedFieldConfiguration scheduledConfig) {
this.scheduledUpdateIntervalMillis = scheduledConfig.isScheduledUpdateEnabled() ? TimeUnit.SECONDS.toMillis(scheduledConfig.getScheduledUpdateInterval()) : -1L;
@ -170,7 +177,7 @@ public class CalculatedFieldCtx {
public void init() {
switch (cfType) {
case SCRIPT -> {
case SCRIPT, PROPAGATION -> {
try {
initTbelExpression(expression);
initialized = true;
@ -512,21 +519,29 @@ public class CalculatedFieldCtx {
return false;
}
public boolean hasRelationQueryDynamicArguments() {
return relationQueryDynamicArguments && scheduledUpdateIntervalMillis != -1;
private boolean isScheduledUpdateEnabled() {
return scheduledUpdateIntervalMillis != -1;
}
public boolean shouldFetchDynamicArgumentsFromDb(CalculatedFieldState state) {
if (!hasRelationQueryDynamicArguments()) {
public boolean shouldFetchRelationQueryDynamicArgumentsFromDb(CalculatedFieldState state) {
if (!relationQueryDynamicArguments) {
return false;
}
if (!(state instanceof GeofencingCalculatedFieldState geofencingState)) {
return false;
}
if (geofencingState.getLastDynamicArgumentsRefreshTs() == -1L) {
return true;
}
return geofencingState.getLastDynamicArgumentsRefreshTs() < System.currentTimeMillis() - scheduledUpdateIntervalMillis;
return switch (cfType) {
case PROPAGATION -> true;
case GEOFENCING -> {
if (!isScheduledUpdateEnabled()) {
yield false;
}
var geofencingState = (GeofencingCalculatedFieldState) state;
if (geofencingState.getLastDynamicArgumentsRefreshTs() == -1L) {
yield true;
}
yield geofencingState.getLastDynamicArgumentsRefreshTs() <
System.currentTimeMillis() - scheduledUpdateIntervalMillis;
}
default -> false;
};
}
public void stop() {

4
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingArgumentEntry.java

@ -18,7 +18,7 @@ package org.thingsboard.server.service.cf.ctx.state.geofencing;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.script.api.tbel.TbelCfArg;
import org.thingsboard.script.api.tbel.TbelCfTsGeofencingArg;
import org.thingsboard.script.api.tbel.TbelCfGeofencingArg;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.util.ProtoUtils;
@ -83,7 +83,7 @@ public class GeofencingArgumentEntry implements ArgumentEntry {
@Override
public TbelCfArg toTbelCfArg() {
return new TbelCfTsGeofencingArg(zoneStates);
return new TbelCfGeofencingArg(zoneStates);
}
private Map<EntityId, GeofencingZoneState> toZones(Map<EntityId, KvEntry> entityIdKvEntryMap) {

4
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/geofencing/GeofencingCalculatedFieldState.java

@ -146,6 +146,10 @@ public class GeofencingCalculatedFieldState extends BaseCalculatedFieldState {
lastDynamicArgumentsRefreshTs = -1;
}
public void updateLastDynamicArgumentsRefreshTs() {
lastDynamicArgumentsRefreshTs = System.currentTimeMillis();
}
private Map<String, GeofencingArgumentEntry> getGeofencingArguments() {
return arguments.entrySet()
.stream()

72
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationArgumentEntry.java

@ -0,0 +1,72 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state.propagation;
import lombok.Data;
import org.thingsboard.script.api.tbel.TbelCfArg;
import org.thingsboard.script.api.tbel.TbelCfPropagationArg;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.util.CollectionsUtil;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntryType;
import java.util.List;
@Data
public class PropagationArgumentEntry implements ArgumentEntry {
private List<EntityId> propagationEntityIds;
private boolean forceResetPrevious;
public PropagationArgumentEntry(List<EntityId> propagationEntityIds) {
this.propagationEntityIds = propagationEntityIds;
}
@Override
public ArgumentEntryType getType() {
return ArgumentEntryType.PROPAGATION;
}
@Override
public Object getValue() {
return propagationEntityIds;
}
@Override
public boolean updateEntry(ArgumentEntry entry) {
if (!(entry instanceof PropagationArgumentEntry propagationArgumentEntry)) {
throw new IllegalArgumentException("Unsupported argument entry type for propagation argument entry: " + entry.getType());
}
if (propagationArgumentEntry.isEmpty()) {
propagationEntityIds.clear();
} else {
propagationEntityIds = propagationArgumentEntry.getPropagationEntityIds();
}
return true;
}
@Override
public boolean isEmpty() {
return CollectionsUtil.isEmpty(propagationEntityIds);
}
@Override
public TbelCfArg toTbelCfArg() {
return new TbelCfPropagationArg(propagationEntityIds);
}
}

59
application/src/main/java/org/thingsboard/server/service/cf/ctx/state/propagation/PropagationCalculatedFieldState.java

@ -0,0 +1,59 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.cf.ctx.state.propagation;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.service.cf.CalculatedFieldResult;
import org.thingsboard.server.service.cf.PropagationCalculatedFieldResult;
import org.thingsboard.server.service.cf.TelemetryCalculatedFieldResult;
import org.thingsboard.server.service.cf.ctx.state.ArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.ScriptCalculatedFieldState;
import java.util.Map;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
public class PropagationCalculatedFieldState extends ScriptCalculatedFieldState {
public PropagationCalculatedFieldState(EntityId entityId) {
super(entityId);
}
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.PROPAGATION;
}
@Override
public ListenableFuture<CalculatedFieldResult> performCalculation(Map<String, ArgumentEntry> updatedArgs, CalculatedFieldCtx ctx) {
ArgumentEntry argumentEntry = arguments.get(PROPAGATION_CONFIG_ARGUMENT);
if (!(argumentEntry instanceof PropagationArgumentEntry propagationArgumentEntry) || propagationArgumentEntry.isEmpty()) {
return Futures.immediateFuture(PropagationCalculatedFieldResult.builder().build());
}
return Futures.transform(super.performCalculation(updatedArgs, ctx), telemetryCfResult ->
PropagationCalculatedFieldResult.builder()
.propagationEntityIds(propagationArgumentEntry.getPropagationEntityIds())
.result((TelemetryCalculatedFieldResult) telemetryCfResult)
.build(),
MoreExecutors.directExecutor());
}
}

2
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldArgumentUtils.java

@ -36,6 +36,7 @@ import org.thingsboard.server.service.cf.ctx.state.SimpleCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalculatedFieldState;
import java.util.Optional;
@ -79,6 +80,7 @@ public class CalculatedFieldArgumentUtils {
case SCRIPT -> new ScriptCalculatedFieldState(entityId);
case GEOFENCING -> new GeofencingCalculatedFieldState(entityId);
case ALARM -> new AlarmCalculatedFieldState(entityId);
case PROPAGATION -> new PropagationCalculatedFieldState(entityId);
};
}

25
application/src/main/java/org/thingsboard/server/utils/CalculatedFieldUtils.java

@ -32,6 +32,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.AlarmStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldEntityCtxIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.CalculatedFieldStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.EntityIdProto;
import org.thingsboard.server.gen.transport.TransportProtos.GeofencingArgumentProto;
import org.thingsboard.server.gen.transport.TransportProtos.GeofencingZoneProto;
import org.thingsboard.server.gen.transport.TransportProtos.SingleValueArgumentProto;
@ -50,7 +51,10 @@ import org.thingsboard.server.service.cf.ctx.state.alarm.AlarmRuleState;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingCalculatedFieldState;
import org.thingsboard.server.service.cf.ctx.state.geofencing.GeofencingZoneState;
import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationArgumentEntry;
import org.thingsboard.server.service.cf.ctx.state.propagation.PropagationCalculatedFieldState;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
@ -58,6 +62,8 @@ import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
public class CalculatedFieldUtils {
public static CalculatedFieldIdProto toProto(CalculatedFieldId cfId) {
@ -92,12 +98,11 @@ public class CalculatedFieldUtils {
.setType(state.getType().name());
state.getArguments().forEach((argName, argEntry) -> {
if (argEntry instanceof SingleValueArgumentEntry singleValueArgumentEntry) {
builder.addSingleValueArguments(toSingleValueArgumentProto(argName, singleValueArgumentEntry));
} else if (argEntry instanceof TsRollingArgumentEntry rollingArgumentEntry) {
builder.addRollingValueArguments(toRollingArgumentProto(argName, rollingArgumentEntry));
} else if (argEntry instanceof GeofencingArgumentEntry geofencingArgumentEntry) {
builder.addGeofencingArguments(toGeofencingArgumentProto(argName, geofencingArgumentEntry));
switch (argEntry.getType()) {
case SINGLE_VALUE -> builder.addSingleValueArguments(toSingleValueArgumentProto(argName, (SingleValueArgumentEntry) argEntry));
case TS_ROLLING -> builder.addRollingValueArguments(toRollingArgumentProto(argName, (TsRollingArgumentEntry) argEntry));
case GEOFENCING -> builder.addGeofencingArguments(toGeofencingArgumentProto(argName, (GeofencingArgumentEntry) argEntry));
case PROPAGATION -> builder.addAllPropagationEntityIds(toPropagationEntityIdsProto((PropagationArgumentEntry) argEntry));
}
});
if (state instanceof AlarmCalculatedFieldState alarmState) {
@ -112,6 +117,10 @@ public class CalculatedFieldUtils {
return builder.build();
}
private static List<EntityIdProto> toPropagationEntityIdsProto(PropagationArgumentEntry argEntry) {
return argEntry.getPropagationEntityIds().stream().map(ProtoUtils::toProto).collect(Collectors.toList());
}
private static AlarmRuleStateProto toAlarmRuleStateProto(AlarmRuleState ruleState) {
return AlarmRuleStateProto.newBuilder()
.setSeverity(Optional.ofNullable(ruleState.getSeverity()).map(Enum::name).orElse(""))
@ -178,11 +187,15 @@ public class CalculatedFieldUtils {
case SCRIPT -> new ScriptCalculatedFieldState(id.entityId());
case GEOFENCING -> new GeofencingCalculatedFieldState(id.entityId());
case ALARM -> new AlarmCalculatedFieldState(id.entityId());
case PROPAGATION -> new PropagationCalculatedFieldState(id.entityId());
};
proto.getSingleValueArgumentsList().forEach(argProto ->
state.getArguments().put(argProto.getArgName(), fromSingleValueArgumentProto(argProto)));
List<EntityId> propagationEntityIds = proto.getPropagationEntityIdsList().stream().map(ProtoUtils::fromProto).toList();
state.getArguments().put(PROPAGATION_CONFIG_ARGUMENT, new PropagationArgumentEntry(propagationEntityIds));
switch (type) {
case SCRIPT -> {
proto.getRollingValueArgumentsList().forEach(argProto ->

3
common/data/src/main/java/org/thingsboard/server/common/data/cf/CalculatedFieldType.java

@ -19,5 +19,6 @@ public enum CalculatedFieldType {
SIMPLE,
SCRIPT,
GEOFENCING,
ALARM
ALARM,
PROPAGATION
}

10
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/BaseCalculatedFieldConfiguration.java

@ -28,12 +28,16 @@ public abstract class BaseCalculatedFieldConfiguration implements ExpressionBase
@Override
public void validate() {
baseCalculatedFieldRestriction();
if (arguments.values().stream().anyMatch(Argument::hasRelationQuerySource)) {
throw new IllegalArgumentException("Calculated field with type: '" + getType() + "' doesn't support relation query configuration!");
}
}
protected void baseCalculatedFieldRestriction() {
if (arguments.containsKey("ctx")) {
throw new IllegalArgumentException("Argument name 'ctx' is reserved and cannot be used.");
}
if (arguments.values().stream().anyMatch(Argument::hasRelationQuerySource)) {
throw new IllegalArgumentException("Calculated field with type: '" + getType() + "' doesn't support relation query source configuration!");
}
}
}

3
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/CalculatedFieldConfiguration.java

@ -40,7 +40,8 @@ import java.util.stream.Collectors;
@Type(value = SimpleCalculatedFieldConfiguration.class, name = "SIMPLE"),
@Type(value = ScriptCalculatedFieldConfiguration.class, name = "SCRIPT"),
@Type(value = GeofencingCalculatedFieldConfiguration.class, name = "GEOFENCING"),
@Type(value = AlarmCalculatedFieldConfiguration.class, name = "ALARM")
@Type(value = AlarmCalculatedFieldConfiguration.class, name = "ALARM"),
@Type(value = PropagationCalculatedFieldConfiguration.class, name = "PROPAGATION")
})
@JsonIgnoreProperties(ignoreUnknown = true)
public interface CalculatedFieldConfiguration {

66
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PropagationCalculatedFieldConfiguration.java

@ -0,0 +1,66 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.cf.configuration;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.cf.CalculatedFieldType;
import org.thingsboard.server.common.data.relation.EntitySearchDirection;
import org.thingsboard.server.common.data.relation.RelationPathLevel;
import java.util.List;
@Data
@EqualsAndHashCode(callSuper = true)
public class PropagationCalculatedFieldConfiguration extends BaseCalculatedFieldConfiguration {
public static final String PROPAGATION_CONFIG_ARGUMENT = "propagationCtx";
private EntitySearchDirection direction;
private String relationType;
@Override
public CalculatedFieldType getType() {
return CalculatedFieldType.PROPAGATION;
}
@Override
public void validate() {
baseCalculatedFieldRestriction();
propagationRestriction();
if (direction == null) {
throw new IllegalArgumentException("Propagation calculated field direction must be specified!");
}
if (StringUtils.isBlank(relationType)) {
throw new IllegalArgumentException("Propagation calculated field relation type must be specified!");
}
}
public Argument toPropagationArgument() {
var refDynamicSourceConfiguration = new RelationPathQueryDynamicSourceConfiguration();
refDynamicSourceConfiguration.setLevels(List.of(new RelationPathLevel(direction, relationType)));
var propagationArgument = new Argument();
propagationArgument.setRefDynamicSourceConfiguration(refDynamicSourceConfiguration);
return propagationArgument;
}
private void propagationRestriction() {
if (arguments.entrySet().stream().anyMatch(entry -> entry.getKey().equals(PROPAGATION_CONFIG_ARGUMENT))) {
throw new IllegalArgumentException("Argument name '" + PROPAGATION_CONFIG_ARGUMENT + "' is reserved and cannot be used.");
}
}
}

1
common/proto/src/main/proto/queue.proto

@ -922,6 +922,7 @@ message CalculatedFieldStateProto {
repeated TsRollingArgumentProto rollingValueArguments = 4;
repeated GeofencingArgumentProto geofencingArguments = 5;
AlarmStateProto alarmState = 6;
repeated EntityIdProto propagationEntityIds = 7;
}
//Used to report session state to tb-Service and persist this state in the cache on the tb-Service level.

3
common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfArg.java

@ -27,7 +27,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes({
@JsonSubTypes.Type(value = TbelCfSingleValueArg.class, name = "SINGLE_VALUE"),
@JsonSubTypes.Type(value = TbelCfTsRollingArg.class, name = "TS_ROLLING"),
@JsonSubTypes.Type(value = TbelCfTsGeofencingArg.class, name = "GEOFENCING_CF_ARGUMENT_VALUE"),
@JsonSubTypes.Type(value = TbelCfGeofencingArg.class, name = "GEOFENCING_CF_ARGUMENT_VALUE"),
@JsonSubTypes.Type(value = TbelCfPropagationArg.class, name = "PROPAGATION_CF_ARGUMENT_VALUE"),
})
public interface TbelCfArg extends TbelCfObject {

4
common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfTsGeofencingArg.java → common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfGeofencingArg.java

@ -20,12 +20,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class TbelCfTsGeofencingArg implements TbelCfArg {
public class TbelCfGeofencingArg implements TbelCfArg {
private final Object value;
@JsonCreator
public TbelCfTsGeofencingArg(@JsonProperty("value") Object value) {
public TbelCfGeofencingArg(@JsonProperty("value") Object value) {
this.value = value;
}

42
common/script/script-api/src/main/java/org/thingsboard/script/api/tbel/TbelCfPropagationArg.java

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.script.api.tbel;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;
@Data
public class TbelCfPropagationArg implements TbelCfArg {
private final Object value;
@JsonCreator
public TbelCfPropagationArg(@JsonProperty("value") Object value) {
this.value = value;
}
@Override
public String getType() {
return "PROPAGATION_CF_ARGUMENT_VALUE";
}
@Override
public long memorySize() {
return OBJ_SIZE;
}
}
Loading…
Cancel
Save