diff --git a/application/src/main/data/upgrade/basic/schema_update.sql b/application/src/main/data/upgrade/basic/schema_update.sql index cc4fbaad14..565ac50afd 100644 --- a/application/src/main/data/upgrade/basic/schema_update.sql +++ b/application/src/main/data/upgrade/basic/schema_update.sql @@ -80,7 +80,11 @@ SET configuration = jsonb_set( || jsonb_build_object( 'strategy', jsonb_build_object( - 'type', 'PUSH_TO_RULE_ENGINE' + 'type', + CASE (configuration::jsonb -> 'output' ->> 'type') + WHEN 'TIME_SERIES' THEN 'RULE_CHAIN_TIME_SERIES' + WHEN 'ATTRIBUTES' THEN 'RULE_CHAIN_ATTRIBUTES' + END ) ), false diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java index ba3f2d4cc5..5ecbeeb496 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java @@ -36,10 +36,10 @@ import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.cf.CalculatedField; import org.thingsboard.server.common.data.cf.configuration.Argument; import org.thingsboard.server.common.data.cf.configuration.ArgumentType; -import org.thingsboard.server.common.data.cf.configuration.AttributeSkipRuleEngineOutputStrategy; +import org.thingsboard.server.common.data.cf.configuration.AttributeImmediateOutputStrategy; import org.thingsboard.server.common.data.cf.configuration.OutputType; import org.thingsboard.server.common.data.cf.configuration.RelationPathQueryDynamicSourceConfiguration; -import org.thingsboard.server.common.data.cf.configuration.TimeSeriesSkipRuleEngineOutputStrategy; +import org.thingsboard.server.common.data.cf.configuration.TimeSeriesImmediateOutputStrategy; import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; @@ -66,7 +66,6 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx; import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -75,10 +74,11 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import static org.thingsboard.rule.engine.util.TelemetryUtil.filterChangedAttr; +import static org.thingsboard.rule.engine.util.TelemetryUtil.toTsKvEntryList; import static org.thingsboard.server.common.data.cf.CalculatedFieldType.PROPAGATION; import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT; import static org.thingsboard.server.common.data.cf.configuration.geofencing.EntityCoordinates.ENTITY_ID_LATITUDE_ARGUMENT_KEY; @@ -378,60 +378,60 @@ public abstract class AbstractCalculatedFieldProcessingService { } private void saveAttributes(TenantId tenantId, EntityId entityId, TelemetryCalculatedFieldResult cfResult, List cfIds, SettableFuture future) { - if (!(cfResult.getOutputStrategy() instanceof AttributeSkipRuleEngineOutputStrategy outputStrategy)) { + if (!(cfResult.getOutputStrategy() instanceof AttributeImmediateOutputStrategy outputStrategy)) { + future.setException(new IllegalArgumentException("Expected AttributeImmediateOutputStrategy")); return; } JsonElement jsonResult = JsonParser.parseString(Objects.requireNonNull(cfResult.stringValue())); AttributesSaveRequest.Strategy strategy = new Strategy(outputStrategy.isSaveAttribute(), outputStrategy.isSendWsUpdate(), outputStrategy.isProcessCfs()); - List attributeKvEntries = JsonConverter.convertToAttributes(jsonResult); + List newAttributes = JsonConverter.convertToAttributes(jsonResult); if (!outputStrategy.isUpdateAttributesOnlyOnValueChange()) { - tsSubService.saveAttributesInternal(AttributesSaveRequest.builder() - .tenantId(tenantId) - .entityId(entityId) - .entries(attributeKvEntries) - .strategy(strategy) - .previousCalculatedFieldIds(cfIds) - .future(future) - .build() - ); + saveAttributesInternal(tenantId, entityId, cfResult, cfIds, newAttributes, strategy, future); return; } - List keys = attributeKvEntries.stream().map(KvEntry::getKey).collect(Collectors.toList()); - + List keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList()); ListenableFuture> findFuture = attributesService.find(tenantId, entityId, cfResult.getScope(), keys); DonAsynchron.withCallback(findFuture, existingAttributes -> { - List attributesChanged = filterChangedAttr(existingAttributes, attributeKvEntries); - tsSubService.saveAttributesInternal(AttributesSaveRequest.builder() - .tenantId(tenantId) - .entityId(entityId) - .entries(attributesChanged) - .strategy(strategy) - .previousCalculatedFieldIds(cfIds) - .future(future) - .build() - ); + List changed = filterChangedAttr(existingAttributes, newAttributes); + saveAttributesInternal(tenantId, entityId, cfResult, cfIds, changed, strategy, future); }, future::setException, MoreExecutors.directExecutor()); } + private void saveAttributesInternal(TenantId tenantId, EntityId entityId, + TelemetryCalculatedFieldResult cfResult, + List cfIds, + List entries, + AttributesSaveRequest.Strategy strategy, + SettableFuture future) { + tsSubService.saveAttributesInternal(AttributesSaveRequest.builder() + .tenantId(tenantId) + .entityId(entityId) + .scope(cfResult.getScope()) + .entries(entries) + .strategy(strategy) + .previousCalculatedFieldIds(cfIds) + .future(future) + .build()); + } + private void saveTimeSeries(TenantId tenantId, EntityId entityId, TelemetryCalculatedFieldResult cfResult, List cfIds, long ts, SettableFuture future) { - if (!(cfResult.getOutputStrategy() instanceof TimeSeriesSkipRuleEngineOutputStrategy outputStrategy)) { + if (!(cfResult.getOutputStrategy() instanceof TimeSeriesImmediateOutputStrategy outputStrategy)) { + future.setException(new IllegalArgumentException("Expected TimeSeriesImmediateOutputStrategy")); return; } JsonElement jsonResult = JsonParser.parseString(Objects.requireNonNull(cfResult.stringValue())); Map> tsKvMap = JsonConverter.convertToTelemetry(jsonResult, ts); - List tsEntries = new ArrayList<>(); - for (Map.Entry> tsKvEntry : tsKvMap.entrySet()) { - for (KvEntry kvEntry : tsKvEntry.getValue()) { - tsEntries.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); - } + if (tsKvMap.isEmpty()) { + future.setFuture(Futures.immediateFuture(null)); } + List tsEntries = toTsKvEntryList(tsKvMap); TimeseriesSaveRequest.Strategy strategy = new TimeseriesSaveRequest.Strategy(outputStrategy.isSaveTimeSeries(), outputStrategy.isSaveLatest(), outputStrategy.isSendWsUpdate(), outputStrategy.isProcessCfs()); tsSubService.saveTimeseriesInternal(TimeseriesSaveRequest.builder() .tenantId(tenantId) @@ -444,22 +444,4 @@ public abstract class AbstractCalculatedFieldProcessingService { .build()); } - private List filterChangedAttr(List existingAttributes, List newAttributes) { - if (existingAttributes == null || existingAttributes.isEmpty()) { - return newAttributes; - } - - Map currentAttrMap = existingAttributes.stream() - .collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing)); - - return newAttributes.stream() - .filter(item -> { - AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey()); - return cacheAttr == null - || !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type - || !Objects.equals(item.getDataType(), cacheAttr.getDataType()); - }) - .collect(Collectors.toList()); - } - } diff --git a/application/src/main/java/org/thingsboard/server/service/cf/AlarmCalculatedFieldResult.java b/application/src/main/java/org/thingsboard/server/service/cf/AlarmCalculatedFieldResult.java index 3191b84193..498a215e17 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/AlarmCalculatedFieldResult.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/AlarmCalculatedFieldResult.java @@ -21,8 +21,6 @@ import lombok.RequiredArgsConstructor; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.rule.engine.action.TbAlarmResult; import org.thingsboard.server.common.data.DataConstants; -import org.thingsboard.server.common.data.cf.configuration.OutputStrategy; -import org.thingsboard.server.common.data.cf.configuration.PushToRuleEngineOutputStrategy; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.msg.TbMsgType; @@ -38,11 +36,6 @@ public class AlarmCalculatedFieldResult implements CalculatedFieldResult { private final TbAlarmResult alarmResult; - @Override - public OutputStrategy getOutputStrategy() { - return new PushToRuleEngineOutputStrategy(); - } - @Override public TbMsg toTbMsg(EntityId entityId, List cfIds) { TbMsgType msgType; diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java index 5473f3f4a9..0475cb1b66 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java @@ -37,7 +37,7 @@ public interface CalculatedFieldProcessingService { Map fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map arguments); - void saveToDB(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List cfIds, TbCallback callback); + void processImmediately(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List cfIds, TbCallback callback); void processResult(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List cfIds, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java index a9c2c532ee..c62d5dc6d5 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java @@ -15,7 +15,6 @@ */ package org.thingsboard.server.service.cf; -import org.thingsboard.server.common.data.cf.configuration.OutputStrategy; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.TbMsg; @@ -24,8 +23,6 @@ import java.util.List; public interface CalculatedFieldResult { - OutputStrategy getOutputStrategy(); - TbMsg toTbMsg(EntityId entityId, List cfIds); String stringValue(); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java index a6058c7dcb..09fdf62ddf 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java @@ -116,14 +116,20 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF @Override public void processResult(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List cfIds, TbCallback callback) { - switch (result.getOutputStrategy().getType()) { - case SKIP_RULE_ENGINE -> saveToDB(tenantId, entityId, result, cfIds, callback); - case PUSH_TO_RULE_ENGINE -> pushMsgToRuleEngine(tenantId, entityId, result, cfIds, callback); + if (result instanceof AlarmCalculatedFieldResult) { + sendMsgToRuleEngine(tenantId, entityId, callback, result.toTbMsg(entityId, cfIds)); + return; + } + TelemetryCalculatedFieldResult telemetryResult = result instanceof TelemetryCalculatedFieldResult telemetryRes + ? telemetryRes : ((PropagationCalculatedFieldResult) result).getResult(); + switch (telemetryResult.getOutputStrategy().getStrategyType()) { + case IMMEDIATE -> processImmediately(tenantId, entityId, result, cfIds, callback); + case RULE_CHAIN -> pushMsgToRuleEngine(tenantId, entityId, result, cfIds, callback); } } @Override - public void saveToDB(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List cfIds, TbCallback callback) { + public void processImmediately(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List cfIds, TbCallback callback) { if (result instanceof TelemetryCalculatedFieldResult telemetryResult) { saveTelemetryResult(tenantId, entityId, telemetryResult, cfIds, callback); return; @@ -131,7 +137,9 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF if (result instanceof PropagationCalculatedFieldResult propagationResult) { handlePropagationResults(propagationResult, callback, (entity, res, cb) -> saveTelemetryResult(tenantId, entityId, res, cfIds, cb)); + return; } + callback.onSuccess(); } @Override @@ -150,6 +158,7 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF List propagationEntityIds = propagationResult.getPropagationEntityIds(); if (propagationEntityIds.isEmpty()) { callback.onSuccess(); + return; } if (propagationEntityIds.size() == 1) { EntityId propagationEntityId = propagationEntityIds.get(0); diff --git a/application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java b/application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java index 38e1464fb3..780fd220a7 100644 --- a/application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java +++ b/application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java @@ -17,7 +17,6 @@ package org.thingsboard.server.service.cf; import lombok.Builder; import lombok.Data; -import org.thingsboard.server.common.data.cf.configuration.OutputStrategy; import org.thingsboard.server.common.data.id.CalculatedFieldId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.util.CollectionsUtil; @@ -32,11 +31,6 @@ public final class PropagationCalculatedFieldResult implements CalculatedFieldRe private final List propagationEntityIds; private final TelemetryCalculatedFieldResult result; - @Override - public OutputStrategy getOutputStrategy() { - return result.getOutputStrategy(); - } - @Override public TbMsg toTbMsg(EntityId entityId, List cfIds) { return result.toTbMsg(entityId, cfIds); diff --git a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java index 6a76a2fb99..8593ea8ae5 100644 --- a/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java @@ -41,7 +41,7 @@ import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey; import org.thingsboard.server.common.data.cf.configuration.RelationPathQueryDynamicSourceConfiguration; import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration; -import org.thingsboard.server.common.data.cf.configuration.TimeSeriesSkipRuleEngineOutputStrategy; +import org.thingsboard.server.common.data.cf.configuration.TimeSeriesImmediateOutputStrategy; import org.thingsboard.server.common.data.cf.configuration.geofencing.EntityCoordinates; import org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingCalculatedFieldConfiguration; import org.thingsboard.server.common.data.cf.configuration.geofencing.ZoneGroupConfiguration; @@ -1235,7 +1235,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes output.setName("fahrenheitTemp"); output.setType(OutputType.TIME_SERIES); output.setDecimalsByDefault(1); - output.setStrategy(new TimeSeriesSkipRuleEngineOutputStrategy(1000L, true, true, true, true)); + output.setStrategy(new TimeSeriesImmediateOutputStrategy(1000L, true, true, true, true)); config.setOutput(output); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeSkipRuleEngineOutputStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeImmediateOutputStrategy.java similarity index 85% rename from common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeSkipRuleEngineOutputStrategy.java rename to common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeImmediateOutputStrategy.java index 5d30eb36f9..5ca28d0ee4 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeSkipRuleEngineOutputStrategy.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeImmediateOutputStrategy.java @@ -20,9 +20,9 @@ import lombok.Data; import lombok.NoArgsConstructor; @Data -@NoArgsConstructor @AllArgsConstructor -public class AttributeSkipRuleEngineOutputStrategy extends SkipRuleEngineOutputStrategy { +@NoArgsConstructor +public class AttributeImmediateOutputStrategy extends ImmediateOutputStrategy { private boolean updateAttributesOnlyOnValueChange; @@ -30,4 +30,8 @@ public class AttributeSkipRuleEngineOutputStrategy extends SkipRuleEngineOutputS private boolean sendWsUpdate; private boolean processCfs; + @Override + public String getType() { + return "IMMEDIATE_ATTRIBUTES"; + } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/SkipRuleEngineOutputStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeRuleChainOutputStrategy.java similarity index 52% rename from common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/SkipRuleEngineOutputStrategy.java rename to common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeRuleChainOutputStrategy.java index dfc2873ccb..adefc06964 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/SkipRuleEngineOutputStrategy.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeRuleChainOutputStrategy.java @@ -15,23 +15,15 @@ */ package org.thingsboard.server.common.data.cf.configuration; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; +import lombok.Data; +import lombok.NoArgsConstructor; -@JsonTypeInfo( - use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.EXTERNAL_PROPERTY, - property = "type" -) -@JsonSubTypes({ - @JsonSubTypes.Type(value = AttributeSkipRuleEngineOutputStrategy.class, name = "ATTRIBUTES"), - @JsonSubTypes.Type(value = TimeSeriesSkipRuleEngineOutputStrategy.class, name = "TIME_SERIES") -}) -public abstract class SkipRuleEngineOutputStrategy implements OutputStrategy { +@Data +@NoArgsConstructor +public class AttributeRuleChainOutputStrategy extends RuleChainOutputStrategy { @Override - public OutputStrategyType getType() { - return OutputStrategyType.SKIP_RULE_ENGINE; + public String getType() { + return "RULE_CHAIN_ATTRIBUTES"; } - } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PushToRuleEngineOutputStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ImmediateOutputStrategy.java similarity index 80% rename from common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PushToRuleEngineOutputStrategy.java rename to common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ImmediateOutputStrategy.java index adeab6c35d..170fa5bb9c 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PushToRuleEngineOutputStrategy.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ImmediateOutputStrategy.java @@ -15,11 +15,11 @@ */ package org.thingsboard.server.common.data.cf.configuration; -public class PushToRuleEngineOutputStrategy implements OutputStrategy { +public abstract class ImmediateOutputStrategy implements OutputStrategy { @Override - public OutputStrategyType getType() { - return OutputStrategyType.PUSH_TO_RULE_ENGINE; + public OutputStrategyType getStrategyType() { + return OutputStrategyType.IMMEDIATE; } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java index 1821db2760..d2e0ac1ca9 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java @@ -16,7 +16,6 @@ package org.thingsboard.server.common.data.cf.configuration; import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonTypeInfo; import lombok.Data; import org.thingsboard.server.common.data.AttributeScope; @@ -28,12 +27,6 @@ public class Output { private OutputType type; private AttributeScope scope; private Integer decimalsByDefault; - - @JsonTypeInfo( - use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.EXTERNAL_PROPERTY, - property = "type" - ) private OutputStrategy strategy; } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategy.java index b4b71103e8..8f488602fe 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategy.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategy.java @@ -15,23 +15,31 @@ */ package org.thingsboard.server.common.data.cf.configuration; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeInfo.As; @JsonTypeInfo( use = JsonTypeInfo.Id.NAME, - include = JsonTypeInfo.As.PROPERTY, + include = As.PROPERTY, property = "type" ) @JsonSubTypes({ - @JsonSubTypes.Type(value = SkipRuleEngineOutputStrategy.class, name = "SKIP_RULE_ENGINE"), - @JsonSubTypes.Type(value = PushToRuleEngineOutputStrategy.class, name = "PUSH_TO_RULE_ENGINE") + @Type(value = AttributeImmediateOutputStrategy.class, name = "IMMEDIATE_ATTRIBUTES"), + @Type(value = TimeSeriesImmediateOutputStrategy.class, name = "IMMEDIATE_TIME_SERIES"), + + @Type(value = AttributeRuleChainOutputStrategy.class, name = "RULE_CHAIN_ATTRIBUTES"), + @Type(value = TimeSeriesRuleChainOutputStrategy.class, name = "RULE_CHAIN_TIME_SERIES") + }) -@JsonIgnoreProperties(ignoreUnknown = true) public interface OutputStrategy { - OutputStrategyType getType(); + @JsonIgnore + OutputStrategyType getStrategyType(); + + @JsonIgnore + String getType(); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategyType.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategyType.java index d4eef18d61..4f5234acb5 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategyType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategyType.java @@ -17,6 +17,6 @@ package org.thingsboard.server.common.data.cf.configuration; public enum OutputStrategyType { - SKIP_RULE_ENGINE, PUSH_TO_RULE_ENGINE + IMMEDIATE, RULE_CHAIN } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/RuleChainOutputStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/RuleChainOutputStrategy.java new file mode 100644 index 0000000000..0d601d22fc --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/RuleChainOutputStrategy.java @@ -0,0 +1,25 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.cf.configuration; + +public abstract class RuleChainOutputStrategy implements OutputStrategy { + + @Override + public OutputStrategyType getStrategyType() { + return OutputStrategyType.RULE_CHAIN; + } + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesSkipRuleEngineOutputStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesImmediateOutputStrategy.java similarity index 85% rename from common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesSkipRuleEngineOutputStrategy.java rename to common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesImmediateOutputStrategy.java index 3165fc5eac..60d5f99515 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesSkipRuleEngineOutputStrategy.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesImmediateOutputStrategy.java @@ -20,9 +20,9 @@ import lombok.Data; import lombok.NoArgsConstructor; @Data -@NoArgsConstructor @AllArgsConstructor -public class TimeSeriesSkipRuleEngineOutputStrategy extends SkipRuleEngineOutputStrategy { +@NoArgsConstructor +public class TimeSeriesImmediateOutputStrategy extends ImmediateOutputStrategy { private long ttl; @@ -31,4 +31,8 @@ public class TimeSeriesSkipRuleEngineOutputStrategy extends SkipRuleEngineOutput private boolean sendWsUpdate; private boolean processCfs; + @Override + public String getType() { + return "IMMEDIATE_TIME_SERIES"; + } } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesRuleChainOutputStrategy.java b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesRuleChainOutputStrategy.java new file mode 100644 index 0000000000..bb50cc5e8b --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesRuleChainOutputStrategy.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.cf.configuration; + +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class TimeSeriesRuleChainOutputStrategy extends RuleChainOutputStrategy { + + @Override + public String getType() { + return "RULE_CHAIN_TIME_SERIES"; + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java index 533f7d13dd..20aa7993a1 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java @@ -41,16 +41,14 @@ import org.thingsboard.server.common.data.util.TbPair; import org.thingsboard.server.common.msg.TbMsg; import java.util.List; -import java.util.Map; -import java.util.Objects; import java.util.UUID; -import java.util.function.Function; import java.util.stream.Collectors; import static org.thingsboard.rule.engine.telemetry.settings.AttributesProcessingSettings.Advanced; import static org.thingsboard.rule.engine.telemetry.settings.AttributesProcessingSettings.Deduplicate; import static org.thingsboard.rule.engine.telemetry.settings.AttributesProcessingSettings.OnEveryMessage; import static org.thingsboard.rule.engine.telemetry.settings.AttributesProcessingSettings.WebSocketsOnly; +import static org.thingsboard.rule.engine.util.TelemetryUtil.filterChangedAttr; import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY; import static org.thingsboard.server.common.data.DataConstants.SCOPE; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST; @@ -216,24 +214,6 @@ public class TbMsgAttributesNode implements TbNode { .build()); } - private List filterChangedAttr(List currentAttributes, List newAttributes) { - if (currentAttributes == null || currentAttributes.isEmpty()) { - return newAttributes; - } - - Map currentAttrMap = currentAttributes.stream() - .collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing)); - - return newAttributes.stream() - .filter(item -> { - AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey()); - return cacheAttr == null - || !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type - || !Objects.equals(item.getDataType(), cacheAttr.getDataType()); - }) - .collect(Collectors.toList()); - } - private boolean checkSendNotification(AttributeScope scope) { return config.isSendAttributesUpdatedNotification() && AttributeScope.CLIENT_SCOPE != scope; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java index 13dab98c54..32f06b1e00 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java @@ -49,6 +49,7 @@ import static org.thingsboard.rule.engine.telemetry.settings.TimeseriesProcessin import static org.thingsboard.rule.engine.telemetry.settings.TimeseriesProcessingSettings.Deduplicate; import static org.thingsboard.rule.engine.telemetry.settings.TimeseriesProcessingSettings.OnEveryMessage; import static org.thingsboard.rule.engine.telemetry.settings.TimeseriesProcessingSettings.WebSocketsOnly; +import static org.thingsboard.rule.engine.util.TelemetryUtil.toTsKvEntryList; import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST; @RuleNode( @@ -148,12 +149,7 @@ public class TbMsgTimeseriesNode implements TbNode { ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src)); return; } - List tsKvEntryList = new ArrayList<>(); - for (Map.Entry> tsKvEntry : tsKvMap.entrySet()) { - for (KvEntry kvEntry : tsKvEntry.getValue()) { - tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); - } - } + List tsKvEntryList = toTsKvEntryList(tsKvMap); String ttlValue = msg.getMetaData().getValue("TTL"); long ttl = !StringUtils.isEmpty(ttlValue) ? Long.parseLong(ttlValue) : config.getDefaultTTL(); if (ttl == 0L) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/TelemetryUtil.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/TelemetryUtil.java new file mode 100644 index 0000000000..41d6f1ce1d --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/TelemetryUtil.java @@ -0,0 +1,60 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.util; + +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +public class TelemetryUtil { + + public static List toTsKvEntryList(Map> tsKvMap) { + List tsKvEntryList = new ArrayList<>(); + for (Map.Entry> tsKvEntry : tsKvMap.entrySet()) { + for (KvEntry kvEntry : tsKvEntry.getValue()) { + tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry)); + } + } + return tsKvEntryList; + } + + public static List filterChangedAttr(List currentAttributes, List newAttributes) { + if (currentAttributes == null || currentAttributes.isEmpty()) { + return newAttributes; + } + + Map currentAttrMap = currentAttributes.stream() + .collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing)); + + return newAttributes.stream() + .filter(item -> { + AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey()); + return cacheAttr == null + || !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type + || !Objects.equals(item.getDataType(), cacheAttr.getDataType()); + }) + .collect(Collectors.toList()); + } + +}