Browse Source

restructured output strategies

pull/14225/head
IrynaMatveieva 7 months ago
parent
commit
31ed28a6fe
  1. 6
      application/src/main/data/upgrade/basic/schema_update.sql
  2. 84
      application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java
  3. 7
      application/src/main/java/org/thingsboard/server/service/cf/AlarmCalculatedFieldResult.java
  4. 2
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java
  5. 3
      application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java
  6. 17
      application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java
  7. 6
      application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java
  8. 4
      application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java
  9. 8
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeImmediateOutputStrategy.java
  10. 22
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeRuleChainOutputStrategy.java
  11. 6
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ImmediateOutputStrategy.java
  12. 7
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java
  13. 22
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategy.java
  14. 2
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategyType.java
  15. 25
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/RuleChainOutputStrategy.java
  16. 8
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesImmediateOutputStrategy.java
  17. 29
      common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesRuleChainOutputStrategy.java
  18. 22
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java
  19. 8
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java
  20. 60
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/TelemetryUtil.java

6
application/src/main/data/upgrade/basic/schema_update.sql

@ -80,7 +80,11 @@ SET configuration = jsonb_set(
|| jsonb_build_object(
'strategy',
jsonb_build_object(
'type', 'PUSH_TO_RULE_ENGINE'
'type',
CASE (configuration::jsonb -> 'output' ->> 'type')
WHEN 'TIME_SERIES' THEN 'RULE_CHAIN_TIME_SERIES'
WHEN 'ATTRIBUTES' THEN 'RULE_CHAIN_ATTRIBUTES'
END
)
),
false

84
application/src/main/java/org/thingsboard/server/service/cf/AbstractCalculatedFieldProcessingService.java

@ -36,10 +36,10 @@ import org.thingsboard.server.common.adaptor.JsonConverter;
import org.thingsboard.server.common.data.cf.CalculatedField;
import org.thingsboard.server.common.data.cf.configuration.Argument;
import org.thingsboard.server.common.data.cf.configuration.ArgumentType;
import org.thingsboard.server.common.data.cf.configuration.AttributeSkipRuleEngineOutputStrategy;
import org.thingsboard.server.common.data.cf.configuration.AttributeImmediateOutputStrategy;
import org.thingsboard.server.common.data.cf.configuration.OutputType;
import org.thingsboard.server.common.data.cf.configuration.RelationPathQueryDynamicSourceConfiguration;
import org.thingsboard.server.common.data.cf.configuration.TimeSeriesSkipRuleEngineOutputStrategy;
import org.thingsboard.server.common.data.cf.configuration.TimeSeriesImmediateOutputStrategy;
import org.thingsboard.server.common.data.cf.configuration.aggregation.RelatedEntitiesAggregationCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
@ -66,7 +66,6 @@ import org.thingsboard.server.service.cf.ctx.state.CalculatedFieldCtx;
import org.thingsboard.server.service.cf.ctx.state.SingleValueArgumentEntry;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -75,10 +74,11 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.thingsboard.rule.engine.util.TelemetryUtil.filterChangedAttr;
import static org.thingsboard.rule.engine.util.TelemetryUtil.toTsKvEntryList;
import static org.thingsboard.server.common.data.cf.CalculatedFieldType.PROPAGATION;
import static org.thingsboard.server.common.data.cf.configuration.PropagationCalculatedFieldConfiguration.PROPAGATION_CONFIG_ARGUMENT;
import static org.thingsboard.server.common.data.cf.configuration.geofencing.EntityCoordinates.ENTITY_ID_LATITUDE_ARGUMENT_KEY;
@ -378,60 +378,60 @@ public abstract class AbstractCalculatedFieldProcessingService {
}
private void saveAttributes(TenantId tenantId, EntityId entityId, TelemetryCalculatedFieldResult cfResult, List<CalculatedFieldId> cfIds, SettableFuture<Void> future) {
if (!(cfResult.getOutputStrategy() instanceof AttributeSkipRuleEngineOutputStrategy outputStrategy)) {
if (!(cfResult.getOutputStrategy() instanceof AttributeImmediateOutputStrategy outputStrategy)) {
future.setException(new IllegalArgumentException("Expected AttributeImmediateOutputStrategy"));
return;
}
JsonElement jsonResult = JsonParser.parseString(Objects.requireNonNull(cfResult.stringValue()));
AttributesSaveRequest.Strategy strategy = new Strategy(outputStrategy.isSaveAttribute(), outputStrategy.isSendWsUpdate(), outputStrategy.isProcessCfs());
List<AttributeKvEntry> attributeKvEntries = JsonConverter.convertToAttributes(jsonResult);
List<AttributeKvEntry> newAttributes = JsonConverter.convertToAttributes(jsonResult);
if (!outputStrategy.isUpdateAttributesOnlyOnValueChange()) {
tsSubService.saveAttributesInternal(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
.entries(attributeKvEntries)
.strategy(strategy)
.previousCalculatedFieldIds(cfIds)
.future(future)
.build()
);
saveAttributesInternal(tenantId, entityId, cfResult, cfIds, newAttributes, strategy, future);
return;
}
List<String> keys = attributeKvEntries.stream().map(KvEntry::getKey).collect(Collectors.toList());
List<String> keys = newAttributes.stream().map(KvEntry::getKey).collect(Collectors.toList());
ListenableFuture<List<AttributeKvEntry>> findFuture = attributesService.find(tenantId, entityId, cfResult.getScope(), keys);
DonAsynchron.withCallback(findFuture,
existingAttributes -> {
List<AttributeKvEntry> attributesChanged = filterChangedAttr(existingAttributes, attributeKvEntries);
tsSubService.saveAttributesInternal(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
.entries(attributesChanged)
.strategy(strategy)
.previousCalculatedFieldIds(cfIds)
.future(future)
.build()
);
List<AttributeKvEntry> changed = filterChangedAttr(existingAttributes, newAttributes);
saveAttributesInternal(tenantId, entityId, cfResult, cfIds, changed, strategy, future);
},
future::setException,
MoreExecutors.directExecutor());
}
private void saveAttributesInternal(TenantId tenantId, EntityId entityId,
TelemetryCalculatedFieldResult cfResult,
List<CalculatedFieldId> cfIds,
List<AttributeKvEntry> entries,
AttributesSaveRequest.Strategy strategy,
SettableFuture<Void> future) {
tsSubService.saveAttributesInternal(AttributesSaveRequest.builder()
.tenantId(tenantId)
.entityId(entityId)
.scope(cfResult.getScope())
.entries(entries)
.strategy(strategy)
.previousCalculatedFieldIds(cfIds)
.future(future)
.build());
}
private void saveTimeSeries(TenantId tenantId, EntityId entityId, TelemetryCalculatedFieldResult cfResult, List<CalculatedFieldId> cfIds, long ts, SettableFuture<Void> future) {
if (!(cfResult.getOutputStrategy() instanceof TimeSeriesSkipRuleEngineOutputStrategy outputStrategy)) {
if (!(cfResult.getOutputStrategy() instanceof TimeSeriesImmediateOutputStrategy outputStrategy)) {
future.setException(new IllegalArgumentException("Expected TimeSeriesImmediateOutputStrategy"));
return;
}
JsonElement jsonResult = JsonParser.parseString(Objects.requireNonNull(cfResult.stringValue()));
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToTelemetry(jsonResult, ts);
List<TsKvEntry> tsEntries = new ArrayList<>();
for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) {
for (KvEntry kvEntry : tsKvEntry.getValue()) {
tsEntries.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry));
}
if (tsKvMap.isEmpty()) {
future.setFuture(Futures.immediateFuture(null));
}
List<TsKvEntry> tsEntries = toTsKvEntryList(tsKvMap);
TimeseriesSaveRequest.Strategy strategy = new TimeseriesSaveRequest.Strategy(outputStrategy.isSaveTimeSeries(), outputStrategy.isSaveLatest(), outputStrategy.isSendWsUpdate(), outputStrategy.isProcessCfs());
tsSubService.saveTimeseriesInternal(TimeseriesSaveRequest.builder()
.tenantId(tenantId)
@ -444,22 +444,4 @@ public abstract class AbstractCalculatedFieldProcessingService {
.build());
}
private List<AttributeKvEntry> filterChangedAttr(List<AttributeKvEntry> existingAttributes, List<AttributeKvEntry> newAttributes) {
if (existingAttributes == null || existingAttributes.isEmpty()) {
return newAttributes;
}
Map<String, AttributeKvEntry> currentAttrMap = existingAttributes.stream()
.collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing));
return newAttributes.stream()
.filter(item -> {
AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey());
return cacheAttr == null
|| !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type
|| !Objects.equals(item.getDataType(), cacheAttr.getDataType());
})
.collect(Collectors.toList());
}
}

