Browse Source

Improve performance of relation and rule chain service

pull/6758/head
Andrii Shvaika 4 years ago
parent
commit
15c1e215d4
  1. 2
      application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java
  2. 13
      application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java
  3. 2
      common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java
  4. 50
      dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java
  5. 8
      dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java
  6. 29
      dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java
  7. 3
      dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java
  8. 48
      dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java
  9. 11
      dao/src/main/java/org/thingsboard/server/dao/sql/relation/RelationRepository.java
  10. 9
      dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java
  11. 2
      dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java

2
application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java

@ -160,7 +160,7 @@ public abstract class AbstractConsumerService<N extends com.google.protobuf.Gene
TbActorMsg actorMsg = actorMsgOpt.get();
if (actorMsg instanceof ComponentLifecycleMsg) {
ComponentLifecycleMsg componentLifecycleMsg = (ComponentLifecycleMsg) actorMsg;
log.info("[{}][{}][{}] Received Lifecycle event: {}", componentLifecycleMsg.getTenantId(), componentLifecycleMsg.getEntityId().getEntityType(),
log.debug("[{}][{}][{}] Received Lifecycle event: {}", componentLifecycleMsg.getTenantId(), componentLifecycleMsg.getEntityId().getEntityType(),
componentLifecycleMsg.getEntityId(), componentLifecycleMsg.getEvent());
if (EntityType.TENANT_PROFILE.equals(componentLifecycleMsg.getEntityId().getEntityType())) {
TenantProfileId tenantProfileId = new TenantProfileId(componentLifecycleMsg.getEntityId().getId());

13
application/src/main/java/org/thingsboard/server/service/sync/ie/importing/impl/BaseEntityImportService.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -23,6 +23,8 @@ import org.checkerframework.checker.nullness.qual.Nullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StopWatch;
import org.thingsboard.common.util.TbStopWatch;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.ExportableEntity;
@ -84,6 +86,7 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
@Transactional(rollbackFor = Exception.class)
@Override
public EntityImportResult<E> importEntity(EntitiesImportCtx ctx, D exportData) throws ThingsboardException {
// TbStopWatch sw = TbStopWatch.create("find");
EntityImportResult<E> importResult = new EntityImportResult<>();
IdProvider idProvider = new IdProvider(ctx, importResult);
@ -100,16 +103,22 @@ public abstract class BaseEntityImportService<I extends EntityId, E extends Expo
entity.setCreatedTime(existingEntity.getCreatedTime());
}
// sw.startNew("prepareAndSave");
E savedEntity = prepareAndSave(ctx, entity, exportData, idProvider);
importResult.setSavedEntity(savedEntity);
importResult.setOldEntity(existingEntity);
importResult.setEntityType(getEntityType());
// sw.startNew("afterSaved");
processAfterSaved(ctx, importResult, exportData, idProvider);
ctx.putInternalId(exportData.getExternalId(), savedEntity.getId());
// sw.stop();
// for (var task : sw.getTaskInfo()) {
// log.info("[{}][{}] Executed: {} in {}ms", exportData.getEntityType(), exportData.getEntity().getId(), task.getTaskName(), task.getTimeMillis());
// }
// log.info("[{}][{}] Total time: {}ms", exportData.getEntityType(), exportData.getEntity().getId(), sw.getTotalTimeMillis());
return importResult;
}

2
common/transport/transport-api/src/main/java/org/thingsboard/server/common/transport/service/DefaultTransportService.java

@ -901,7 +901,7 @@ public class DefaultTransportService implements TransportService {
if (EntityType.DEVICE_PROFILE.equals(entityType)) {
DeviceProfile deviceProfile = deviceProfileCache.put(msg.getData());
if (deviceProfile != null) {
log.info("On device profile update: {}", deviceProfile);
log.debug("On device profile update: {}", deviceProfile);
onProfileUpdate(deviceProfile);
}
} else if (EntityType.TENANT_PROFILE.equals(entityType)) {

50
dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -25,8 +25,6 @@ import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.annotation.Lazy;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.event.TransactionalEventListener;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.thingsboard.server.cache.TbTransactionalCache;
@ -41,14 +39,12 @@ import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import org.thingsboard.server.common.data.relation.RelationTypeGroup;
import org.thingsboard.server.common.data.relation.RelationsSearchParameters;
import org.thingsboard.server.common.data.rule.RuleChainType;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.entity.EntityService;
import org.thingsboard.server.dao.exception.DataValidationException;
import org.thingsboard.server.dao.service.ConstraintValidator;
import org.thingsboard.server.dao.sql.JpaExecutorService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@ -188,26 +184,28 @@ public class BaseRelationService implements RelationService {
public void deleteEntityRelations(TenantId tenantId, EntityId entityId) {
log.trace("Executing deleteEntityRelations [{}]", entityId);
validate(entityId);
List<EntityRelation> inboundRelations = new ArrayList<>();
for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
inboundRelations.addAll(relationDao.findAllByTo(tenantId, entityId, typeGroup));
}
List<EntityRelation> outboundRelations = new ArrayList<>();
for (RelationTypeGroup typeGroup : RelationTypeGroup.values()) {
outboundRelations.addAll(relationDao.findAllByFrom(tenantId, entityId, typeGroup));
}
List<EntityRelation> inboundRelations = new ArrayList<>(relationDao.findAllByTo(tenantId, entityId));
List<EntityRelation> outboundRelations = new ArrayList<>(relationDao.findAllByFrom(tenantId, entityId));
for (EntityRelation relation : inboundRelations) {
delete(tenantId, relation, true);
}
if (!inboundRelations.isEmpty()) {
try {
relationDao.deleteInboundRelations(tenantId, entityId);
} catch (ConcurrencyFailureException e) {
log.debug("Concurrency exception while deleting relations [{}]", inboundRelations, e);
}
for (EntityRelation relation : outboundRelations) {
delete(tenantId, relation, false);
for (EntityRelation relation : inboundRelations) {
eventPublisher.publishEvent(EntityRelationEvent.from(relation));
}
}
relationDao.deleteOutboundRelations(tenantId, entityId);
if (!outboundRelations.isEmpty()) {
relationDao.deleteOutboundRelations(tenantId, entityId);
for (EntityRelation relation : outboundRelations) {
eventPublisher.publishEvent(EntityRelationEvent.from(relation));
}
}
}
@Override
@ -269,18 +267,6 @@ public class BaseRelationService implements RelationService {
}
}
boolean delete(TenantId tenantId, EntityRelation relation, boolean deleteFromDb) {
eventPublisher.publishEvent(EntityRelationEvent.from(relation));
if (deleteFromDb) {
try {
return relationDao.deleteRelation(tenantId, relation);
} catch (ConcurrencyFailureException e) {
log.debug("Concurrency exception while deleting relations [{}]", relation, e);
}
}
return false;
}
@Override
public List<EntityRelation> findByFrom(TenantId tenantId, EntityId from, RelationTypeGroup typeGroup) {
validate(from);

8
dao/src/main/java/org/thingsboard/server/dao/relation/RelationDao.java

@ -32,10 +32,14 @@ public interface RelationDao {
List<EntityRelation> findAllByFrom(TenantId tenantId, EntityId from, RelationTypeGroup typeGroup);
List<EntityRelation> findAllByFrom(TenantId tenantId, EntityId from);
List<EntityRelation> findAllByFromAndType(TenantId tenantId, EntityId from, String relationType, RelationTypeGroup typeGroup);
List<EntityRelation> findAllByTo(TenantId tenantId, EntityId to, RelationTypeGroup typeGroup);
List<EntityRelation> findAllByTo(TenantId tenantId, EntityId to);
List<EntityRelation> findAllByToAndType(TenantId tenantId, EntityId to, String relationType, RelationTypeGroup typeGroup);
ListenableFuture<Boolean> checkRelation(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
@ -56,7 +60,9 @@ public interface RelationDao {
ListenableFuture<Boolean> deleteRelationAsync(TenantId tenantId, EntityId from, EntityId to, String relationType, RelationTypeGroup typeGroup);
boolean deleteOutboundRelations(TenantId tenantId, EntityId entity);
void deleteOutboundRelations(TenantId tenantId, EntityId entity);
void deleteInboundRelations(TenantId tenantId, EntityId entity);
ListenableFuture<Boolean> deleteOutboundRelationsAsync(TenantId tenantId, EntityId entity);

29
dao/src/main/java/org/thingsboard/server/dao/rule/BaseRuleChainService.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -141,6 +141,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
List<RuleNode> nodes = ruleChainMetaData.getNodes();
List<RuleNode> toAddOrUpdate = new ArrayList<>();
List<RuleNode> toDelete = new ArrayList<>();
List<EntityRelation> relations = new ArrayList<>();
Map<RuleNodeId, Integer> ruleNodeIndexMap = new HashMap<>();
if (nodes != null) {
@ -171,15 +172,15 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
for (RuleNode node : toAddOrUpdate) {
node.setRuleChainId(ruleChain.getId());
RuleNode savedNode = ruleNodeDao.save(tenantId, node);
createRelation(tenantId, new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(),
relations.add(new EntityRelation(ruleChainMetaData.getRuleChainId(), savedNode.getId(),
EntityRelation.CONTAINS_TYPE, RelationTypeGroup.RULE_CHAIN));
int index = nodes.indexOf(node);
nodes.set(index, savedNode);
ruleNodeIndexMap.put(savedNode.getId(), index);
}
}
for (RuleNode node : toDelete) {
deleteRuleNode(tenantId, node.getId());
if (!toDelete.isEmpty()) {
deleteRuleNodes(tenantId, toDelete);
}
RuleNodeId firstRuleNodeId = null;
if (nodes != null) {
@ -196,7 +197,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
EntityId from = nodes.get(nodeConnection.getFromIndex()).getId();
EntityId to = nodes.get(nodeConnection.getToIndex()).getId();
String type = nodeConnection.getType();
createRelation(tenantId, new EntityRelation(from, to, type, RelationTypeGroup.RULE_NODE));
relations.add(new EntityRelation(from, to, type, RelationTypeGroup.RULE_NODE));
}
}
if (ruleChainMetaData.getRuleChainConnections() != null) {
@ -222,7 +223,7 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
sourceRuleChainToRuleNode.setTo(targetNode.getId());
sourceRuleChainToRuleNode.setType(EntityRelation.CONTAINS_TYPE);
sourceRuleChainToRuleNode.setTypeGroup(RelationTypeGroup.RULE_CHAIN);
relationService.saveRelation(tenantId, sourceRuleChainToRuleNode);
relations.add(sourceRuleChainToRuleNode);
EntityRelation sourceRuleNodeToTargetRuleNode = new EntityRelation();
EntityId from = nodes.get(nodeToRuleChainConnection.getFromIndex()).getId();
@ -230,11 +231,15 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
sourceRuleNodeToTargetRuleNode.setTo(targetNode.getId());
sourceRuleNodeToTargetRuleNode.setType(nodeToRuleChainConnection.getType());
sourceRuleNodeToTargetRuleNode.setTypeGroup(RelationTypeGroup.RULE_NODE);
relationService.saveRelation(tenantId, sourceRuleNodeToTargetRuleNode);
}
relations.add(sourceRuleNodeToTargetRuleNode);
}
}
}
if (!relations.isEmpty()) {
relationService.saveRelations(tenantId, relations);
}
return RuleChainUpdateResult.successful(updatedRuleNodes);
}
@ -710,6 +715,14 @@ public class BaseRuleChainService extends AbstractEntityService implements RuleC
deleteRuleNodes(tenantId, ruleChainId);
}
private void deleteRuleNodes(TenantId tenantId, List<RuleNode> ruleNodes) {
List<RuleNodeId> ruleNodeIds = ruleNodes.stream().map(RuleNode::getId).collect(Collectors.toList());
for (var node : ruleNodes) {
deleteEntityRelations(tenantId, node.getId());
}
ruleNodeDao.deleteByIdIn(ruleNodeIds);
}
@Override
public void deleteRuleNodes(TenantId tenantId, RuleChainId ruleChainId) {
List<EntityRelation> nodeRelations = getRuleChainToNodeRelations(tenantId, ruleChainId);

3
dao/src/main/java/org/thingsboard/server/dao/rule/RuleNodeDao.java

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.dao.rule;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
@ -31,4 +32,6 @@ public interface RuleNodeDao extends Dao<RuleNode> {
List<RuleNode> findRuleNodesByTenantIdAndType(TenantId tenantId, String type, String search);
PageData<RuleNode> findAllRuleNodesByType(String type, PageLink pageLink);
void deleteByIdIn(List<RuleNodeId> ruleNodeIds);
}

48
dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationDao.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -33,6 +33,8 @@ import org.thingsboard.server.dao.model.sql.RelationEntity;
import org.thingsboard.server.dao.relation.RelationDao;
import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
@ -44,6 +46,12 @@ import java.util.stream.Collectors;
@Component
public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService implements RelationDao {
private static final List<String> ALL_TYPE_GROUP_NAMES = new ArrayList<>();
static {
Arrays.stream(RelationTypeGroup.values()).map(RelationTypeGroup::name).forEach(ALL_TYPE_GROUP_NAMES::add);
}
@Autowired
private RelationRepository relationRepository;
@ -59,6 +67,15 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
typeGroup.name()));
}
@Override
public List<EntityRelation> findAllByFrom(TenantId tenantId, EntityId from) {
return DaoUtil.convertDataList(
relationRepository.findAllByFromIdAndFromTypeAndRelationTypeGroupIn(
from.getId(),
from.getEntityType().name(),
ALL_TYPE_GROUP_NAMES));
}
@Override
public List<EntityRelation> findAllByFromAndType(TenantId tenantId, EntityId from, String relationType, RelationTypeGroup typeGroup) {
return DaoUtil.convertDataList(
@ -78,6 +95,15 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
typeGroup.name()));
}
@Override
public List<EntityRelation> findAllByTo(TenantId tenantId, EntityId to) {
return DaoUtil.convertDataList(
relationRepository.findAllByToIdAndToTypeAndRelationTypeGroupIn(
to.getId(),
to.getEntityType().name(),
ALL_TYPE_GROUP_NAMES));
}
@Override
public List<EntityRelation> findAllByToAndType(TenantId tenantId, EntityId to, String relationType, RelationTypeGroup typeGroup) {
return DaoUtil.convertDataList(
@ -164,19 +190,21 @@ public class JpaRelationDao extends JpaAbstractDaoListeningExecutorService imple
}
@Override
public boolean deleteOutboundRelations(TenantId tenantId, EntityId entity) {
boolean relationExistsBeforeDelete = false;
public void deleteOutboundRelations(TenantId tenantId, EntityId entity) {
try {
relationExistsBeforeDelete = relationRepository
.findAllByFromIdAndFromType(entity.getId(), entity.getEntityType().name())
.size() > 0;
if (relationExistsBeforeDelete) {
relationRepository.deleteByFromIdAndFromType(entity.getId(), entity.getEntityType().name());
}
relationRepository.deleteByFromIdAndFromType(entity.getId(), entity.getEntityType().name());
} catch (ConcurrencyFailureException e) {
log.debug("Concurrency exception while deleting relations [{}]", entity, e);
}
}
@Override
public void deleteInboundRelations(TenantId tenantId, EntityId entity) {
try {
relationRepository.deleteByToIdAndToTypeAndRelationTypeGroupIn(entity.getId(), entity.getEntityType().name(), ALL_TYPE_GROUP_NAMES);
} catch (ConcurrencyFailureException e) {
log.debug("Concurrency exception while deleting relations [{}]", entity, e);
}
return relationExistsBeforeDelete;
}
@Override

11
dao/src/main/java/org/thingsboard/server/dao/sql/relation/RelationRepository.java

@ -35,6 +35,10 @@ public interface RelationRepository
String fromType,
String relationTypeGroup);
List<RelationEntity> findAllByFromIdAndFromTypeAndRelationTypeGroupIn(UUID fromId,
String fromType,
List<String> relationTypeGroups);
List<RelationEntity> findAllByFromIdAndFromTypeAndRelationTypeAndRelationTypeGroup(UUID fromId,
String fromType,
String relationType,
@ -44,6 +48,10 @@ public interface RelationRepository
String toType,
String relationTypeGroup);
List<RelationEntity> findAllByToIdAndToTypeAndRelationTypeGroupIn(UUID toId,
String toType,
List<String> relationTypeGroups);
List<RelationEntity> findAllByToIdAndToTypeAndRelationTypeAndRelationTypeGroup(UUID toId,
String toType,
String relationType,
@ -66,4 +74,7 @@ public interface RelationRepository
@Transactional
void deleteByFromIdAndFromType(UUID fromId, String fromType);
@Transactional
void deleteByToIdAndToTypeAndRelationTypeGroupIn(UUID fromId, String fromType, List<String> relationTypeGroups);
}

9
dao/src/main/java/org/thingsboard/server/dao/sql/rule/JpaRuleNodeDao.java

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -20,6 +20,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Component;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.RuleNodeId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.page.PageLink;
@ -32,6 +33,7 @@ import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;
@Slf4j
@Component
@ -64,6 +66,11 @@ public class JpaRuleNodeDao extends JpaAbstractSearchTextDao<RuleNodeEntity, Rul
DaoUtil.toPageable(pageLink)));
}
@Override
public void deleteByIdIn(List<RuleNodeId> ruleNodeIds) {
ruleNodeRepository.deleteAllById(ruleNodeIds.stream().map(RuleNodeId::getId).collect(Collectors.toList()));
}
@Override
public EntityType getEntityType() {
return EntityType.RULE_NODE;

2
dao/src/main/java/org/thingsboard/server/dao/sql/rule/RuleNodeRepository.java

@ -39,4 +39,6 @@ public interface RuleNodeRepository extends JpaRepository<RuleNodeEntity, UUID>
@Param("searchText") String searchText,
Pageable pageable);
void deleteByIdIn(List<UUID> ids);
}

Loading…
Cancel
Save