Browse Source

Rule Node States

pull/3551/head
Andrii Shvaika 6 years ago
parent
commit
080a5f1587
  1. 2
      application/src/main/data/json/tenant/device_profile/rule_chain_template.json
  2. 5
      application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java
  3. 29
      application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java
  4. 33
      common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateService.java
  5. 35
      common/data/src/main/java/org/thingsboard/server/common/data/id/RuleNodeStateId.java
  6. 42
      common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNodeState.java
  7. 9
      dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java
  8. 79
      dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeStateEntity.java
  9. 94
      dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleNodeStateService.java
  10. 34
      dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateDao.java
  11. 59
      dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeStateDao.java
  12. 36
      dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeStateRepository.java
  13. 11
      dao/src/main/resources/sql/schema-entities-hsql.sql
  14. 2
      dao/src/main/resources/sql/schema-entities-idx.sql
  15. 11
      dao/src/main/resources/sql/schema-entities.sql
  16. 1
      dao/src/test/resources/sql/hsql/drop-all-tables.sql
  17. 1
      dao/src/test/resources/sql/psql/drop-all-tables.sql
  18. 1
      dao/src/test/resources/sql/timescale/drop-all-tables.sql
  19. 9
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java
  20. 47
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java
  21. 51
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java
  22. 107
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java
  23. 22
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java
  24. 56
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeConfiguration.java
  25. 30
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmRuleState.java
  26. 29
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmState.java
  27. 27
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedDeviceState.java

2
application/src/main/data/json/tenant/device_profile/rule_chain_template.json

@ -94,7 +94,7 @@
"name": "Device Profile Node",
"debugMode": false,
"configuration": {
"version": 0
"persistAlarmRulesState": false
}
}
],

5
application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java

