|
|
|
@ -18,6 +18,7 @@ package org.thingsboard.rule.engine.action; |
|
|
|
import com.google.common.util.concurrent.Futures; |
|
|
|
import com.google.common.util.concurrent.ListenableFuture; |
|
|
|
import lombok.extern.slf4j.Slf4j; |
|
|
|
import org.springframework.util.CollectionUtils; |
|
|
|
import org.thingsboard.rule.engine.api.RuleNode; |
|
|
|
import org.thingsboard.rule.engine.api.TbContext; |
|
|
|
import org.thingsboard.rule.engine.api.TbNodeConfiguration; |
|
|
|
@ -69,41 +70,31 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR |
|
|
|
|
|
|
|
@Override |
|
|
|
protected ListenableFuture<RelationContainer> doProcessEntityRelationAction(TbContext ctx, TbMsg msg, EntityContainer entity, String relationType) { |
|
|
|
ListenableFuture<Boolean> future = createIfAbsent(ctx, msg, entity, relationType); |
|
|
|
ListenableFuture<Boolean> future = createRelationIfAbsent(ctx, msg, entity, relationType); |
|
|
|
return Futures.transform(future, result -> { |
|
|
|
RelationContainer container = new RelationContainer(); |
|
|
|
if (result && config.isChangeOriginatorToRelatedEntity()) { |
|
|
|
TbMsg tbMsg = ctx.transformMsg(msg, msg.getType(), entity.getEntityId(), msg.getMetaData(), msg.getData()); |
|
|
|
container.setMsg(tbMsg); |
|
|
|
} else { |
|
|
|
container.setMsg(msg); |
|
|
|
return new RelationContainer(tbMsg, result); |
|
|
|
} |
|
|
|
container.setResult(result); |
|
|
|
return container; |
|
|
|
return new RelationContainer(msg, result); |
|
|
|
}, ctx.getDbCallbackExecutor()); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Boolean> createIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) { |
|
|
|
private ListenableFuture<Boolean> createRelationIfAbsent(TbContext ctx, TbMsg msg, EntityContainer entityContainer, String relationType) { |
|
|
|
SearchDirectionIds sdId = processSingleSearchDirection(msg, entityContainer); |
|
|
|
ListenableFuture<Boolean> checkRelationFuture = Futures.transformAsync(ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON), result -> { |
|
|
|
if (!result) { |
|
|
|
if (config.isRemoveCurrentRelations()) { |
|
|
|
return processDeleteRelations(ctx, processFindRelations(ctx, msg, sdId, relationType)); |
|
|
|
} |
|
|
|
return Futures.immediateFuture(false); |
|
|
|
} |
|
|
|
return Futures.immediateFuture(true); |
|
|
|
}, ctx.getDbCallbackExecutor()); |
|
|
|
return Futures.transformAsync(deleteCurrentRelationsIfNeeded(ctx, msg, sdId, relationType), v -> |
|
|
|
checkRelationAndCreateIfAbsent(ctx, entityContainer, relationType, sdId), |
|
|
|
ctx.getDbCallbackExecutor()); |
|
|
|
} |
|
|
|
|
|
|
|
return Futures.transformAsync(checkRelationFuture, result -> { |
|
|
|
if (!result) { |
|
|
|
return processCreateRelation(ctx, entityContainer, sdId, relationType); |
|
|
|
} |
|
|
|
return Futures.immediateFuture(true); |
|
|
|
}, ctx.getDbCallbackExecutor()); |
|
|
|
private ListenableFuture<Void> deleteCurrentRelationsIfNeeded(TbContext ctx, TbMsg msg, SearchDirectionIds sdId, String relationType) { |
|
|
|
if (config.isRemoveCurrentRelations()) { |
|
|
|
return deleteOriginatorRelations(ctx, findOriginatorRelations(ctx, msg, sdId, relationType)); |
|
|
|
} |
|
|
|
return Futures.immediateFuture(null); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<List<EntityRelation>> processFindRelations(TbContext ctx, TbMsg msg, SearchDirectionIds sdId, String relationType) { |
|
|
|
private ListenableFuture<List<EntityRelation>> findOriginatorRelations(TbContext ctx, TbMsg msg, SearchDirectionIds sdId, String relationType) { |
|
|
|
if (sdId.isOriginatorDirectionFrom()) { |
|
|
|
return ctx.getRelationService().findByFromAndTypeAsync(ctx.getTenantId(), msg.getOriginator(), relationType, RelationTypeGroup.COMMON); |
|
|
|
} else { |
|
|
|
@ -111,19 +102,31 @@ public class TbCreateRelationNode extends TbAbstractRelationActionNode<TbCreateR |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Boolean> processDeleteRelations(TbContext ctx, ListenableFuture<List<EntityRelation>> listListenableFuture) { |
|
|
|
return Futures.transformAsync(listListenableFuture, entityRelations -> { |
|
|
|
if (!entityRelations.isEmpty()) { |
|
|
|
List<ListenableFuture<Boolean>> list = new ArrayList<>(); |
|
|
|
for (EntityRelation relation : entityRelations) { |
|
|
|
private ListenableFuture<Void> deleteOriginatorRelations(TbContext ctx, ListenableFuture<List<EntityRelation>> originatorRelationsFuture) { |
|
|
|
return Futures.transformAsync(originatorRelationsFuture, originatorRelations -> { |
|
|
|
List<ListenableFuture<Boolean>> list = new ArrayList<>(); |
|
|
|
if (!CollectionUtils.isEmpty(originatorRelations)) { |
|
|
|
for (EntityRelation relation : originatorRelations) { |
|
|
|
list.add(ctx.getRelationService().deleteRelationAsync(ctx.getTenantId(), relation)); |
|
|
|
} |
|
|
|
return Futures.transform(Futures.allAsList(list), result -> false, ctx.getDbCallbackExecutor()); |
|
|
|
} |
|
|
|
return Futures.immediateFuture(false); |
|
|
|
return Futures.transform(Futures.allAsList(list), result -> null, ctx.getDbCallbackExecutor()); |
|
|
|
}, ctx.getDbCallbackExecutor()); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Boolean> checkRelationAndCreateIfAbsent(TbContext ctx, EntityContainer entityContainer, String relationType, SearchDirectionIds sdId) { |
|
|
|
return Futures.transformAsync(checkRelation(ctx, sdId, relationType), relationPresent -> { |
|
|
|
if (relationPresent) { |
|
|
|
return Futures.immediateFuture(true); |
|
|
|
} |
|
|
|
return processCreateRelation(ctx, entityContainer, sdId, relationType); |
|
|
|
}, ctx.getDbCallbackExecutor()); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Boolean> checkRelation(TbContext ctx, SearchDirectionIds sdId, String relationType) { |
|
|
|
return ctx.getRelationService().checkRelation(ctx.getTenantId(), sdId.getFromId(), sdId.getToId(), relationType, RelationTypeGroup.COMMON); |
|
|
|
} |
|
|
|
|
|
|
|
private ListenableFuture<Boolean> processCreateRelation(TbContext ctx, EntityContainer entityContainer, SearchDirectionIds sdId, String relationType) { |
|
|
|
switch (entityContainer.getEntityType()) { |
|
|
|
case ASSET: |
|
|
|
|