From 3fc18988be0cb633b0da8ef08272333a91c8a237 Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Thu, 11 Jun 2020 20:10:47 +0300 Subject: [PATCH] Added update relation msg functionality --- .../server/controller/UserController.java | 90 +++++++++ .../service/edge/EdgeContextComponent.java | 10 +- .../service/edge/rpc/EdgeGrpcSession.java | 4 +- .../RelationUpdateMsgConstructor.java | 2 +- .../edge/rpc/init/DefaultSyncEdgeService.java | 182 +++++++++++------- .../edge/rpc/init/SyncEdgeService.java | 5 +- .../server/dao/user/UserService.java | 11 ++ .../common/data/edge/EdgeQueueEntityType.java | 2 +- .../server/dao/asset/BaseAssetService.java | 2 - .../dao/entity/AbstractEntityService.java | 3 + .../server/dao/sql/user/JpaUserDao.java | 28 +++ .../server/dao/user/CassandraUserDao.java | 27 +++ .../thingsboard/server/dao/user/UserDao.java | 12 ++ .../server/dao/user/UserServiceImpl.java | 66 +++++++ .../dao/service/BaseRuleChainServiceTest.java | 1 + 15 files changed, 365 insertions(+), 80 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/UserController.java b/application/src/main/java/org/thingsboard/server/controller/UserController.java index b3d66adc53..13409e0d07 100644 --- a/application/src/main/java/org/thingsboard/server/controller/UserController.java +++ b/application/src/main/java/org/thingsboard/server/controller/UserController.java @@ -35,13 +35,17 @@ import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.audit.ActionType; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.exception.ThingsboardErrorCode; import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.data.page.TimePageData; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.UserCredentials; import org.thingsboard.server.queue.util.TbCoreComponent; @@ -56,6 +60,8 @@ import org.thingsboard.server.utils.MiscUtils; import javax.servlet.http.HttpServletRequest; +import static org.thingsboard.server.controller.EdgeController.EDGE_ID; + @RestController @TbCoreComponent @RequestMapping("/api") @@ -300,4 +306,88 @@ public class UserController extends BaseController { throw handleException(e); } } + + @PreAuthorize("hasAuthority('TENANT_ADMIN')") + @RequestMapping(value = "/edge/{edgeId}/user/{userId}", method = RequestMethod.POST) + @ResponseBody + public User assignUserToEdge(@PathVariable(EDGE_ID) String strEdgeId, + @PathVariable(USER_ID) String strUserId) throws ThingsboardException { + checkParameter(EDGE_ID, strEdgeId); + checkParameter(USER_ID, strUserId); + try { + EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); + Edge edge = checkEdgeId(edgeId, Operation.READ); + + UserId userId = new UserId(toUUID(strUserId)); + checkUserId(userId, Operation.ASSIGN_TO_EDGE); + + User savedUser = checkNotNull(userService.assignUserToEdge(getTenantId(), userId, edgeId)); + + logEntityAction(userId, savedUser, + savedUser.getCustomerId(), + ActionType.ASSIGNED_TO_EDGE, null, strUserId, strEdgeId, edge.getName()); + + return savedUser; + } catch (Exception e) { + + logEntityAction(emptyId(EntityType.USER), null, + null, + ActionType.ASSIGNED_TO_EDGE, e, strUserId, strEdgeId); + + throw handleException(e); + } + } + + @PreAuthorize("hasAuthority('TENANT_ADMIN')") + @RequestMapping(value = "/edge/{edgeId}/user/{userId}", method = RequestMethod.DELETE) + @ResponseBody + public User unassignUserFromEdge(@PathVariable(EDGE_ID) String strEdgeId, + @PathVariable(USER_ID) String strUserId) throws ThingsboardException { + checkParameter(EDGE_ID, strEdgeId); + checkParameter(USER_ID, strUserId); + try { + EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); + Edge edge = checkEdgeId(edgeId, Operation.READ); + + UserId userId = new UserId(toUUID(strUserId)); + User user = checkUserId(userId, Operation.UNASSIGN_FROM_EDGE); + + User savedUser = checkNotNull(userService.unassignUserFromEdge(getTenantId(), userId, edgeId)); + + logEntityAction(userId, savedUser, + savedUser.getCustomerId(), + ActionType.UNASSIGNED_FROM_EDGE, null, strUserId, edge.getId().toString(), edge.getName()); + + return savedUser; + } catch (Exception e) { + + logEntityAction(emptyId(EntityType.USER), null, + null, + ActionType.UNASSIGNED_FROM_EDGE, e, strUserId); + + throw handleException(e); + } + } + + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN')") + @RequestMapping(value = "/edge/{edgeId}/users", params = {"limit"}, method = RequestMethod.GET) + @ResponseBody + public TimePageData getEdgeUsers( + @PathVariable(EDGE_ID) String strEdgeId, + @RequestParam int limit, + @RequestParam(required = false) Long startTime, + @RequestParam(required = false) Long endTime, + @RequestParam(required = false, defaultValue = "false") boolean ascOrder, + @RequestParam(required = false) String offset) throws ThingsboardException { + checkParameter(EDGE_ID, strEdgeId); + try { + TenantId tenantId = getCurrentUser().getTenantId(); + EdgeId edgeId = new EdgeId(toUUID(strEdgeId)); + checkEdgeId(edgeId, Operation.READ); + TimePageLink pageLink = createPageLink(limit, startTime, endTime, ascOrder, offset); + return checkNotNull(userService.findUsersByTenantIdAndEdgeId(tenantId, edgeId, pageLink).get()); + } catch (Exception e) { + throw handleException(e); + } + } } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index 6959a522bf..db1b2411eb 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,6 +16,7 @@ package org.thingsboard.server.service.edge; import lombok.Data; +import lombok.Getter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; @@ -37,9 +38,10 @@ import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgCon import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.RelationUpdateMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.UserUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.init.SyncEdgeService; -import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor; +import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.state.DeviceStateService; @@ -138,4 +140,8 @@ public class EdgeContextComponent { @Lazy @Autowired private EdgeEventStorageSettings edgeEventStorageSettings; + + @Autowired + @Getter + private DbCallbackExecutorService dbCallbackExecutor; } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index d029b5ff20..a095675fb5 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -141,7 +141,7 @@ public final class EdgeGrpcSession implements Closeable { outputStream.onError(new RuntimeException(responseMsg.getErrorMsg())); } if (ConnectResponseCode.ACCEPTED == responseMsg.getResponseCode()) { - ctx.getSyncEdgeService().sync(edge, outputStream); + ctx.getSyncEdgeService().sync(ctx, edge, outputStream); } } if (connected) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RelationUpdateMsgConstructor.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RelationUpdateMsgConstructor.java index 1c0af319f3..e09b9f2e8b 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RelationUpdateMsgConstructor.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/constructor/RelationUpdateMsgConstructor.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java index d51474046e..510fa6978a 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,7 +17,6 @@ package org.thingsboard.server.service.edge.rpc.init; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -26,7 +25,7 @@ import org.thingsboard.server.common.data.Dashboard; import org.thingsboard.server.common.data.DashboardInfo; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.EntityView; -import org.thingsboard.server.common.data.Tenant; +import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EntityId; @@ -45,6 +44,7 @@ import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.relation.RelationService; import org.thingsboard.server.dao.rule.RuleChainService; +import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.gen.edge.AssetUpdateMsg; import org.thingsboard.server.gen.edge.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg; @@ -56,12 +56,15 @@ import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg; import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; import org.thingsboard.server.gen.edge.UpdateMsgType; +import org.thingsboard.server.gen.edge.UserUpdateMsg; +import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.service.edge.rpc.constructor.AssetUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.DashboardUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.DeviceUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.EntityViewUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.RelationUpdateMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.RuleChainUpdateMsgConstructor; +import org.thingsboard.server.service.edge.rpc.constructor.UserUpdateMsgConstructor; import java.util.ArrayList; import java.util.HashSet; @@ -92,10 +95,10 @@ public class DefaultSyncEdgeService implements SyncEdgeService { private DashboardService dashboardService; @Autowired - private RuleChainUpdateMsgConstructor ruleChainUpdateMsgConstructor; + private UserService userService; @Autowired - private RelationUpdateMsgConstructor relationUpdateMsgConstructor; + private RuleChainUpdateMsgConstructor ruleChainUpdateMsgConstructor; @Autowired private DeviceUpdateMsgConstructor deviceUpdateMsgConstructor; @@ -109,68 +112,56 @@ public class DefaultSyncEdgeService implements SyncEdgeService { @Autowired private DashboardUpdateMsgConstructor dashboardUpdateMsgConstructor; + @Autowired + private UserUpdateMsgConstructor userUpdateMsgConstructor; + + @Autowired + private RelationUpdateMsgConstructor relationUpdateMsgConstructor; + @Override - public void sync(Edge edge, StreamObserver outputStream) { + public void sync(EdgeContextComponent ctx, Edge edge, StreamObserver outputStream) { Set pushedEntityIds = new HashSet<>(); syncRuleChains(edge, pushedEntityIds, outputStream); syncDevices(edge, pushedEntityIds, outputStream); syncAssets(edge, pushedEntityIds, outputStream); syncEntityViews(edge, pushedEntityIds, outputStream); syncDashboards(edge, pushedEntityIds, outputStream); - syncRelations(edge, pushedEntityIds, outputStream); + syncUsers(ctx, edge, pushedEntityIds, outputStream); + syncRelations(ctx, edge, pushedEntityIds, outputStream); } - private void syncRelations(Edge edge, Set pushedEntityIds, StreamObserver outputStream) { - if (!pushedEntityIds.isEmpty()) { - List>> futures = new ArrayList<>(); - for (EntityId entityId : pushedEntityIds) { - futures.add(syncRelations(edge, entityId, EntitySearchDirection.FROM)); - futures.add(syncRelations(edge, entityId, EntitySearchDirection.TO)); - } - ListenableFuture>> relationsListFuture = Futures.allAsList(futures); - Futures.transform(relationsListFuture, relationsList -> { - try { - Set uniqueEntityRelations = new HashSet<>(); - if (!relationsList.isEmpty()) { - for (List entityRelations : relationsList) { - if (!entityRelations.isEmpty()) { - uniqueEntityRelations.addAll(entityRelations); - } - } - } - if (!uniqueEntityRelations.isEmpty()) { - log.trace("[{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), uniqueEntityRelations.size()); - for (EntityRelation relation : uniqueEntityRelations) { - try { - RelationUpdateMsg relationUpdateMsg = - relationUpdateMsgConstructor.constructRelationUpdatedMsg( - UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, - relation); - EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setRelationUpdateMsg(relationUpdateMsg) - .build(); - outputStream.onNext(ResponseMsg.newBuilder() - .setEntityUpdateMsg(entityUpdateMsg) - .build()); - } catch (Exception e) { - log.error("Exception during loading relation [{}] to edge on init!", relation, e); - } - } + private void syncRuleChains(Edge edge, Set pushedEntityIds, StreamObserver outputStream) { + try { + TimePageLink pageLink = new TimePageLink(100); + TimePageData pageData; + do { + pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get(); + if (!pageData.getData().isEmpty()) { + log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); + for (RuleChain ruleChain : pageData.getData()) { + RuleChainUpdateMsg ruleChainUpdateMsg = + ruleChainUpdateMsgConstructor.constructRuleChainUpdatedMsg( + edge.getRootRuleChainId(), + UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, + ruleChain); + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() + .setRuleChainUpdateMsg(ruleChainUpdateMsg) + .build(); + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) + .build()); + pushedEntityIds.add(ruleChain.getId()); } - } catch (Exception e) { - log.error("Exception during loading relation(s) to edge on init!", e); } - return null; - }, MoreExecutors.directExecutor()); + if (pageData.hasNext()) { + pageLink = pageData.getNextPageLink(); + } + } while (pageData.hasNext()); + } catch (Exception e) { + log.error("Exception during loading edge rule chain(s) on sync!", e); } } - private ListenableFuture> syncRelations(Edge edge, EntityId entityId, EntitySearchDirection direction) { - EntityRelationsQuery query = new EntityRelationsQuery(); - query.setParameters(new RelationsSearchParameters(entityId, direction, -1, false)); - return relationService.findByQuery(edge.getTenantId(), query); - } - private void syncDevices(Edge edge, Set pushedEntityIds, StreamObserver outputStream) { try { TimePageLink pageLink = new TimePageLink(100); @@ -198,7 +189,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { } } while (pageData.hasNext()); } catch (Exception e) { - log.error("Exception during loading edge device(s) on init!", e); + log.error("Exception during loading edge device(s) on sync!", e); } } @@ -229,7 +220,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { } } while (pageData.hasNext()); } catch (Exception e) { - log.error("Exception during loading edge asset(s) on init!", e); + log.error("Exception during loading edge asset(s) on sync!", e); } } @@ -260,7 +251,7 @@ public class DefaultSyncEdgeService implements SyncEdgeService { } } while (pageData.hasNext()); } catch (Exception e) { - log.error("Exception during loading edge entity view(s) on init!", e); + log.error("Exception during loading edge entity view(s) on sync!", e); } } @@ -292,31 +283,30 @@ public class DefaultSyncEdgeService implements SyncEdgeService { } } while (pageData.hasNext()); } catch (Exception e) { - log.error("Exception during loading edge dashboard(s) on init!", e); + log.error("Exception during loading edge dashboard(s) on sync!", e); } } - private void syncRuleChains(Edge edge, Set pushedEntityIds, StreamObserver outputStream) { + private void syncUsers(EdgeContextComponent ctx, Edge edge, Set pushedEntityIds, StreamObserver outputStream) { try { TimePageLink pageLink = new TimePageLink(100); - TimePageData pageData; + TimePageData pageData; do { - pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get(); + pageData = userService.findUsersByTenantIdAndEdgeId(edge.getTenantId(), edge.getId(), pageLink).get(); if (!pageData.getData().isEmpty()) { - log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); - for (RuleChain ruleChain : pageData.getData()) { - RuleChainUpdateMsg ruleChainUpdateMsg = - ruleChainUpdateMsgConstructor.constructRuleChainUpdatedMsg( - edge.getRootRuleChainId(), - UpdateMsgType.ENTITY_UPDATED_RPC_MESSAGE, - ruleChain); + log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); + for (User user : pageData.getData()) { + UserUpdateMsg userUpdateMsg = + userUpdateMsgConstructor.constructUserUpdatedMsg( + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, + user); EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() - .setRuleChainUpdateMsg(ruleChainUpdateMsg) + .setUserUpdateMsg(userUpdateMsg) .build(); outputStream.onNext(ResponseMsg.newBuilder() .setEntityUpdateMsg(entityUpdateMsg) .build()); - pushedEntityIds.add(ruleChain.getId()); + pushedEntityIds.add(user.getId()); } } if (pageData.hasNext()) { @@ -324,10 +314,62 @@ public class DefaultSyncEdgeService implements SyncEdgeService { } } while (pageData.hasNext()); } catch (Exception e) { - log.error("Exception during loading edge rule chain(s) on init!", e); + log.error("Exception during loading edge user(s) on sync!", e); + } + } + + private void syncRelations(EdgeContextComponent ctx, Edge edge, Set pushedEntityIds, StreamObserver outputStream) { + if (!pushedEntityIds.isEmpty()) { + List>> futures = new ArrayList<>(); + for (EntityId entityId : pushedEntityIds) { + futures.add(syncRelations(edge, entityId, EntitySearchDirection.FROM)); + futures.add(syncRelations(edge, entityId, EntitySearchDirection.TO)); + } + ListenableFuture>> relationsListFuture = Futures.allAsList(futures); + Futures.transform(relationsListFuture, relationsList -> { + try { + Set uniqueEntityRelations = new HashSet<>(); + if (!relationsList.isEmpty()) { + for (List entityRelations : relationsList) { + if (!entityRelations.isEmpty()) { + uniqueEntityRelations.addAll(entityRelations); + } + } + } + if (!uniqueEntityRelations.isEmpty()) { + log.trace("[{}] [{}] relation(s) are going to be pushed to edge.", edge.getId(), uniqueEntityRelations.size()); + for (EntityRelation relation : uniqueEntityRelations) { + try { + RelationUpdateMsg relationUpdateMsg = + relationUpdateMsgConstructor.constructRelationUpdatedMsg( + UpdateMsgType.ENTITY_CREATED_RPC_MESSAGE, + relation); + EntityUpdateMsg entityUpdateMsg = EntityUpdateMsg.newBuilder() + .setRelationUpdateMsg(relationUpdateMsg) + .build(); + outputStream.onNext(ResponseMsg.newBuilder() + .setEntityUpdateMsg(entityUpdateMsg) + .build()); + } catch (Exception e) { + log.error("Exception during loading relation [{}] to edge on sync!", relation, e); + } + } + } + } catch (Exception e) { + log.error("Exception during loading relation(s) to edge on sync!", e); + } + return null; + }, ctx.getDbCallbackExecutor()); } } + private ListenableFuture> syncRelations(Edge edge, EntityId entityId, EntitySearchDirection direction) { + EntityRelationsQuery query = new EntityRelationsQuery(); + query.setParameters(new RelationsSearchParameters(entityId, direction, -1, false)); + return relationService.findByQuery(edge.getTenantId(), query); + } + + @Override public void syncRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver outputStream) { if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java index c83a9ec3b0..e408eedd65 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,10 +19,11 @@ import io.grpc.stub.StreamObserver; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.gen.edge.ResponseMsg; import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg; +import org.thingsboard.server.service.edge.EdgeContextComponent; public interface SyncEdgeService { - void sync(Edge edge, StreamObserver outputStream); + void sync(EdgeContextComponent ctx, Edge edge, StreamObserver outputStream); void syncRuleChainMetadata(Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg, StreamObserver outputStream); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java index f0e2a0e7a1..32e22a84e8 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/user/UserService.java @@ -17,12 +17,17 @@ package org.thingsboard.server.dao.user; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.AssetId; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserCredentialsId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.data.page.TimePageData; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.common.data.security.UserCredentials; public interface UserService { @@ -66,4 +71,10 @@ public interface UserService { void onUserLoginSuccessful(TenantId tenantId, UserId userId); int onUserLoginIncorrectCredentials(TenantId tenantId, UserId userId); + + User assignUserToEdge(TenantId tenantId, UserId userId, EdgeId edgeId); + + User unassignUserFromEdge(TenantId tenantId, UserId userId, EdgeId edgeId); + + ListenableFuture> findUsersByTenantIdAndEdgeId(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java index 7ba316a529..ca268380f4 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/edge/EdgeQueueEntityType.java @@ -5,7 +5,7 @@ * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, diff --git a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java index 8c0f4cf644..62f68e962c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/asset/BaseAssetService.java @@ -80,10 +80,8 @@ import static org.thingsboard.server.dao.service.Validator.validateString; public class BaseAssetService extends AbstractEntityService implements AssetService { public static final String INCORRECT_TENANT_ID = "Incorrect tenantId "; - public static final String INCORRECT_PAGE_LINK = "Incorrect page link "; public static final String INCORRECT_CUSTOMER_ID = "Incorrect customerId "; public static final String INCORRECT_ASSET_ID = "Incorrect assetId "; - public static final String INCORRECT_EDGE_ID = "Incorrect edgeId "; @Autowired private AssetDao assetDao; diff --git a/dao/src/main/java/org/thingsboard/server/dao/entity/AbstractEntityService.java b/dao/src/main/java/org/thingsboard/server/dao/entity/AbstractEntityService.java index 17fa7bcb3e..523df59db5 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entity/AbstractEntityService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entity/AbstractEntityService.java @@ -31,6 +31,9 @@ import java.util.concurrent.ExecutionException; @Slf4j public abstract class AbstractEntityService { + public static final String INCORRECT_EDGE_ID = "Incorrect edgeId "; + public static final String INCORRECT_PAGE_LINK = "Incorrect page link "; + @Autowired protected RelationService relationService; diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/user/JpaUserDao.java b/dao/src/main/java/org/thingsboard/server/dao/sql/user/JpaUserDao.java index f30f6ba6d1..5240fe2dc4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/user/JpaUserDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/user/JpaUserDao.java @@ -15,20 +15,31 @@ */ package org.thingsboard.server.dao.sql.user; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.PageRequest; import org.springframework.data.repository.CrudRepository; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.UserEntity; +import org.thingsboard.server.dao.relation.RelationDao; import org.thingsboard.server.dao.sql.JpaAbstractSearchTextDao; import org.thingsboard.server.dao.user.UserDao; import org.thingsboard.server.dao.util.SqlDao; +import java.util.ArrayList; import java.util.List; import java.util.Objects; import java.util.UUID; @@ -41,11 +52,15 @@ import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID_STR; */ @Component @SqlDao +@Slf4j public class JpaUserDao extends JpaAbstractSearchTextDao implements UserDao { @Autowired private UserRepository userRepository; + @Autowired + private RelationDao relationDao; + @Override protected Class getEntityClass() { return UserEntity.class; @@ -87,4 +102,17 @@ public class JpaUserDao extends JpaAbstractSearchTextDao imple PageRequest.of(0, pageLink.getLimit()))); } + + @Override + public ListenableFuture> findUsersByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink) { + log.debug("Try to find users by tenantId [{}], edgeId [{}] and pageLink [{}]", tenantId, edgeId, pageLink); + ListenableFuture> relations = relationDao.findRelations(new TenantId(tenantId), new EdgeId(edgeId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE, EntityType.USER, pageLink); + return Futures.transformAsync(relations, input -> { + List> userFutures = new ArrayList<>(input.size()); + for (EntityRelation relation : input) { + userFutures.add(findByIdAsync(new TenantId(tenantId), relation.getTo().getId())); + } + return Futures.successfulAsList(userFutures); + }, MoreExecutors.directExecutor()); + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/user/CassandraUserDao.java b/dao/src/main/java/org/thingsboard/server/dao/user/CassandraUserDao.java index 2b4546676a..2f134e9839 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/user/CassandraUserDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/user/CassandraUserDao.java @@ -16,18 +16,30 @@ package org.thingsboard.server.dao.user; import com.datastax.driver.core.querybuilder.Select.Where; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.model.nosql.UserEntity; import org.thingsboard.server.dao.nosql.CassandraAbstractSearchTextDao; +import org.thingsboard.server.dao.relation.RelationDao; import org.thingsboard.server.dao.util.NoSqlDao; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -40,6 +52,9 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.select; @NoSqlDao public class CassandraUserDao extends CassandraAbstractSearchTextDao implements UserDao { + @Autowired + private RelationDao relationDao; + @Override protected Class getColumnFamilyClass() { return UserEntity.class; @@ -86,4 +101,16 @@ public class CassandraUserDao extends CassandraAbstractSearchTextDao> findUsersByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink) { + log.debug("Try to find users by tenantId [{}], edgeId [{}] and pageLink [{}]", tenantId, edgeId, pageLink); + ListenableFuture> relations = relationDao.findRelations(new TenantId(tenantId), new EdgeId(edgeId), EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE, EntityType.USER, pageLink); + return Futures.transformAsync(relations, input -> { + List> userFutures = new ArrayList<>(input.size()); + for (EntityRelation relation : input) { + userFutures.add(findByIdAsync(new TenantId(tenantId), relation.getTo().getId())); + } + return Futures.successfulAsList(userFutures); + }, MoreExecutors.directExecutor()); + } } diff --git a/dao/src/main/java/org/thingsboard/server/dao/user/UserDao.java b/dao/src/main/java/org/thingsboard/server/dao/user/UserDao.java index 4e7b222c66..5de17132ba 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/user/UserDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/user/UserDao.java @@ -15,9 +15,11 @@ */ package org.thingsboard.server.dao.user; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.User; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.data.page.TimePageLink; import org.thingsboard.server.dao.Dao; import java.util.List; @@ -59,5 +61,15 @@ public interface UserDao extends Dao { * @return the list of user entities */ List findCustomerUsers(UUID tenantId, UUID customerId, TextPageLink pageLink); + + /** + * Find users by tenantId, edgeId and page link. + * + * @param tenantId the tenantId + * @param edgeId the edgeId + * @param pageLink the page link + * @return the list of user objects + */ + ListenableFuture> findUsersByTenantIdAndEdgeId(UUID tenantId, UUID edgeId, TimePageLink pageLink); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java index 914ba5d3ec..cd7b7ef860 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/user/UserServiceImpl.java @@ -18,7 +18,10 @@ package org.thingsboard.server.dao.user; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Function; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; @@ -28,15 +31,22 @@ import org.springframework.stereotype.Service; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserCredentialsId; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.page.TextPageData; import org.thingsboard.server.common.data.page.TextPageLink; +import org.thingsboard.server.common.data.page.TimePageData; +import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.data.relation.EntityRelation; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.common.data.security.UserCredentials; import org.thingsboard.server.dao.customer.CustomerDao; +import org.thingsboard.server.dao.edge.EdgeService; import org.thingsboard.server.dao.entity.AbstractEntityService; import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.IncorrectParameterException; @@ -45,9 +55,11 @@ import org.thingsboard.server.dao.service.DataValidator; import org.thingsboard.server.dao.service.PaginatedRemover; import org.thingsboard.server.dao.tenant.TenantDao; +import javax.annotation.Nullable; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import static org.thingsboard.server.dao.service.Validator.validateId; import static org.thingsboard.server.dao.service.Validator.validatePageLink; @@ -85,6 +97,9 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic @Autowired private CustomerDao customerDao; + @Autowired + private EdgeService edgeService; + @Override public User findUserByEmail(TenantId tenantId, String email) { log.trace("Executing findUserByEmail [{}]", email); @@ -312,6 +327,57 @@ public class UserServiceImpl extends AbstractEntityService implements UserServic return failedLoginAttempts; } + @Override + public User assignUserToEdge(TenantId tenantId, UserId userId, EdgeId edgeId) { + User user = findUserById(tenantId, userId); + Edge edge = edgeService.findEdgeById(tenantId, edgeId); + if (edge == null) { + throw new DataValidationException("Can't assign user to non-existent edge!"); + } + if (!edge.getTenantId().getId().equals(user.getTenantId().getId())) { + throw new DataValidationException("Can't assign user to edge from different tenant!"); + } + try { + createRelation(tenantId, new EntityRelation(edgeId, userId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE)); + } catch (ExecutionException | InterruptedException e) { + log.warn("[{}] Failed to create user relation. Edge Id: [{}]", userId, edgeId); + throw new RuntimeException(e); + } + return user; + } + + @Override + public User unassignUserFromEdge(TenantId tenantId, UserId userId, EdgeId edgeId) { + User user = findUserById(tenantId, userId); + Edge edge = edgeService.findEdgeById(tenantId, edgeId); + if (edge == null) { + throw new DataValidationException("Can't unassign user from non-existent edge!"); + } + try { + deleteRelation(tenantId, new EntityRelation(edgeId, userId, EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE)); + } catch (ExecutionException | InterruptedException e) { + log.warn("[{}] Failed to delete user relation. Edge Id: [{}]", userId, edgeId); + throw new RuntimeException(e); + } + return user; + } + + @Override + public ListenableFuture> findUsersByTenantIdAndEdgeId(TenantId tenantId, EdgeId edgeId, TimePageLink pageLink) { + log.trace("Executing findUsersByTenantIdAndEdgeId, tenantId [{}], edgeId [{}], pageLink [{}]", tenantId, edgeId, pageLink); + validateId(tenantId, INCORRECT_TENANT_ID + tenantId); + validateId(edgeId, INCORRECT_EDGE_ID + edgeId); + validatePageLink(pageLink, INCORRECT_PAGE_LINK + pageLink); + ListenableFuture> users = userDao.findUsersByTenantIdAndEdgeId(tenantId.getId(), edgeId.getId(), pageLink); + return Futures.transform(users, new Function, TimePageData>() { + @Nullable + @Override + public TimePageData apply(@Nullable List users) { + return new TimePageData<>(users, pageLink); + } + }, MoreExecutors.directExecutor()); + } + private int increaseFailedLoginAttempts(User user) { JsonNode additionalInfo = user.getAdditionalInfo(); if (!(additionalInfo instanceof ObjectNode)) { diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRuleChainServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRuleChainServiceTest.java index f37017068e..b7ecc1f784 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/BaseRuleChainServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/BaseRuleChainServiceTest.java @@ -331,6 +331,7 @@ public abstract class BaseRuleChainServiceTest extends AbstractServiceTest { result = ruleChainService.findDefaultEdgeRuleChainsByTenantId(tenantId).get(); Assert.assertEquals(1, result.size()); } + private RuleChainId saveRuleChainAndSetDefaultEdge(String name) { RuleChain edgeRuleChain = new RuleChain(); edgeRuleChain.setTenantId(tenantId);