@ -58,6 +58,7 @@ import org.thingsboard.server.dao.event.EventService;
import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor;
import org.thingsboard.server.dao.relation.RelationService;
import org.thingsboard.server.dao.rule.RuleChainService;
import org.thingsboard.server.dao.rule.RuleNodeStateService;
import org.thingsboard.server.dao.tenant.TenantProfileService;
import org.thingsboard.server.dao.tenant.TenantService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
@ -158,6 +159,10 @@ public class ActorSystemContext {
@Getter
private RuleChainService ruleChainService;
@Autowired
@Getter
private RuleNodeStateService ruleNodeStateService;
@Autowired
private PartitionService partitionService;

29
application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java

@ -40,13 +40,15 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleChainId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNode;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
@ -68,7 +70,6 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
@ -430,6 +431,30 @@ class DefaultTbContext implements TbContext {
return mainCtx.getRedisTemplate();
}
@Override
public PageData<RuleNodeState> findRuleNodeStates(PageLink pageLink) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Fetch Rule Node States.", getTenantId(), getSelfId());
}
return mainCtx.getRuleNodeStateService().findByRuleNodeId(getTenantId(), getSelfId(), pageLink);
}
@Override
public RuleNodeState findRuleNodeStateForEntity(EntityId entityId) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] Fetch Rule Node State for entity.", getTenantId(), getSelfId(), entityId);
}
return mainCtx.getRuleNodeStateService().findByRuleNodeIdAndEntityId(getTenantId(), getSelfId(), entityId);
}
@Override
public RuleNodeState saveRuleNodeState(RuleNodeState state) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}][{}] Persist Rule Node State for entity: {}", getTenantId(), getSelfId(), state.getEntityId(), state.getStateData());
}
state.setRuleNodeId(getSelfId());
return mainCtx.getRuleNodeStateService().save(getTenantId(), state);
}
private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) {
TbMsgMetaData metaData = new TbMsgMetaData();

33
common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateService.java

@ -0,0 +1,33 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.rule;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNodeState;
public interface RuleNodeStateService {
PageData<RuleNodeState> findByRuleNodeId(TenantId tenantId, RuleNodeId ruleNodeId, PageLink pageLink);
RuleNodeState findByRuleNodeIdAndEntityId(TenantId tenantId, RuleNodeId ruleNodeId, EntityId entityId);
RuleNodeState save(TenantId tenantId, RuleNodeState ruleNodeState);
}

35
common/data/src/main/java/org/thingsboard/server/common/data/id/RuleNodeStateId.java

@ -0,0 +1,35 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.id;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.UUID;
public class RuleNodeStateId extends UUIDBased {
private static final long serialVersionUID = 1L;
@JsonCreator
public RuleNodeStateId(@JsonProperty("id") UUID id) {
super(id);
}
public static RuleNodeStateId fromString(String eventId) {
return new RuleNodeStateId(UUID.fromString(eventId));
}
}

42
common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNodeState.java

@ -0,0 +1,42 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.rule;
import lombok.Data;
import org.thingsboard.server.common.data.BaseData;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.RuleNodeStateId;
@Data
public class RuleNodeState extends BaseData<RuleNodeStateId> {
private RuleNodeId ruleNodeId;
private EntityId entityId;
private String stateData;
public RuleNodeState() {
super();
}
public RuleNodeState(RuleNodeStateId id) {
super(id);
}
public RuleNodeState(RuleNodeState event) {
super(event);
}
}

9
dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java

@ -380,6 +380,15 @@ public class ModelConstants {
public static final String RULE_NODE_NAME_PROPERTY = "name";
public static final String RULE_NODE_CONFIGURATION_PROPERTY = "configuration";
/**
* Rule node state constants.
*/
public static final String RULE_NODE_STATE_TABLE_NAME = "rule_node_state";
public static final String RULE_NODE_STATE_NODE_ID_PROPERTY = "rule_node_id";
public static final String RULE_NODE_STATE_ENTITY_TYPE_PROPERTY = "entity_type";
public static final String RULE_NODE_STATE_ENTITY_ID_PROPERTY = "entity_id";
public static final String RULE_NODE_STATE_DATA_PROPERTY = "state_data";
/**
* Cassandra attributes and timeseries constants.
*/

79
dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeStateEntity.java

@ -0,0 +1,79 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.model.sql;
import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.hibernate.annotations.Type;
import org.hibernate.annotations.TypeDef;
import org.thingsboard.server.common.data.id.EntityIdFactory;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.RuleNodeStateId;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.BaseSqlEntity;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.util.mapping.JsonStringType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Table;
import java.util.UUID;
@Data
@EqualsAndHashCode(callSuper = true)
@Entity
@TypeDef(name = "json", typeClass = JsonStringType.class)
@Table(name = ModelConstants.RULE_NODE_STATE_TABLE_NAME)
public class RuleNodeStateEntity extends BaseSqlEntity<RuleNodeState> {
@Column(name = ModelConstants.RULE_NODE_STATE_NODE_ID_PROPERTY)
private UUID ruleNodeId;
@Column(name = ModelConstants.RULE_NODE_STATE_ENTITY_TYPE_PROPERTY)
private String entityType;
@Column(name = ModelConstants.RULE_NODE_STATE_ENTITY_ID_PROPERTY)
private UUID entityId;
@Column(name = ModelConstants.RULE_NODE_STATE_DATA_PROPERTY)
private String stateData;
public RuleNodeStateEntity() {
}
public RuleNodeStateEntity(RuleNodeState ruleNodeState) {
if (ruleNodeState.getId() != null) {
this.setUuid(ruleNodeState.getUuidId());
}
this.setCreatedTime(ruleNodeState.getCreatedTime());
this.ruleNodeId = DaoUtil.getId(ruleNodeState.getRuleNodeId());
this.entityId = ruleNodeState.getEntityId().getId();
this.entityType = ruleNodeState.getEntityId().getEntityType().name();
this.stateData = ruleNodeState.getStateData();
}
@Override
public RuleNodeState toData() {
RuleNodeState ruleNode = new RuleNodeState(new RuleNodeStateId(this.getUuid()));
ruleNode.setCreatedTime(createdTime);
ruleNode.setRuleNodeId(new RuleNodeId(ruleNodeId));
ruleNode.setEntityId(EntityIdFactory.getByTypeAndUuid(entityType, entityId));
ruleNode.setStateData(stateData);
return ruleNode;
}
}

94
dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleNodeStateService.java

@ -0,0 +1,94 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.rule;
import lombok.extern.slf4j.Slf4j;
import org.hibernate.exception.ConstraintViolationException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.dao.entity.AbstractEntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
@Service
@Slf4j
public class BaseRuleNodeStateService extends AbstractEntityService implements RuleNodeStateService {
@Autowired
private RuleNodeStateDao ruleNodeStateDao;
@Override
public PageData<RuleNodeState> findByRuleNodeId(TenantId tenantId, RuleNodeId ruleNodeId, PageLink pageLink) {
if (tenantId == null) {
throw new DataValidationException("Tenant id should be specified!.");
}
if (ruleNodeId == null) {
throw new DataValidationException("RuleNode id should be specified!.");
}
return ruleNodeStateDao.findByRuleNodeId(ruleNodeId.getId(), pageLink);
}
@Override
public RuleNodeState findByRuleNodeIdAndEntityId(TenantId tenantId, RuleNodeId ruleNodeId, EntityId entityId) {
if (tenantId == null) {
throw new DataValidationException("Tenant id should be specified!.");
}
if (ruleNodeId == null) {
throw new DataValidationException("RuleNode id should be specified!.");
}
if (entityId == null) {
throw new DataValidationException("Entity id should be specified!.");
}
return ruleNodeStateDao.findByRuleNodeIdAndEntityId(ruleNodeId.getId(), entityId.getId());
}
@Override
public RuleNodeState save(TenantId tenantId, RuleNodeState ruleNodeState) {
if (tenantId == null) {
throw new DataValidationException("Tenant id should be specified!.");
}
return saveOrUpdate(tenantId, ruleNodeState, false);
}
public RuleNodeState saveOrUpdate(TenantId tenantId, RuleNodeState ruleNodeState, boolean update) {
try {
if (update) {
RuleNodeState old = ruleNodeStateDao.findByRuleNodeIdAndEntityId(ruleNodeState.getRuleNodeId().getId(), ruleNodeState.getEntityId().getId());
if (old != null && !old.getId().equals(ruleNodeState.getId())) {
ruleNodeState.setId(old.getId());
ruleNodeState.setCreatedTime(old.getCreatedTime());
}
}
return ruleNodeStateDao.save(tenantId, ruleNodeState);
} catch (Exception t) {
ConstraintViolationException e = extractConstraintViolationException(t).orElse(null);
if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("rule_node_state_unq_key")) {
if (!update) {
return saveOrUpdate(tenantId, ruleNodeState, true);
} else {
throw new DataValidationException("Rule node state for such rule node id and entity id already exists!");
}
} else {
throw t;
}
}
}
}

34
dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateDao.java

@ -0,0 +1,34 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.rule;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.dao.Dao;
import java.util.UUID;
/**
* Created by igor on 3/12/18.
*/
public interface RuleNodeStateDao extends Dao<RuleNodeState> {
PageData<RuleNodeState> findByRuleNodeId(UUID ruleNodeId, PageLink pageLink);
RuleNodeState findByRuleNodeIdAndEntityId(UUID ruleNodeId, UUID entityId);
}

59
dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeStateDao.java

@ -0,0 +1,59 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.rule;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.repository.CrudRepository;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sql.RuleNodeStateEntity;
import org.thingsboard.server.dao.rule.RuleNodeStateDao;
import org.thingsboard.server.dao.sql.JpaAbstractDao;
import java.util.UUID;
@Slf4j
@Component
public class JpaRuleNodeStateDao extends JpaAbstractDao<RuleNodeStateEntity, RuleNodeState> implements RuleNodeStateDao {
@Autowired
private RuleNodeStateRepository ruleNodeStateRepository;
@Override
protected Class getEntityClass() {
return RuleNodeStateEntity.class;
}
@Override
protected CrudRepository getCrudRepository() {
return ruleNodeStateRepository;
}
@Override
public PageData<RuleNodeState> findByRuleNodeId(UUID ruleNodeId, PageLink pageLink) {
return DaoUtil.toPageData(ruleNodeStateRepository.findByRuleNodeId(ruleNodeId, DaoUtil.toPageable(pageLink)));
}
@Override
public RuleNodeState findByRuleNodeIdAndEntityId(UUID ruleNodeId, UUID entityId) {
return DaoUtil.getData(ruleNodeStateRepository.findByRuleNodeIdAndEntityId(ruleNodeId, entityId));
}
}

36
dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeStateRepository.java

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.dao.sql.rule;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.PagingAndSortingRepository;
import org.springframework.data.repository.query.Param;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.dao.model.sql.EventEntity;
import org.thingsboard.server.dao.model.sql.RuleNodeStateEntity;
import java.util.UUID;
public interface RuleNodeStateRepository extends PagingAndSortingRepository<RuleNodeStateEntity, UUID> {
@Query("SELECT e FROM RuleNodeStateEntity e WHERE e.ruleNodeId = :ruleNodeId")
Page<RuleNodeStateEntity> findByRuleNodeId(@Param("ruleNodeId") UUID ruleNodeId, Pageable pageable);
@Query("SELECT e FROM RuleNodeStateEntity e WHERE e.ruleNodeId = :ruleNodeId and e.entityId = :entityId")
RuleNodeStateEntity findByRuleNodeIdAndEntityId(@Param("ruleNodeId") UUID ruleNodeId, @Param("entityId") UUID entityId);
}

11
dao/src/main/resources/sql/schema-entities-hsql.sql

@ -146,6 +146,17 @@ CREATE TABLE IF NOT EXISTS rule_node (
search_text varchar(255)
);
CREATE TABLE IF NOT EXISTS rule_node_state (
id uuid NOT NULL CONSTRAINT rule_node_state_pkey PRIMARY KEY,
created_time bigint NOT NULL,
rule_node_id uuid NOT NULL,
entity_type varchar(32) NOT NULL,
entity_id uuid NOT NULL,
state_data varchar(16384) NOT NULL,
CONSTRAINT rule_node_state_unq_key UNIQUE (rule_node_id, entity_id),
CONSTRAINT fk_rule_node_state_node_id FOREIGN KEY (rule_node_id) REFERENCES rule_node(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS device_profile (
id uuid NOT NULL CONSTRAINT device_profile_pkey PRIMARY KEY,
created_time bigint NOT NULL,

2
dao/src/main/resources/sql/schema-entities-idx.sql

@ -40,4 +40,4 @@ CREATE INDEX IF NOT EXISTS idx_asset_customer_id_and_type ON asset(tenant_id, cu
CREATE INDEX IF NOT EXISTS idx_asset_type ON asset(tenant_id, type);
CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc);
CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc);

11
dao/src/main/resources/sql/schema-entities.sql

@ -164,6 +164,17 @@ CREATE TABLE IF NOT EXISTS rule_node (
search_text varchar(255)
);
CREATE TABLE IF NOT EXISTS rule_node_state (
id uuid NOT NULL CONSTRAINT rule_node_state_pkey PRIMARY KEY,
created_time bigint NOT NULL,
rule_node_id uuid NOT NULL,
entity_type varchar(32) NOT NULL,
entity_id uuid NOT NULL,
state_data varchar(16384) NOT NULL,
CONSTRAINT rule_node_state_unq_key UNIQUE (rule_node_id, entity_id),
CONSTRAINT fk_rule_node_state_node_id FOREIGN KEY (rule_node_id) REFERENCES rule_node(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS device_profile (
id uuid NOT NULL CONSTRAINT device_profile_pkey PRIMARY KEY,
created_time bigint NOT NULL,

1
dao/src/test/resources/sql/hsql/drop-all-tables.sql

@ -21,6 +21,7 @@ DROP TABLE IF EXISTS widgets_bundle;
DROP TABLE IF EXISTS entity_view;
DROP TABLE IF EXISTS device_profile;
DROP TABLE IF EXISTS tenant_profile;
DROP TABLE IF EXISTS rule_node_state;
DROP TABLE IF EXISTS rule_node;
DROP TABLE IF EXISTS rule_chain;
DROP FUNCTION IF EXISTS to_uuid;

1
dao/src/test/resources/sql/psql/drop-all-tables.sql

@ -21,6 +21,7 @@ DROP TABLE IF EXISTS widgets_bundle;
DROP TABLE IF EXISTS entity_view;
DROP TABLE IF EXISTS device_profile;
DROP TABLE IF EXISTS tenant_profile;
DROP TABLE IF EXISTS rule_node_state;
DROP TABLE IF EXISTS rule_node;
DROP TABLE IF EXISTS rule_chain;
DROP TABLE IF EXISTS tb_schema_settings;

1
dao/src/test/resources/sql/timescale/drop-all-tables.sql

@ -18,6 +18,7 @@ DROP TABLE IF EXISTS ts_kv_dictionary;
DROP TABLE IF EXISTS user_credentials;
DROP TABLE IF EXISTS widget_type;
DROP TABLE IF EXISTS widgets_bundle;
DROP TABLE IF EXISTS rule_node_state;
DROP TABLE IF EXISTS rule_node;
DROP TABLE IF EXISTS rule_chain;
DROP TABLE IF EXISTS entity_view;

9
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java

@ -25,9 +25,11 @@ import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.asset.AssetService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.cassandra.CassandraCluster;
@ -214,4 +216,9 @@ public interface TbContext {
@Deprecated
RedisTemplate<String, Object> getRedisTemplate();
PageData<RuleNodeState> findRuleNodeStates(PageLink pageLink);
RuleNodeState findRuleNodeStateForEntity(EntityId entityId);
RuleNodeState saveRuleNodeState(RuleNodeState state);
}

47
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java

@ -16,6 +16,7 @@
package org.thingsboard.rule.engine.profile;
import lombok.Data;
import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.device.profile.AlarmCondition;
import org.thingsboard.server.common.data.device.profile.AlarmRule;
@ -32,12 +33,17 @@ public class AlarmRuleState {
private final AlarmSeverity severity;
private final AlarmRule alarmRule;
private final long requiredDurationInMs;
private long lastEventTs;
private long duration;
private PersistedAlarmRuleState state;
private boolean updateFlag;
public AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule) {
public AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, PersistedAlarmRuleState state) {
this.severity = severity;
this.alarmRule = alarmRule;
if (state != null) {
this.state = state;
} else {
this.state = new PersistedAlarmRuleState(0L, 0L);
}
if (alarmRule.getCondition().getDurationValue() > 0) {
requiredDurationInMs = alarmRule.getCondition().getDurationUnit().toMillis(alarmRule.getCondition().getDurationValue());
} else {
@ -45,23 +51,35 @@ public class AlarmRuleState {
}
}
public boolean checkUpdate() {
if (updateFlag) {
updateFlag = false;
return true;
} else {
return false;
}
}
public boolean eval(DeviceDataSnapshot data) {
if (requiredDurationInMs > 0) {
boolean eval = eval(alarmRule.getCondition(), data);
if (eval) {
if (lastEventTs > 0) {
if (data.getTs() > lastEventTs) {
duration += data.getTs() - lastEventTs;
lastEventTs = data.getTs();
if (state.getLastEventTs() > 0) {
if (data.getTs() > state.getLastEventTs()) {
state.setDuration(state.getDuration() + (data.getTs() - state.getLastEventTs()));
state.setLastEventTs(data.getTs());
updateFlag = true;
}
} else {
lastEventTs = data.getTs();
duration = 0;
state.setLastEventTs(data.getTs());
state.setDuration(0L);
updateFlag = true;
}
return duration > requiredDurationInMs;
return state.getDuration() > requiredDurationInMs;
} else {
lastEventTs = 0;
duration = 0;
state.setLastEventTs(0L);
state.setDuration(0L);
updateFlag = true;
return false;
}
} else {
@ -70,8 +88,8 @@ public class AlarmRuleState {
}
public boolean eval(long ts) {
if (requiredDurationInMs > 0 && lastEventTs > 0 && ts > lastEventTs) {
duration += ts - lastEventTs;
if (requiredDurationInMs > 0 && state.getLastEventTs() > 0 && ts > state.getLastEventTs()) {
long duration = state.getDuration() + (ts - state.getLastEventTs());
return duration > requiredDurationInMs;
} else {
return false;
@ -87,7 +105,6 @@ public class AlarmRuleState {
}
eval = eval && eval(value, keyFilter.getPredicate());
}
//TODO: use condition duration;
return eval;
}

51
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java

@ -19,6 +19,8 @@ import com.fasterxml.jackson.databind.JsonNode;
import lombok.Data;
import org.thingsboard.rule.engine.action.TbAlarmResult;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState;
import org.thingsboard.rule.engine.profile.state.PersistedAlarmState;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
@ -27,6 +29,7 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.common.msg.queue.ServiceQueue;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import java.util.ArrayList;
@ -47,40 +50,47 @@ class DeviceProfileAlarmState {
private volatile TbMsgMetaData lastMsgMetaData;
private volatile String lastMsgQueueName;
public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition) {
public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition, PersistedAlarmState alarmState) {
this.originator = originator;
this.updateState(alarmDefinition);
this.updateState(alarmDefinition, alarmState);
}
public void process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException {
public boolean process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException {
initCurrentAlarm(ctx);
lastMsgMetaData = msg.getMetaData();
lastMsgQueueName = msg.getQueueName();
createOrClearAlarms(ctx, data, AlarmRuleState::eval);
return createOrClearAlarms(ctx, data, AlarmRuleState::eval);
}
public void process(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
initCurrentAlarm(ctx);
createOrClearAlarms(ctx, ts, AlarmRuleState::eval);
return createOrClearAlarms(ctx, ts, AlarmRuleState::eval);
}
public <T> void createOrClearAlarms(TbContext ctx, T data, BiFunction<AlarmRuleState, T, Boolean> evalFunction) {
public <T> boolean createOrClearAlarms(TbContext ctx, T data, BiFunction<AlarmRuleState, T, Boolean> evalFunction) {
boolean stateUpdate = false;
AlarmSeverity resultSeverity = null;
for (AlarmRuleState state : createRulesSortedBySeverityDesc) {
if (evalFunction.apply(state, data)) {
boolean evalResult = evalFunction.apply(state, data);
stateUpdate |= state.checkUpdate();
if (evalResult) {
resultSeverity = state.getSeverity();
break;
}
}
if (resultSeverity != null) {
pushMsg(ctx, calculateAlarmResult(ctx, resultSeverity));
} else if (currentAlarm != null) {
if (evalFunction.apply(clearState, data)) {
} else if (currentAlarm != null && clearState != null) {
Boolean evalResult = evalFunction.apply(clearState, data);
if (evalResult) {
stateUpdate |= clearState.checkUpdate();
ctx.getAlarmService().clearAlarm(ctx.getTenantId(), currentAlarm.getId(), JacksonUtil.OBJECT_MAPPER.createObjectNode(), System.currentTimeMillis());
pushMsg(ctx, new TbAlarmResult(false, false, true, currentAlarm));
currentAlarm = null;
}
}
return stateUpdate;
}
public void initCurrentAlarm(TbContext ctx) throws InterruptedException, ExecutionException {
@ -96,7 +106,7 @@ class DeviceProfileAlarmState {
public void pushMsg(TbContext ctx, TbAlarmResult alarmResult) {
JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.getAlarm());
String data = jsonNodes.toString();
TbMsgMetaData metaData = lastMsgMetaData.copy();
TbMsgMetaData metaData = lastMsgMetaData != null ? lastMsgMetaData.copy() : new TbMsgMetaData();
String relationType;
if (alarmResult.isCreated()) {
relationType = "Alarm Created";
@ -112,18 +122,29 @@ class DeviceProfileAlarmState {
relationType = "Alarm Cleared";
metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
}
TbMsg newMsg = ctx.newMsg(lastMsgQueueName, "ALARM", originator, metaData, data);
TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM", originator, metaData, data);
ctx.tellNext(newMsg, relationType);
}
public void updateState(DeviceProfileAlarm alarm) {
public void updateState(DeviceProfileAlarm alarm, PersistedAlarmState alarmState) {
this.alarmDefinition = alarm;
this.createRulesSortedBySeverityDesc = new ArrayList<>();
alarmDefinition.getCreateRules().forEach((severity, rule) -> {
createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule));
PersistedAlarmRuleState ruleState = null;
if (alarmState != null) {
ruleState = alarmState.getCreateRuleStates().get(severity);
if (ruleState == null) {
ruleState = new PersistedAlarmRuleState();
alarmState.getCreateRuleStates().put(severity, ruleState);
}
}
createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule, ruleState));
});
createRulesSortedBySeverityDesc.sort(Comparator.comparingInt(state -> state.getSeverity().ordinal()));
clearState = new AlarmRuleState(null, alarmDefinition.getClearRule());
PersistedAlarmRuleState ruleState = alarmState == null ? null : alarmState.getClearRuleState();
if (alarmDefinition.getClearRule() != null) {
clearState = new AlarmRuleState(null, alarmDefinition.getClearRule(), ruleState);
}
}
private TbAlarmResult calculateAlarmResult(TbContext ctx, AlarmSeverity severity) {

107
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java

@ -18,6 +18,8 @@ package org.thingsboard.rule.engine.profile;
import com.google.gson.JsonParser;
import org.springframework.util.StringUtils;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.profile.state.PersistedAlarmState;
import org.thingsboard.rule.engine.profile.state.PersistedDeviceState;
import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
@ -26,17 +28,21 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.RuleNodeStateId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.session.SessionMsgType;
import org.thingsboard.server.common.transport.adaptor.JsonConverter;
import org.thingsboard.server.dao.sql.query.EntityKeyMapping;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@ -48,14 +54,36 @@ import java.util.stream.Collectors;
class DeviceState {
private final boolean persistState;
private final DeviceId deviceId;
private RuleNodeState state;
private DeviceProfileState deviceProfile;
private PersistedDeviceState pds;
private DeviceDataSnapshot latestValues;
private final ConcurrentMap<String, DeviceProfileAlarmState> alarmStates = new ConcurrentHashMap<>();
public DeviceState(DeviceId deviceId, DeviceProfileState deviceProfile) {
public DeviceState(TbContext ctx, TbDeviceProfileNodeConfiguration config, DeviceId deviceId, DeviceProfileState deviceProfile) {
this.persistState = config.isPersistAlarmRulesState();
this.deviceId = deviceId;
this.deviceProfile = deviceProfile;
if (config.isPersistAlarmRulesState()) {
state = ctx.findRuleNodeStateForEntity(deviceId);
if (state != null) {
pds = JacksonUtil.fromString(state.getStateData(), PersistedDeviceState.class);
} else {
state = new RuleNodeState();
state.setRuleNodeId(ctx.getSelfId());
state.setEntityId(deviceId);
pds = new PersistedDeviceState();
pds.setAlarmStates(new HashMap<>());
}
}
if (pds != null) {
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
alarmStates.computeIfAbsent(alarm.getId(),
a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
}
}
}
public void updateProfile(TbContext ctx, DeviceProfile deviceProfile) throws ExecutionException, InterruptedException {
@ -72,9 +100,9 @@ class DeviceState {
alarmStates.keySet().removeIf(id -> !newAlarmStateIds.contains(id));
for (DeviceProfileAlarm alarm : this.deviceProfile.getAlarmSettings()) {
if (alarmStates.containsKey(alarm.getId())) {
alarmStates.get(alarm.getId()).updateState(alarm);
alarmStates.get(alarm.getId()).updateState(alarm, getOrInitPersistedAlarmState(alarm));
} else {
alarmStates.putIfAbsent(alarm.getId(), new DeviceProfileAlarmState(deviceId, alarm));
alarmStates.putIfAbsent(alarm.getId(), new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
}
}
}
@ -89,29 +117,35 @@ class DeviceState {
if (latestValues == null) {
latestValues = fetchLatestValues(ctx, deviceId);
}
boolean stateChanged = false;
if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
processTelemetry(ctx, msg);
stateChanged = processTelemetry(ctx, msg);
} else if (msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) {
processAttributesUpdateRequest(ctx, msg);
stateChanged = processAttributesUpdateRequest(ctx, msg);
} else if (msg.getType().equals(DataConstants.ATTRIBUTES_UPDATED)) {
processAttributesUpdateNotification(ctx, msg);
stateChanged = processAttributesUpdateNotification(ctx, msg);
} else if (msg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) {
processAttributesDeleteNotification(ctx, msg);
stateChanged = processAttributesDeleteNotification(ctx, msg);
} else {
ctx.tellSuccess(msg);
}
if (persistState && stateChanged) {
state.setStateData(JacksonUtil.toString(pds));
state = ctx.saveRuleNodeState(state);
}
}
private void processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
private boolean processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));
String scope = msg.getMetaData().getValue("scope");
if (StringUtils.isEmpty(scope)) {
scope = DataConstants.CLIENT_SCOPE;
}
processAttributesUpdate(ctx, msg, attributes, scope);
return processAttributesUpdate(ctx, msg, attributes, scope);
}
private void processAttributesDeleteNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
private boolean processAttributesDeleteNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
boolean stateChanged = false;
List<String> keys = new ArrayList<>();
new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray().forEach(e -> keys.add(e.getAsString()));
String scope = msg.getMetaData().getValue("scope");
@ -122,59 +156,65 @@ class DeviceState {
EntityKeyType keyType = getKeyTypeFromScope(scope);
keys.forEach(key -> latestValues.removeValue(new EntityKey(keyType, key)));
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm));
alarmState.process(ctx, msg, latestValues);
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
stateChanged |= alarmState.process(ctx, msg, latestValues);
}
}
ctx.tellSuccess(msg);
return stateChanged;
}
protected void processAttributesUpdateRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
protected boolean processAttributesUpdateRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
Set<AttributeKvEntry> attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData()));
processAttributesUpdate(ctx, msg, attributes, DataConstants.CLIENT_SCOPE);
return processAttributesUpdate(ctx, msg, attributes, DataConstants.CLIENT_SCOPE);
}
private void processAttributesUpdate(TbContext ctx, TbMsg msg, Set<AttributeKvEntry> attributes, String scope) throws ExecutionException, InterruptedException {
private boolean processAttributesUpdate(TbContext ctx, TbMsg msg, Set<AttributeKvEntry> attributes, String scope) throws ExecutionException, InterruptedException {
boolean stateChanged = false;
if (!attributes.isEmpty()) {
latestValues = merge(latestValues, attributes, scope);
merge(latestValues, attributes, scope);
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm));
alarmState.process(ctx, msg, latestValues);
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
stateChanged |= alarmState.process(ctx, msg, latestValues);
}
}
ctx.tellSuccess(msg);
return stateChanged;
}
protected void processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
protected boolean processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException {
boolean stateChanged = false;
Map<Long, List<KvEntry>> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg));
for (Map.Entry<Long, List<KvEntry>> entry : tsKvMap.entrySet()) {
Long ts = entry.getKey();
List<KvEntry> data = entry.getValue();
latestValues = merge(latestValues, ts, data);
merge(latestValues, ts, data);
for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) {
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm));
alarmState.process(ctx, msg, latestValues);
DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(),
a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm)));
stateChanged |= alarmState.process(ctx, msg, latestValues);
}
}
ctx.tellSuccess(msg);
return stateChanged;
}
private DeviceDataSnapshot merge(DeviceDataSnapshot latestValues, Long ts, List<KvEntry> data) {
private void merge(DeviceDataSnapshot latestValues, Long ts, List<KvEntry> data) {
latestValues.setTs(ts);
for (KvEntry entry : data) {
latestValues.putValue(new EntityKey(EntityKeyType.TIME_SERIES, entry.getKey()), toEntityValue(entry));
}
return latestValues;
}
private DeviceDataSnapshot merge(DeviceDataSnapshot latestValues, Set<AttributeKvEntry> attributes, String scope) {
private void merge(DeviceDataSnapshot latestValues, Set<AttributeKvEntry> attributes, String scope) {
long ts = latestValues.getTs();
for (AttributeKvEntry entry : attributes) {
ts = Math.max(ts, entry.getLastUpdateTs());
latestValues.putValue(new EntityKey(getKeyTypeFromScope(scope), entry.getKey()), toEntityValue(entry));
}
latestValues.setTs(ts);
return latestValues;
}
private static EntityKeyType getKeyTypeFromScope(String scope) {
@ -303,4 +343,19 @@ class DeviceState {
public DeviceProfileId getProfileId() {
return deviceProfile.getProfileId();
}
private PersistedAlarmState getOrInitPersistedAlarmState(DeviceProfileAlarm alarm) {
if (pds != null) {
PersistedAlarmState alarmState = pds.getAlarmStates().get(alarm.getId());
if (alarmState == null) {
alarmState = new PersistedAlarmState();
alarmState.setCreateRuleStates(new HashMap<>());
pds.getAlarmStates().put(alarm.getId(), alarmState);
}
return alarmState;
} else {
return null;
}
}
}

22
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java

@ -16,13 +16,14 @@
package org.thingsboard.rule.engine.profile;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.rule.engine.api.util.TbNodeUtils;
import org.thingsboard.rule.engine.profile.state.PersistedDeviceState;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
@ -30,11 +31,14 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.data.rule.RuleNodeState;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@ -45,25 +49,23 @@ import java.util.concurrent.TimeUnit;
name = "device profile",
customRelations = true,
relationTypes = {"Alarm Created", "Alarm Updated", "Alarm Severity Updated", "Alarm Cleared", "Success", "Failure"},
configClazz = EmptyNodeConfiguration.class,
configClazz = TbDeviceProfileNodeConfiguration.class,
nodeDescription = "Process device messages based on device profile settings",
nodeDetails = "Create and clear alarms based on alarm rules defined in device profile. Generates ",
uiResources = {"static/rulenode/rulenode-core-config.js"},
configDirective = "tbNodeEmptyConfig"
nodeDetails = "Create and clear alarms based on alarm rules defined in device profile. Generates "
)
public class TbDeviceProfileNode implements TbNode {
private static final String PERIODIC_MSG_TYPE = "TbDeviceProfilePeriodicMsg";
private TbDeviceProfileNodeConfiguration config;
private RuleEngineDeviceProfileCache cache;
private final Map<DeviceId, DeviceState> deviceStates = new ConcurrentHashMap<>();
@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
cache = ctx.getDeviceProfileCache();
this.config = TbNodeUtils.convert(configuration, TbDeviceProfileNodeConfiguration.class);
this.cache = ctx.getDeviceProfileCache();
scheduleAlarmHarvesting(ctx);
//TODO: check that I am in root rule chain.
// If Yes - Init for all device profiles that do not have default rule chain id in device profile.
// If No - find device profiles with this rule chain id.
//TODO: launch a process of fetching the alarm rule states from the database;
}
/**
@ -127,7 +129,7 @@ public class TbDeviceProfileNode implements TbNode {
if (deviceState == null) {
DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId);
if (deviceProfile != null) {
deviceState = new DeviceState(deviceId, new DeviceProfileState(deviceProfile));
deviceState = new DeviceState(ctx, config, deviceId, new DeviceProfileState(deviceProfile));
deviceStates.put(deviceId, deviceState);
}
}

56
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeConfiguration.java

@ -0,0 +1,56 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.profile;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.thingsboard.rule.engine.api.EmptyNodeConfiguration;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
import org.thingsboard.rule.engine.api.TbNode;
import org.thingsboard.rule.engine.api.TbNodeConfiguration;
import org.thingsboard.rule.engine.api.TbNodeException;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;
import org.thingsboard.server.common.msg.TbMsgMetaData;
import org.thingsboard.server.dao.util.mapping.JacksonUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class TbDeviceProfileNodeConfiguration implements NodeConfiguration<TbDeviceProfileNodeConfiguration> {
private boolean persistAlarmRulesState;
private boolean fetchAlarmRulesStateOnStart;
@Override
public TbDeviceProfileNodeConfiguration defaultConfiguration() {
return new TbDeviceProfileNodeConfiguration();
}
}

30
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmRuleState.java

@ -0,0 +1,30 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.profile.state;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PersistedAlarmRuleState {
private long lastEventTs;
private long duration;
}

29
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmState.java

@ -0,0 +1,29 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.profile.state;
import lombok.Data;
import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import java.util.Map;
@Data
public class PersistedAlarmState {
private Map<AlarmSeverity, PersistedAlarmRuleState> createRuleStates;
private PersistedAlarmRuleState clearRuleState;
}

27
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedDeviceState.java

@ -0,0 +1,27 @@
/**
* Copyright © 2016-2020 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.rule.engine.profile.state;
import lombok.Data;
import java.util.Map;
@Data
public class PersistedDeviceState {
Map<String, PersistedAlarmState> alarmStates;
}
Loading…
Cancel
Save