|
|
|
@ -52,6 +52,7 @@ import java.util.List; |
|
|
|
import java.util.Set; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
import java.util.function.BiConsumer; |
|
|
|
import java.util.stream.Collectors; |
|
|
|
|
|
|
|
import static org.thingsboard.server.dao.service.Validator.validateId; |
|
|
|
|
|
|
|
@ -512,40 +513,42 @@ public class BaseRelationService implements RelationService { |
|
|
|
|
|
|
|
private ListenableFuture<Set<EntityRelation>> findRelationsRecursively(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, |
|
|
|
RelationTypeGroup relationTypeGroup, int lvl, boolean fetchLastLevelOnly, |
|
|
|
final ConcurrentHashMap<EntityId, Boolean> uniqueMap) throws Exception { |
|
|
|
final ConcurrentHashMap<EntityId, Boolean> uniqueMap) { |
|
|
|
if (lvl == 0) { |
|
|
|
return Futures.immediateFuture(Collections.emptySet()); |
|
|
|
} |
|
|
|
lvl--; |
|
|
|
//TODO: try to remove this blocking operation
|
|
|
|
Set<EntityRelation> children = new HashSet<>(findRelations(tenantId, rootId, direction, relationTypeGroup).get()); |
|
|
|
Set<EntityId> childrenIds = new HashSet<>(); |
|
|
|
for (EntityRelation childRelation : children) { |
|
|
|
log.trace("Found Relation: {}", childRelation); |
|
|
|
EntityId childId; |
|
|
|
if (direction == EntitySearchDirection.FROM) { |
|
|
|
childId = childRelation.getTo(); |
|
|
|
} else { |
|
|
|
childId = childRelation.getFrom(); |
|
|
|
} |
|
|
|
if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { |
|
|
|
log.trace("Adding Relation: {}", childId); |
|
|
|
if (childrenIds.add(childId)) { |
|
|
|
log.trace("Added Relation: {}", childId); |
|
|
|
final int currentLvl = --lvl; |
|
|
|
final Set<EntityRelation> children = new HashSet<>(); |
|
|
|
ListenableFuture<List<EntityRelation>> rootRelationsFuture = findRelations(tenantId, rootId, direction, relationTypeGroup); |
|
|
|
ListenableFuture<Set<EntityId>> childrenIdsFuture = Futures.transform(rootRelationsFuture, relations -> { |
|
|
|
children.addAll(relations); |
|
|
|
Set<EntityId> childrenIds = new HashSet<>(); |
|
|
|
for (EntityRelation childRelation : children) { |
|
|
|
log.trace("Found Relation: {}", childRelation); |
|
|
|
EntityId childId = direction == EntitySearchDirection.FROM ? childRelation.getTo() : childRelation.getFrom(); |
|
|
|
if (uniqueMap.putIfAbsent(childId, Boolean.TRUE) == null) { |
|
|
|
log.trace("Adding Relation: {}", childId); |
|
|
|
if (childrenIds.add(childId)) { |
|
|
|
log.trace("Added Relation: {}", childId); |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
List<ListenableFuture<Set<EntityRelation>>> futures = new ArrayList<>(); |
|
|
|
for (EntityId entityId : childrenIds) { |
|
|
|
futures.add(findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, lvl, fetchLastLevelOnly, uniqueMap)); |
|
|
|
} |
|
|
|
//TODO: try to remove this blocking operation
|
|
|
|
List<Set<EntityRelation>> relations = Futures.successfulAsList(futures).get(); |
|
|
|
if (fetchLastLevelOnly && lvl > 0) { |
|
|
|
children.clear(); |
|
|
|
} |
|
|
|
relations.forEach(r -> r.forEach(children::add)); |
|
|
|
return Futures.immediateFuture(children); |
|
|
|
return childrenIds; |
|
|
|
}, MoreExecutors.directExecutor()); |
|
|
|
|
|
|
|
ListenableFuture<List<Set<EntityRelation>>> recursiveFutures = Futures.transformAsync(childrenIdsFuture, childrenIds -> |
|
|
|
Futures.successfulAsList(childrenIds.stream() |
|
|
|
.map(entityId -> findRelationsRecursively(tenantId, entityId, direction, relationTypeGroup, currentLvl, fetchLastLevelOnly, uniqueMap)) |
|
|
|
.collect(Collectors.toList())), MoreExecutors.directExecutor()); |
|
|
|
|
|
|
|
ListenableFuture<Set<EntityRelation>> relationsFuture = Futures.transform(recursiveFutures, recursiveRelations -> { |
|
|
|
if (fetchLastLevelOnly && currentLvl > 0) { |
|
|
|
children.clear(); |
|
|
|
} |
|
|
|
recursiveRelations.forEach(children::addAll); |
|
|
|
return children; |
|
|
|
}, MoreExecutors.directExecutor()); |
|
|
|
return relationsFuture; |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<List<EntityRelation>> findRelations(final TenantId tenantId, final EntityId rootId, final EntitySearchDirection direction, RelationTypeGroup relationTypeGroup) { |
|
|
|
|