7
application/src/main/java/org/thingsboard/server/service/cf/AlarmCalculatedFieldResult.java

@ -21,8 +21,6 @@ import lombok.RequiredArgsConstructor;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.action.TbAlarmResult;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.cf.configuration.OutputStrategy;
import org.thingsboard.server.common.data.cf.configuration.PushToRuleEngineOutputStrategy;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.msg.TbMsgType;
@ -38,11 +36,6 @@ public class AlarmCalculatedFieldResult implements CalculatedFieldResult {
private final TbAlarmResult alarmResult;
@Override
public OutputStrategy getOutputStrategy() {
return new PushToRuleEngineOutputStrategy();
}
@Override
public TbMsg toTbMsg(EntityId entityId, List<CalculatedFieldId> cfIds) {
TbMsgType msgType;

2
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldProcessingService.java

@ -37,7 +37,7 @@ public interface CalculatedFieldProcessingService {
Map<String, ArgumentEntry> fetchArgsFromDb(TenantId tenantId, EntityId entityId, Map<String, Argument> arguments);
void saveToDB(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List<CalculatedFieldId> cfIds, TbCallback callback);
void processImmediately(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List<CalculatedFieldId> cfIds, TbCallback callback);
void processResult(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List<CalculatedFieldId> cfIds, TbCallback callback);

3
application/src/main/java/org/thingsboard/server/service/cf/CalculatedFieldResult.java

@ -15,7 +15,6 @@
*/
package org.thingsboard.server.service.cf;
import org.thingsboard.server.common.data.cf.configuration.OutputStrategy;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.TbMsg;
@ -24,8 +23,6 @@ import java.util.List;
public interface CalculatedFieldResult {
OutputStrategy getOutputStrategy();
TbMsg toTbMsg(EntityId entityId, List<CalculatedFieldId> cfIds);
String stringValue();

17
application/src/main/java/org/thingsboard/server/service/cf/DefaultCalculatedFieldProcessingService.java

@ -116,14 +116,20 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF
@Override
public void processResult(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List<CalculatedFieldId> cfIds, TbCallback callback) {
switch (result.getOutputStrategy().getType()) {
case SKIP_RULE_ENGINE -> saveToDB(tenantId, entityId, result, cfIds, callback);
case PUSH_TO_RULE_ENGINE -> pushMsgToRuleEngine(tenantId, entityId, result, cfIds, callback);
if (result instanceof AlarmCalculatedFieldResult) {
sendMsgToRuleEngine(tenantId, entityId, callback, result.toTbMsg(entityId, cfIds));
return;
}
TelemetryCalculatedFieldResult telemetryResult = result instanceof TelemetryCalculatedFieldResult telemetryRes
? telemetryRes : ((PropagationCalculatedFieldResult) result).getResult();
switch (telemetryResult.getOutputStrategy().getStrategyType()) {
case IMMEDIATE -> processImmediately(tenantId, entityId, result, cfIds, callback);
case RULE_CHAIN -> pushMsgToRuleEngine(tenantId, entityId, result, cfIds, callback);
}
}
@Override
public void saveToDB(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List<CalculatedFieldId> cfIds, TbCallback callback) {
public void processImmediately(TenantId tenantId, EntityId entityId, CalculatedFieldResult result, List<CalculatedFieldId> cfIds, TbCallback callback) {
if (result instanceof TelemetryCalculatedFieldResult telemetryResult) {
saveTelemetryResult(tenantId, entityId, telemetryResult, cfIds, callback);
return;
@ -131,7 +137,9 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF
if (result instanceof PropagationCalculatedFieldResult propagationResult) {
handlePropagationResults(propagationResult, callback,
(entity, res, cb) -> saveTelemetryResult(tenantId, entityId, res, cfIds, cb));
return;
}
callback.onSuccess();
}
@Override
@ -150,6 +158,7 @@ public class DefaultCalculatedFieldProcessingService extends AbstractCalculatedF
List<EntityId> propagationEntityIds = propagationResult.getPropagationEntityIds();
if (propagationEntityIds.isEmpty()) {
callback.onSuccess();
return;
}
if (propagationEntityIds.size() == 1) {
EntityId propagationEntityId = propagationEntityIds.get(0);

6
application/src/main/java/org/thingsboard/server/service/cf/PropagationCalculatedFieldResult.java

@ -17,7 +17,6 @@ package org.thingsboard.server.service.cf;
import lombok.Builder;
import lombok.Data;
import org.thingsboard.server.common.data.cf.configuration.OutputStrategy;
import org.thingsboard.server.common.data.id.CalculatedFieldId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.util.CollectionsUtil;
@ -32,11 +31,6 @@ public final class PropagationCalculatedFieldResult implements CalculatedFieldRe
private final List<EntityId> propagationEntityIds;
private final TelemetryCalculatedFieldResult result;
@Override
public OutputStrategy getOutputStrategy() {
return result.getOutputStrategy();
}
@Override
public TbMsg toTbMsg(EntityId entityId, List<CalculatedFieldId> cfIds) {
return result.toTbMsg(entityId, cfIds);

4
application/src/test/java/org/thingsboard/server/cf/CalculatedFieldIntegrationTest.java

@ -41,7 +41,7 @@ import org.thingsboard.server.common.data.cf.configuration.ReferencedEntityKey;
import org.thingsboard.server.common.data.cf.configuration.RelationPathQueryDynamicSourceConfiguration;
import org.thingsboard.server.common.data.cf.configuration.ScriptCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.SimpleCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.TimeSeriesSkipRuleEngineOutputStrategy;
import org.thingsboard.server.common.data.cf.configuration.TimeSeriesImmediateOutputStrategy;
import org.thingsboard.server.common.data.cf.configuration.geofencing.EntityCoordinates;
import org.thingsboard.server.common.data.cf.configuration.geofencing.GeofencingCalculatedFieldConfiguration;
import org.thingsboard.server.common.data.cf.configuration.geofencing.ZoneGroupConfiguration;
@ -1235,7 +1235,7 @@ public class CalculatedFieldIntegrationTest extends CalculatedFieldControllerTes
output.setName("fahrenheitTemp");
output.setType(OutputType.TIME_SERIES);
output.setDecimalsByDefault(1);
output.setStrategy(new TimeSeriesSkipRuleEngineOutputStrategy(1000L, true, true, true, true));
output.setStrategy(new TimeSeriesImmediateOutputStrategy(1000L, true, true, true, true));
config.setOutput(output);

8
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeSkipRuleEngineOutputStrategy.java → common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeImmediateOutputStrategy.java

@ -20,9 +20,9 @@ import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AttributeSkipRuleEngineOutputStrategy extends SkipRuleEngineOutputStrategy {
@NoArgsConstructor
public class AttributeImmediateOutputStrategy extends ImmediateOutputStrategy {
private boolean updateAttributesOnlyOnValueChange;
@ -30,4 +30,8 @@ public class AttributeSkipRuleEngineOutputStrategy extends SkipRuleEngineOutputS
private boolean sendWsUpdate;
private boolean processCfs;
@Override
public String getType() {
return "IMMEDIATE_ATTRIBUTES";
}
}

22
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/SkipRuleEngineOutputStrategy.java → common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/AttributeRuleChainOutputStrategy.java

@ -15,23 +15,15 @@
*/
package org.thingsboard.server.common.data.cf.configuration;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import lombok.NoArgsConstructor;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXTERNAL_PROPERTY,
property = "type"
)
@JsonSubTypes({
@JsonSubTypes.Type(value = AttributeSkipRuleEngineOutputStrategy.class, name = "ATTRIBUTES"),
@JsonSubTypes.Type(value = TimeSeriesSkipRuleEngineOutputStrategy.class, name = "TIME_SERIES")
})
public abstract class SkipRuleEngineOutputStrategy implements OutputStrategy {
@Data
@NoArgsConstructor
public class AttributeRuleChainOutputStrategy extends RuleChainOutputStrategy {
@Override
public OutputStrategyType getType() {
return OutputStrategyType.SKIP_RULE_ENGINE;
public String getType() {
return "RULE_CHAIN_ATTRIBUTES";
}
}

6
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/PushToRuleEngineOutputStrategy.java → common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/ImmediateOutputStrategy.java

@ -15,11 +15,11 @@
*/
package org.thingsboard.server.common.data.cf.configuration;
public class PushToRuleEngineOutputStrategy implements OutputStrategy {
public abstract class ImmediateOutputStrategy implements OutputStrategy {
@Override
public OutputStrategyType getType() {
return OutputStrategyType.PUSH_TO_RULE_ENGINE;
public OutputStrategyType getStrategyType() {
return OutputStrategyType.IMMEDIATE;
}
}

7
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/Output.java

@ -16,7 +16,6 @@
package org.thingsboard.server.common.data.cf.configuration;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import lombok.Data;
import org.thingsboard.server.common.data.AttributeScope;
@ -28,12 +27,6 @@ public class Output {
private OutputType type;
private AttributeScope scope;
private Integer decimalsByDefault;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXTERNAL_PROPERTY,
property = "type"
)
private OutputStrategy strategy;
}

22
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategy.java

@ -15,23 +15,31 @@
*/
package org.thingsboard.server.common.data.cf.configuration;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.As;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
include = As.PROPERTY,
property = "type"
)
@JsonSubTypes({
@JsonSubTypes.Type(value = SkipRuleEngineOutputStrategy.class, name = "SKIP_RULE_ENGINE"),
@JsonSubTypes.Type(value = PushToRuleEngineOutputStrategy.class, name = "PUSH_TO_RULE_ENGINE")
@Type(value = AttributeImmediateOutputStrategy.class, name = "IMMEDIATE_ATTRIBUTES"),
@Type(value = TimeSeriesImmediateOutputStrategy.class, name = "IMMEDIATE_TIME_SERIES"),
@Type(value = AttributeRuleChainOutputStrategy.class, name = "RULE_CHAIN_ATTRIBUTES"),
@Type(value = TimeSeriesRuleChainOutputStrategy.class, name = "RULE_CHAIN_TIME_SERIES")
})
@JsonIgnoreProperties(ignoreUnknown = true)
public interface OutputStrategy {
OutputStrategyType getType();
@JsonIgnore
OutputStrategyType getStrategyType();
@JsonIgnore
String getType();
}

2
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/OutputStrategyType.java

@ -17,6 +17,6 @@ package org.thingsboard.server.common.data.cf.configuration;
public enum OutputStrategyType {
SKIP_RULE_ENGINE, PUSH_TO_RULE_ENGINE
IMMEDIATE, RULE_CHAIN
}

25
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/RuleChainOutputStrategy.java

@ -0,0 +1,25 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.cf.configuration;
public abstract class RuleChainOutputStrategy implements OutputStrategy {
@Override
public OutputStrategyType getStrategyType() {
return OutputStrategyType.RULE_CHAIN;
}
}

8
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesSkipRuleEngineOutputStrategy.java → common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesImmediateOutputStrategy.java

@ -20,9 +20,9 @@ import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TimeSeriesSkipRuleEngineOutputStrategy extends SkipRuleEngineOutputStrategy {
@NoArgsConstructor
public class TimeSeriesImmediateOutputStrategy extends ImmediateOutputStrategy {
private long ttl;
@ -31,4 +31,8 @@ public class TimeSeriesSkipRuleEngineOutputStrategy extends SkipRuleEngineOutput
private boolean sendWsUpdate;
private boolean processCfs;
@Override
public String getType() {
return "IMMEDIATE_TIME_SERIES";
}
}

29
common/data/src/main/java/org/thingsboard/server/common/data/cf/configuration/TimeSeriesRuleChainOutputStrategy.java

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.cf.configuration;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
public class TimeSeriesRuleChainOutputStrategy extends RuleChainOutputStrategy {
@Override
public String getType() {
return "RULE_CHAIN_TIME_SERIES";
}
}

22
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgAttributesNode.java

@ -41,16 +41,14 @@ import org.thingsboard.server.common.data.util.TbPair;
import org.thingsboard.server.common.msg.TbMsg;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.thingsboard.rule.engine.telemetry.settings.AttributesProcessingSettings.Advanced;
import static org.thingsboard.rule.engine.telemetry.settings.AttributesProcessingSettings.Deduplicate;
import static org.thingsboard.rule.engine.telemetry.settings.AttributesProcessingSettings.OnEveryMessage;
import static org.thingsboard.rule.engine.telemetry.settings.AttributesProcessingSettings.WebSocketsOnly;
import static org.thingsboard.rule.engine.util.TelemetryUtil.filterChangedAttr;
import static org.thingsboard.server.common.data.DataConstants.NOTIFY_DEVICE_METADATA_KEY;
import static org.thingsboard.server.common.data.DataConstants.SCOPE;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_ATTRIBUTES_REQUEST;
@ -216,24 +214,6 @@ public class TbMsgAttributesNode implements TbNode {
.build());
}
private List<AttributeKvEntry> filterChangedAttr(List<AttributeKvEntry> currentAttributes, List<AttributeKvEntry> newAttributes) {
if (currentAttributes == null || currentAttributes.isEmpty()) {
return newAttributes;
}
Map<String, AttributeKvEntry> currentAttrMap = currentAttributes.stream()
.collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing));
return newAttributes.stream()
.filter(item -> {
AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey());
return cacheAttr == null
|| !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type
|| !Objects.equals(item.getDataType(), cacheAttr.getDataType());
})
.collect(Collectors.toList());
}
private boolean checkSendNotification(AttributeScope scope) {
return config.isSendAttributesUpdatedNotification() && AttributeScope.CLIENT_SCOPE != scope;
}

8
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/telemetry/TbMsgTimeseriesNode.java

@ -49,6 +49,7 @@ import static org.thingsboard.rule.engine.telemetry.settings.TimeseriesProcessin
import static org.thingsboard.rule.engine.telemetry.settings.TimeseriesProcessingSettings.Deduplicate;
import static org.thingsboard.rule.engine.telemetry.settings.TimeseriesProcessingSettings.OnEveryMessage;
import static org.thingsboard.rule.engine.telemetry.settings.TimeseriesProcessingSettings.WebSocketsOnly;
import static org.thingsboard.rule.engine.util.TelemetryUtil.toTsKvEntryList;
import static org.thingsboard.server.common.data.msg.TbMsgType.POST_TELEMETRY_REQUEST;
@RuleNode(
@ -148,12 +149,7 @@ public class TbMsgTimeseriesNode implements TbNode {
ctx.tellFailure(msg, new IllegalArgumentException("Msg body is empty: " + src));
return;
}
List<TsKvEntry> tsKvEntryList = new ArrayList<>();
for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) {
for (KvEntry kvEntry : tsKvEntry.getValue()) {
tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry));
}
}
List<TsKvEntry> tsKvEntryList = toTsKvEntryList(tsKvMap);
String ttlValue = msg.getMetaData().getValue("TTL");
long ttl = !StringUtils.isEmpty(ttlValue) ? Long.parseLong(ttlValue) : config.getDefaultTTL();
if (ttl == 0L) {

60
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/util/TelemetryUtil.java

@ -0,0 +1,60 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.util;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
public class TelemetryUtil {
public static List<TsKvEntry> toTsKvEntryList(Map<Long, List<KvEntry>> tsKvMap) {
List<TsKvEntry> tsKvEntryList = new ArrayList<>();
for (Map.Entry<Long, List<KvEntry>> tsKvEntry : tsKvMap.entrySet()) {
for (KvEntry kvEntry : tsKvEntry.getValue()) {
tsKvEntryList.add(new BasicTsKvEntry(tsKvEntry.getKey(), kvEntry));
}
}
return tsKvEntryList;
}
public static List<AttributeKvEntry> filterChangedAttr(List<AttributeKvEntry> currentAttributes, List<AttributeKvEntry> newAttributes) {
if (currentAttributes == null || currentAttributes.isEmpty()) {
return newAttributes;
}
Map<String, AttributeKvEntry> currentAttrMap = currentAttributes.stream()
.collect(Collectors.toMap(AttributeKvEntry::getKey, Function.identity(), (existing, replacement) -> existing));
return newAttributes.stream()
.filter(item -> {
AttributeKvEntry cacheAttr = currentAttrMap.get(item.getKey());
return cacheAttr == null
|| !Objects.equals(item.getValue(), cacheAttr.getValue()) //JSON and String can be equals by value, but different by type
|| !Objects.equals(item.getDataType(), cacheAttr.getDataType());
})
.collect(Collectors.toList());
}
}
Loading…
Cancel
Save