From e1534721c5330beb1f7dba8a54554ddcfb8a5535 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 3 Nov 2022 16:46:10 +0200 Subject: [PATCH 1/4] findRelationsRecursively refactoring - removed blocking operations --- .../dao/relation/BaseRelationService.java | 61 ++++++++++--------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 6f6a942162..c751f7334b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -52,6 +52,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; +import java.util.stream.Collectors; import static org.thingsboard.server.dao.service.Validator.validateId; @@ -512,40 +513,42 @@ public class BaseRelationService implements RelationService { private ListenableFuture> findRelationsRecursively(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly, - final ConcurrentHashMap uniqueMap) throws Exception { + final ConcurrentHashMap uniqueMap) { if (lvl == 0) { return Futures.immediateFuture(Collections.emptySet()); } - lvl--; - //TODO: try to remove this blocking operation - Set children = new HashSet<>(findRelations(tenantId, rootId, direction, relationTypeGroup).get()); - Set childrenIds = new HashSet<>(); - for (EntityRelation childRelation : children) { - log.trace("Found Relation: {}", childRelation); - EntityId childId; - if (direction == EntitySearchDirection.FROM) { - childId = childRelation.getTo(); - } else { - childId = childRelation.getFrom(); - } - if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { - log.trace("Adding Relation: {}", childId); - if (childrenIds.add(childId)) { - log.trace("Added Relation: {}", childId); + final int currentLvl = --lvl; + final Set children = new HashSet<>(); + ListenableFuture> rootRelationsFuture = findRelations(tenantId, rootId, direction, relationTypeGroup); + ListenableFuture> childrenIdsFuture = Futures.transform(rootRelationsFuture, relations -> { + children.addAll(relations); + Set childrenIds = new HashSet<>(); + for (EntityRelation childRelation : children) { + log.trace("Found Relation: {}", childRelation); + EntityId childId = direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom(); + if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { + log.trace("Adding Relation: {}", childId); + if (childrenIds.add(childId)) { + log.trace("Added Relation: {}", childId); + } } } - } - List>> futures = new ArrayList<>(); - for (EntityId entityId : childrenIds) { - futures.add(findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, lvl, fetchLastLevelOnly, uniqueMap)); - } - //TODO: try to remove this blocking operation - List> relations = Futures.successfulAsList(futures).get(); - if (fetchLastLevelOnly && lvl > 0) { - children.clear(); - } - relations.forEach(r -> r.forEach(children::add)); - return Futures.immediateFuture(children); + return childrenIds; + }, MoreExecutors.directExecutor()); + + ListenableFuture>> recursiveFutures = Futures.transformAsync(childrenIdsFuture, childrenIds -> + Futures.successfulAsList(childrenIds.stream() + .map(entityId -> findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, currentLvl, fetchLastLevelOnly, uniqueMap)) + .collect(Collectors.toList())), MoreExecutors.directExecutor()); + + ListenableFuture> relationsFuture = Futures.transform(recursiveFutures, recursiveRelations -> { + if (fetchLastLevelOnly && currentLvl > 0) { + children.clear(); + } + recursiveRelations.forEach(children::addAll); + return children; + }, MoreExecutors.directExecutor()); + return relationsFuture; } private ListenableFuture> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { From 32f01438b1cd02d8b90c71177abc9542056ea53f Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 3 Nov 2022 17:43:59 +0200 Subject: [PATCH 2/4] Test for recursive relation depth --- .../dao/service/BaseRelationServiceTest.java | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java index 75ac95d470..ca85c88337 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java @@ -80,7 +80,7 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { Assert.assertTrue(relationService.deleteRelationAsync(SYSTEM_TENANT_ID, relationA).get()); - Assert.assertFalse(relationService.checkRelation(SYSTEM_TENANT_ID, parentId, childId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON)); + Assert.assertFalse(relationService.checkRelation(SYSTEM_TENANT_ID, parentId, childId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON)); Assert.assertTrue(relationService.checkRelation(SYSTEM_TENANT_ID, childId, subChildId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.COMMON)); @@ -288,6 +288,53 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { Assert.assertTrue(relations.contains(relationBC)); } + @Test + public void testRecursiveRelationDepth() throws ExecutionException, InterruptedException { + int maxLevel = 100; + AssetId root = new AssetId(Uuids.timeBased()); + AssetId left = new AssetId(Uuids.timeBased()); + AssetId right = new AssetId(Uuids.timeBased()); + + List expected = new ArrayList<>(); + + EntityRelation relationAB = new EntityRelation(root, left, EntityRelation.CONTAINS_TYPE); + EntityRelation relationBC = new EntityRelation(root, right, EntityRelation.CONTAINS_TYPE); + saveRelation(relationAB); + expected.add(relationAB); + + saveRelation(relationBC); + expected.add(relationBC); + + for (int i = 0; i < maxLevel; i++) { + var newLeft = new AssetId(Uuids.timeBased()); + var newRight = new AssetId(Uuids.timeBased()); + EntityRelation relationLeft = new EntityRelation(left, newLeft, EntityRelation.CONTAINS_TYPE); + EntityRelation relationRight = new EntityRelation(right, newRight, EntityRelation.CONTAINS_TYPE); + saveRelation(relationLeft); + expected.add(relationLeft); + saveRelation(relationRight); + expected.add(relationRight); + left = newLeft; + right = newRight; + } + + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(root, EntitySearchDirection.FROM, -1, false)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(expected.size(), relations.size()); + for(EntityRelation r : expected){ + Assert.assertTrue(relations.contains(r)); + } + + //Test from cache + relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(expected.size(), relations.size()); + for(EntityRelation r : expected){ + Assert.assertTrue(relations.contains(r)); + } + } @Test(expected = DataValidationException.class) public void testSaveRelationWithEmptyFrom() throws ExecutionException, InterruptedException { From 582c65b71e27c56472f2c6e0906b61c0a7b2f3de Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Thu, 3 Nov 2022 18:48:18 +0200 Subject: [PATCH 3/4] Experiments with queue vs futures --- .../server/dao/relation/RelationService.java | 2 + .../dao/relation/BaseRelationService.java | 122 +++++++++++++++++- .../dao/service/BaseRelationServiceTest.java | 52 +++++++- 3 files changed, 169 insertions(+), 7 deletions(-) diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java index e7df5eea93..35ca4f9ad1 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java @@ -79,6 +79,8 @@ public interface RelationService { ListenableFuture> findByQuery(TenantId tenantId, EntityRelationsQuery query); + ListenableFuture> findByQuery2(TenantId tenantId, EntityRelationsQuery query); + ListenableFuture> findInfoByQuery(TenantId tenantId, EntityRelationsQuery query); void removeRelations(TenantId tenantId, EntityId entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index c751f7334b..39dd207527 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -20,6 +20,9 @@ import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import lombok.Getter; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; @@ -49,8 +52,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -407,6 +413,39 @@ public class BaseRelationService implements RelationService { } } + @Override + public ListenableFuture> findByQuery2(TenantId tenantId, EntityRelationsQuery query) { + //boolean fetchLastLevelOnly = true; + log.trace("Executing findByQuery [{}]", query); + RelationsSearchParameters params = query.getParameters(); + final List filters = query.getFilters(); + if (filters == null || filters.isEmpty()) { + log.debug("Filters are not set [{}]", query); + } + + int maxLvl = params.getMaxLevel() > 0 ? params.getMaxLevel() : Integer.MAX_VALUE; + + try { + ListenableFuture> relationSet = findRelationsRecursively2(tenantId, params.getEntityId(), params.getDirection(), params.getRelationTypeGroup(), maxLvl, params.isFetchLastLevelOnly(), new ConcurrentHashMap<>()); + return Futures.transform(relationSet, input -> { + List relations = new ArrayList<>(); + if (filters == null || filters.isEmpty()) { + relations.addAll(input); + return relations; + } + for (EntityRelation relation : input) { + if (matchFilters(filters, relation, params.getDirection())) { + relations.add(relation); + } + } + return relations; + }, MoreExecutors.directExecutor()); + } catch (Exception e) { + log.warn("Failed to query relations: [{}]", query, e); + throw new RuntimeException(e); + } + } + @Override public ListenableFuture> findInfoByQuery(TenantId tenantId, EntityRelationsQuery query) { log.trace("Executing findInfoByQuery [{}]", query); @@ -511,15 +550,75 @@ public class BaseRelationService implements RelationService { } } + @RequiredArgsConstructor + private static class RelationQueueCtx { + final SettableFuture> future = SettableFuture.create(); + final Set result = ConcurrentHashMap.newKeySet(); + final Queue tasks = new ConcurrentLinkedQueue<>(); + + final TenantId tenantId; + final EntitySearchDirection direction; + final RelationTypeGroup relationTypeGroup; + final boolean fetchLastLevelOnly; + final int lvl; + final ConcurrentHashMap uniqueMap; + + } + + @RequiredArgsConstructor + private static class RelationTask { + private final int currentLvl; + private final EntityId root; + } + + private void processQueue(RelationQueueCtx ctx) { + RelationTask task = ctx.tasks.poll(); + while (task != null) { + List relations = findRelations(ctx.tenantId, task.root, ctx.direction, ctx.relationTypeGroup); + Set childrenIds = new HashSet<>(); + for (EntityRelation childRelation : relations) { + log.trace("Found Relation: {}", childRelation); + EntityId childId = ctx.direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom(); + if (ctx.uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { + log.trace("Adding Relation: {}", childId); + if (childrenIds.add(childId)) { + log.trace("Added Relation: {}", childId); + } + } + } + for (EntityId child : childrenIds) { + ctx.tasks.add(new RelationTask(task.currentLvl - 1, child)); + if (!ctx.fetchLastLevelOnly || task.currentLvl == 0) { + ctx.result.addAll(relations); + } + } + task = ctx.tasks.poll(); + } + ctx.future.set(ctx.result); + } + private ListenableFuture> findRelationsRecursively(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly, final ConcurrentHashMap uniqueMap) { if (lvl == 0) { return Futures.immediateFuture(Collections.emptySet()); } + var relationQueueCtx = new RelationQueueCtx(tenantId, direction, relationTypeGroup, fetchLastLevelOnly, lvl, uniqueMap); + relationQueueCtx.tasks.add(new RelationTask(lvl, rootId)); + executor.submit(() -> processQueue(relationQueueCtx)); + return relationQueueCtx.future; + } + + + private ListenableFuture> findRelationsRecursively2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, + RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly, + final ConcurrentHashMap uniqueMap) { + if (lvl == 0) { + return Futures.immediateFuture(Collections.emptySet()); + } final int currentLvl = --lvl; final Set children = new HashSet<>(); - ListenableFuture> rootRelationsFuture = findRelations(tenantId, rootId, direction, relationTypeGroup); + ListenableFuture> rootRelationsFuture = findRelations2(tenantId, rootId, direction, relationTypeGroup); ListenableFuture> childrenIdsFuture = Futures.transform(rootRelationsFuture, relations -> { children.addAll(relations); Set childrenIds = new HashSet<>(); @@ -539,7 +638,7 @@ public class BaseRelationService implements RelationService { ListenableFuture>> recursiveFutures = Futures.transformAsync(childrenIdsFuture, childrenIds -> Futures.successfulAsList(childrenIds.stream() .map(entityId -> findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, currentLvl, fetchLastLevelOnly, uniqueMap)) - .collect(Collectors.toList())), MoreExecutors.directExecutor()); + .collect(Collectors.toList())), executor); ListenableFuture> relationsFuture = Futures.transform(recursiveFutures, recursiveRelations -> { if (fetchLastLevelOnly && currentLvl > 0) { @@ -547,11 +646,24 @@ public class BaseRelationService implements RelationService { } recursiveRelations.forEach(children::addAll); return children; - }, MoreExecutors.directExecutor()); + }, executor); return relationsFuture; } - private ListenableFuture> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { + private List findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { + List relations; + if (relationTypeGroup == null) { + relationTypeGroup = RelationTypeGroup.COMMON; + } + if (direction == EntitySearchDirection.FROM) { + relations = findByFrom(tenantId, rootId, relationTypeGroup); + } else { + relations = findByTo(tenantId, rootId, relationTypeGroup); + } + return relations; + } + + private ListenableFuture> findRelations2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { ListenableFuture> relations; if (relationTypeGroup == null) { relationTypeGroup = RelationTypeGroup.COMMON; diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java index ca85c88337..a5b27bed32 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java @@ -289,8 +289,8 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { } @Test - public void testRecursiveRelationDepth() throws ExecutionException, InterruptedException { - int maxLevel = 100; + public void testRecursiveRelationDepth3() throws ExecutionException, InterruptedException { + int maxLevel = 1000; AssetId root = new AssetId(Uuids.timeBased()); AssetId left = new AssetId(Uuids.timeBased()); AssetId right = new AssetId(Uuids.timeBased()); @@ -336,6 +336,54 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { } } + @Test + public void testRecursiveRelationDepth2() throws ExecutionException, InterruptedException { + int maxLevel = 1000; + AssetId root = new AssetId(Uuids.timeBased()); + AssetId left = new AssetId(Uuids.timeBased()); + AssetId right = new AssetId(Uuids.timeBased()); + + List expected = new ArrayList<>(); + + EntityRelation relationAB = new EntityRelation(root, left, EntityRelation.CONTAINS_TYPE); + EntityRelation relationBC = new EntityRelation(root, right, EntityRelation.CONTAINS_TYPE); + saveRelation(relationAB); + expected.add(relationAB); + + saveRelation(relationBC); + expected.add(relationBC); + + for (int i = 0; i < maxLevel; i++) { + var newLeft = new AssetId(Uuids.timeBased()); + var newRight = new AssetId(Uuids.timeBased()); + EntityRelation relationLeft = new EntityRelation(left, newLeft, EntityRelation.CONTAINS_TYPE); + EntityRelation relationRight = new EntityRelation(right, newRight, EntityRelation.CONTAINS_TYPE); + saveRelation(relationLeft); + expected.add(relationLeft); + saveRelation(relationRight); + expected.add(relationRight); + left = newLeft; + right = newRight; + } + + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(root, EntitySearchDirection.FROM, -1, false)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(expected.size(), relations.size()); + for(EntityRelation r : expected){ + Assert.assertTrue(relations.contains(r)); + } + + //Test from cache + relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(expected.size(), relations.size()); + for(EntityRelation r : expected){ + Assert.assertTrue(relations.contains(r)); + } + } + @Test(expected = DataValidationException.class) public void testSaveRelationWithEmptyFrom() throws ExecutionException, InterruptedException { EntityRelation relation = new EntityRelation(); From 7d73c40885bdef4c392b24187abe1722df5b17ef Mon Sep 17 00:00:00 2001 From: Andrii Shvaika Date: Fri, 4 Nov 2022 13:30:01 +0200 Subject: [PATCH 4/4] Major improvement to the relation query --- .../src/main/resources/thingsboard.yml | 4 +- .../server/dao/relation/RelationService.java | 2 - .../common/util/ThingsBoardExecutors.java | 5 + .../dao/relation/BaseRelationService.java | 161 +++----- .../JpaRelationQueryExecutorService.java | 33 ++ .../dao/service/BaseRelationServiceTest.java | 344 +++++++++++++++--- 6 files changed, 388 insertions(+), 161 deletions(-) create mode 100644 dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationQueryExecutorService.java diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 66a603a1a1..c42205d5df 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -310,7 +310,9 @@ sql: ttl: "${SQL_TTL_AUDIT_LOGS_SECS:0}" # Disabled by default. Accuracy of the cleanup depends on the sql.audit_logs.partition_size checking_interval_ms: "${SQL_TTL_AUDIT_LOGS_CHECKING_INTERVAL_MS:86400000}" # Default value - 1 day relations: - max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # //This value has to be reasonable small to prevent infinite recursion as early as possible + max_level: "${SQL_RELATIONS_MAX_LEVEL:50}" # This value has to be reasonable small to prevent infinite recursion as early as possible + pool_size: "${SQL_RELATIONS_POOL_SIZE:4}" # This value has to be reasonable small to prevent relation query blocking all other DB calls + query_timeout: "${SQL_RELATIONS_QUERY_TIMEOUT_SEC:20}" # This value has to be reasonable small to prevent relation query blocking all other DB calls # Actor system parameters actors: diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java index 35ca4f9ad1..e7df5eea93 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/relation/RelationService.java @@ -79,8 +79,6 @@ public interface RelationService { ListenableFuture> findByQuery(TenantId tenantId, EntityRelationsQuery query); - ListenableFuture> findByQuery2(TenantId tenantId, EntityRelationsQuery query); - ListenableFuture> findInfoByQuery(TenantId tenantId, EntityRelationsQuery query); void removeRelations(TenantId tenantId, EntityId entityId); diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java index 55bbb38700..51cf24398a 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java @@ -15,6 +15,10 @@ */ package org.thingsboard.common.util; +import com.google.common.util.concurrent.MoreExecutors; + +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; @@ -47,4 +51,5 @@ public class ThingsBoardExecutors { public static ExecutorService newWorkStealingPool(int parallelism, Class clazz) { return newWorkStealingPool(parallelism, clazz.getSimpleName()); } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 39dd207527..ceda7eb52c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -21,9 +21,9 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; -import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Lazy; import org.springframework.dao.ConcurrencyFailureException; @@ -31,6 +31,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.id.EntityId; @@ -47,18 +48,23 @@ import org.thingsboard.server.dao.entity.EntityService; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.service.ConstraintValidator; import org.thingsboard.server.dao.sql.JpaExecutorService; +import org.thingsboard.server.dao.sql.relation.JpaRelationQueryExecutorService; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; import java.util.ArrayList; import java.util.Collections; -import java.util.HashSet; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; -import java.util.stream.Collectors; import static org.thingsboard.server.dao.service.Validator.validateId; @@ -74,15 +80,34 @@ public class BaseRelationService implements RelationService { private final TbTransactionalCache cache; private final ApplicationEventPublisher eventPublisher; private final JpaExecutorService executor; + private final JpaRelationQueryExecutorService relationsExecutor; + protected ScheduledExecutorService timeoutExecutorService; + + @Value("${sql.relations.query_timeout:20}") + private Integer relationQueryTimeout; public BaseRelationService(RelationDao relationDao, @Lazy EntityService entityService, TbTransactionalCache cache, - ApplicationEventPublisher eventPublisher, JpaExecutorService executor) { + ApplicationEventPublisher eventPublisher, JpaExecutorService executor, + JpaRelationQueryExecutorService relationsExecutor) { this.relationDao = relationDao; this.entityService = entityService; this.cache = cache; this.eventPublisher = eventPublisher; this.executor = executor; + this.relationsExecutor = relationsExecutor; + } + + @PostConstruct + public void init() { + timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("relations-query-timeout")); + } + + @PreDestroy + public void destroy() { + if (timeoutExecutorService != null) { + timeoutExecutorService.shutdownNow(); + } } @TransactionalEventListener(classes = EntityRelationEvent.class) @@ -382,7 +407,6 @@ public class BaseRelationService implements RelationService { @Override public ListenableFuture> findByQuery(TenantId tenantId, EntityRelationsQuery query) { - //boolean fetchLastLevelOnly = true; log.trace("Executing findByQuery [{}]", query); RelationsSearchParameters params = query.getParameters(); final List filters = query.getFilters(); @@ -393,40 +417,8 @@ public class BaseRelationService implements RelationService { int maxLvl = params.getMaxLevel() > 0 ? params.getMaxLevel() : Integer.MAX_VALUE; try { - ListenableFuture> relationSet = findRelationsRecursively(tenantId, params.getEntityId(), params.getDirection(), params.getRelationTypeGroup(), maxLvl, params.isFetchLastLevelOnly(), new ConcurrentHashMap<>()); - return Futures.transform(relationSet, input -> { - List relations = new ArrayList<>(); - if (filters == null || filters.isEmpty()) { - relations.addAll(input); - return relations; - } - for (EntityRelation relation : input) { - if (matchFilters(filters, relation, params.getDirection())) { - relations.add(relation); - } - } - return relations; - }, MoreExecutors.directExecutor()); - } catch (Exception e) { - log.warn("Failed to query relations: [{}]", query, e); - throw new RuntimeException(e); - } - } - - @Override - public ListenableFuture> findByQuery2(TenantId tenantId, EntityRelationsQuery query) { - //boolean fetchLastLevelOnly = true; - log.trace("Executing findByQuery [{}]", query); - RelationsSearchParameters params = query.getParameters(); - final List filters = query.getFilters(); - if (filters == null || filters.isEmpty()) { - log.debug("Filters are not set [{}]", query); - } - - int maxLvl = params.getMaxLevel() > 0 ? params.getMaxLevel() : Integer.MAX_VALUE; - - try { - ListenableFuture> relationSet = findRelationsRecursively2(tenantId, params.getEntityId(), params.getDirection(), params.getRelationTypeGroup(), maxLvl, params.isFetchLastLevelOnly(), new ConcurrentHashMap<>()); + ListenableFuture> relationSet = findRelationsRecursively(tenantId, params.getEntityId(), params.getDirection(), + params.getRelationTypeGroup(), maxLvl, params.isFetchLastLevelOnly(), new ConcurrentHashMap<>()); return Futures.transform(relationSet, input -> { List relations = new ArrayList<>(); if (filters == null || filters.isEmpty()) { @@ -560,7 +552,7 @@ public class BaseRelationService implements RelationService { final EntitySearchDirection direction; final RelationTypeGroup relationTypeGroup; final boolean fetchLastLevelOnly; - final int lvl; + final int maxLvl; final ConcurrentHashMap uniqueMap; } @@ -569,29 +561,43 @@ public class BaseRelationService implements RelationService { private static class RelationTask { private final int currentLvl; private final EntityId root; + private final List prevRelations; } private void processQueue(RelationQueueCtx ctx) { RelationTask task = ctx.tasks.poll(); while (task != null) { List relations = findRelations(ctx.tenantId, task.root, ctx.direction, ctx.relationTypeGroup); - Set childrenIds = new HashSet<>(); + Map> newChildrenRelations = new HashMap<>(); for (EntityRelation childRelation : relations) { log.trace("Found Relation: {}", childRelation); EntityId childId = ctx.direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom(); if (ctx.uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { log.trace("Adding Relation: {}", childId); - if (childrenIds.add(childId)) { - log.trace("Added Relation: {}", childId); + newChildrenRelations.put(childId, new ArrayList<>()); + } + if (ctx.fetchLastLevelOnly) { + var list = newChildrenRelations.get(childId); + if (list != null) { + list.add(childRelation); } } } - for (EntityId child : childrenIds) { - ctx.tasks.add(new RelationTask(task.currentLvl - 1, child)); - if (!ctx.fetchLastLevelOnly || task.currentLvl == 0) { + if (ctx.fetchLastLevelOnly) { + if (relations.isEmpty()) { + ctx.result.addAll(task.prevRelations); + } else if (task.currentLvl == ctx.maxLvl) { ctx.result.addAll(relations); } + } else { + ctx.result.addAll(relations); } + var finalTask = task; + newChildrenRelations.forEach((child, childRelations) -> { + var newLvl = finalTask.currentLvl + 1; + if (newLvl <= ctx.maxLvl) + ctx.tasks.add(new RelationTask(newLvl, child, childRelations)); + }); task = ctx.tasks.poll(); } ctx.future.set(ctx.result); @@ -604,52 +610,12 @@ public class BaseRelationService implements RelationService { return Futures.immediateFuture(Collections.emptySet()); } var relationQueueCtx = new RelationQueueCtx(tenantId, direction, relationTypeGroup, fetchLastLevelOnly, lvl, uniqueMap); - relationQueueCtx.tasks.add(new RelationTask(lvl, rootId)); - executor.submit(() -> processQueue(relationQueueCtx)); - return relationQueueCtx.future; + relationQueueCtx.tasks.add(new RelationTask(1, rootId, Collections.emptyList())); + relationsExecutor.submit(() -> processQueue(relationQueueCtx)); + return Futures.withTimeout(relationQueueCtx.future, relationQueryTimeout, TimeUnit.SECONDS, timeoutExecutorService); } - private ListenableFuture> findRelationsRecursively2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, - RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly, - final ConcurrentHashMap uniqueMap) { - if (lvl == 0) { - return Futures.immediateFuture(Collections.emptySet()); - } - final int currentLvl = --lvl; - final Set children = new HashSet<>(); - ListenableFuture> rootRelationsFuture = findRelations2(tenantId, rootId, direction, relationTypeGroup); - ListenableFuture> childrenIdsFuture = Futures.transform(rootRelationsFuture, relations -> { - children.addAll(relations); - Set childrenIds = new HashSet<>(); - for (EntityRelation childRelation : children) { - log.trace("Found Relation: {}", childRelation); - EntityId childId = direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom(); - if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { - log.trace("Adding Relation: {}", childId); - if (childrenIds.add(childId)) { - log.trace("Added Relation: {}", childId); - } - } - } - return childrenIds; - }, MoreExecutors.directExecutor()); - - ListenableFuture>> recursiveFutures = Futures.transformAsync(childrenIdsFuture, childrenIds -> - Futures.successfulAsList(childrenIds.stream() - .map(entityId -> findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, currentLvl, fetchLastLevelOnly, uniqueMap)) - .collect(Collectors.toList())), executor); - - ListenableFuture> relationsFuture = Futures.transform(recursiveFutures, recursiveRelations -> { - if (fetchLastLevelOnly && currentLvl > 0) { - children.clear(); - } - recursiveRelations.forEach(children::addAll); - return children; - }, executor); - return relationsFuture; - } - private List findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { List relations; if (relationTypeGroup == null) { @@ -663,19 +629,6 @@ public class BaseRelationService implements RelationService { return relations; } - private ListenableFuture> findRelations2(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { - ListenableFuture> relations; - if (relationTypeGroup == null) { - relationTypeGroup = RelationTypeGroup.COMMON; - } - if (direction == EntitySearchDirection.FROM) { - relations = findByFromAsync(tenantId, rootId, relationTypeGroup); - } else { - relations = findByToAsync(tenantId, rootId, relationTypeGroup); - } - return relations; - } - private void publishEvictEvent(EntityRelationEvent event) { if (TransactionSynchronizationManager.isActualTransactionActive()) { eventPublisher.publishEvent(event); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationQueryExecutorService.java b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationQueryExecutorService.java new file mode 100644 index 0000000000..be6caad8d9 --- /dev/null +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/relation/JpaRelationQueryExecutorService.java @@ -0,0 +1,33 @@ +/** + * Copyright © 2016-2022 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.dao.sql.relation; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; +import org.thingsboard.common.util.AbstractListeningExecutor; + +@Component +public class JpaRelationQueryExecutorService extends AbstractListeningExecutor { + + @Value("${sql.relations.pool_size:4}") + private int poolSize; + + @Override + protected int getThreadPollSize() { + return poolSize; + } + +} diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java index a5b27bed32..e294a3f35b 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRelationServiceTest.java @@ -35,6 +35,7 @@ import org.thingsboard.server.dao.exception.DataValidationException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutionException; @@ -171,8 +172,8 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { Assert.assertEquals(0, relations.size()); } - private Boolean saveRelation(EntityRelation relationA1) throws ExecutionException, InterruptedException { - return relationService.saveRelationAsync(SYSTEM_TENANT_ID, relationA1).get(); + private Boolean saveRelation(EntityRelation relationA1) { + return relationService.saveRelation(SYSTEM_TENANT_ID, relationA1); } @Test @@ -194,9 +195,6 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { saveRelation(relationB1); saveRelation(relationB2); - // Data propagation to views is async - Thread.sleep(3000); - List relations = relationService.findByTo(SYSTEM_TENANT_ID, childA, RelationTypeGroup.COMMON); Assert.assertEquals(2, relations.size()); for (EntityRelation relation : relations) { @@ -289,7 +287,7 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { } @Test - public void testRecursiveRelationDepth3() throws ExecutionException, InterruptedException { + public void testRecursiveRelationDepth() throws ExecutionException, InterruptedException { int maxLevel = 1000; AssetId root = new AssetId(Uuids.timeBased()); AssetId left = new AssetId(Uuids.timeBased()); @@ -336,54 +334,6 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { } } - @Test - public void testRecursiveRelationDepth2() throws ExecutionException, InterruptedException { - int maxLevel = 1000; - AssetId root = new AssetId(Uuids.timeBased()); - AssetId left = new AssetId(Uuids.timeBased()); - AssetId right = new AssetId(Uuids.timeBased()); - - List expected = new ArrayList<>(); - - EntityRelation relationAB = new EntityRelation(root, left, EntityRelation.CONTAINS_TYPE); - EntityRelation relationBC = new EntityRelation(root, right, EntityRelation.CONTAINS_TYPE); - saveRelation(relationAB); - expected.add(relationAB); - - saveRelation(relationBC); - expected.add(relationBC); - - for (int i = 0; i < maxLevel; i++) { - var newLeft = new AssetId(Uuids.timeBased()); - var newRight = new AssetId(Uuids.timeBased()); - EntityRelation relationLeft = new EntityRelation(left, newLeft, EntityRelation.CONTAINS_TYPE); - EntityRelation relationRight = new EntityRelation(right, newRight, EntityRelation.CONTAINS_TYPE); - saveRelation(relationLeft); - expected.add(relationLeft); - saveRelation(relationRight); - expected.add(relationRight); - left = newLeft; - right = newRight; - } - - - EntityRelationsQuery query = new EntityRelationsQuery(); - query.setParameters(new RelationsSearchParameters(root, EntitySearchDirection.FROM, -1, false)); - query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); - List relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get(); - Assert.assertEquals(expected.size(), relations.size()); - for(EntityRelation r : expected){ - Assert.assertTrue(relations.contains(r)); - } - - //Test from cache - relations = relationService.findByQuery2(SYSTEM_TENANT_ID, query).get(); - Assert.assertEquals(expected.size(), relations.size()); - for(EntityRelation r : expected){ - Assert.assertTrue(relations.contains(r)); - } - } - @Test(expected = DataValidationException.class) public void testSaveRelationWithEmptyFrom() throws ExecutionException, InterruptedException { EntityRelation relation = new EntityRelation(); @@ -407,4 +357,290 @@ public abstract class BaseRelationServiceTest extends AbstractServiceTest { relation.setTo(new AssetId(Uuids.timeBased())); Assert.assertTrue(saveRelation(relation)); } + + @Test + public void testFindByQueryFetchLastOnlyTreeLike() throws Exception { + // A -> B + // A -> C + // C -> D + // C -> E + + AssetId assetA = new AssetId(Uuids.timeBased()); + AssetId assetB = new AssetId(Uuids.timeBased()); + AssetId assetC = new AssetId(Uuids.timeBased()); + AssetId assetD = new AssetId(Uuids.timeBased()); + AssetId assetE = new AssetId(Uuids.timeBased()); + + EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE); + EntityRelation relationB = new EntityRelation(assetA, assetC, EntityRelation.CONTAINS_TYPE); + EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE); + EntityRelation relationD = new EntityRelation(assetC, assetE, EntityRelation.CONTAINS_TYPE); + + saveRelation(relationA); + saveRelation(relationB); + saveRelation(relationC); + saveRelation(relationD); + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, -1, true)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(3, relations.size()); + Assert.assertTrue(relations.contains(relationA)); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertTrue(relations.contains(relationD)); + Assert.assertFalse(relations.contains(relationB)); + + //Test from cache + relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertTrue(relations.contains(relationA)); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertTrue(relations.contains(relationD)); + Assert.assertFalse(relations.contains(relationB)); + } + + @Test + public void testFindByQueryFetchLastOnlySingleLinked() throws Exception { + // A -> B -> C -> D + + AssetId assetA = new AssetId(Uuids.timeBased()); + AssetId assetB = new AssetId(Uuids.timeBased()); + AssetId assetC = new AssetId(Uuids.timeBased()); + AssetId assetD = new AssetId(Uuids.timeBased()); + + EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE); + EntityRelation relationB = new EntityRelation(assetB, assetC, EntityRelation.CONTAINS_TYPE); + EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE); + + saveRelation(relationA); + saveRelation(relationB); + saveRelation(relationC); + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, -1, true)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(1, relations.size()); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertFalse(relations.contains(relationA)); + Assert.assertFalse(relations.contains(relationB)); + + //Test from cache + relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertFalse(relations.contains(relationA)); + Assert.assertFalse(relations.contains(relationB)); + } + + @Test + public void testFindByQueryFetchLastOnlyTreeLikeWithMaxLvl() throws Exception { + // A -> B A + // A -> C B + // C -> D C + // C -> E D + // D -> F E + // D -> G F + + AssetId assetA = new AssetId(Uuids.timeBased()); + AssetId assetB = new AssetId(Uuids.timeBased()); + AssetId assetC = new AssetId(Uuids.timeBased()); + AssetId assetD = new AssetId(Uuids.timeBased()); + AssetId assetE = new AssetId(Uuids.timeBased()); + AssetId assetF = new AssetId(Uuids.timeBased()); + AssetId assetG = new AssetId(Uuids.timeBased()); + + EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE); + EntityRelation relationB = new EntityRelation(assetA, assetC, EntityRelation.CONTAINS_TYPE); + EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE); + EntityRelation relationD = new EntityRelation(assetC, assetE, EntityRelation.CONTAINS_TYPE); + EntityRelation relationE = new EntityRelation(assetD, assetF, EntityRelation.CONTAINS_TYPE); + EntityRelation relationF = new EntityRelation(assetD, assetG, EntityRelation.CONTAINS_TYPE); + + saveRelation(relationA); + saveRelation(relationB); + saveRelation(relationC); + saveRelation(relationD); + saveRelation(relationE); + saveRelation(relationF); + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, 2, true)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(3, relations.size()); + Assert.assertTrue(relations.contains(relationA)); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertTrue(relations.contains(relationD)); + Assert.assertFalse(relations.contains(relationB)); + Assert.assertFalse(relations.contains(relationE)); + Assert.assertFalse(relations.contains(relationF)); + + //Test from cache + relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertTrue(relations.contains(relationA)); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertTrue(relations.contains(relationD)); + Assert.assertFalse(relations.contains(relationB)); + Assert.assertFalse(relations.contains(relationE)); + Assert.assertFalse(relations.contains(relationF)); + } + + @Test + public void testFindByQueryTreeLikeWithMaxLvl() throws Exception { + // A -> B A + // A -> C B + // C -> D C + // C -> E D + // D -> F E + // D -> G F + + AssetId assetA = new AssetId(Uuids.timeBased()); + AssetId assetB = new AssetId(Uuids.timeBased()); + AssetId assetC = new AssetId(Uuids.timeBased()); + AssetId assetD = new AssetId(Uuids.timeBased()); + AssetId assetE = new AssetId(Uuids.timeBased()); + AssetId assetF = new AssetId(Uuids.timeBased()); + AssetId assetG = new AssetId(Uuids.timeBased()); + + EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE); + EntityRelation relationB = new EntityRelation(assetA, assetC, EntityRelation.CONTAINS_TYPE); + EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE); + EntityRelation relationD = new EntityRelation(assetC, assetE, EntityRelation.CONTAINS_TYPE); + EntityRelation relationE = new EntityRelation(assetD, assetF, EntityRelation.CONTAINS_TYPE); + EntityRelation relationF = new EntityRelation(assetD, assetG, EntityRelation.CONTAINS_TYPE); + + saveRelation(relationA); + saveRelation(relationB); + saveRelation(relationC); + saveRelation(relationD); + saveRelation(relationE); + saveRelation(relationF); + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, 2, false)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(4, relations.size()); + Assert.assertTrue(relations.contains(relationA)); + Assert.assertTrue(relations.contains(relationB)); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertTrue(relations.contains(relationD)); + Assert.assertFalse(relations.contains(relationE)); + Assert.assertFalse(relations.contains(relationF)); + + //Test from cache + relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertTrue(relations.contains(relationA)); + Assert.assertTrue(relations.contains(relationB)); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertTrue(relations.contains(relationD)); + Assert.assertFalse(relations.contains(relationE)); + Assert.assertFalse(relations.contains(relationF)); + } + + @Test + public void testFindByQueryTreeLikeWithUnlimLvl() throws Exception { + // A -> B A + // A -> C B + // C -> D C + // C -> E D + // D -> F E + // D -> G F + + AssetId assetA = new AssetId(Uuids.timeBased()); + AssetId assetB = new AssetId(Uuids.timeBased()); + AssetId assetC = new AssetId(Uuids.timeBased()); + AssetId assetD = new AssetId(Uuids.timeBased()); + AssetId assetE = new AssetId(Uuids.timeBased()); + AssetId assetF = new AssetId(Uuids.timeBased()); + AssetId assetG = new AssetId(Uuids.timeBased()); + + EntityRelation relationA = new EntityRelation(assetA, assetB, EntityRelation.CONTAINS_TYPE); + EntityRelation relationB = new EntityRelation(assetA, assetC, EntityRelation.CONTAINS_TYPE); + EntityRelation relationC = new EntityRelation(assetC, assetD, EntityRelation.CONTAINS_TYPE); + EntityRelation relationD = new EntityRelation(assetC, assetE, EntityRelation.CONTAINS_TYPE); + EntityRelation relationE = new EntityRelation(assetD, assetF, EntityRelation.CONTAINS_TYPE); + EntityRelation relationF = new EntityRelation(assetD, assetG, EntityRelation.CONTAINS_TYPE); + + saveRelation(relationA); + saveRelation(relationB); + saveRelation(relationC); + saveRelation(relationD); + saveRelation(relationE); + saveRelation(relationF); + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(assetA, EntitySearchDirection.FROM, -1, false)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(6, relations.size()); + Assert.assertTrue(relations.contains(relationA)); + Assert.assertTrue(relations.contains(relationB)); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertTrue(relations.contains(relationD)); + Assert.assertTrue(relations.contains(relationE)); + Assert.assertTrue(relations.contains(relationF)); + + //Test from cache + relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertTrue(relations.contains(relationA)); + Assert.assertTrue(relations.contains(relationB)); + Assert.assertTrue(relations.contains(relationC)); + Assert.assertTrue(relations.contains(relationD)); + Assert.assertTrue(relations.contains(relationE)); + Assert.assertTrue(relations.contains(relationF)); + } + + @Test + public void testFindByQueryLargeHierarchyFetchAllWithUnlimLvl() throws Exception { + AssetId rootAsset = new AssetId(Uuids.timeBased()); + final int hierarchyLvl = 10; + List expectedRelations = new LinkedList<>(); + + createAssetRelationsRecursively(rootAsset, hierarchyLvl, expectedRelations, false); + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(rootAsset, EntitySearchDirection.FROM, -1, false)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(expectedRelations.size(), relations.size()); + Assert.assertTrue(relations.containsAll(expectedRelations)); + } + + @Test + public void testFindByQueryLargeHierarchyFetchLastOnlyWithUnlimLvl() throws Exception { + AssetId rootAsset = new AssetId(Uuids.timeBased()); + final int hierarchyLvl = 10; + List expectedRelations = new LinkedList<>(); + + createAssetRelationsRecursively(rootAsset, hierarchyLvl, expectedRelations, true); + + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(rootAsset, EntitySearchDirection.FROM, -1, true)); + query.setFilters(Collections.singletonList(new RelationEntityTypeFilter(EntityRelation.CONTAINS_TYPE, Collections.singletonList(EntityType.ASSET)))); + List relations = relationService.findByQuery(SYSTEM_TENANT_ID, query).get(); + Assert.assertEquals(expectedRelations.size(), relations.size()); + Assert.assertTrue(relations.containsAll(expectedRelations)); + } + + private void createAssetRelationsRecursively(AssetId rootAsset, int lvl, List entityRelations, boolean lastLvlOnly) throws Exception { + if (lvl == 0) return; + + AssetId firstAsset = new AssetId(Uuids.timeBased()); + AssetId secondAsset = new AssetId(Uuids.timeBased()); + + EntityRelation firstRelation = new EntityRelation(rootAsset, firstAsset, EntityRelation.CONTAINS_TYPE); + EntityRelation secondRelation = new EntityRelation(rootAsset, secondAsset, EntityRelation.CONTAINS_TYPE); + + saveRelation(firstRelation); + saveRelation(secondRelation); + + if (!lastLvlOnly || lvl == 1) { + entityRelations.add(firstRelation); + entityRelations.add(secondRelation); + } + + createAssetRelationsRecursively(firstAsset, lvl - 1, entityRelations, lastLvlOnly); + createAssetRelationsRecursively(secondAsset, lvl - 1, entityRelations, lastLvlOnly); + } }