diff --git a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json index a17e1cc6f0..3d076ff812 100644 --- a/application/src/main/data/json/tenant/device_profile/rule_chain_template.json +++ b/application/src/main/data/json/tenant/device_profile/rule_chain_template.json @@ -94,7 +94,7 @@ "name": "Device Profile Node", "debugMode": false, "configuration": { - "version": 0 + "persistAlarmRulesState": false } } ], diff --git a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java index c45e9f8406..8f6a290af4 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ActorSystemContext.java @@ -58,6 +58,7 @@ import org.thingsboard.server.dao.event.EventService; import org.thingsboard.server.dao.nosql.CassandraBufferedRateExecutor; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; +import org.thingsboard.server.dao.rule.RuleNodeStateService; import org.thingsboard.server.dao.tenant.TenantProfileService; import org.thingsboard.server.dao.tenant.TenantService; import org.thingsboard.server.dao.timeseries.TimeseriesService; @@ -158,6 +159,10 @@ public class ActorSystemContext { @Getter private RuleChainService ruleChainService; + @Autowired + @Getter + private RuleNodeStateService ruleNodeStateService; + @Autowired private PartitionService partitionService; diff --git a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java index cdbf322bf2..a51f500f82 100644 --- a/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java +++ b/application/src/main/java/org/thingsboard/server/actors/ruleChain/DefaultTbContext.java @@ -40,13 +40,15 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.rule.RuleNode; +import org.thingsboard.server.common.data.rule.RuleNodeState; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; -import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.cassandra.CassandraCluster; @@ -68,7 +70,6 @@ import org.thingsboard.server.service.script.RuleNodeJsScriptEngine; import java.util.Collections; import java.util.Set; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -430,6 +431,30 @@ class DefaultTbContext implements TbContext { return mainCtx.getRedisTemplate(); } + @Override + public PageData findRuleNodeStates(PageLink pageLink) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Fetch Rule Node States.", getTenantId(), getSelfId()); + } + return mainCtx.getRuleNodeStateService().findByRuleNodeId(getTenantId(), getSelfId(), pageLink); + } + + @Override + public RuleNodeState findRuleNodeStateForEntity(EntityId entityId) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}][{}] Fetch Rule Node State for entity.", getTenantId(), getSelfId(), entityId); + } + return mainCtx.getRuleNodeStateService().findByRuleNodeIdAndEntityId(getTenantId(), getSelfId(), entityId); + } + + @Override + public RuleNodeState saveRuleNodeState(RuleNodeState state) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}][{}] Persist Rule Node State for entity: {}", getTenantId(), getSelfId(), state.getEntityId(), state.getStateData()); + } + state.setRuleNodeId(getSelfId()); + return mainCtx.getRuleNodeStateService().save(getTenantId(), state); + } private TbMsgMetaData getActionMetaData(RuleNodeId ruleNodeId) { TbMsgMetaData metaData = new TbMsgMetaData(); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateService.java new file mode 100644 index 0000000000..07138a1a11 --- /dev/null +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateService.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.rule; + +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rule.RuleNodeState; + +public interface RuleNodeStateService { + + PageData findByRuleNodeId(TenantId tenantId, RuleNodeId ruleNodeId, PageLink pageLink); + + RuleNodeState findByRuleNodeIdAndEntityId(TenantId tenantId, RuleNodeId ruleNodeId, EntityId entityId); + + RuleNodeState save(TenantId tenantId, RuleNodeState ruleNodeState); + +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/id/RuleNodeStateId.java b/common/data/src/main/java/org/thingsboard/server/common/data/id/RuleNodeStateId.java new file mode 100644 index 0000000000..7bdf411353 --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/id/RuleNodeStateId.java @@ -0,0 +1,35 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.id; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.UUID; + +public class RuleNodeStateId extends UUIDBased { + + private static final long serialVersionUID = 1L; + + @JsonCreator + public RuleNodeStateId(@JsonProperty("id") UUID id) { + super(id); + } + + public static RuleNodeStateId fromString(String eventId) { + return new RuleNodeStateId(UUID.fromString(eventId)); + } +} diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNodeState.java b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNodeState.java new file mode 100644 index 0000000000..a3432760be --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/rule/RuleNodeState.java @@ -0,0 +1,42 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.rule; + +import lombok.Data; +import org.thingsboard.server.common.data.BaseData; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.RuleNodeStateId; + +@Data +public class RuleNodeState extends BaseData { + + private RuleNodeId ruleNodeId; + private EntityId entityId; + private String stateData; + + public RuleNodeState() { + super(); + } + + public RuleNodeState(RuleNodeStateId id) { + super(id); + } + + public RuleNodeState(RuleNodeState event) { + super(event); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java index b62dac2d44..fb8a0194b6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/ModelConstants.java @@ -380,6 +380,15 @@ public class ModelConstants { public static final String RULE_NODE_NAME_PROPERTY = "name"; public static final String RULE_NODE_CONFIGURATION_PROPERTY = "configuration"; + /** + * Rule node state constants. + */ + public static final String RULE_NODE_STATE_TABLE_NAME = "rule_node_state"; + public static final String RULE_NODE_STATE_NODE_ID_PROPERTY = "rule_node_id"; + public static final String RULE_NODE_STATE_ENTITY_TYPE_PROPERTY = "entity_type"; + public static final String RULE_NODE_STATE_ENTITY_ID_PROPERTY = "entity_id"; + public static final String RULE_NODE_STATE_DATA_PROPERTY = "state_data"; + /** * Cassandra attributes and timeseries constants. */ diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeStateEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeStateEntity.java new file mode 100644 index 0000000000..a416034fab --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sql/RuleNodeStateEntity.java @@ -0,0 +1,79 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.model.sql; + +import com.fasterxml.jackson.databind.JsonNode; +import lombok.Data; +import lombok.EqualsAndHashCode; +import org.hibernate.annotations.Type; +import org.hibernate.annotations.TypeDef; +import org.thingsboard.server.common.data.id.EntityIdFactory; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.RuleNodeStateId; +import org.thingsboard.server.common.data.rule.RuleNodeState; +import org.thingsboard.server.dao.DaoUtil; +import org.thingsboard.server.dao.model.BaseSqlEntity; +import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.util.mapping.JsonStringType; + +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Table; +import java.util.UUID; + +@Data +@EqualsAndHashCode(callSuper = true) +@Entity +@TypeDef(name = "json", typeClass = JsonStringType.class) +@Table(name = ModelConstants.RULE_NODE_STATE_TABLE_NAME) +public class RuleNodeStateEntity extends BaseSqlEntity { + + @Column(name = ModelConstants.RULE_NODE_STATE_NODE_ID_PROPERTY) + private UUID ruleNodeId; + + @Column(name = ModelConstants.RULE_NODE_STATE_ENTITY_TYPE_PROPERTY) + private String entityType; + + @Column(name = ModelConstants.RULE_NODE_STATE_ENTITY_ID_PROPERTY) + private UUID entityId; + + @Column(name = ModelConstants.RULE_NODE_STATE_DATA_PROPERTY) + private String stateData; + + public RuleNodeStateEntity() { + } + + public RuleNodeStateEntity(RuleNodeState ruleNodeState) { + if (ruleNodeState.getId() != null) { + this.setUuid(ruleNodeState.getUuidId()); + } + this.setCreatedTime(ruleNodeState.getCreatedTime()); + this.ruleNodeId = DaoUtil.getId(ruleNodeState.getRuleNodeId()); + this.entityId = ruleNodeState.getEntityId().getId(); + this.entityType = ruleNodeState.getEntityId().getEntityType().name(); + this.stateData = ruleNodeState.getStateData(); + } + + @Override + public RuleNodeState toData() { + RuleNodeState ruleNode = new RuleNodeState(new RuleNodeStateId(this.getUuid())); + ruleNode.setCreatedTime(createdTime); + ruleNode.setRuleNodeId(new RuleNodeId(ruleNodeId)); + ruleNode.setEntityId(EntityIdFactory.getByTypeAndUuid(entityType, entityId)); + ruleNode.setStateData(stateData); + return ruleNode; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleNodeStateService.java b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleNodeStateService.java new file mode 100644 index 0000000000..a0b83f0333 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleNodeStateService.java @@ -0,0 +1,94 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.rule; + +import lombok.extern.slf4j.Slf4j; +import org.hibernate.exception.ConstraintViolationException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rule.RuleNodeState; +import org.thingsboard.server.dao.entity.AbstractEntityService; +import org.thingsboard.server.dao.exception.DataValidationException; + +@Service +@Slf4j +public class BaseRuleNodeStateService extends AbstractEntityService implements RuleNodeStateService { + + @Autowired + private RuleNodeStateDao ruleNodeStateDao; + + @Override + public PageData findByRuleNodeId(TenantId tenantId, RuleNodeId ruleNodeId, PageLink pageLink) { + if (tenantId == null) { + throw new DataValidationException("Tenant id should be specified!."); + } + if (ruleNodeId == null) { + throw new DataValidationException("RuleNode id should be specified!."); + } + return ruleNodeStateDao.findByRuleNodeId(ruleNodeId.getId(), pageLink); + } + + @Override + public RuleNodeState findByRuleNodeIdAndEntityId(TenantId tenantId, RuleNodeId ruleNodeId, EntityId entityId) { + if (tenantId == null) { + throw new DataValidationException("Tenant id should be specified!."); + } + if (ruleNodeId == null) { + throw new DataValidationException("RuleNode id should be specified!."); + } + if (entityId == null) { + throw new DataValidationException("Entity id should be specified!."); + } + return ruleNodeStateDao.findByRuleNodeIdAndEntityId(ruleNodeId.getId(), entityId.getId()); + } + + @Override + public RuleNodeState save(TenantId tenantId, RuleNodeState ruleNodeState) { + if (tenantId == null) { + throw new DataValidationException("Tenant id should be specified!."); + } + return saveOrUpdate(tenantId, ruleNodeState, false); + } + + public RuleNodeState saveOrUpdate(TenantId tenantId, RuleNodeState ruleNodeState, boolean update) { + try { + if (update) { + RuleNodeState old = ruleNodeStateDao.findByRuleNodeIdAndEntityId(ruleNodeState.getRuleNodeId().getId(), ruleNodeState.getEntityId().getId()); + if (old != null && !old.getId().equals(ruleNodeState.getId())) { + ruleNodeState.setId(old.getId()); + ruleNodeState.setCreatedTime(old.getCreatedTime()); + } + } + return ruleNodeStateDao.save(tenantId, ruleNodeState); + } catch (Exception t) { + ConstraintViolationException e = extractConstraintViolationException(t).orElse(null); + if (e != null && e.getConstraintName() != null && e.getConstraintName().equalsIgnoreCase("rule_node_state_unq_key")) { + if (!update) { + return saveOrUpdate(tenantId, ruleNodeState, true); + } else { + throw new DataValidationException("Rule node state for such rule node id and entity id already exists!"); + } + } else { + throw t; + } + } + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateDao.java b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateDao.java new file mode 100644 index 0000000000..b12c448aa5 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeStateDao.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.rule; + +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rule.RuleNodeState; +import org.thingsboard.server.dao.Dao; + +import java.util.UUID; + +/** + * Created by igor on 3/12/18. + */ +public interface RuleNodeStateDao extends Dao { + + PageData findByRuleNodeId(UUID ruleNodeId, PageLink pageLink); + + RuleNodeState findByRuleNodeIdAndEntityId(UUID ruleNodeId, UUID entityId); +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeStateDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeStateDao.java new file mode 100644 index 0000000000..51e487b456 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeStateDao.java @@ -0,0 +1,59 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.rule; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.repository.CrudRepository; +import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rule.RuleNodeState; +import org.thingsboard.server.dao.DaoUtil; +import org.thingsboard.server.dao.model.sql.RuleNodeStateEntity; +import org.thingsboard.server.dao.rule.RuleNodeStateDao; +import org.thingsboard.server.dao.sql.JpaAbstractDao; + +import java.util.UUID; + +@Slf4j +@Component +public class JpaRuleNodeStateDao extends JpaAbstractDao implements RuleNodeStateDao { + + @Autowired + private RuleNodeStateRepository ruleNodeStateRepository; + + @Override + protected Class getEntityClass() { + return RuleNodeStateEntity.class; + } + + @Override + protected CrudRepository getCrudRepository() { + return ruleNodeStateRepository; + } + + @Override + public PageData findByRuleNodeId(UUID ruleNodeId, PageLink pageLink) { + return DaoUtil.toPageData(ruleNodeStateRepository.findByRuleNodeId(ruleNodeId, DaoUtil.toPageable(pageLink))); + } + + @Override + public RuleNodeState findByRuleNodeIdAndEntityId(UUID ruleNodeId, UUID entityId) { + return DaoUtil.getData(ruleNodeStateRepository.findByRuleNodeIdAndEntityId(ruleNodeId, entityId)); + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeStateRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeStateRepository.java new file mode 100644 index 0000000000..403dcd1377 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeStateRepository.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.rule; + +import org.springframework.data.domain.Page; +import org.springframework.data.domain.Pageable; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.PagingAndSortingRepository; +import org.springframework.data.repository.query.Param; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.dao.model.sql.EventEntity; +import org.thingsboard.server.dao.model.sql.RuleNodeStateEntity; + +import java.util.UUID; + +public interface RuleNodeStateRepository extends PagingAndSortingRepository { + + @Query("SELECT e FROM RuleNodeStateEntity e WHERE e.ruleNodeId = :ruleNodeId") + Page findByRuleNodeId(@Param("ruleNodeId") UUID ruleNodeId, Pageable pageable); + + @Query("SELECT e FROM RuleNodeStateEntity e WHERE e.ruleNodeId = :ruleNodeId and e.entityId = :entityId") + RuleNodeStateEntity findByRuleNodeIdAndEntityId(@Param("ruleNodeId") UUID ruleNodeId, @Param("entityId") UUID entityId); +} diff --git a/dao/src/main/resources/sql/schema-entities-hsql.sql b/dao/src/main/resources/sql/schema-entities-hsql.sql index 8406fb11b6..99fc915d24 100644 --- a/dao/src/main/resources/sql/schema-entities-hsql.sql +++ b/dao/src/main/resources/sql/schema-entities-hsql.sql @@ -146,6 +146,17 @@ CREATE TABLE IF NOT EXISTS rule_node ( search_text varchar(255) ); +CREATE TABLE IF NOT EXISTS rule_node_state ( + id uuid NOT NULL CONSTRAINT rule_node_state_pkey PRIMARY KEY, + created_time bigint NOT NULL, + rule_node_id uuid NOT NULL, + entity_type varchar(32) NOT NULL, + entity_id uuid NOT NULL, + state_data varchar(16384) NOT NULL, + CONSTRAINT rule_node_state_unq_key UNIQUE (rule_node_id, entity_id), + CONSTRAINT fk_rule_node_state_node_id FOREIGN KEY (rule_node_id) REFERENCES rule_node(id) ON DELETE CASCADE +); + CREATE TABLE IF NOT EXISTS device_profile ( id uuid NOT NULL CONSTRAINT device_profile_pkey PRIMARY KEY, created_time bigint NOT NULL, diff --git a/dao/src/main/resources/sql/schema-entities-idx.sql b/dao/src/main/resources/sql/schema-entities-idx.sql index 2d3b2c4d75..97134a8e19 100644 --- a/dao/src/main/resources/sql/schema-entities-idx.sql +++ b/dao/src/main/resources/sql/schema-entities-idx.sql @@ -40,4 +40,4 @@ CREATE INDEX IF NOT EXISTS idx_asset_customer_id_and_type ON asset(tenant_id, cu CREATE INDEX IF NOT EXISTS idx_asset_type ON asset(tenant_id, type); -CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc); \ No newline at end of file +CREATE INDEX IF NOT EXISTS idx_attribute_kv_by_key_and_last_update_ts ON attribute_kv(entity_id, attribute_key, last_update_ts desc); diff --git a/dao/src/main/resources/sql/schema-entities.sql b/dao/src/main/resources/sql/schema-entities.sql index 281d8ffdc9..a4ff653aa6 100644 --- a/dao/src/main/resources/sql/schema-entities.sql +++ b/dao/src/main/resources/sql/schema-entities.sql @@ -164,6 +164,17 @@ CREATE TABLE IF NOT EXISTS rule_node ( search_text varchar(255) ); +CREATE TABLE IF NOT EXISTS rule_node_state ( + id uuid NOT NULL CONSTRAINT rule_node_state_pkey PRIMARY KEY, + created_time bigint NOT NULL, + rule_node_id uuid NOT NULL, + entity_type varchar(32) NOT NULL, + entity_id uuid NOT NULL, + state_data varchar(16384) NOT NULL, + CONSTRAINT rule_node_state_unq_key UNIQUE (rule_node_id, entity_id), + CONSTRAINT fk_rule_node_state_node_id FOREIGN KEY (rule_node_id) REFERENCES rule_node(id) ON DELETE CASCADE +); + CREATE TABLE IF NOT EXISTS device_profile ( id uuid NOT NULL CONSTRAINT device_profile_pkey PRIMARY KEY, created_time bigint NOT NULL, diff --git a/dao/src/test/resources/sql/hsql/drop-all-tables.sql b/dao/src/test/resources/sql/hsql/drop-all-tables.sql index 9205515209..c8cc908125 100644 --- a/dao/src/test/resources/sql/hsql/drop-all-tables.sql +++ b/dao/src/test/resources/sql/hsql/drop-all-tables.sql @@ -21,6 +21,7 @@ DROP TABLE IF EXISTS widgets_bundle; DROP TABLE IF EXISTS entity_view; DROP TABLE IF EXISTS device_profile; DROP TABLE IF EXISTS tenant_profile; +DROP TABLE IF EXISTS rule_node_state; DROP TABLE IF EXISTS rule_node; DROP TABLE IF EXISTS rule_chain; DROP FUNCTION IF EXISTS to_uuid; diff --git a/dao/src/test/resources/sql/psql/drop-all-tables.sql b/dao/src/test/resources/sql/psql/drop-all-tables.sql index 0d74d7fce5..899a66f510 100644 --- a/dao/src/test/resources/sql/psql/drop-all-tables.sql +++ b/dao/src/test/resources/sql/psql/drop-all-tables.sql @@ -21,6 +21,7 @@ DROP TABLE IF EXISTS widgets_bundle; DROP TABLE IF EXISTS entity_view; DROP TABLE IF EXISTS device_profile; DROP TABLE IF EXISTS tenant_profile; +DROP TABLE IF EXISTS rule_node_state; DROP TABLE IF EXISTS rule_node; DROP TABLE IF EXISTS rule_chain; DROP TABLE IF EXISTS tb_schema_settings; diff --git a/dao/src/test/resources/sql/timescale/drop-all-tables.sql b/dao/src/test/resources/sql/timescale/drop-all-tables.sql index 14b7e6a733..4270a2a192 100644 --- a/dao/src/test/resources/sql/timescale/drop-all-tables.sql +++ b/dao/src/test/resources/sql/timescale/drop-all-tables.sql @@ -18,6 +18,7 @@ DROP TABLE IF EXISTS ts_kv_dictionary; DROP TABLE IF EXISTS user_credentials; DROP TABLE IF EXISTS widget_type; DROP TABLE IF EXISTS widgets_bundle; +DROP TABLE IF EXISTS rule_node_state; DROP TABLE IF EXISTS rule_node; DROP TABLE IF EXISTS rule_chain; DROP TABLE IF EXISTS entity_view; diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java index 6ad8bc719a..fa3c1f67a1 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/TbContext.java @@ -25,9 +25,11 @@ import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.RuleNodeId; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rule.RuleNodeState; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; -import org.thingsboard.server.dao.alarm.AlarmService; import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.cassandra.CassandraCluster; @@ -214,4 +216,9 @@ public interface TbContext { @Deprecated RedisTemplate getRedisTemplate(); + PageData findRuleNodeStates(PageLink pageLink); + + RuleNodeState findRuleNodeStateForEntity(EntityId entityId); + + RuleNodeState saveRuleNodeState(RuleNodeState state); } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java index 3f20c3794d..0f1349791b 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/AlarmRuleState.java @@ -16,6 +16,7 @@ package org.thingsboard.rule.engine.profile; import lombok.Data; +import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState; import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.device.profile.AlarmCondition; import org.thingsboard.server.common.data.device.profile.AlarmRule; @@ -32,12 +33,17 @@ public class AlarmRuleState { private final AlarmSeverity severity; private final AlarmRule alarmRule; private final long requiredDurationInMs; - private long lastEventTs; - private long duration; + private PersistedAlarmRuleState state; + private boolean updateFlag; - public AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule) { + public AlarmRuleState(AlarmSeverity severity, AlarmRule alarmRule, PersistedAlarmRuleState state) { this.severity = severity; this.alarmRule = alarmRule; + if (state != null) { + this.state = state; + } else { + this.state = new PersistedAlarmRuleState(0L, 0L); + } if (alarmRule.getCondition().getDurationValue() > 0) { requiredDurationInMs = alarmRule.getCondition().getDurationUnit().toMillis(alarmRule.getCondition().getDurationValue()); } else { @@ -45,23 +51,35 @@ public class AlarmRuleState { } } + public boolean checkUpdate() { + if (updateFlag) { + updateFlag = false; + return true; + } else { + return false; + } + } + public boolean eval(DeviceDataSnapshot data) { if (requiredDurationInMs > 0) { boolean eval = eval(alarmRule.getCondition(), data); if (eval) { - if (lastEventTs > 0) { - if (data.getTs() > lastEventTs) { - duration += data.getTs() - lastEventTs; - lastEventTs = data.getTs(); + if (state.getLastEventTs() > 0) { + if (data.getTs() > state.getLastEventTs()) { + state.setDuration(state.getDuration() + (data.getTs() - state.getLastEventTs())); + state.setLastEventTs(data.getTs()); + updateFlag = true; } } else { - lastEventTs = data.getTs(); - duration = 0; + state.setLastEventTs(data.getTs()); + state.setDuration(0L); + updateFlag = true; } - return duration > requiredDurationInMs; + return state.getDuration() > requiredDurationInMs; } else { - lastEventTs = 0; - duration = 0; + state.setLastEventTs(0L); + state.setDuration(0L); + updateFlag = true; return false; } } else { @@ -70,8 +88,8 @@ public class AlarmRuleState { } public boolean eval(long ts) { - if (requiredDurationInMs > 0 && lastEventTs > 0 && ts > lastEventTs) { - duration += ts - lastEventTs; + if (requiredDurationInMs > 0 && state.getLastEventTs() > 0 && ts > state.getLastEventTs()) { + long duration = state.getDuration() + (ts - state.getLastEventTs()); return duration > requiredDurationInMs; } else { return false; @@ -87,7 +105,6 @@ public class AlarmRuleState { } eval = eval && eval(value, keyFilter.getPredicate()); } - //TODO: use condition duration; return eval; } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java index da3e1b45a2..0c16b038e5 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceProfileAlarmState.java @@ -19,6 +19,8 @@ import com.fasterxml.jackson.databind.JsonNode; import lombok.Data; import org.thingsboard.rule.engine.action.TbAlarmResult; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.profile.state.PersistedAlarmRuleState; +import org.thingsboard.rule.engine.profile.state.PersistedAlarmState; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.alarm.AlarmSeverity; @@ -27,6 +29,7 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.common.msg.queue.ServiceQueue; import org.thingsboard.server.dao.util.mapping.JacksonUtil; import java.util.ArrayList; @@ -47,40 +50,47 @@ class DeviceProfileAlarmState { private volatile TbMsgMetaData lastMsgMetaData; private volatile String lastMsgQueueName; - public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition) { + public DeviceProfileAlarmState(EntityId originator, DeviceProfileAlarm alarmDefinition, PersistedAlarmState alarmState) { this.originator = originator; - this.updateState(alarmDefinition); + this.updateState(alarmDefinition, alarmState); + } - public void process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException { + public boolean process(TbContext ctx, TbMsg msg, DeviceDataSnapshot data) throws ExecutionException, InterruptedException { initCurrentAlarm(ctx); lastMsgMetaData = msg.getMetaData(); lastMsgQueueName = msg.getQueueName(); - createOrClearAlarms(ctx, data, AlarmRuleState::eval); + return createOrClearAlarms(ctx, data, AlarmRuleState::eval); } - public void process(TbContext ctx, long ts) throws ExecutionException, InterruptedException { + public boolean process(TbContext ctx, long ts) throws ExecutionException, InterruptedException { initCurrentAlarm(ctx); - createOrClearAlarms(ctx, ts, AlarmRuleState::eval); + return createOrClearAlarms(ctx, ts, AlarmRuleState::eval); } - public void createOrClearAlarms(TbContext ctx, T data, BiFunction evalFunction) { + public boolean createOrClearAlarms(TbContext ctx, T data, BiFunction evalFunction) { + boolean stateUpdate = false; AlarmSeverity resultSeverity = null; for (AlarmRuleState state : createRulesSortedBySeverityDesc) { - if (evalFunction.apply(state, data)) { + boolean evalResult = evalFunction.apply(state, data); + stateUpdate |= state.checkUpdate(); + if (evalResult) { resultSeverity = state.getSeverity(); break; } } if (resultSeverity != null) { pushMsg(ctx, calculateAlarmResult(ctx, resultSeverity)); - } else if (currentAlarm != null) { - if (evalFunction.apply(clearState, data)) { + } else if (currentAlarm != null && clearState != null) { + Boolean evalResult = evalFunction.apply(clearState, data); + if (evalResult) { + stateUpdate |= clearState.checkUpdate(); ctx.getAlarmService().clearAlarm(ctx.getTenantId(), currentAlarm.getId(), JacksonUtil.OBJECT_MAPPER.createObjectNode(), System.currentTimeMillis()); pushMsg(ctx, new TbAlarmResult(false, false, true, currentAlarm)); currentAlarm = null; } } + return stateUpdate; } public void initCurrentAlarm(TbContext ctx) throws InterruptedException, ExecutionException { @@ -96,7 +106,7 @@ class DeviceProfileAlarmState { public void pushMsg(TbContext ctx, TbAlarmResult alarmResult) { JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.getAlarm()); String data = jsonNodes.toString(); - TbMsgMetaData metaData = lastMsgMetaData.copy(); + TbMsgMetaData metaData = lastMsgMetaData != null ? lastMsgMetaData.copy() : new TbMsgMetaData(); String relationType; if (alarmResult.isCreated()) { relationType = "Alarm Created"; @@ -112,18 +122,29 @@ class DeviceProfileAlarmState { relationType = "Alarm Cleared"; metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString()); } - TbMsg newMsg = ctx.newMsg(lastMsgQueueName, "ALARM", originator, metaData, data); + TbMsg newMsg = ctx.newMsg(lastMsgQueueName != null ? lastMsgQueueName : ServiceQueue.MAIN, "ALARM", originator, metaData, data); ctx.tellNext(newMsg, relationType); } - public void updateState(DeviceProfileAlarm alarm) { + public void updateState(DeviceProfileAlarm alarm, PersistedAlarmState alarmState) { this.alarmDefinition = alarm; this.createRulesSortedBySeverityDesc = new ArrayList<>(); alarmDefinition.getCreateRules().forEach((severity, rule) -> { - createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule)); + PersistedAlarmRuleState ruleState = null; + if (alarmState != null) { + ruleState = alarmState.getCreateRuleStates().get(severity); + if (ruleState == null) { + ruleState = new PersistedAlarmRuleState(); + alarmState.getCreateRuleStates().put(severity, ruleState); + } + } + createRulesSortedBySeverityDesc.add(new AlarmRuleState(severity, rule, ruleState)); }); createRulesSortedBySeverityDesc.sort(Comparator.comparingInt(state -> state.getSeverity().ordinal())); - clearState = new AlarmRuleState(null, alarmDefinition.getClearRule()); + PersistedAlarmRuleState ruleState = alarmState == null ? null : alarmState.getClearRuleState(); + if (alarmDefinition.getClearRule() != null) { + clearState = new AlarmRuleState(null, alarmDefinition.getClearRule(), ruleState); + } } private TbAlarmResult calculateAlarmResult(TbContext ctx, AlarmSeverity severity) { diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java index ecb2e5905c..6ac3e2e14a 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java @@ -18,6 +18,8 @@ package org.thingsboard.rule.engine.profile; import com.google.gson.JsonParser; import org.springframework.util.StringUtils; import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.profile.state.PersistedAlarmState; +import org.thingsboard.rule.engine.profile.state.PersistedDeviceState; import org.thingsboard.rule.engine.telemetry.TbMsgTimeseriesNode; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; @@ -26,17 +28,21 @@ import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.RuleNodeStateId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.rule.RuleNodeState; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.session.SessionMsgType; import org.thingsboard.server.common.transport.adaptor.JsonConverter; import org.thingsboard.server.dao.sql.query.EntityKeyMapping; +import org.thingsboard.server.dao.util.mapping.JacksonUtil; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -48,14 +54,36 @@ import java.util.stream.Collectors; class DeviceState { + private final boolean persistState; private final DeviceId deviceId; + private RuleNodeState state; private DeviceProfileState deviceProfile; + private PersistedDeviceState pds; private DeviceDataSnapshot latestValues; private final ConcurrentMap alarmStates = new ConcurrentHashMap<>(); - public DeviceState(DeviceId deviceId, DeviceProfileState deviceProfile) { + public DeviceState(TbContext ctx, TbDeviceProfileNodeConfiguration config, DeviceId deviceId, DeviceProfileState deviceProfile) { + this.persistState = config.isPersistAlarmRulesState(); this.deviceId = deviceId; this.deviceProfile = deviceProfile; + if (config.isPersistAlarmRulesState()) { + state = ctx.findRuleNodeStateForEntity(deviceId); + if (state != null) { + pds = JacksonUtil.fromString(state.getStateData(), PersistedDeviceState.class); + } else { + state = new RuleNodeState(); + state.setRuleNodeId(ctx.getSelfId()); + state.setEntityId(deviceId); + pds = new PersistedDeviceState(); + pds.setAlarmStates(new HashMap<>()); + } + } + if (pds != null) { + for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) { + alarmStates.computeIfAbsent(alarm.getId(), + a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm))); + } + } } public void updateProfile(TbContext ctx, DeviceProfile deviceProfile) throws ExecutionException, InterruptedException { @@ -72,9 +100,9 @@ class DeviceState { alarmStates.keySet().removeIf(id -> !newAlarmStateIds.contains(id)); for (DeviceProfileAlarm alarm : this.deviceProfile.getAlarmSettings()) { if (alarmStates.containsKey(alarm.getId())) { - alarmStates.get(alarm.getId()).updateState(alarm); + alarmStates.get(alarm.getId()).updateState(alarm, getOrInitPersistedAlarmState(alarm)); } else { - alarmStates.putIfAbsent(alarm.getId(), new DeviceProfileAlarmState(deviceId, alarm)); + alarmStates.putIfAbsent(alarm.getId(), new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm))); } } } @@ -89,29 +117,35 @@ class DeviceState { if (latestValues == null) { latestValues = fetchLatestValues(ctx, deviceId); } + boolean stateChanged = false; if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { - processTelemetry(ctx, msg); + stateChanged = processTelemetry(ctx, msg); } else if (msg.getType().equals(SessionMsgType.POST_ATTRIBUTES_REQUEST.name())) { - processAttributesUpdateRequest(ctx, msg); + stateChanged = processAttributesUpdateRequest(ctx, msg); } else if (msg.getType().equals(DataConstants.ATTRIBUTES_UPDATED)) { - processAttributesUpdateNotification(ctx, msg); + stateChanged = processAttributesUpdateNotification(ctx, msg); } else if (msg.getType().equals(DataConstants.ATTRIBUTES_DELETED)) { - processAttributesDeleteNotification(ctx, msg); + stateChanged = processAttributesDeleteNotification(ctx, msg); } else { ctx.tellSuccess(msg); } + if (persistState && stateChanged) { + state.setStateData(JacksonUtil.toString(pds)); + state = ctx.saveRuleNodeState(state); + } } - private void processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + private boolean processAttributesUpdateNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { Set attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())); String scope = msg.getMetaData().getValue("scope"); if (StringUtils.isEmpty(scope)) { scope = DataConstants.CLIENT_SCOPE; } - processAttributesUpdate(ctx, msg, attributes, scope); + return processAttributesUpdate(ctx, msg, attributes, scope); } - private void processAttributesDeleteNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + private boolean processAttributesDeleteNotification(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + boolean stateChanged = false; List keys = new ArrayList<>(); new JsonParser().parse(msg.getData()).getAsJsonObject().get("attributes").getAsJsonArray().forEach(e -> keys.add(e.getAsString())); String scope = msg.getMetaData().getValue("scope"); @@ -122,59 +156,65 @@ class DeviceState { EntityKeyType keyType = getKeyTypeFromScope(scope); keys.forEach(key -> latestValues.removeValue(new EntityKey(keyType, key))); for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) { - DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm)); - alarmState.process(ctx, msg, latestValues); + DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), + a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm))); + stateChanged |= alarmState.process(ctx, msg, latestValues); } } ctx.tellSuccess(msg); + return stateChanged; } - protected void processAttributesUpdateRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + protected boolean processAttributesUpdateRequest(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { Set attributes = JsonConverter.convertToAttributes(new JsonParser().parse(msg.getData())); - processAttributesUpdate(ctx, msg, attributes, DataConstants.CLIENT_SCOPE); + return processAttributesUpdate(ctx, msg, attributes, DataConstants.CLIENT_SCOPE); } - private void processAttributesUpdate(TbContext ctx, TbMsg msg, Set attributes, String scope) throws ExecutionException, InterruptedException { + private boolean processAttributesUpdate(TbContext ctx, TbMsg msg, Set attributes, String scope) throws ExecutionException, InterruptedException { + boolean stateChanged = false; if (!attributes.isEmpty()) { - latestValues = merge(latestValues, attributes, scope); + merge(latestValues, attributes, scope); for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) { - DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm)); - alarmState.process(ctx, msg, latestValues); + DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), + a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm))); + stateChanged |= alarmState.process(ctx, msg, latestValues); } } ctx.tellSuccess(msg); + return stateChanged; } - protected void processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + protected boolean processTelemetry(TbContext ctx, TbMsg msg) throws ExecutionException, InterruptedException { + boolean stateChanged = false; Map> tsKvMap = JsonConverter.convertToSortedTelemetry(new JsonParser().parse(msg.getData()), TbMsgTimeseriesNode.getTs(msg)); for (Map.Entry> entry : tsKvMap.entrySet()) { Long ts = entry.getKey(); List data = entry.getValue(); - latestValues = merge(latestValues, ts, data); + merge(latestValues, ts, data); for (DeviceProfileAlarm alarm : deviceProfile.getAlarmSettings()) { - DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), a -> new DeviceProfileAlarmState(deviceId, alarm)); - alarmState.process(ctx, msg, latestValues); + DeviceProfileAlarmState alarmState = alarmStates.computeIfAbsent(alarm.getId(), + a -> new DeviceProfileAlarmState(deviceId, alarm, getOrInitPersistedAlarmState(alarm))); + stateChanged |= alarmState.process(ctx, msg, latestValues); } } ctx.tellSuccess(msg); + return stateChanged; } - private DeviceDataSnapshot merge(DeviceDataSnapshot latestValues, Long ts, List data) { + private void merge(DeviceDataSnapshot latestValues, Long ts, List data) { latestValues.setTs(ts); for (KvEntry entry : data) { latestValues.putValue(new EntityKey(EntityKeyType.TIME_SERIES, entry.getKey()), toEntityValue(entry)); } - return latestValues; } - private DeviceDataSnapshot merge(DeviceDataSnapshot latestValues, Set attributes, String scope) { + private void merge(DeviceDataSnapshot latestValues, Set attributes, String scope) { long ts = latestValues.getTs(); for (AttributeKvEntry entry : attributes) { ts = Math.max(ts, entry.getLastUpdateTs()); latestValues.putValue(new EntityKey(getKeyTypeFromScope(scope), entry.getKey()), toEntityValue(entry)); } latestValues.setTs(ts); - return latestValues; } private static EntityKeyType getKeyTypeFromScope(String scope) { @@ -303,4 +343,19 @@ class DeviceState { public DeviceProfileId getProfileId() { return deviceProfile.getProfileId(); } + + private PersistedAlarmState getOrInitPersistedAlarmState(DeviceProfileAlarm alarm) { + if (pds != null) { + PersistedAlarmState alarmState = pds.getAlarmStates().get(alarm.getId()); + if (alarmState == null) { + alarmState = new PersistedAlarmState(); + alarmState.setCreateRuleStates(new HashMap<>()); + pds.getAlarmStates().put(alarm.getId(), alarmState); + } + return alarmState; + } else { + return null; + } + } + } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java index d3022f177c..ce01b3c103 100644 --- a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNode.java @@ -16,13 +16,14 @@ package org.thingsboard.rule.engine.profile; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; import org.thingsboard.rule.engine.api.RuleNode; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.rule.engine.api.util.TbNodeUtils; +import org.thingsboard.rule.engine.profile.state.PersistedDeviceState; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; @@ -30,11 +31,14 @@ import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.data.rule.RuleNodeState; import org.thingsboard.server.common.msg.TbMsg; import org.thingsboard.server.common.msg.TbMsgMetaData; import org.thingsboard.server.dao.util.mapping.JacksonUtil; +import java.util.HashMap; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -45,25 +49,23 @@ import java.util.concurrent.TimeUnit; name = "device profile", customRelations = true, relationTypes = {"Alarm Created", "Alarm Updated", "Alarm Severity Updated", "Alarm Cleared", "Success", "Failure"}, - configClazz = EmptyNodeConfiguration.class, + configClazz = TbDeviceProfileNodeConfiguration.class, nodeDescription = "Process device messages based on device profile settings", - nodeDetails = "Create and clear alarms based on alarm rules defined in device profile. Generates ", - uiResources = {"static/rulenode/rulenode-core-config.js"}, - configDirective = "tbNodeEmptyConfig" + nodeDetails = "Create and clear alarms based on alarm rules defined in device profile. Generates " ) public class TbDeviceProfileNode implements TbNode { private static final String PERIODIC_MSG_TYPE = "TbDeviceProfilePeriodicMsg"; + private TbDeviceProfileNodeConfiguration config; private RuleEngineDeviceProfileCache cache; private final Map deviceStates = new ConcurrentHashMap<>(); @Override public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { - cache = ctx.getDeviceProfileCache(); + this.config = TbNodeUtils.convert(configuration, TbDeviceProfileNodeConfiguration.class); + this.cache = ctx.getDeviceProfileCache(); scheduleAlarmHarvesting(ctx); - //TODO: check that I am in root rule chain. - // If Yes - Init for all device profiles that do not have default rule chain id in device profile. - // If No - find device profiles with this rule chain id. + //TODO: launch a process of fetching the alarm rule states from the database; } /** @@ -127,7 +129,7 @@ public class TbDeviceProfileNode implements TbNode { if (deviceState == null) { DeviceProfile deviceProfile = cache.get(ctx.getTenantId(), deviceId); if (deviceProfile != null) { - deviceState = new DeviceState(deviceId, new DeviceProfileState(deviceProfile)); + deviceState = new DeviceState(ctx, config, deviceId, new DeviceProfileState(deviceProfile)); deviceStates.put(deviceId, deviceState); } } diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeConfiguration.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeConfiguration.java new file mode 100644 index 0000000000..0b32893c90 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeConfiguration.java @@ -0,0 +1,56 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.profile; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.codehaus.jackson.annotate.JsonIgnoreProperties; +import org.thingsboard.rule.engine.api.EmptyNodeConfiguration; +import org.thingsboard.rule.engine.api.NodeConfiguration; +import org.thingsboard.rule.engine.api.RuleEngineDeviceProfileCache; +import org.thingsboard.rule.engine.api.RuleNode; +import org.thingsboard.rule.engine.api.TbContext; +import org.thingsboard.rule.engine.api.TbNode; +import org.thingsboard.rule.engine.api.TbNodeConfiguration; +import org.thingsboard.rule.engine.api.TbNodeException; +import org.thingsboard.server.common.data.DataConstants; +import org.thingsboard.server.common.data.Device; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.EntityType; +import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; +import org.thingsboard.server.common.data.plugin.ComponentType; +import org.thingsboard.server.common.msg.TbMsg; +import org.thingsboard.server.common.msg.TbMsgMetaData; +import org.thingsboard.server.dao.util.mapping.JacksonUtil; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class TbDeviceProfileNodeConfiguration implements NodeConfiguration { + + private boolean persistAlarmRulesState; + private boolean fetchAlarmRulesStateOnStart; + + @Override + public TbDeviceProfileNodeConfiguration defaultConfiguration() { + return new TbDeviceProfileNodeConfiguration(); + } +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmRuleState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmRuleState.java new file mode 100644 index 0000000000..c097d91f38 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmRuleState.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.profile.state; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class PersistedAlarmRuleState { + + private long lastEventTs; + private long duration; + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmState.java new file mode 100644 index 0000000000..b8a657c4bb --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedAlarmState.java @@ -0,0 +1,29 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.profile.state; + +import lombok.Data; +import org.thingsboard.server.common.data.alarm.AlarmSeverity; + +import java.util.Map; + +@Data +public class PersistedAlarmState { + + private Map createRuleStates; + private PersistedAlarmRuleState clearRuleState; + +} diff --git a/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedDeviceState.java b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedDeviceState.java new file mode 100644 index 0000000000..c0acfc0768 --- /dev/null +++ b/rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/state/PersistedDeviceState.java @@ -0,0 +1,27 @@ +/** + * Copyright © 2016-2020 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.rule.engine.profile.state; + +import lombok.Data; + +import java.util.Map; + +@Data +public class PersistedDeviceState { + + Map alarmStates; + +}