Browse Source

Implemented main functionality

pull/3551/head
Andrii Shvaika 5 years ago
parent
commit
85643636ab
  1. 3
      common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java
  2. 252
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java
  3. 72
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceDataSnapshot.java
  4. 245
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java
  5. 82
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java
  6. 91
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java
  7. 15
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java

3
common/message/src/main/java/org/thingsboard/server/common/msg/TbMsgMetaData.java

@ -20,6 +20,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -31,6 +32,8 @@ import java.util.concurrent.ConcurrentHashMap;
@NoArgsConstructor
public final class TbMsgMetaData implements Serializable {
public static final TbMsgMetaData EMPTY = new TbMsgMetaData(Collections.emptyMap());
private final Map<String, String> data = new ConcurrentHashMap<>();
public TbMsgMetaData(Map<String, String> data) {

252
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java

@ -0,0 +1,252 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.profile;
import lombok.Data;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.device.profile.AlarmCondition;
import org.thingsboard.server.common.data.device.profile.AlarmRule;
import org.thingsboard.server.common.data.query.BooleanFilterPredicate;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.KeyFilterPredicate;
import org.thingsboard.server.common.data.query.NumericFilterPredicate;
import org.thingsboard.server.common.data.query.StringFilterPredicate;
@Data
public class AlarmRuleState {
private final AlarmSeverity severity;
private final AlarmRule alarmRule;
private final long requiredDurationInMs;
private long lastEventTs;
private long duration;
public AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule) {
this.severity = severity;
this.alarmRule = alarmRule;
if (alarmRule.getCondition().getDurationValue() > 0) {
requiredDurationInMs = alarmRule.getCondition().getDurationUnit().toMillis(alarmRule.getCondition().getDurationValue());
} else {
requiredDurationInMs = 0;
}
}
public boolean eval(DeviceDataSnapshot data) {
if (requiredDurationInMs > 0) {
boolean eval = eval(alarmRule.getCondition(), data);
if (eval) {
if (lastEventTs > 0) {
if (data.getTs() > lastEventTs) {
duration += data.getTs() - lastEventTs;
lastEventTs = data.getTs();
}
} else {
lastEventTs = data.getTs();
duration = 0;
}
return duration > requiredDurationInMs;
} else {
lastEventTs = 0;
duration = 0;
return false;
}
} else {
return eval(alarmRule.getCondition(), data);
}
}
public boolean eval(long ts) {
if (requiredDurationInMs > 0 && lastEventTs > 0 && ts > lastEventTs) {
duration += ts - lastEventTs;
return duration > requiredDurationInMs;
} else {
return false;
}
}
private boolean eval(AlarmCondition condition, DeviceDataSnapshot data) {
boolean eval = true;
for (KeyFilter keyFilter : condition.getCondition()) {
EntityKeyValue value = data.getValue(keyFilter.getKey());
if (value == null) {
return false;
}
eval = eval && eval(value, keyFilter.getPredicate());
}
//TODO: use condition duration;
return eval;
}
private boolean eval(EntityKeyValue value, KeyFilterPredicate predicate) {
switch (predicate.getType()) {
case STRING:
return evalStrPredicate(value, (StringFilterPredicate) predicate);
case NUMERIC:
return evalNumPredicate(value, (NumericFilterPredicate) predicate);
case COMPLEX:
return evalComplexPredicate(value, (ComplexFilterPredicate) predicate);
case BOOLEAN:
return evalBoolPredicate(value, (BooleanFilterPredicate) predicate);
default:
return false;
}
}
private boolean evalComplexPredicate(EntityKeyValue ekv, ComplexFilterPredicate predicate) {
switch (predicate.getOperation()) {
case OR:
for (KeyFilterPredicate kfp : predicate.getPredicates()) {
if (eval(ekv, kfp)) {
return true;
}
}
return false;
case AND:
for (KeyFilterPredicate kfp : predicate.getPredicates()) {
if (!eval(ekv, kfp)) {
return false;
}
}
return true;
default:
throw new RuntimeException("Operation not supported: " + predicate.getOperation());
}
}
private boolean evalBoolPredicate(EntityKeyValue ekv, BooleanFilterPredicate predicate) {
Boolean value;
switch (ekv.getDataType()) {
case LONG:
value = ekv.getLngValue() > 0;
break;
case DOUBLE:
value = ekv.getDblValue() > 0;
break;
case BOOLEAN:
value = ekv.getBoolValue();
break;
case STRING:
try {
value = Boolean.parseBoolean(ekv.getStrValue());
break;
} catch (RuntimeException e) {
return false;
}
case JSON:
try {
value = Boolean.parseBoolean(ekv.getJsonValue());
break;
} catch (RuntimeException e) {
return false;
}
default:
return false;
}
if (value == null) {
return false;
}
switch (predicate.getOperation()) {
case EQUAL:
return value.equals(predicate.getValue().getDefaultValue());
case NOT_EQUAL:
return !value.equals(predicate.getValue().getDefaultValue());
default:
throw new RuntimeException("Operation not supported: " + predicate.getOperation());
}
}
private boolean evalNumPredicate(EntityKeyValue ekv, NumericFilterPredicate predicate) {
Double value;
switch (ekv.getDataType()) {
case LONG:
value = ekv.getLngValue().doubleValue();
break;
case DOUBLE:
value = ekv.getDblValue();
break;
case BOOLEAN:
value = ekv.getBoolValue() ? 1.0 : 0.0;
break;
case STRING:
try {
value = Double.parseDouble(ekv.getStrValue());
break;
} catch (RuntimeException e) {
return false;
}
case JSON:
try {
value = Double.parseDouble(ekv.getJsonValue());
break;
} catch (RuntimeException e) {
return false;
}
default:
return false;
}
if (value == null) {
return false;
}
Double predicateValue = predicate.getValue().getDefaultValue();
switch (predicate.getOperation()) {
case NOT_EQUAL:
return !value.equals(predicateValue);
case EQUAL:
return value.equals(predicateValue);
case GREATER:
return value > predicateValue;
case GREATER_OR_EQUAL:
return value >= predicateValue;
case LESS:
return value < predicateValue;
case LESS_OR_EQUAL:
return value <= predicateValue;
default:
throw new RuntimeException("Operation not supported: " + predicate.getOperation());
}
}
private boolean evalStrPredicate(EntityKeyValue ekv, StringFilterPredicate predicate) {
String val;
String predicateValue;
if (predicate.isIgnoreCase()) {
val = ekv.getStrValue().toLowerCase();
predicateValue = predicate.getValue().getDefaultValue().toLowerCase();
} else {
val = ekv.getStrValue();
predicateValue = predicate.getValue().getDefaultValue();
}
switch (predicate.getOperation()) {
case CONTAINS:
return val.contains(predicateValue);
case EQUAL:
return val.equals(predicateValue);
case STARTS_WITH:
return val.startsWith(predicateValue);
case ENDS_WITH:
return val.endsWith(predicateValue);
case NOT_EQUAL:
return !val.equals(predicateValue);
case NOT_CONTAINS:
return !val.contains(predicateValue);
default:
throw new RuntimeException("Operation not supported: " + predicate.getOperation());
}
}
}

72
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceDataSnapshot.java

@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.profile;
import lombok.Getter;
import lombok.Setter;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import java.util.Map;
import java.util.Set;
@ -26,24 +27,79 @@ import java.util.concurrent.ConcurrentHashMap;
public class DeviceDataSnapshot {
private volatile boolean ready;
@Getter @Setter
@Getter
@Setter
private long ts;
private final Set<EntityKey> keys;
private final Map<EntityKey, EntityKeyValue> values = new ConcurrentHashMap<>();
public DeviceDataSnapshot(Set<EntityKey> entityKeySet) {
entityKeySet.forEach(key -> values.put(key, new EntityKeyValue()));
this.ready = false;
public DeviceDataSnapshot(Set<EntityKey> entityKeysToFetch) {
this.keys = entityKeysToFetch;
}
void removeValue(EntityKey key) {
switch (key.getType()) {
case ATTRIBUTE:
values.remove(key);
values.remove(getAttrKey(key, EntityKeyType.CLIENT_ATTRIBUTE));
values.remove(getAttrKey(key, EntityKeyType.SHARED_ATTRIBUTE));
values.remove(getAttrKey(key, EntityKeyType.SERVER_ATTRIBUTE));
break;
case CLIENT_ATTRIBUTE:
case SHARED_ATTRIBUTE:
case SERVER_ATTRIBUTE:
values.remove(key);
values.remove(getAttrKey(key, EntityKeyType.ATTRIBUTE));
break;
default:
values.remove(key);
}
}
void putValue(EntityKey key, EntityKeyValue value) {
values.put(key, value);
switch (key.getType()) {
case ATTRIBUTE:
putIfKeyExists(key, value);
putIfKeyExists(getAttrKey(key, EntityKeyType.CLIENT_ATTRIBUTE), value);
putIfKeyExists(getAttrKey(key, EntityKeyType.SHARED_ATTRIBUTE), value);
putIfKeyExists(getAttrKey(key, EntityKeyType.SERVER_ATTRIBUTE), value);
break;
case CLIENT_ATTRIBUTE:
case SHARED_ATTRIBUTE:
case SERVER_ATTRIBUTE:
putIfKeyExists(key, value);
putIfKeyExists(getAttrKey(key, EntityKeyType.ATTRIBUTE), value);
break;
default:
putIfKeyExists(key, value);
}
}
private void putIfKeyExists(EntityKey key, EntityKeyValue value) {
if (keys.contains(key)) {
values.put(key, value);
}
}
EntityKeyValue getValue(EntityKey key) {
return values.get(key);
if (EntityKeyType.ATTRIBUTE.equals(key.getType())) {
EntityKeyValue value = values.get(key);
if (value == null) {
value = values.get(getAttrKey(key, EntityKeyType.CLIENT_ATTRIBUTE));
if (value == null) {
value = values.get(getAttrKey(key, EntityKeyType.SHARED_ATTRIBUTE));
if (value == null) {
value = values.get(getAttrKey(key, EntityKeyType.SERVER_ATTRIBUTE));
}
}
}
return value;
} else {
return values.get(key);
}
}
boolean isReady() {
return ready;
private EntityKey getAttrKey(EntityKey key, EntityKeyType clientAttribute) {
return new EntityKey(clientAttribute, key.getKey());
}
}

245
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java

@ -23,34 +23,29 @@ import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.alarm.AlarmStatus;
import org.thingsboard.server.common.data.device.profile.AlarmCondition;
import org.thingsboard.server.common.data.device.profile.AlarmRule;
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.query.BooleanFilterPredicate;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.KeyFilterPredicate;
import org.thingsboard.server.common.data.query.NumericFilterPredicate;
import org.thingsboard.server.common.data.query.StringFilterPredicate;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Map;
import java.util.TreeMap;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.BiFunction;
@Data
class DeviceProfileAlarmState {
private final EntityId originator;
private DeviceProfileAlarm alarmDefinition;
private volatile Map<AlarmSeverity, AlarmRule> createRulesSortedBySeverityDesc;
private volatile List<AlarmRuleState> createRulesSortedBySeverityDesc;
private volatile AlarmRuleState clearState;
private volatile Alarm currentAlarm;
private volatile boolean initialFetchDone;
private volatile TbMsgMetaData lastMsgMetaData;
private volatile String lastMsgQueueName;
public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition) {
this.originator = originator;
@ -58,38 +53,50 @@ class DeviceProfileAlarmState {
}
public void process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException {
if (!initialFetchDone) {
Alarm alarm = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), originator, alarmDefinition.getAlarmType()).get();
if (alarm != null && !alarm.getStatus().isCleared()) {
currentAlarm = alarm;
}
initialFetchDone = true;
}
initCurrentAlarm(ctx);
lastMsgMetaData = msg.getMetaData();
lastMsgQueueName = msg.getQueueName();
createOrClearAlarms(ctx, data, AlarmRuleState::eval);
}
public void process(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
initCurrentAlarm(ctx);
createOrClearAlarms(ctx, ts, AlarmRuleState::eval);
}
public <T> void createOrClearAlarms(TbContext ctx, T data, BiFunction<AlarmRuleState, T, Boolean> evalFunction) {
AlarmSeverity resultSeverity = null;
for (Map.Entry<AlarmSeverity, AlarmRule> kv : createRulesSortedBySeverityDesc.entrySet()) {
AlarmRule alarmRule = kv.getValue();
if (eval(alarmRule.getCondition(), data)) {
resultSeverity = kv.getKey();
for (AlarmRuleState state : createRulesSortedBySeverityDesc) {
if (evalFunction.apply(state, data)) {
resultSeverity = state.getSeverity();
break;
}
}
if (resultSeverity != null) {
pushMsg(ctx, calculateAlarmResult(ctx, resultSeverity), msg);
pushMsg(ctx, calculateAlarmResult(ctx, resultSeverity));
} else if (currentAlarm != null) {
AlarmRule clearRule = alarmDefinition.getClearRule();
if (eval(clearRule.getCondition(), data)) {
if (evalFunction.apply(clearState, data)) {
ctx.getAlarmService().clearAlarm(ctx.getTenantId(), currentAlarm.getId(), JacksonUtil.OBJECT_MAPPER.createObjectNode(), System.currentTimeMillis());
pushMsg(ctx, new TbAlarmResult(false, false, true, currentAlarm), msg);
pushMsg(ctx, new TbAlarmResult(false, false, true, currentAlarm));
currentAlarm = null;
}
}
}
public void pushMsg(TbContext ctx, TbAlarmResult alarmResult, TbMsg originalMsg) {
public void initCurrentAlarm(TbContext ctx) throws InterruptedException, ExecutionException {
if (!initialFetchDone) {
Alarm alarm = ctx.getAlarmService().findLatestByOriginatorAndType(ctx.getTenantId(), originator, alarmDefinition.getAlarmType()).get();
if (alarm != null && !alarm.getStatus().isCleared()) {
currentAlarm = alarm;
}
initialFetchDone = true;
}
}
public void pushMsg(TbContext ctx, TbAlarmResult alarmResult) {
JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.getAlarm());
String data = jsonNodes.toString();
TbMsgMetaData metaData = originalMsg.getMetaData().copy();
TbMsgMetaData metaData = lastMsgMetaData.copy();
String relationType;
if (alarmResult.isCreated()) {
relationType = "Alarm Created";
@ -105,14 +112,18 @@ class DeviceProfileAlarmState {
relationType = "Alarm Cleared";
metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
}
TbMsg newMsg = ctx.newMsg(originalMsg.getQueueName(), "ALARM", originalMsg.getOriginator(), metaData, data);
TbMsg newMsg = ctx.newMsg(lastMsgQueueName, "ALARM", originator, metaData, data);
ctx.tellNext(newMsg, relationType);
}
public void updateState(DeviceProfileAlarm alarm) {
this.alarmDefinition = alarm;
this.createRulesSortedBySeverityDesc = new TreeMap<>(Comparator.comparingInt(AlarmSeverity::ordinal));
this.createRulesSortedBySeverityDesc.putAll(alarmDefinition.getCreateRules());
this.createRulesSortedBySeverityDesc = new ArrayList<>();
alarmDefinition.getCreateRules().forEach((severity, rule) -> {
createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule));
});
createRulesSortedBySeverityDesc.sort(Comparator.comparingInt(state -> state.getSeverity().ordinal()));
clearState = new AlarmRuleState(null, alarmDefinition.getClearRule());
}
private TbAlarmResult calculateAlarmResult(TbContext ctx, AlarmSeverity severity) {
@ -147,175 +158,5 @@ class DeviceProfileAlarmState {
}
}
private boolean eval(AlarmCondition condition, DeviceDataSnapshot data) {
boolean eval = true;
for (KeyFilter keyFilter : condition.getCondition()) {
EntityKeyValue value = data.getValue(keyFilter.getKey());
if (value == null) {
return false;
}
eval = eval && eval(value, keyFilter.getPredicate());
}
//TODO: use condition duration;
return eval;
}
private boolean eval(EntityKeyValue value, KeyFilterPredicate predicate) {
switch (predicate.getType()) {
case STRING:
return evalStrPredicate(value, (StringFilterPredicate) predicate);
case NUMERIC:
return evalNumPredicate(value, (NumericFilterPredicate) predicate);
case COMPLEX:
return evalComplexPredicate(value, (ComplexFilterPredicate) predicate);
case BOOLEAN:
return evalBoolPredicate(value, (BooleanFilterPredicate) predicate);
default:
return false;
}
}
private boolean evalComplexPredicate(EntityKeyValue ekv, ComplexFilterPredicate predicate) {
switch (predicate.getOperation()) {
case OR:
for (KeyFilterPredicate kfp : predicate.getPredicates()) {
if (eval(ekv, kfp)) {
return true;
}
}
return false;
case AND:
for (KeyFilterPredicate kfp : predicate.getPredicates()) {
if (!eval(ekv, kfp)) {
return false;
}
}
return true;
default:
throw new RuntimeException("Operation not supported: " + predicate.getOperation());
}
}
private boolean evalBoolPredicate(EntityKeyValue ekv, BooleanFilterPredicate predicate) {
Boolean value;
switch (ekv.getDataType()) {
case LONG:
value = ekv.getLngValue() > 0;
break;
case DOUBLE:
value = ekv.getDblValue() > 0;
break;
case BOOLEAN:
value = ekv.getBoolValue();
break;
case STRING:
try {
value = Boolean.parseBoolean(ekv.getStrValue());
break;
} catch (RuntimeException e) {
return false;
}
case JSON:
try {
value = Boolean.parseBoolean(ekv.getJsonValue());
break;
} catch (RuntimeException e) {
return false;
}
default:
return false;
}
if (value == null) {
return false;
}
switch (predicate.getOperation()) {
case EQUAL:
return value.equals(predicate.getValue().getDefaultValue());
case NOT_EQUAL:
return !value.equals(predicate.getValue().getDefaultValue());
default:
throw new RuntimeException("Operation not supported: " + predicate.getOperation());
}
}
private boolean evalNumPredicate(EntityKeyValue ekv, NumericFilterPredicate predicate) {
Double value;
switch (ekv.getDataType()) {
case LONG:
value = ekv.getLngValue().doubleValue();
break;
case DOUBLE:
value = ekv.getDblValue();
break;
case BOOLEAN:
value = ekv.getBoolValue() ? 1.0 : 0.0;
break;
case STRING:
try {
value = Double.parseDouble(ekv.getStrValue());
break;
} catch (RuntimeException e) {
return false;
}
case JSON:
try {
value = Double.parseDouble(ekv.getJsonValue());
break;
} catch (RuntimeException e) {
return false;
}
default:
return false;
}
if (value == null) {
return false;
}
Double predicateValue = predicate.getValue().getDefaultValue();
switch (predicate.getOperation()) {
case NOT_EQUAL:
return !value.equals(predicateValue);
case EQUAL:
return value.equals(predicateValue);
case GREATER:
return value > predicateValue;
case GREATER_OR_EQUAL:
return value >= predicateValue;
case LESS:
return value < predicateValue;
case LESS_OR_EQUAL:
return value <= predicateValue;
default:
throw new RuntimeException("Operation not supported: " + predicate.getOperation());
}
}
private boolean evalStrPredicate(EntityKeyValue ekv, StringFilterPredicate predicate) {
String val;
String predicateValue;
if (predicate.isIgnoreCase()) {
val = ekv.getStrValue().toLowerCase();
predicateValue = predicate.getValue().getDefaultValue().toLowerCase();
} else {
val = ekv.getStrValue();
predicateValue = predicate.getValue().getDefaultValue();
}
switch (predicate.getOperation()) {
case CONTAINS:
return val.contains(predicateValue);
case EQUAL:
return val.equals(predicateValue);
case STARTS_WITH:
return val.startsWith(predicateValue);
case ENDS_WITH:
return val.endsWith(predicateValue);
case NOT_EQUAL:
return !val.equals(predicateValue);
case NOT_CONTAINS:
return !val.contains(predicateValue);
default:
throw new RuntimeException("Operation not supported: " + predicate.getOperation());
}
}
}

82
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.profile;
import com.google.gson.JsonParser;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode;
import org.thingsboard.server.common.data.DataConstants;
@ -35,6 +36,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.dao.sql.query.EntityKeyMapping;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -77,18 +79,73 @@ class DeviceState {
}
}
public void harvestAlarms(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
for (DeviceProfileAlarmState state : alarmStates.values()) {
state.process(ctx, ts);
}
}
public void process(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
if (latestValues == null) {
latestValues = fetchLatestValues(ctx, deviceId);
}
if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
processTelemetry(ctx, msg);
} else if (msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) {
processAttributesUpdateRequest(ctx, msg);
} else if (msg.getType().equals(DataConstants.ATTRIBUTES_UPDATED)) {
processAttributesUpdateNotification(ctx, msg);
} else if (msg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) {
processAttributesDeleteNotification(ctx, msg);
} else {
ctx.tellSuccess(msg);
}
}
private void processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
private void processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));
String scope = msg.getMetaData().getValue("scope");
if (StringUtils.isEmpty(scope)) {
scope = DataConstants.CLIENT_SCOPE;
}
processAttributesUpdate(ctx, msg, attributes, scope);
}
private void processAttributesDeleteNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
List<String> keys = new ArrayList<>();
new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray().forEach(e -> keys.add(e.getAsString()));
String scope = msg.getMetaData().getValue("scope");
if (StringUtils.isEmpty(scope)) {
scope = DataConstants.CLIENT_SCOPE;
}
if (!keys.isEmpty()) {
EntityKeyType keyType = getKeyTypeFromScope(scope);
keys.forEach(key -> latestValues.removeValue(new EntityKey(keyType, key)));
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm));
alarmState.process(ctx, msg, latestValues);
}
}
ctx.tellSuccess(msg);
}
protected void processAttributesUpdateRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));
processAttributesUpdate(ctx, msg, attributes, DataConstants.CLIENT_SCOPE);
}
private void processAttributesUpdate(TbContext ctx, TbMsg msg, Set<AttributeKvEntry> attributes, String scope) throws ExecutionException, InterruptedException {
if (!attributes.isEmpty()) {
latestValues = merge(latestValues, attributes, scope);
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm));
alarmState.process(ctx, msg, latestValues);
}
}
ctx.tellSuccess(msg);
}
protected void processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg));
for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) {
Long ts = entry.getKey();
@ -110,6 +167,28 @@ class DeviceState {
return latestValues;
}
private DeviceDataSnapshot merge(DeviceDataSnapshot latestValues, Set<AttributeKvEntry> attributes, String scope) {
long ts = latestValues.getTs();
for (AttributeKvEntry entry : attributes) {
ts = Math.max(ts, entry.getLastUpdateTs());
latestValues.putValue(new EntityKey(getKeyTypeFromScope(scope), entry.getKey()), toEntityValue(entry));
}
latestValues.setTs(ts);
return latestValues;
}
private static EntityKeyType getKeyTypeFromScope(String scope) {
switch (scope) {
case DataConstants.CLIENT_SCOPE:
return EntityKeyType.CLIENT_ATTRIBUTE;
case DataConstants.SHARED_SCOPE:
return EntityKeyType.SHARED_ATTRIBUTE;
case DataConstants.SERVER_SCOPE:
return EntityKeyType.SERVER_ATTRIBUTE;
}
return EntityKeyType.ATTRIBUTE;
}
private DeviceDataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) throws ExecutionException, InterruptedException {
Set<EntityKey> entityKeysToFetch = deviceProfile.getEntityKeys();
DeviceDataSnapshot result = new DeviceDataSnapshot(entityKeysToFetch);
@ -224,5 +303,4 @@ class DeviceState {
public DeviceProfileId getProfileId() {
return deviceProfile.getProfileId();
}
}

91
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java

@ -23,16 +23,21 @@ import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Slf4j
@RuleNode(
@ -47,53 +52,77 @@ import java.util.concurrent.ExecutionException;
configDirective = "tbNodeEmptyConfig"
)
public class TbDeviceProfileNode implements TbNode {
private static final String PERIODIC_MSG_TYPE = "TbDeviceProfilePeriodicMsg";
private RuleEngineDeviceProfileCache cache;
private Map<DeviceId, DeviceState> deviceStates = new ConcurrentHashMap<>();
private final Map<DeviceId, DeviceState> deviceStates = new ConcurrentHashMap<>();
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
cache = ctx.getDeviceProfileCache();
scheduleAlarmHarvesting(ctx);
//TODO: check that I am in root rule chain.
// If Yes - Init for all device profiles that do not have default rule chain id in device profile.
// If No - find device profiles with this rule chain id.
}
/**
* TODO:
* 1. Duration in the alarm conditions;
* 3. Update of the Device attributes (client, server and shared);
* 4. Dynamic values evaluation;
* 2. Dynamic values evaluation;
*/
@Override
public void onMsg(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
EntityType originatorType = msg.getOriginator().getEntityType();
if (EntityType.DEVICE.equals(originatorType)) {
DeviceId deviceId = new DeviceId(msg.getOriginator().getId());
if (msg.getType().equals("ENTITY_UPDATED")) {
//TODO: handle if device profile id has changed.
} else {
DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId);
if (deviceState != null) {
deviceState.process(ctx, msg);
if (msg.getType().equals(PERIODIC_MSG_TYPE)) {
scheduleAlarmHarvesting(ctx);
harvestAlarms(ctx, System.currentTimeMillis());
} else {
if (EntityType.DEVICE.equals(originatorType)) {
DeviceId deviceId = new DeviceId(msg.getOriginator().getId());
if (msg.getType().equals(DataConstants.ENTITY_UPDATED)) {
invalidateDeviceProfileCache(deviceId, msg.getData());
} else if (msg.getType().equals(DataConstants.ENTITY_DELETED)) {
deviceStates.remove(deviceId);
} else {
ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!"));
DeviceState deviceState = getOrCreateDeviceState(ctx, deviceId);
if (deviceState != null) {
deviceState.process(ctx, msg);
} else {
ctx.tellFailure(msg, new IllegalStateException("Device profile for device [" + deviceId + "] not found!"));
}
}
}
} else if (EntityType.DEVICE_PROFILE.equals(originatorType)) {
if (msg.getType().equals("ENTITY_UPDATED")) {
DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class);
for (DeviceState state : deviceStates.values()) {
if (deviceProfile.getId().equals(state.getProfileId())) {
state.updateProfile(ctx, deviceProfile);
} else if (EntityType.DEVICE_PROFILE.equals(originatorType)) {
if (msg.getType().equals("ENTITY_UPDATED")) {
DeviceProfile deviceProfile = JacksonUtil.fromString(msg.getData(), DeviceProfile.class);
for (DeviceState state : deviceStates.values()) {
if (deviceProfile.getId().equals(state.getProfileId())) {
state.updateProfile(ctx, deviceProfile);
}
}
}
ctx.tellSuccess(msg);
} else {
ctx.tellSuccess(msg);
}
}
}
public void invalidateDeviceProfileCache(DeviceId deviceId, String deviceJson) {
DeviceState deviceState = deviceStates.get(deviceId);
if (deviceState != null) {
DeviceProfileId currentProfileId = deviceState.getProfileId();
Device device = JacksonUtil.fromString(deviceJson, Device.class);
if (!currentProfileId.equals(device.getDeviceProfileId())) {
deviceStates.remove(deviceId);
}
ctx.tellSuccess(msg);
} else {
ctx.tellSuccess(msg);
}
}
private DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId) {
@Override
public void destroy() {
deviceStates.clear();
}
protected DeviceState getOrCreateDeviceState(TbContext ctx, DeviceId deviceId) {
DeviceState deviceState = deviceStates.get(deviceId);
if (deviceState == null) {
DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId);
@ -105,9 +134,15 @@ public class TbDeviceProfileNode implements TbNode {
return deviceState;
}
@Override
public void destroy() {
protected void scheduleAlarmHarvesting(TbContext ctx) {
TbMsg periodicCheck = TbMsg.newMsg(PERIODIC_MSG_TYPE, ctx.getTenantId(), TbMsgMetaData.EMPTY, "{}");
ctx.tellSelf(periodicCheck, TimeUnit.MINUTES.toMillis(1));
}
protected void harvestAlarms(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
for (DeviceState state : deviceStates.values()) {
state.harvestAlarms(ctx, ts);
}
}
}

15
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java

@ -124,6 +124,7 @@ public class TbDeviceProfileNodeTest {
DeviceProfile deviceProfile = new DeviceProfile();
DeviceProfileData deviceProfileData = new DeviceProfileData();
KeyFilter highTempFilter = new KeyFilter();
highTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
highTempFilter.setValueType(EntityKeyValueType.NUMERIC);
@ -139,6 +140,20 @@ public class TbDeviceProfileNodeTest {
dpa.setId("highTemperatureAlarmID");
dpa.setAlarmType("highTemperatureAlarm");
dpa.setCreateRules(Collections.singletonMap(AlarmSeverity.CRITICAL, alarmRule));
KeyFilter lowTempFilter = new KeyFilter();
lowTempFilter.setKey(new EntityKey(EntityKeyType.TIME_SERIES, "temperature"));
lowTempFilter.setValueType(EntityKeyValueType.NUMERIC);
NumericFilterPredicate lowTemperaturePredicate = new NumericFilterPredicate();
lowTemperaturePredicate.setOperation(NumericFilterPredicate.NumericOperation.LESS);
lowTemperaturePredicate.setValue(new FilterPredicateValue<>(10.0));
lowTempFilter.setPredicate(lowTemperaturePredicate);
AlarmRule clearRule = new AlarmRule();
AlarmCondition clearCondition = new AlarmCondition();
clearCondition.setCondition(Collections.singletonList(lowTempFilter));
clearRule.setCondition(clearCondition);
dpa.setClearRule(clearRule);
deviceProfileData.setAlarms(Collections.singletonList(dpa));
deviceProfile.setProfileData(deviceProfileData);

Loading…
Cancel
Save