diff --git a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java index 10b198ec28..f815dd6192 100644 --- a/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java +++ b/common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java @@ -20,6 +20,7 @@ import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -31,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap; @NoArgsConstructor public final class TbMsgMetaData implements Serializable { + public static final TbMsgMetaData EMPTY = new TbMsgMetaData(Collections.emptyMap()); + private final Map data = new ConcurrentHashMap<>(); public TbMsgMetaData(Map data) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java new file mode 100644 index 0000000000..3f20c3794d --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java @@ -0,0 +1,252 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.profile; + +import lombok.Data; +import org.thingsboard.server.common.data.alarm.AlarmSeverity; +import org.thingsboard.server.common.data.device.profile.AlarmCondition; +import org.thingsboard.server.common.data.device.profile.AlarmRule; +import org.thingsboard.server.common.data.query.BooleanFilterPredicate; +import org.thingsboard.server.common.data.query.ComplexFilterPredicate; +import org.thingsboard.server.common.data.query.KeyFilter; +import org.thingsboard.server.common.data.query.KeyFilterPredicate; +import org.thingsboard.server.common.data.query.NumericFilterPredicate; +import org.thingsboard.server.common.data.query.StringFilterPredicate; + +@Data +public class AlarmRuleState { + + private final AlarmSeverity severity; + private final AlarmRule alarmRule; + private final long requiredDurationInMs; + private long lastEventTs; + private long duration; + + public AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule) { + this.severity = severity; + this.alarmRule = alarmRule; + if (alarmRule.getCondition().getDurationValue() > 0) { + requiredDurationInMs = alarmRule.getCondition().getDurationUnit().toMillis(alarmRule.getCondition().getDurationValue()); + } else { + requiredDurationInMs = 0; + } + } + + public boolean eval(DeviceDataSnapshot data) { + if (requiredDurationInMs > 0) { + boolean eval = eval(alarmRule.getCondition(), data); + if (eval) { + if (lastEventTs > 0) { + if (data.getTs() > lastEventTs) { + duration += data.getTs() - lastEventTs; + lastEventTs = data.getTs(); + } + } else { + lastEventTs = data.getTs(); + duration = 0; + } + return duration > requiredDurationInMs; + } else { + lastEventTs = 0; + duration = 0; + return false; + } + } else { + return eval(alarmRule.getCondition(), data); + } + } + + public boolean eval(long ts) { + if (requiredDurationInMs > 0 && lastEventTs > 0 && ts > lastEventTs) { + duration += ts - lastEventTs; + return duration > requiredDurationInMs; + } else { + return false; + } + } + + private boolean eval(AlarmCondition condition, DeviceDataSnapshot data) { + boolean eval = true; + for (KeyFilter keyFilter : condition.getCondition()) { + EntityKeyValue value = data.getValue(keyFilter.getKey()); + if (value == null) { + return false; + } + eval = eval && eval(value, keyFilter.getPredicate()); + } + //TODO: use condition duration; + return eval; + } + + private boolean eval(EntityKeyValue value, KeyFilterPredicate predicate) { + switch (predicate.getType()) { + case STRING: + return evalStrPredicate(value, (StringFilterPredicate) predicate); + case NUMERIC: + return evalNumPredicate(value, (NumericFilterPredicate) predicate); + case COMPLEX: + return evalComplexPredicate(value, (ComplexFilterPredicate) predicate); + case BOOLEAN: + return evalBoolPredicate(value, (BooleanFilterPredicate) predicate); + default: + return false; + } + } + + private boolean evalComplexPredicate(EntityKeyValue ekv, ComplexFilterPredicate predicate) { + switch (predicate.getOperation()) { + case OR: + for (KeyFilterPredicate kfp : predicate.getPredicates()) { + if (eval(ekv, kfp)) { + return true; + } + } + return false; + case AND: + for (KeyFilterPredicate kfp : predicate.getPredicates()) { + if (!eval(ekv, kfp)) { + return false; + } + } + return true; + default: + throw new RuntimeException("Operation not supported: " + predicate.getOperation()); + } + } + + + private boolean evalBoolPredicate(EntityKeyValue ekv, BooleanFilterPredicate predicate) { + Boolean value; + switch (ekv.getDataType()) { + case LONG: + value = ekv.getLngValue() > 0; + break; + case DOUBLE: + value = ekv.getDblValue() > 0; + break; + case BOOLEAN: + value = ekv.getBoolValue(); + break; + case STRING: + try { + value = Boolean.parseBoolean(ekv.getStrValue()); + break; + } catch (RuntimeException e) { + return false; + } + case JSON: + try { + value = Boolean.parseBoolean(ekv.getJsonValue()); + break; + } catch (RuntimeException e) { + return false; + } + default: + return false; + } + if (value == null) { + return false; + } + switch (predicate.getOperation()) { + case EQUAL: + return value.equals(predicate.getValue().getDefaultValue()); + case NOT_EQUAL: + return !value.equals(predicate.getValue().getDefaultValue()); + default: + throw new RuntimeException("Operation not supported: " + predicate.getOperation()); + } + } + + private boolean evalNumPredicate(EntityKeyValue ekv, NumericFilterPredicate predicate) { + Double value; + switch (ekv.getDataType()) { + case LONG: + value = ekv.getLngValue().doubleValue(); + break; + case DOUBLE: + value = ekv.getDblValue(); + break; + case BOOLEAN: + value = ekv.getBoolValue() ? 1.0 : 0.0; + break; + case STRING: + try { + value = Double.parseDouble(ekv.getStrValue()); + break; + } catch (RuntimeException e) { + return false; + } + case JSON: + try { + value = Double.parseDouble(ekv.getJsonValue()); + break; + } catch (RuntimeException e) { + return false; + } + default: + return false; + } + if (value == null) { + return false; + } + + Double predicateValue = predicate.getValue().getDefaultValue(); + switch (predicate.getOperation()) { + case NOT_EQUAL: + return !value.equals(predicateValue); + case EQUAL: + return value.equals(predicateValue); + case GREATER: + return value > predicateValue; + case GREATER_OR_EQUAL: + return value >= predicateValue; + case LESS: + return value < predicateValue; + case LESS_OR_EQUAL: + return value <= predicateValue; + default: + throw new RuntimeException("Operation not supported: " + predicate.getOperation()); + } + } + + private boolean evalStrPredicate(EntityKeyValue ekv, StringFilterPredicate predicate) { + String val; + String predicateValue; + if (predicate.isIgnoreCase()) { + val = ekv.getStrValue().toLowerCase(); + predicateValue = predicate.getValue().getDefaultValue().toLowerCase(); + } else { + val = ekv.getStrValue(); + predicateValue = predicate.getValue().getDefaultValue(); + } + switch (predicate.getOperation()) { + case CONTAINS: + return val.contains(predicateValue); + case EQUAL: + return val.equals(predicateValue); + case STARTS_WITH: + return val.startsWith(predicateValue); + case ENDS_WITH: + return val.endsWith(predicateValue); + case NOT_EQUAL: + return !val.equals(predicateValue); + case NOT_CONTAINS: + return !val.contains(predicateValue); + default: + throw new RuntimeException("Operation not supported: " + predicate.getOperation()); + } + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceDataSnapshot.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceDataSnapshot.java index 34f31cfb1d..f1b1067095 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceDataSnapshot.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceDataSnapshot.java @@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.profile; import lombok.Getter; import lombok.Setter; import org.thingsboard.server.common.data.query.EntityKey; +import org.thingsboard.server.common.data.query.EntityKeyType; import java.util.Map; import java.util.Set; @@ -26,24 +27,79 @@ import java.util.concurrent.ConcurrentHashMap; public class DeviceDataSnapshot { private volatile boolean ready; - @Getter @Setter + @Getter + @Setter private long ts; + private final Set keys; private final Map values = new ConcurrentHashMap<>(); - public DeviceDataSnapshot(Set entityKeySet) { - entityKeySet.forEach(key -> values.put(key, new EntityKeyValue())); - this.ready = false; + public DeviceDataSnapshot(Set entityKeysToFetch) { + this.keys = entityKeysToFetch; + } + + void removeValue(EntityKey key) { + switch (key.getType()) { + case ATTRIBUTE: + values.remove(key); + values.remove(getAttrKey(key, EntityKeyType.CLIENT_ATTRIBUTE)); + values.remove(getAttrKey(key, EntityKeyType.SHARED_ATTRIBUTE)); + values.remove(getAttrKey(key, EntityKeyType.SERVER_ATTRIBUTE)); + break; + case CLIENT_ATTRIBUTE: + case SHARED_ATTRIBUTE: + case SERVER_ATTRIBUTE: + values.remove(key); + values.remove(getAttrKey(key, EntityKeyType.ATTRIBUTE)); + break; + default: + values.remove(key); + } } void putValue(EntityKey key, EntityKeyValue value) { - values.put(key, value); + switch (key.getType()) { + case ATTRIBUTE: + putIfKeyExists(key, value); + putIfKeyExists(getAttrKey(key, EntityKeyType.CLIENT_ATTRIBUTE), value); + putIfKeyExists(getAttrKey(key, EntityKeyType.SHARED_ATTRIBUTE), value); + putIfKeyExists(getAttrKey(key, EntityKeyType.SERVER_ATTRIBUTE), value); + break; + case CLIENT_ATTRIBUTE: + case SHARED_ATTRIBUTE: + case SERVER_ATTRIBUTE: + putIfKeyExists(key, value); + putIfKeyExists(getAttrKey(key, EntityKeyType.ATTRIBUTE), value); + break; + default: + putIfKeyExists(key, value); + } + } + + private void putIfKeyExists(EntityKey key, EntityKeyValue value) { + if (keys.contains(key)) { + values.put(key, value); + } } EntityKeyValue getValue(EntityKey key) { - return values.get(key); + if (EntityKeyType.ATTRIBUTE.equals(key.getType())) { + EntityKeyValue value = values.get(key); + if (value == null) { + value = values.get(getAttrKey(key, EntityKeyType.CLIENT_ATTRIBUTE)); + if (value == null) { + value = values.get(getAttrKey(key, EntityKeyType.SHARED_ATTRIBUTE)); + if (value == null) { + value = values.get(getAttrKey(key, EntityKeyType.SERVER_ATTRIBUTE)); + } + } + } + return value; + } else { + return values.get(key); + } } - boolean isReady() { - return ready; + private EntityKey getAttrKey(EntityKey key, EntityKeyType clientAttribute) { + return new EntityKey(clientAttribute, key.getKey()); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java index 83bfd2b3d3..da3e1b45a2 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java @@ -23,34 +23,29 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.alarm.AlarmStatus; -import org.thingsboard.server.common.data.device.profile.AlarmCondition; -import org.thingsboard.server.common.data.device.profile.AlarmRule; import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.query.BooleanFilterPredicate; -import org.thingsboard.server.common.data.query.ComplexFilterPredicate; -import org.thingsboard.server.common.data.query.KeyFilter; -import org.thingsboard.server.common.data.query.KeyFilterPredicate; -import org.thingsboard.server.common.data.query.NumericFilterPredicate; -import org.thingsboard.server.common.data.query.StringFilterPredicate; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.util.mapping.JacksonUtil; +import java.util.ArrayList; import java.util.Comparator; -import java.util.Map; -import java.util.TreeMap; +import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.function.BiFunction; @Data class DeviceProfileAlarmState { private final EntityId originator; private DeviceProfileAlarm alarmDefinition; - private volatile Map createRulesSortedBySeverityDesc; + private volatile List createRulesSortedBySeverityDesc; + private volatile AlarmRuleState clearState; private volatile Alarm currentAlarm; private volatile boolean initialFetchDone; + private volatile TbMsgMetaData lastMsgMetaData; + private volatile String lastMsgQueueName; public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition) { this.originator = originator; @@ -58,38 +53,50 @@ class DeviceProfileAlarmState { } public void process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException { - if (!initialFetchDone) { - Alarm alarm = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), originator, alarmDefinition.getAlarmType()).get(); - if (alarm != null && !alarm.getStatus().isCleared()) { - currentAlarm = alarm; - } - initialFetchDone = true; - } + initCurrentAlarm(ctx); + lastMsgMetaData = msg.getMetaData(); + lastMsgQueueName = msg.getQueueName(); + createOrClearAlarms(ctx, data, AlarmRuleState::eval); + } + + public void process(TbContext ctx, long ts) throws ExecutionException, InterruptedException { + initCurrentAlarm(ctx); + createOrClearAlarms(ctx, ts, AlarmRuleState::eval); + } + public void createOrClearAlarms(TbContext ctx, T data, BiFunction evalFunction) { AlarmSeverity resultSeverity = null; - for (Map.Entry kv : createRulesSortedBySeverityDesc.entrySet()) { - AlarmRule alarmRule = kv.getValue(); - if (eval(alarmRule.getCondition(), data)) { - resultSeverity = kv.getKey(); + for (AlarmRuleState state : createRulesSortedBySeverityDesc) { + if (evalFunction.apply(state, data)) { + resultSeverity = state.getSeverity(); break; } } if (resultSeverity != null) { - pushMsg(ctx, calculateAlarmResult(ctx, resultSeverity), msg); + pushMsg(ctx, calculateAlarmResult(ctx, resultSeverity)); } else if (currentAlarm != null) { - AlarmRule clearRule = alarmDefinition.getClearRule(); - if (eval(clearRule.getCondition(), data)) { + if (evalFunction.apply(clearState, data)) { ctx.getAlarmService().clearAlarm(ctx.getTenantId(), currentAlarm.getId(), JacksonUtil.OBJECT_MAPPER.createObjectNode(), System.currentTimeMillis()); - pushMsg(ctx, new TbAlarmResult(false, false, true, currentAlarm), msg); + pushMsg(ctx, new TbAlarmResult(false, false, true, currentAlarm)); currentAlarm = null; } } } - public void pushMsg(TbContext ctx, TbAlarmResult alarmResult, TbMsg originalMsg) { + public void initCurrentAlarm(TbContext ctx) throws InterruptedException, ExecutionException { + if (!initialFetchDone) { + Alarm alarm = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), originator, alarmDefinition.getAlarmType()).get(); + if (alarm != null && !alarm.getStatus().isCleared()) { + currentAlarm = alarm; + } + initialFetchDone = true; + } + } + + public void pushMsg(TbContext ctx, TbAlarmResult alarmResult) { JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.getAlarm()); String data = jsonNodes.toString(); - TbMsgMetaData metaData = originalMsg.getMetaData().copy(); + TbMsgMetaData metaData = lastMsgMetaData.copy(); String relationType; if (alarmResult.isCreated()) { relationType = "Alarm Created"; @@ -105,14 +112,18 @@ class DeviceProfileAlarmState { relationType = "Alarm Cleared"; metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString()); } - TbMsg newMsg = ctx.newMsg(originalMsg.getQueueName(), "ALARM", originalMsg.getOriginator(), metaData, data); + TbMsg newMsg = ctx.newMsg(lastMsgQueueName, "ALARM", originator, metaData, data); ctx.tellNext(newMsg, relationType); } public void updateState(DeviceProfileAlarm alarm) { this.alarmDefinition = alarm; - this.createRulesSortedBySeverityDesc = new TreeMap<>(Comparator.comparingInt(AlarmSeverity::ordinal)); - this.createRulesSortedBySeverityDesc.putAll(alarmDefinition.getCreateRules()); + this.createRulesSortedBySeverityDesc = new ArrayList<>(); + alarmDefinition.getCreateRules().forEach((severity, rule) -> { + createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule)); + }); + createRulesSortedBySeverityDesc.sort(Comparator.comparingInt(state -> state.getSeverity().ordinal())); + clearState = new AlarmRuleState(null, alarmDefinition.getClearRule()); } private TbAlarmResult calculateAlarmResult(TbContext ctx, AlarmSeverity severity) { @@ -147,175 +158,5 @@ class DeviceProfileAlarmState { } } - private boolean eval(AlarmCondition condition, DeviceDataSnapshot data) { - boolean eval = true; - for (KeyFilter keyFilter : condition.getCondition()) { - EntityKeyValue value = data.getValue(keyFilter.getKey()); - if (value == null) { - return false; - } - eval = eval && eval(value, keyFilter.getPredicate()); - } - //TODO: use condition duration; - return eval; - } - - private boolean eval(EntityKeyValue value, KeyFilterPredicate predicate) { - switch (predicate.getType()) { - case STRING: - return evalStrPredicate(value, (StringFilterPredicate) predicate); - case NUMERIC: - return evalNumPredicate(value, (NumericFilterPredicate) predicate); - case COMPLEX: - return evalComplexPredicate(value, (ComplexFilterPredicate) predicate); - case BOOLEAN: - return evalBoolPredicate(value, (BooleanFilterPredicate) predicate); - default: - return false; - } - } - - private boolean evalComplexPredicate(EntityKeyValue ekv, ComplexFilterPredicate predicate) { - switch (predicate.getOperation()) { - case OR: - for (KeyFilterPredicate kfp : predicate.getPredicates()) { - if (eval(ekv, kfp)) { - return true; - } - } - return false; - case AND: - for (KeyFilterPredicate kfp : predicate.getPredicates()) { - if (!eval(ekv, kfp)) { - return false; - } - } - return true; - default: - throw new RuntimeException("Operation not supported: " + predicate.getOperation()); - } - } - - private boolean evalBoolPredicate(EntityKeyValue ekv, BooleanFilterPredicate predicate) { - Boolean value; - switch (ekv.getDataType()) { - case LONG: - value = ekv.getLngValue() > 0; - break; - case DOUBLE: - value = ekv.getDblValue() > 0; - break; - case BOOLEAN: - value = ekv.getBoolValue(); - break; - case STRING: - try { - value = Boolean.parseBoolean(ekv.getStrValue()); - break; - } catch (RuntimeException e) { - return false; - } - case JSON: - try { - value = Boolean.parseBoolean(ekv.getJsonValue()); - break; - } catch (RuntimeException e) { - return false; - } - default: - return false; - } - if (value == null) { - return false; - } - switch (predicate.getOperation()) { - case EQUAL: - return value.equals(predicate.getValue().getDefaultValue()); - case NOT_EQUAL: - return !value.equals(predicate.getValue().getDefaultValue()); - default: - throw new RuntimeException("Operation not supported: " + predicate.getOperation()); - } - } - - private boolean evalNumPredicate(EntityKeyValue ekv, NumericFilterPredicate predicate) { - Double value; - switch (ekv.getDataType()) { - case LONG: - value = ekv.getLngValue().doubleValue(); - break; - case DOUBLE: - value = ekv.getDblValue(); - break; - case BOOLEAN: - value = ekv.getBoolValue() ? 1.0 : 0.0; - break; - case STRING: - try { - value = Double.parseDouble(ekv.getStrValue()); - break; - } catch (RuntimeException e) { - return false; - } - case JSON: - try { - value = Double.parseDouble(ekv.getJsonValue()); - break; - } catch (RuntimeException e) { - return false; - } - default: - return false; - } - if (value == null) { - return false; - } - - Double predicateValue = predicate.getValue().getDefaultValue(); - switch (predicate.getOperation()) { - case NOT_EQUAL: - return !value.equals(predicateValue); - case EQUAL: - return value.equals(predicateValue); - case GREATER: - return value > predicateValue; - case GREATER_OR_EQUAL: - return value >= predicateValue; - case LESS: - return value < predicateValue; - case LESS_OR_EQUAL: - return value <= predicateValue; - default: - throw new RuntimeException("Operation not supported: " + predicate.getOperation()); - } - } - - private boolean evalStrPredicate(EntityKeyValue ekv, StringFilterPredicate predicate) { - String val; - String predicateValue; - if (predicate.isIgnoreCase()) { - val = ekv.getStrValue().toLowerCase(); - predicateValue = predicate.getValue().getDefaultValue().toLowerCase(); - } else { - val = ekv.getStrValue(); - predicateValue = predicate.getValue().getDefaultValue(); - } - switch (predicate.getOperation()) { - case CONTAINS: - return val.contains(predicateValue); - case EQUAL: - return val.equals(predicateValue); - case STARTS_WITH: - return val.startsWith(predicateValue); - case ENDS_WITH: - return val.endsWith(predicateValue); - case NOT_EQUAL: - return !val.equals(predicateValue); - case NOT_CONTAINS: - return !val.contains(predicateValue); - default: - throw new RuntimeException("Operation not supported: " + predicate.getOperation()); - } - } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java index 3a9c08f1e8..ecb2e5905c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.profile; import com.google.gson.JsonParser; +import org.springframework.util.StringUtils; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; import org.thingsboard.server.common.data.DataConstants; @@ -35,6 +36,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.dao.sql.query.EntityKeyMapping; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -77,18 +79,73 @@ class DeviceState { } } + public void harvestAlarms(TbContext ctx, long ts) throws ExecutionException, InterruptedException { + for (DeviceProfileAlarmState state : alarmStates.values()) { + state.process(ctx, ts); + } + } + public void process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { if (latestValues == null) { latestValues = fetchLatestValues(ctx, deviceId); } if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { processTelemetry(ctx, msg); + } else if (msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) { + processAttributesUpdateRequest(ctx, msg); + } else if (msg.getType().equals(DataConstants.ATTRIBUTES_UPDATED)) { + processAttributesUpdateNotification(ctx, msg); + } else if (msg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) { + processAttributesDeleteNotification(ctx, msg); } else { ctx.tellSuccess(msg); } } - private void processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + private void processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + Set attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())); + String scope = msg.getMetaData().getValue("scope"); + if (StringUtils.isEmpty(scope)) { + scope = DataConstants.CLIENT_SCOPE; + } + processAttributesUpdate(ctx, msg, attributes, scope); + } + + private void processAttributesDeleteNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + List keys = new ArrayList<>(); + new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray().forEach(e -> keys.add(e.getAsString())); + String scope = msg.getMetaData().getValue("scope"); + if (StringUtils.isEmpty(scope)) { + scope = DataConstants.CLIENT_SCOPE; + } + if (!keys.isEmpty()) { + EntityKeyType keyType = getKeyTypeFromScope(scope); + keys.forEach(key -> latestValues.removeValue(new EntityKey(keyType, key))); + for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) { + DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm)); + alarmState.process(ctx, msg, latestValues); + } + } + ctx.tellSuccess(msg); + } + + protected void processAttributesUpdateRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + Set attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())); + processAttributesUpdate(ctx, msg, attributes, DataConstants.CLIENT_SCOPE); + } + + private void processAttributesUpdate(TbContext ctx, TbMsg msg, Set attributes, String scope) throws ExecutionException, InterruptedException { + if (!attributes.isEmpty()) { + latestValues = merge(latestValues, attributes, scope); + for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) { + DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm)); + alarmState.process(ctx, msg, latestValues); + } + } + ctx.tellSuccess(msg); + } + + protected void processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { Map> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg)); for (Map.Entry> entry : tsKvMap.entrySet()) { Long ts = entry.getKey(); @@ -110,6 +167,28 @@ class DeviceState { return latestValues; } + private DeviceDataSnapshot merge(DeviceDataSnapshot latestValues, Set attributes, String scope) { + long ts = latestValues.getTs(); + for (AttributeKvEntry entry : attributes) { + ts = Math.max(ts, entry.getLastUpdateTs()); + latestValues.putValue(new EntityKey(getKeyTypeFromScope(scope), entry.getKey()), toEntityValue(entry)); + } + latestValues.setTs(ts); + return latestValues; + } + + private static EntityKeyType getKeyTypeFromScope(String scope) { + switch (scope) { + case DataConstants.CLIENT_SCOPE: + return EntityKeyType.CLIENT_ATTRIBUTE; + case DataConstants.SHARED_SCOPE: + return EntityKeyType.SHARED_ATTRIBUTE; + case DataConstants.SERVER_SCOPE: + return EntityKeyType.SERVER_ATTRIBUTE; + } + return EntityKeyType.ATTRIBUTE; + } + private DeviceDataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) throws ExecutionException, InterruptedException { Set entityKeysToFetch = deviceProfile.getEntityKeys(); DeviceDataSnapshot result = new DeviceDataSnapshot(entityKeysToFetch); @@ -224,5 +303,4 @@ class DeviceState { public DeviceProfileId getProfileId() { return deviceProfile.getProfileId(); } - } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java index 4d0a714f25..d3022f177c 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java @@ -23,16 +23,21 @@ import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.plugin.ComponentType; import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.util.mapping.JacksonUtil; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; @Slf4j @RuleNode( @@ -47,53 +52,77 @@ import java.util.concurrent.ExecutionException; configDirective = "tbNodeEmptyConfig" ) public class TbDeviceProfileNode implements TbNode { + private static final String PERIODIC_MSG_TYPE = "TbDeviceProfilePeriodicMsg"; private RuleEngineDeviceProfileCache cache; - private Map deviceStates = new ConcurrentHashMap<>(); + private final Map deviceStates = new ConcurrentHashMap<>(); @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { cache = ctx.getDeviceProfileCache(); + scheduleAlarmHarvesting(ctx); + //TODO: check that I am in root rule chain. + // If Yes - Init for all device profiles that do not have default rule chain id in device profile. + // If No - find device profiles with this rule chain id. } /** - * TODO: - * 1. Duration in the alarm conditions; - * 3. Update of the Device attributes (client, server and shared); - * 4. Dynamic values evaluation; + * 2. Dynamic values evaluation; */ - @Override public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { EntityType originatorType = msg.getOriginator().getEntityType(); - if (EntityType.DEVICE.equals(originatorType)) { - DeviceId deviceId = new DeviceId(msg.getOriginator().getId()); - if (msg.getType().equals("ENTITY_UPDATED")) { - //TODO: handle if device profile id has changed. - } else { - DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId); - if (deviceState != null) { - deviceState.process(ctx, msg); + if (msg.getType().equals(PERIODIC_MSG_TYPE)) { + scheduleAlarmHarvesting(ctx); + harvestAlarms(ctx, System.currentTimeMillis()); + } else { + if (EntityType.DEVICE.equals(originatorType)) { + DeviceId deviceId = new DeviceId(msg.getOriginator().getId()); + if (msg.getType().equals(DataConstants.ENTITY_UPDATED)) { + invalidateDeviceProfileCache(deviceId, msg.getData()); + } else if (msg.getType().equals(DataConstants.ENTITY_DELETED)) { + deviceStates.remove(deviceId); } else { - ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!")); + DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId); + if (deviceState != null) { + deviceState.process(ctx, msg); + } else { + ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!")); + } } - } - } else if (EntityType.DEVICE_PROFILE.equals(originatorType)) { - if (msg.getType().equals("ENTITY_UPDATED")) { - DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class); - for (DeviceState state : deviceStates.values()) { - if (deviceProfile.getId().equals(state.getProfileId())) { - state.updateProfile(ctx, deviceProfile); + } else if (EntityType.DEVICE_PROFILE.equals(originatorType)) { + if (msg.getType().equals("ENTITY_UPDATED")) { + DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class); + for (DeviceState state : deviceStates.values()) { + if (deviceProfile.getId().equals(state.getProfileId())) { + state.updateProfile(ctx, deviceProfile); + } } } + ctx.tellSuccess(msg); + } else { + ctx.tellSuccess(msg); + } + } + } + + public void invalidateDeviceProfileCache(DeviceId deviceId, String deviceJson) { + DeviceState deviceState = deviceStates.get(deviceId); + if (deviceState != null) { + DeviceProfileId currentProfileId = deviceState.getProfileId(); + Device device = JacksonUtil.fromString(deviceJson, Device.class); + if (!currentProfileId.equals(device.getDeviceProfileId())) { + deviceStates.remove(deviceId); } - ctx.tellSuccess(msg); - } else { - ctx.tellSuccess(msg); } } - private DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId) { + @Override + public void destroy() { + deviceStates.clear(); + } + + protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId) { DeviceState deviceState = deviceStates.get(deviceId); if (deviceState == null) { DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId); @@ -105,9 +134,15 @@ public class TbDeviceProfileNode implements TbNode { return deviceState; } - @Override - public void destroy() { + protected void scheduleAlarmHarvesting(TbContext ctx) { + TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, "{}"); + ctx.tellSelf(periodicCheck, TimeUnit.MINUTES.toMillis(1)); + } + protected void harvestAlarms(TbContext ctx, long ts) throws ExecutionException, InterruptedException { + for (DeviceState state : deviceStates.values()) { + state.harvestAlarms(ctx, ts); + } } } diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java index ba2c4a9f3f..8562b52bb9 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java @@ -124,6 +124,7 @@ public class TbDeviceProfileNodeTest { DeviceProfile deviceProfile = new DeviceProfile(); DeviceProfileData deviceProfileData = new DeviceProfileData(); + KeyFilter highTempFilter = new KeyFilter(); highTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")); highTempFilter.setValueType(EntityKeyValueType.NUMERIC); @@ -139,6 +140,20 @@ public class TbDeviceProfileNodeTest { dpa.setId("highTemperatureAlarmID"); dpa.setAlarmType("highTemperatureAlarm"); dpa.setCreateRules(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule)); + + KeyFilter lowTempFilter = new KeyFilter(); + lowTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature")); + lowTempFilter.setValueType(EntityKeyValueType.NUMERIC); + NumericFilterPredicate lowTemperaturePredicate = new NumericFilterPredicate(); + lowTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS); + lowTemperaturePredicate.setValue(new FilterPredicateValue<>(10.0)); + lowTempFilter.setPredicate(lowTemperaturePredicate); + AlarmRule clearRule = new AlarmRule(); + AlarmCondition clearCondition = new AlarmCondition(); + clearCondition.setCondition(Collections.singletonList(lowTempFilter)); + clearRule.setCondition(clearCondition); + dpa.setClearRule(clearRule); + deviceProfileData.setAlarms(Collections.singletonList(dpa)); deviceProfile.setProfileData(deviceProfileData);