diff --git a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java index 68afbdaf99..1a74e368f5 100644 --- a/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java +++ b/application/src/main/java/org/thingsboard/server/actors/tenant/TenantActor.java @@ -245,11 +245,11 @@ public class TenantActor extends RuleChainManagerActor { EdgeId edgeId = new EdgeId(msg.getEntityId().getId()); EdgeRpcService edgeRpcService = systemContext.getEdgeRpcService(); if (msg.getEvent() == ComponentLifecycleEvent.DELETED) { - edgeRpcService.deleteEdge(edgeId); + edgeRpcService.deleteEdge(tenantId, edgeId); } else { Edge edge = systemContext.getEdgeService().findEdgeById(tenantId, edgeId); if (msg.getEvent() == ComponentLifecycleEvent.UPDATED) { - edgeRpcService.updateEdge(edge); + edgeRpcService.updateEdge(tenantId, edge); } } } else if (isRuleEngineForCurrentTenant) { @@ -277,7 +277,7 @@ public class TenantActor extends RuleChainManagerActor { private void onToEdgeSessionMsg(EdgeEventUpdateMsg msg) { log.trace("[{}] onToEdgeSessionMsg [{}]", msg.getTenantId(), msg); - systemContext.getEdgeRpcService().onEdgeEvent(msg.getEdgeId()); + systemContext.getEdgeRpcService().onEdgeEvent(tenantId, msg.getEdgeId()); } public static class ActorCreator extends ContextBasedCreator { diff --git a/application/src/main/java/org/thingsboard/server/controller/BaseController.java b/application/src/main/java/org/thingsboard/server/controller/BaseController.java index 505416f4d6..9f6379cf6c 100644 --- a/application/src/main/java/org/thingsboard/server/controller/BaseController.java +++ b/application/src/main/java/org/thingsboard/server/controller/BaseController.java @@ -43,8 +43,8 @@ import org.thingsboard.server.common.data.Firmware; import org.thingsboard.server.common.data.FirmwareInfo; import org.thingsboard.server.common.data.HasName; import org.thingsboard.server.common.data.HasTenantId; -import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.TbResource; +import org.thingsboard.server.common.data.TbResourceInfo; import org.thingsboard.server.common.data.Tenant; import org.thingsboard.server.common.data.TenantInfo; import org.thingsboard.server.common.data.TenantProfile; @@ -71,9 +71,9 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.EntityViewId; import org.thingsboard.server.common.data.id.FirmwareId; -import org.thingsboard.server.common.data.id.TbResourceId; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.RuleNodeId; +import org.thingsboard.server.common.data.id.TbResourceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.TenantProfileId; import org.thingsboard.server.common.data.id.UserId; @@ -128,10 +128,9 @@ import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.provider.TbQueueProducerProvider; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.component.ComponentDiscoveryService; -import org.thingsboard.server.service.firmware.FirmwareStateService; import org.thingsboard.server.service.edge.EdgeNotificationService; -import org.thingsboard.server.service.edge.rpc.EdgeGrpcService; -import org.thingsboard.server.service.edge.rpc.init.SyncEdgeService; +import org.thingsboard.server.service.edge.rpc.EdgeRpcService; +import org.thingsboard.server.service.firmware.FirmwareStateService; import org.thingsboard.server.service.lwm2m.LwM2MServerSecurityInfoRepository; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.TbClusterService; @@ -276,10 +275,7 @@ public abstract class BaseController { protected EdgeNotificationService edgeNotificationService; @Autowired(required = false) - protected SyncEdgeService syncEdgeService; - - @Autowired(required = false) - protected EdgeGrpcService edgeGrpcService; + protected EdgeRpcService edgeGrpcService; @Value("${server.log_controller_error_stack_trace}") @Getter diff --git a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java index 9c196c6bd2..c71f1a7a74 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EdgeController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EdgeController.java @@ -49,7 +49,6 @@ import org.thingsboard.server.dao.exception.DataValidationException; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.queue.util.TbCoreComponent; -import org.thingsboard.server.service.edge.rpc.EdgeGrpcSession; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.security.permission.Resource; @@ -536,9 +535,7 @@ public class EdgeController extends BaseController { edgeId = checkNotNull(edgeId); SecurityUser user = getCurrentUser(); TenantId tenantId = user.getTenantId(); - EdgeGrpcSession session = edgeGrpcService.getEdgeGrpcSessionById(tenantId, edgeId); - Edge edge = session.getEdge(); - syncEdgeService.sync(tenantId, edge); + edgeGrpcService.startSyncProcess(tenantId, edgeId); } else { throw new ThingsboardException("Edges support disabled", ThingsboardErrorCode.GENERAL); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index 66e64c14f1..5d85ddf16f 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -326,7 +326,7 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { case ASSIGNED_TO_CUSTOMER: case UNASSIGNED_FROM_CUSTOMER: edgeIdsFuture = edgeService.findRelatedEdgeIdsByEntityId(tenantId, entityId); - Futures.addCallback(edgeIdsFuture, new FutureCallback>() { + Futures.addCallback(edgeIdsFuture, new FutureCallback<>() { @Override public void onSuccess(@Nullable List edgeIds) { if (edgeIds != null && !edgeIds.isEmpty()) { diff --git a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java index eb3852f7ac..62940f2931 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/EdgeContextComponent.java @@ -54,7 +54,7 @@ import org.thingsboard.server.service.edge.rpc.constructor.RuleChainMsgConstruct import org.thingsboard.server.service.edge.rpc.constructor.UserMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.WidgetTypeMsgConstructor; import org.thingsboard.server.service.edge.rpc.constructor.WidgetsBundleMsgConstructor; -import org.thingsboard.server.service.edge.rpc.init.SyncEdgeService; +import org.thingsboard.server.service.edge.rpc.sync.EdgeRequestsService; import org.thingsboard.server.service.edge.rpc.processor.AlarmProcessor; import org.thingsboard.server.service.edge.rpc.processor.DeviceProcessor; import org.thingsboard.server.service.edge.rpc.processor.RelationProcessor; @@ -149,7 +149,7 @@ public class EdgeContextComponent { @Lazy @Autowired - private SyncEdgeService syncEdgeService; + private EdgeRequestsService edgeRequestsService; @Lazy @Autowired diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventUtils.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventUtils.java new file mode 100644 index 0000000000..f09184504e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeEventUtils.java @@ -0,0 +1,48 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc; + +import com.fasterxml.jackson.databind.JsonNode; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.id.TenantId; + +public final class EdgeEventUtils { + + private EdgeEventUtils() { + } + + public static EdgeEvent constructEdgeEvent(TenantId tenantId, + EdgeId edgeId, + EdgeEventType type, + EdgeEventActionType action, + EntityId entityId, + JsonNode body) { + EdgeEvent edgeEvent = new EdgeEvent(); + edgeEvent.setTenantId(tenantId); + edgeEvent.setEdgeId(edgeId); + edgeEvent.setType(type); + edgeEvent.setAction(action); + if (entityId != null) { + edgeEvent.setEntityId(entityId.getId()); + } + edgeEvent.setBody(body); + return edgeEvent; + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 70033f10e1..59b22864bd 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -53,6 +53,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -95,6 +96,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i private ScheduledExecutorService scheduler; + private ExecutorService syncExecutorService; + @PostConstruct public void init() { log.info("Initializing Edge RPC service!"); @@ -120,6 +123,8 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i throw new RuntimeException("Failed to start Edge RPC server!"); } this.scheduler = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-scheduler")); + this.syncExecutorService = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), ThingsBoardThreadFactory.forName("edge-sync")); log.info("Edge RPC service initialized!"); } @@ -139,29 +144,32 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i if (scheduler != null) { scheduler.shutdownNow(); } + if (syncExecutorService != null) { + syncExecutorService.shutdownNow(); + } } @Override public StreamObserver handleMsgs(StreamObserver outputStream) { - return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper).getInputStream(); + return new EdgeGrpcSession(ctx, outputStream, this::onEdgeConnect, this::onEdgeDisconnect, mapper, syncExecutorService).getInputStream(); } @Override - public void updateEdge(Edge edge) { + public void updateEdge(TenantId tenantId, Edge edge) { EdgeGrpcSession session = sessions.get(edge.getId()); if (session != null && session.isConnected()) { - log.debug("[{}] Updating configuration for edge [{}] [{}]", edge.getTenantId(), edge.getName(), edge.getId()); + log.debug("[{}] Updating configuration for edge [{}] [{}]", tenantId, edge.getName(), edge.getId()); session.onConfigurationUpdate(edge); } else { - log.debug("[{}] Session doesn't exist for edge [{}] [{}]", edge.getTenantId(), edge.getName(), edge.getId()); + log.debug("[{}] Session doesn't exist for edge [{}] [{}]", tenantId, edge.getName(), edge.getId()); } } @Override - public void deleteEdge(EdgeId edgeId) { + public void deleteEdge(TenantId tenantId, EdgeId edgeId) { EdgeGrpcSession session = sessions.get(edgeId); if (session != null && session.isConnected()) { - log.info("Closing and removing session for edge [{}]", edgeId); + log.info("[{}] Closing and removing session for edge [{}]", tenantId, edgeId); session.close(); sessions.remove(edgeId); sessionNewEvents.remove(edgeId); @@ -170,10 +178,10 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i } @Override - public void onEdgeEvent(EdgeId edgeId) { - log.trace("[{}] onEdgeEvent", edgeId.getId()); + public void onEdgeEvent(TenantId tenantId, EdgeId edgeId) { + log.trace("[{}] onEdgeEvent [{}]", tenantId, edgeId.getId()); if (!sessionNewEvents.get(edgeId)) { - log.trace("[{}] set session new events flag to true", edgeId.getId()); + log.trace("[{}] set session new events flag to true [{}]", tenantId, edgeId.getId()); sessionNewEvents.put(edgeId, true); } } @@ -188,10 +196,11 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i scheduleEdgeEventsCheck(edgeGrpcSession); } - public EdgeGrpcSession getEdgeGrpcSessionById(TenantId tenantId, EdgeId edgeId) { + @Override + public void startSyncProcess(TenantId tenantId, EdgeId edgeId) { EdgeGrpcSession session = sessions.get(edgeId); if (session != null && session.isConnected()) { - return session; + session.startSyncProcess(); } else { log.error("[{}] Edge is not connected [{}]", tenantId, edgeId); throw new RuntimeException("Edge is not connected"); diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java index fb01926e2a..da0b1a7736 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcSession.java @@ -16,9 +16,7 @@ package org.thingsboard.server.service.edge.rpc; import com.datastax.oss.driver.api.core.uuid.Uuids; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -28,6 +26,7 @@ import io.grpc.stub.StreamObserver; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.AdminSettings; import org.thingsboard.server.common.data.Customer; import org.thingsboard.server.common.data.Dashboard; @@ -61,8 +60,7 @@ import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.page.PageData; -import org.thingsboard.server.common.data.page.SortOrder; -import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.rule.RuleChainMetaData; @@ -71,7 +69,6 @@ import org.thingsboard.server.common.data.security.UserCredentials; import org.thingsboard.server.common.data.widget.WidgetType; import org.thingsboard.server.common.data.widget.WidgetsBundle; import org.thingsboard.server.common.transport.util.JsonUtils; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.gen.edge.AdminSettingsUpdateMsg; import org.thingsboard.server.gen.edge.AlarmUpdateMsg; import org.thingsboard.server.gen.edge.AssetUpdateMsg; @@ -83,6 +80,7 @@ import org.thingsboard.server.gen.edge.CustomerUpdateMsg; import org.thingsboard.server.gen.edge.DashboardUpdateMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsUpdateMsg; +import org.thingsboard.server.gen.edge.DeviceProfileDevicesRequestMsg; import org.thingsboard.server.gen.edge.DeviceProfileUpdateMsg; import org.thingsboard.server.gen.edge.DeviceRpcCallMsg; import org.thingsboard.server.gen.edge.DeviceUpdateMsg; @@ -92,6 +90,7 @@ import org.thingsboard.server.gen.edge.EdgeConfiguration; import org.thingsboard.server.gen.edge.EdgeUpdateMsg; import org.thingsboard.server.gen.edge.EntityDataProto; import org.thingsboard.server.gen.edge.EntityViewUpdateMsg; +import org.thingsboard.server.gen.edge.EntityViewsRequestMsg; import org.thingsboard.server.gen.edge.RelationRequestMsg; import org.thingsboard.server.gen.edge.RelationUpdateMsg; import org.thingsboard.server.gen.edge.RequestMsg; @@ -100,14 +99,26 @@ import org.thingsboard.server.gen.edge.ResponseMsg; import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg; import org.thingsboard.server.gen.edge.RuleChainMetadataUpdateMsg; import org.thingsboard.server.gen.edge.RuleChainUpdateMsg; +import org.thingsboard.server.gen.edge.SyncCompletedMsg; import org.thingsboard.server.gen.edge.UpdateMsgType; import org.thingsboard.server.gen.edge.UplinkMsg; import org.thingsboard.server.gen.edge.UplinkResponseMsg; import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg; import org.thingsboard.server.gen.edge.UserCredentialsUpdateMsg; +import org.thingsboard.server.gen.edge.WidgetBundleTypesRequestMsg; import org.thingsboard.server.gen.edge.WidgetTypeUpdateMsg; import org.thingsboard.server.gen.edge.WidgetsBundleUpdateMsg; import org.thingsboard.server.service.edge.EdgeContextComponent; +import org.thingsboard.server.service.edge.rpc.fetch.AssetsEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.CustomerUsersEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.DashboardsEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.DeviceProfilesEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.EdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.GeneralEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.RuleChainsEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.SystemWidgetsBundlesEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.TenantAdminUsersEdgeEventFetcher; +import org.thingsboard.server.service.edge.rpc.fetch.TenantWidgetsBundlesEdgeEventFetcher; import java.io.Closeable; import java.util.ArrayList; @@ -117,6 +128,7 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; @@ -140,22 +152,26 @@ public final class EdgeGrpcSession implements Closeable { private StreamObserver inputStream; private StreamObserver outputStream; private boolean connected; + private boolean syncCompleted; + + private ExecutorService syncExecutorService; private CountDownLatch latch; EdgeGrpcSession(EdgeContextComponent ctx, StreamObserver outputStream, BiConsumer sessionOpenListener, - Consumer sessionCloseListener, ObjectMapper mapper) { + Consumer sessionCloseListener, ObjectMapper mapper, ExecutorService syncExecutorService) { this.sessionId = UUID.randomUUID(); this.ctx = ctx; this.outputStream = outputStream; this.sessionOpenListener = sessionOpenListener; this.sessionCloseListener = sessionCloseListener; this.mapper = mapper; + this.syncExecutorService = syncExecutorService; initInputStream(); } private void initInputStream() { - this.inputStream = new StreamObserver() { + this.inputStream = new StreamObserver<>() { @Override public void onNext(RequestMsg requestMsg) { if (!connected && requestMsg.getMsgType().equals(RequestMsgType.CONNECT_RPC_MESSAGE)) { @@ -170,7 +186,10 @@ public final class EdgeGrpcSession implements Closeable { } } if (connected && requestMsg.getMsgType().equals(RequestMsgType.SYNC_REQUEST_RPC_MESSAGE)) { - ctx.getSyncEdgeService().sync(edge.getTenantId(), edge); + if (requestMsg.getSyncRequestMsg().getSyncRequired()) { + startSyncProcess(); + } + syncCompleted = true; } if (connected) { if (requestMsg.getMsgType().equals(RequestMsgType.UPLINK_RPC_MESSAGE) && requestMsg.hasUplinkMsg()) { @@ -198,18 +217,55 @@ public final class EdgeGrpcSession implements Closeable { if (edge != null) { try { sessionCloseListener.accept(edge.getId()); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } } try { outputStream.onCompleted(); - } catch (Exception ignored) {} + } catch (Exception ignored) { + } } }; } + public void startSyncProcess() { + log.trace("[{}][{}] Staring edge sync process", edge.getTenantId(), edge.getId()); + syncExecutorService.submit(() -> { + try { + startProcessingEdgeEvents(new TenantWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); + startProcessingEdgeEvents(new SystemWidgetsBundlesEdgeEventFetcher(ctx.getWidgetsBundleService())); + startProcessingEdgeEvents(new DeviceProfilesEdgeEventFetcher(ctx.getDeviceProfileService())); + startProcessingEdgeEvents(new RuleChainsEdgeEventFetcher(ctx.getRuleChainService())); + + startProcessingEdgeEvents(new TenantAdminUsersEdgeEventFetcher(ctx.getUserService())); + if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { + EdgeEvent customerEdgeEvent = EdgeEventUtils.constructEdgeEvent(edge.getTenantId(), edge.getId(), + EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null); + DownlinkMsg customerDownlinkMsg = convertToDownlinkMsg(customerEdgeEvent); + sendSingleDownlinkMsg(customerDownlinkMsg); + + startProcessingEdgeEvents(new CustomerUsersEdgeEventFetcher(ctx.getUserService(), edge.getCustomerId())); + } + + // TODO: voba - implement this functionality + // syncAdminSettings(edge); + + startProcessingEdgeEvents(new AssetsEdgeEventFetcher(ctx.getAssetService())); + startProcessingEdgeEvents(new DashboardsEdgeEventFetcher(ctx.getDashboardService())); + + DownlinkMsg syncCompleteDownlinkMsg = DownlinkMsg.newBuilder() + .setSyncCompletedMsg(SyncCompletedMsg.newBuilder().build()) + .build(); + sendSingleDownlinkMsg(syncCompleteDownlinkMsg); + } catch (Exception e) { + log.error("[{}][{}] Exception during sync process", edge.getTenantId(), edge.getId(), e); + } + }); + } + private void onUplinkMsg(UplinkMsg uplinkMsg) { ListenableFuture> future = processUplinkMsg(uplinkMsg); - Futures.addCallback(future, new FutureCallback>() { + Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable List result) { UplinkResponseMsg uplinkResponseMsg = UplinkResponseMsg.newBuilder().setSuccess(true).build(); @@ -271,101 +327,136 @@ public final class EdgeGrpcSession implements Closeable { void processEdgeEvents() throws ExecutionException, InterruptedException { log.trace("[{}] processHandleMessages started", this.sessionId); - if (isConnected()) { + if (isConnected() && isSyncCompleted()) { Long queueStartTs = getQueueStartTs().get(); - TimePageLink pageLink = new TimePageLink( + GeneralEdgeEventFetcher fetcher = new GeneralEdgeEventFetcher( ctx.getEdgeEventStorageSettings().getMaxReadRecordsCount(), - 0, - null, - new SortOrder("createdTime", SortOrder.Direction.ASC), queueStartTs, - null); - PageData pageData; - UUID ifOffset = null; - boolean success = true; - do { - pageData = ctx.getEdgeEventService().findEdgeEvents(edge.getTenantId(), edge.getId(), pageLink, true); - if (isConnected() && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); - List downlinkMsgsPack = convertToDownlinkMsgsPack(pageData.getData()); - log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, downlinkMsgsPack.size()); - - latch = new CountDownLatch(downlinkMsgsPack.size()); - for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { - sendDownlinkMsg(ResponseMsg.newBuilder() - .setDownlinkMsg(downlinkMsg) - .build()); - } + ctx.getEdgeEventService()); + UUID ifOffset = startProcessingEdgeEvents(fetcher); + if (ifOffset != null) { + Long newStartTs = Uuids.unixTimestamp(ifOffset); + updateQueueStartTs(newStartTs); + } + } + log.trace("[{}] processHandleMessages finished", this.sessionId); + } - ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); + private UUID startProcessingEdgeEvents(EdgeEventFetcher fetcher) throws InterruptedException { + PageLink pageLink = fetcher.getPageLink(); + PageData pageData; + UUID ifOffset = null; + boolean success = true; + do { + pageData = fetcher.fetchEdgeEvents(edge.getTenantId(), edge.getId(), pageLink); + if (isConnected() && !pageData.getData().isEmpty()) { + log.trace("[{}] [{}] event(s) are going to be processed.", this.sessionId, pageData.getData().size()); - success = latch.await(10, TimeUnit.SECONDS); - if (!success) { - log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack); - } + success = processEdgeEventsPack(pageData.getData()); + + ifOffset = pageData.getData().get(pageData.getData().size() - 1).getUuidId(); + } + if (isConnected() && (!success || pageData.hasNext())) { + try { + Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); + } catch (InterruptedException e) { + log.error("[{}] Error during sleep between batches", this.sessionId, e); } - if (isConnected() && (!success || pageData.hasNext())) { - try { - Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); - } catch (InterruptedException e) { - log.error("[{}] Error during sleep between batches", this.sessionId, e); - } - if (success) { - pageLink = pageLink.nextPageLink(); - } + if (success) { + pageLink = pageLink.nextPageLink(); } - } while (isConnected() && (!success || pageData.hasNext())); + } + } while (isConnected() && (!success || pageData.hasNext())); + return ifOffset; + } - if (ifOffset != null) { - Long newStartTs = Uuids.unixTimestamp(ifOffset); - updateQueueStartTs(newStartTs); + private boolean processEdgeEventsPack(List edgeEvents) throws InterruptedException { + List downlinkMsgsPack = convertToDownlinkMsgsPack(edgeEvents); + + log.trace("[{}] [{}] downlink msg(s) are going to be send.", this.sessionId, downlinkMsgsPack.size()); + + latch = new CountDownLatch(downlinkMsgsPack.size()); + for (DownlinkMsg downlinkMsg : downlinkMsgsPack) { + sendDownlinkMsg(ResponseMsg.newBuilder() + .setDownlinkMsg(downlinkMsg) + .build()); + } + + boolean success = latch.await(10, TimeUnit.SECONDS); + if (!success) { + log.warn("[{}] Failed to deliver the batch: {}", this.sessionId, downlinkMsgsPack); + } + return success; + } + + private void sendSingleDownlinkMsg(DownlinkMsg downlinkMsg) throws InterruptedException { + boolean success; + do { + latch = new CountDownLatch(1); + sendDownlinkMsg(ResponseMsg.newBuilder() + .setDownlinkMsg(downlinkMsg) + .build()); + success = latch.await(10, TimeUnit.SECONDS); + if (!success) { + log.warn("[{}] Failed to deliver single downlink msg!", this.sessionId); + } + if (isConnected() && !success) { + try { + Thread.sleep(ctx.getEdgeEventStorageSettings().getSleepIntervalBetweenBatches()); + } catch (InterruptedException e) { + log.error("[{}] Error during sleep between next send of single downlink msg", this.sessionId, e); + } } + } while (isConnected() && !success); + } + + private DownlinkMsg convertToDownlinkMsg(EdgeEvent edgeEvent) { + log.trace("[{}] converting edge event to downlink msg [{}]", this.sessionId, edgeEvent); + DownlinkMsg downlinkMsg = null; + try { + switch (edgeEvent.getAction()) { + case UPDATED: + case ADDED: + case DELETED: + case ASSIGNED_TO_EDGE: + case UNASSIGNED_FROM_EDGE: + case ALARM_ACK: + case ALARM_CLEAR: + case CREDENTIALS_UPDATED: + case RELATION_ADD_OR_UPDATE: + case RELATION_DELETED: + case ASSIGNED_TO_CUSTOMER: + case UNASSIGNED_FROM_CUSTOMER: + downlinkMsg = processEntityMessage(edgeEvent, edgeEvent.getAction()); + break; + case ATTRIBUTES_UPDATED: + case POST_ATTRIBUTES: + case ATTRIBUTES_DELETED: + case TIMESERIES_UPDATED: + downlinkMsg = processTelemetryMessage(edgeEvent); + break; + case CREDENTIALS_REQUEST: + downlinkMsg = processCredentialsRequestMessage(edgeEvent); + break; + case ENTITY_MERGE_REQUEST: + downlinkMsg = processEntityMergeRequestMessage(edgeEvent); + break; + case RPC_CALL: + downlinkMsg = processRpcCallMsg(edgeEvent); + break; + } + } catch (Exception e) { + log.error("Exception during converting edge event to downlink msg", e); } - log.trace("[{}] processHandleMessages finished", this.sessionId); + return downlinkMsg; } private List convertToDownlinkMsgsPack(List edgeEvents) { List result = new ArrayList<>(); for (EdgeEvent edgeEvent : edgeEvents) { - log.trace("[{}] Processing edge event [{}]", this.sessionId, edgeEvent); - try { - DownlinkMsg downlinkMsg = null; - switch (edgeEvent.getAction()) { - case UPDATED: - case ADDED: - case DELETED: - case ASSIGNED_TO_EDGE: - case UNASSIGNED_FROM_EDGE: - case ALARM_ACK: - case ALARM_CLEAR: - case CREDENTIALS_UPDATED: - case RELATION_ADD_OR_UPDATE: - case RELATION_DELETED: - case ASSIGNED_TO_CUSTOMER: - case UNASSIGNED_FROM_CUSTOMER: - downlinkMsg = processEntityMessage(edgeEvent, edgeEvent.getAction()); - break; - case ATTRIBUTES_UPDATED: - case POST_ATTRIBUTES: - case ATTRIBUTES_DELETED: - case TIMESERIES_UPDATED: - downlinkMsg = processTelemetryMessage(edgeEvent); - break; - case CREDENTIALS_REQUEST: - downlinkMsg = processCredentialsRequestMessage(edgeEvent); - break; - case ENTITY_MERGE_REQUEST: - downlinkMsg = processEntityMergeRequestMessage(edgeEvent); - break; - case RPC_CALL: - downlinkMsg = processRpcCallMsg(edgeEvent); - break; - } - if (downlinkMsg != null) { - result.add(downlinkMsg); - } - } catch (Exception e) { - log.error("Exception during processing records from queue", e); + DownlinkMsg downlinkMsg = convertToDownlinkMsg(edgeEvent); + if (downlinkMsg != null) { + result.add(downlinkMsg); } } return result; @@ -945,27 +1036,27 @@ public final class EdgeGrpcSession implements Closeable { } if (uplinkMsg.getRuleChainMetadataRequestMsgCount() > 0) { for (RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg : uplinkMsg.getRuleChainMetadataRequestMsgList()) { - result.add(ctx.getSyncEdgeService().processRuleChainMetadataRequestMsg(edge.getTenantId(), edge, ruleChainMetadataRequestMsg)); + result.add(ctx.getEdgeRequestsService().processRuleChainMetadataRequestMsg(edge.getTenantId(), edge, ruleChainMetadataRequestMsg)); } } if (uplinkMsg.getAttributesRequestMsgCount() > 0) { for (AttributesRequestMsg attributesRequestMsg : uplinkMsg.getAttributesRequestMsgList()) { - result.add(ctx.getSyncEdgeService().processAttributesRequestMsg(edge.getTenantId(), edge, attributesRequestMsg)); + result.add(ctx.getEdgeRequestsService().processAttributesRequestMsg(edge.getTenantId(), edge, attributesRequestMsg)); } } if (uplinkMsg.getRelationRequestMsgCount() > 0) { for (RelationRequestMsg relationRequestMsg : uplinkMsg.getRelationRequestMsgList()) { - result.add(ctx.getSyncEdgeService().processRelationRequestMsg(edge.getTenantId(), edge, relationRequestMsg)); + result.add(ctx.getEdgeRequestsService().processRelationRequestMsg(edge.getTenantId(), edge, relationRequestMsg)); } } if (uplinkMsg.getUserCredentialsRequestMsgCount() > 0) { for (UserCredentialsRequestMsg userCredentialsRequestMsg : uplinkMsg.getUserCredentialsRequestMsgList()) { - result.add(ctx.getSyncEdgeService().processUserCredentialsRequestMsg(edge.getTenantId(), edge, userCredentialsRequestMsg)); + result.add(ctx.getEdgeRequestsService().processUserCredentialsRequestMsg(edge.getTenantId(), edge, userCredentialsRequestMsg)); } } if (uplinkMsg.getDeviceCredentialsRequestMsgCount() > 0) { for (DeviceCredentialsRequestMsg deviceCredentialsRequestMsg : uplinkMsg.getDeviceCredentialsRequestMsgList()) { - result.add(ctx.getSyncEdgeService().processDeviceCredentialsRequestMsg(edge.getTenantId(), edge, deviceCredentialsRequestMsg)); + result.add(ctx.getEdgeRequestsService().processDeviceCredentialsRequestMsg(edge.getTenantId(), edge, deviceCredentialsRequestMsg)); } } if (uplinkMsg.getDeviceRpcCallMsgCount() > 0) { @@ -973,6 +1064,21 @@ public final class EdgeGrpcSession implements Closeable { result.add(ctx.getDeviceProcessor().processDeviceRpcCallResponseMsg(edge.getTenantId(), deviceRpcCallMsg)); } } + if (uplinkMsg.getDeviceProfileDevicesRequestMsgCount() > 0) { + for (DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg : uplinkMsg.getDeviceProfileDevicesRequestMsgList()) { + result.add(ctx.getEdgeRequestsService().processDeviceProfileDevicesRequestMsg(edge.getTenantId(), edge, deviceProfileDevicesRequestMsg)); + } + } + if (uplinkMsg.getWidgetBundleTypesRequestMsgCount() > 0) { + for (WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg : uplinkMsg.getWidgetBundleTypesRequestMsgList()) { + result.add(ctx.getEdgeRequestsService().processWidgetBundleTypesRequestMsg(edge.getTenantId(), edge, widgetBundleTypesRequestMsg)); + } + } + if (uplinkMsg.getEntityViewsRequestMsgCount() > 0) { + for (EntityViewsRequestMsg entityViewRequestMsg : uplinkMsg.getEntityViewsRequestMsgList()) { + result.add(ctx.getEdgeRequestsService().processEntityViewsRequestMsg(edge.getTenantId(), edge, entityViewRequestMsg)); + } + } } catch (Exception e) { log.error("[{}] Can't process uplink msg [{}]", this.sessionId, uplinkMsg, e); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java index ddea009849..fdd2adadd7 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeRpcService.java @@ -17,12 +17,15 @@ package org.thingsboard.server.service.edge.rpc; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; public interface EdgeRpcService { - void updateEdge(Edge edge); + void updateEdge(TenantId tenantId, Edge edge); - void deleteEdge(EdgeId edgeId); + void deleteEdge(TenantId tenantId, EdgeId edgeId); - void onEdgeEvent(EdgeId edgeId); + void onEdgeEvent(TenantId tenantId, EdgeId edgeId); + + void startSyncProcess(TenantId tenantId, EdgeId edgeId); } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java new file mode 100644 index 0000000000..b232333d5d --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AdminSettingsEdgeEventFetcher.java @@ -0,0 +1,109 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; + +@AllArgsConstructor +@Slf4j +public class AdminSettingsEdgeEventFetcher implements EdgeEventFetcher { + + @Override + public PageLink getPageLink() { + return new PageLink(DEFAULT_LIMIT); + } + + @Override + public PageData fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) { + return null; + } + + +// +// private void syncAdminSettings(TenantId tenantId, Edge edge) { +// log.trace("[{}] syncAdminSettings [{}]", tenantId, edge.getName()); +// try { +// AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail"); +// saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailSettings)); +// AdminSettings tenantMailSettings = convertToTenantAdminSettings(systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue()); +// saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailSettings)); +// AdminSettings systemMailTemplates = loadMailTemplates(); +// saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailTemplates)); +// AdminSettings tenantMailTemplates = convertToTenantAdminSettings(systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue()); +// saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailTemplates)); +// } catch (Exception e) { +// log.error("Can't load admin settings", e); +// } +// } +// +// private AdminSettings loadMailTemplates() throws Exception { +// Map mailTemplates = new HashMap<>(); +// Pattern startPattern = Pattern.compile("
"); +// Pattern endPattern = Pattern.compile("
"); +// File[] files = new DefaultResourceLoader().getResource("classpath:/templates/").getFile().listFiles(); +// for (File file : files) { +// Map mailTemplate = new HashMap<>(); +// String name = validateName(file.getName()); +// String stringTemplate = FileUtils.readFileToString(file, StandardCharsets.UTF_8); +// Matcher start = startPattern.matcher(stringTemplate); +// Matcher end = endPattern.matcher(stringTemplate); +// if (start.find() && end.find()) { +// String body = StringUtils.substringBetween(stringTemplate, start.group(), end.group()).replaceAll("\t", ""); +// String subject = StringUtils.substringBetween(body, "

", "

"); +// mailTemplate.put("subject", subject); +// mailTemplate.put("body", body); +// mailTemplates.put(name, mailTemplate); +// } else { +// log.error("Can't load mail template from file {}", file.getName()); +// } +// } +// AdminSettings adminSettings = new AdminSettings(); +// adminSettings.setId(new AdminSettingsId(Uuids.timeBased())); +// adminSettings.setKey("mailTemplates"); +// adminSettings.setJsonValue(mapper.convertValue(mailTemplates, JsonNode.class)); +// return adminSettings; +// } +// +// private String validateName(String name) throws Exception { +// StringBuilder nameBuilder = new StringBuilder(); +// name = name.replace(".vm", ""); +// String[] nameParts = name.split("\\."); +// if (nameParts.length >= 1) { +// nameBuilder.append(nameParts[0]); +// for (int i = 1; i < nameParts.length; i++) { +// String word = WordUtils.capitalize(nameParts[i]); +// nameBuilder.append(word); +// } +// return nameBuilder.toString(); +// } else { +// throw new Exception("Error during filename validation"); +// } +// } +// +// private AdminSettings convertToTenantAdminSettings(String key, ObjectNode jsonValue) { +// AdminSettings tenantMailSettings = new AdminSettings(); +// jsonValue.put("useSystemMailSettings", true); +// tenantMailSettings.setJsonValue(jsonValue); +// tenantMailSettings.setKey(key); +// return tenantMailSettings; +// } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AssetsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AssetsEdgeEventFetcher.java new file mode 100644 index 0000000000..e2eb2e8d61 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/AssetsEdgeEventFetcher.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.asset.Asset; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.asset.AssetService; +import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; + +import java.util.ArrayList; +import java.util.List; + +@AllArgsConstructor +@Slf4j +public class AssetsEdgeEventFetcher implements EdgeEventFetcher { + + private final AssetService assetService; + + @Override + public PageLink getPageLink() { + return new PageLink(DEFAULT_LIMIT); + } + + @Override + public PageData fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) { + log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId); + PageData pageData = assetService.findAssetsByTenantIdAndEdgeId(tenantId, edgeId, pageLink); + List result = new ArrayList<>(); + if (!pageData.getData().isEmpty()) { + for (Asset asset : pageData.getData()) { + result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.ASSET, + EdgeEventActionType.ADDED, asset.getId(), null)); + } + } + return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java new file mode 100644 index 0000000000..883f451636 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseUsersEdgeEventFetcher.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.widget.WidgetsBundle; +import org.thingsboard.server.dao.user.UserService; +import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public abstract class BaseUsersEdgeEventFetcher implements EdgeEventFetcher { + + @Override + public PageLink getPageLink() { + return new PageLink(DEFAULT_LIMIT); + } + + @Override + public PageData fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) { + log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId); + PageData pageData = findUsers(tenantId, pageLink); + List result = new ArrayList<>(); + if (!pageData.getData().isEmpty()) { + for (User user : pageData.getData()) { + result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.USER, + EdgeEventActionType.ADDED, user.getId(), null)); + } + } + return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); + } + + protected abstract PageData findUsers(TenantId tenantId, PageLink pageLink); +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseWidgetsBundlesEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseWidgetsBundlesEdgeEventFetcher.java new file mode 100644 index 0000000000..53c6e579d3 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/BaseWidgetsBundlesEdgeEventFetcher.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.widget.WidgetsBundle; +import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +public abstract class BaseWidgetsBundlesEdgeEventFetcher implements EdgeEventFetcher { + + @Override + public PageLink getPageLink() { + return new PageLink(DEFAULT_LIMIT); + } + + @Override + public PageData fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) { + log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId); + PageData pageData = findWidgetsBundles(tenantId, pageLink); + List result = new ArrayList<>(); + if (!pageData.getData().isEmpty()) { + for (WidgetsBundle widgetsBundle : pageData.getData()) { + result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.WIDGETS_BUNDLE, + EdgeEventActionType.ADDED, widgetsBundle.getId(), null)); + } + } + return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); + } + + protected abstract PageData findWidgetsBundles(TenantId tenantId, PageLink pageLink); +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerUsersEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerUsersEdgeEventFetcher.java new file mode 100644 index 0000000000..6775b85844 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/CustomerUsersEdgeEventFetcher.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.id.CustomerId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.user.UserService; + +@AllArgsConstructor +public class CustomerUsersEdgeEventFetcher extends BaseUsersEdgeEventFetcher { + + private final UserService userService; + private final CustomerId customerId; + + @Override + protected PageData findUsers(TenantId tenantId, PageLink pageLink) { + return userService.findCustomerUsers(tenantId, customerId, pageLink); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DashboardsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DashboardsEdgeEventFetcher.java new file mode 100644 index 0000000000..515a356850 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DashboardsEdgeEventFetcher.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.DashboardInfo; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.dashboard.DashboardService; +import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; + +import java.util.ArrayList; +import java.util.List; + +@AllArgsConstructor +@Slf4j +public class DashboardsEdgeEventFetcher implements EdgeEventFetcher { + + private final DashboardService dashboardService; + + @Override + public PageLink getPageLink() { + return new PageLink(DEFAULT_LIMIT); + } + + @Override + public PageData fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) { + log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId); + PageData pageData = dashboardService.findDashboardsByTenantIdAndEdgeId(tenantId, edgeId, pageLink); + List result = new ArrayList<>(); + if (!pageData.getData().isEmpty()) { + for (DashboardInfo dashboardInfo : pageData.getData()) { + result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DASHBOARD, + EdgeEventActionType.ADDED, dashboardInfo.getId(), null)); + } + } + return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DeviceProfilesEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DeviceProfilesEdgeEventFetcher.java new file mode 100644 index 0000000000..fb0af06b75 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/DeviceProfilesEdgeEventFetcher.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.DeviceProfile; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.device.DeviceProfileService; +import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; + +import java.util.ArrayList; +import java.util.List; + +@AllArgsConstructor +@Slf4j +public class DeviceProfilesEdgeEventFetcher implements EdgeEventFetcher { + + private final DeviceProfileService deviceProfileService; + + @Override + public PageLink getPageLink() { + return new PageLink(DEFAULT_LIMIT); + } + + @Override + public PageData fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) { + log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId); + PageData pageData = deviceProfileService.findDeviceProfiles(tenantId, pageLink); + List result = new ArrayList<>(); + if (!pageData.getData().isEmpty()) { + for (DeviceProfile deviceProfile : pageData.getData()) { + result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.DEVICE_PROFILE, + EdgeEventActionType.ADDED, deviceProfile.getId(), null)); + } + } + return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/EdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/EdgeEventFetcher.java new file mode 100644 index 0000000000..e283b2c043 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/EdgeEventFetcher.java @@ -0,0 +1,31 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; + +public interface EdgeEventFetcher { + + final int DEFAULT_LIMIT = 100; + + PageLink getPageLink(); + + PageData fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink); +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java new file mode 100644 index 0000000000..a87a4ebd68 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/GeneralEdgeEventFetcher.java @@ -0,0 +1,50 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.page.SortOrder; +import org.thingsboard.server.common.data.page.TimePageLink; +import org.thingsboard.server.dao.edge.EdgeEventService; + +@AllArgsConstructor +public class GeneralEdgeEventFetcher implements EdgeEventFetcher { + + private final int maxReadRecordsCount; + private final Long queueStartTs; + private final EdgeEventService edgeEventService; + + @Override + public PageLink getPageLink() { + return new TimePageLink( + maxReadRecordsCount, + 0, + null, + new SortOrder("createdTime", SortOrder.Direction.ASC), + queueStartTs, + null); + } + + @Override + public PageData fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) { + return edgeEventService.findEdgeEvents(tenantId, edgeId, (TimePageLink) pageLink, true); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/RuleChainsEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/RuleChainsEdgeEventFetcher.java new file mode 100644 index 0000000000..2aca092856 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/RuleChainsEdgeEventFetcher.java @@ -0,0 +1,58 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.edge.EdgeEvent; +import org.thingsboard.server.common.data.edge.EdgeEventActionType; +import org.thingsboard.server.common.data.edge.EdgeEventType; +import org.thingsboard.server.common.data.id.EdgeId; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.rule.RuleChain; +import org.thingsboard.server.dao.rule.RuleChainService; +import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; + +import java.util.ArrayList; +import java.util.List; + +@Slf4j +@AllArgsConstructor +public class RuleChainsEdgeEventFetcher implements EdgeEventFetcher { + + private final RuleChainService ruleChainService; + + @Override + public PageLink getPageLink() { + return new PageLink(DEFAULT_LIMIT); + } + + @Override + public PageData fetchEdgeEvents(TenantId tenantId, EdgeId edgeId, PageLink pageLink) { + log.trace("[{}] start fetching edge events [{}]", tenantId, edgeId); + PageData pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edgeId, pageLink); + List result = new ArrayList<>(); + if (!pageData.getData().isEmpty()) { + for (RuleChain ruleChain : pageData.getData()) { + result.add(EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, EdgeEventType.RULE_CHAIN, + EdgeEventActionType.ADDED, ruleChain.getId(), null)); + } + } + return new PageData<>(result, pageData.getTotalPages(), pageData.getTotalElements(), pageData.hasNext()); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/SystemWidgetsBundlesEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/SystemWidgetsBundlesEdgeEventFetcher.java new file mode 100644 index 0000000000..e32e003d4e --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/SystemWidgetsBundlesEdgeEventFetcher.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.widget.WidgetsBundle; +import org.thingsboard.server.dao.widget.WidgetsBundleService; + +@Slf4j +@AllArgsConstructor +public class SystemWidgetsBundlesEdgeEventFetcher extends BaseWidgetsBundlesEdgeEventFetcher { + + private final WidgetsBundleService widgetsBundleService; + + @Override + protected PageData findWidgetsBundles(TenantId tenantId, PageLink pageLink) { + return widgetsBundleService.findSystemWidgetsBundlesByPageLink(tenantId, pageLink); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantAdminUsersEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantAdminUsersEdgeEventFetcher.java new file mode 100644 index 0000000000..018104ed31 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantAdminUsersEdgeEventFetcher.java @@ -0,0 +1,34 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import org.thingsboard.server.common.data.User; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.dao.user.UserService; + +@AllArgsConstructor +public class TenantAdminUsersEdgeEventFetcher extends BaseUsersEdgeEventFetcher { + + private final UserService userService; + + @Override + protected PageData findUsers(TenantId tenantId, PageLink pageLink) { + return userService.findTenantAdmins(tenantId, pageLink); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java new file mode 100644 index 0000000000..6dee9853de --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/fetch/TenantWidgetsBundlesEdgeEventFetcher.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.edge.rpc.fetch; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.page.PageData; +import org.thingsboard.server.common.data.page.PageLink; +import org.thingsboard.server.common.data.widget.WidgetsBundle; +import org.thingsboard.server.dao.widget.WidgetsBundleService; + +@Slf4j +@AllArgsConstructor +public class TenantWidgetsBundlesEdgeEventFetcher extends BaseWidgetsBundlesEdgeEventFetcher implements EdgeEventFetcher { + + private final WidgetsBundleService widgetsBundleService; + + @Override + protected PageData findWidgetsBundles(TenantId tenantId, PageLink pageLink) { + return widgetsBundleService.findAllTenantWidgetsBundlesByTenantIdAndPageLink(tenantId, pageLink); + } +} diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java similarity index 52% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java index 0b9afa4ccf..bccc6cedd0 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/DefaultSyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/DefaultEdgeRequestsService.java @@ -13,9 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.init; +package org.thingsboard.server.service.edge.rpc.sync; -import com.datastax.oss.driver.api.core.uuid.Uuids; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -24,34 +23,27 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.text.WordUtils; import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.core.io.DefaultResourceLoader; import org.springframework.stereotype.Service; -import org.thingsboard.server.common.data.AdminSettings; -import org.thingsboard.server.common.data.DashboardInfo; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.EdgeUtils; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; -import org.thingsboard.server.common.data.User; -import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEvent; import org.thingsboard.server.common.data.edge.EdgeEventActionType; import org.thingsboard.server.common.data.edge.EdgeEventType; -import org.thingsboard.server.common.data.id.AdminSettingsId; import org.thingsboard.server.common.data.id.DeviceId; +import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EdgeId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; import org.thingsboard.server.common.data.id.RuleChainId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.id.UserId; +import org.thingsboard.server.common.data.id.WidgetsBundleId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.DataType; import org.thingsboard.server.common.data.page.PageData; @@ -59,44 +51,39 @@ import org.thingsboard.server.common.data.page.PageLink; import org.thingsboard.server.common.data.relation.EntityRelation; import org.thingsboard.server.common.data.relation.EntityRelationsQuery; import org.thingsboard.server.common.data.relation.EntitySearchDirection; +import org.thingsboard.server.common.data.relation.RelationTypeGroup; import org.thingsboard.server.common.data.relation.RelationsSearchParameters; -import org.thingsboard.server.common.data.rule.RuleChain; import org.thingsboard.server.common.data.widget.WidgetType; import org.thingsboard.server.common.data.widget.WidgetsBundle; -import org.thingsboard.server.dao.asset.AssetService; import org.thingsboard.server.dao.attributes.AttributesService; -import org.thingsboard.server.dao.dashboard.DashboardService; import org.thingsboard.server.dao.device.DeviceProfileService; import org.thingsboard.server.dao.device.DeviceService; import org.thingsboard.server.dao.edge.EdgeEventService; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.relation.RelationService; -import org.thingsboard.server.dao.rule.RuleChainService; -import org.thingsboard.server.dao.settings.AdminSettingsService; -import org.thingsboard.server.dao.user.UserService; import org.thingsboard.server.dao.widget.WidgetTypeService; import org.thingsboard.server.dao.widget.WidgetsBundleService; import org.thingsboard.server.gen.edge.AttributesRequestMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; +import org.thingsboard.server.gen.edge.DeviceProfileDevicesRequestMsg; +import org.thingsboard.server.gen.edge.EntityViewsRequestMsg; import org.thingsboard.server.gen.edge.RelationRequestMsg; import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg; import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg; +import org.thingsboard.server.gen.edge.WidgetBundleTypesRequestMsg; +import org.thingsboard.server.service.edge.rpc.EdgeEventUtils; import org.thingsboard.server.service.executors.DbCallbackExecutorService; import org.thingsboard.server.service.queue.TbClusterService; -import java.io.File; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.regex.Matcher; -import java.util.regex.Pattern; @Service @Slf4j -public class DefaultSyncEdgeService implements SyncEdgeService { +public class DefaultEdgeRequestsService implements EdgeRequestsService { private static final ObjectMapper mapper = new ObjectMapper(); @@ -108,29 +95,17 @@ public class DefaultSyncEdgeService implements SyncEdgeService { @Autowired private AttributesService attributesService; - @Autowired - private RuleChainService ruleChainService; - @Autowired private RelationService relationService; @Autowired private DeviceService deviceService; - @Autowired - private DeviceProfileService deviceProfileService; - - @Autowired - private AssetService assetService; - @Autowired private EntityViewService entityViewService; @Autowired - private DashboardService dashboardService; - - @Autowired - private UserService userService; + private DeviceProfileService deviceProfileService; @Autowired private WidgetsBundleService widgetsBundleService; @@ -138,309 +113,12 @@ public class DefaultSyncEdgeService implements SyncEdgeService { @Autowired private WidgetTypeService widgetTypeService; - @Autowired - private AdminSettingsService adminSettingsService; - @Autowired private DbCallbackExecutorService dbCallbackExecutorService; @Autowired private TbClusterService tbClusterService; - @Override - public void sync(TenantId tenantId, Edge edge) { - log.trace("[{}][{}] Staring edge sync process", tenantId, edge.getId()); - try { - syncWidgetsBundles(tenantId, edge); - // TODO: voba - implement this functionality - // syncAdminSettings(edge); - syncDeviceProfiles(tenantId, edge); - syncRuleChains(tenantId, edge); - syncUsers(tenantId, edge); - syncAssets(tenantId, edge); - syncEntityViews(tenantId, edge); - syncDashboards(tenantId, edge); - syncWidgetsTypes(tenantId, edge); - syncDevices(tenantId, edge); - } catch (Exception e) { - log.error("[{}][{}] Exception during sync process", tenantId, edge.getId(), e); - } - } - - private void syncRuleChains(TenantId tenantId, Edge edge) { - log.trace("[{}] syncRuleChains [{}]", tenantId, edge.getName()); - try { - PageLink pageLink = new PageLink(DEFAULT_LIMIT); - PageData pageData; - do { - pageData = ruleChainService.findRuleChainsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] rule chains(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); - for (RuleChain ruleChain : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN, EdgeEventActionType.ADDED, ruleChain.getId(), null); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - } catch (Exception e) { - log.error("Exception during loading edge rule chain(s) on sync!", e); - } - } - - private void syncDevices(TenantId tenantId, Edge edge) { - log.trace("[{}] syncDevices [{}]", tenantId, edge.getName()); - try { - PageLink pageLink = new PageLink(DEFAULT_LIMIT); - PageData pageData; - do { - pageData = deviceService.findDevicesByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); - for (Device device : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - } catch (Exception e) { - log.error("Exception during loading edge device(s) on sync!", e); - } - } - - private void syncDeviceProfiles(TenantId tenantId, Edge edge) { - log.trace("[{}] syncDeviceProfiles [{}]", tenantId, edge.getName()); - try { - PageLink pageLink = new PageLink(DEFAULT_LIMIT); - PageData pageData; - do { - pageData = deviceProfileService.findDeviceProfiles(tenantId, pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); - for (DeviceProfile deviceProfile : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE_PROFILE, EdgeEventActionType.ADDED, deviceProfile.getId(), null); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - } catch (Exception e) { - log.error("Exception during loading device profile(s) on sync!", e); - } - } - - private void syncAssets(TenantId tenantId, Edge edge) { - log.trace("[{}] syncAssets [{}]", tenantId, edge.getName()); - try { - PageLink pageLink = new PageLink(DEFAULT_LIMIT); - PageData pageData; - do { - pageData = assetService.findAssetsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] asset(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); - for (Asset asset : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ASSET, EdgeEventActionType.ADDED, asset.getId(), null); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - } catch (Exception e) { - log.error("Exception during loading edge asset(s) on sync!", e); - } - } - - private void syncEntityViews(TenantId tenantId, Edge edge) { - log.trace("[{}] syncEntityViews [{}]", tenantId, edge.getName()); - try { - PageLink pageLink = new PageLink(DEFAULT_LIMIT); - PageData pageData; - do { - pageData = entityViewService.findEntityViewsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] entity view(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); - for (EntityView entityView : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW, EdgeEventActionType.ADDED, entityView.getId(), null); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - } catch (Exception e) { - log.error("Exception during loading edge entity view(s) on sync!", e); - } - } - - private void syncDashboards(TenantId tenantId, Edge edge) { - log.trace("[{}] syncDashboards [{}]", tenantId, edge.getName()); - try { - PageLink pageLink = new PageLink(DEFAULT_LIMIT); - PageData pageData; - do { - pageData = dashboardService.findDashboardsByTenantIdAndEdgeId(tenantId, edge.getId(), pageLink); - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] dashboard(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); - for (DashboardInfo dashboardInfo : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DASHBOARD, EdgeEventActionType.ADDED, dashboardInfo.getId(), null); - } - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } - } while (pageData != null && pageData.hasNext()); - } catch (Exception e) { - log.error("Exception during loading edge dashboard(s) on sync!", e); - } - } - - private void syncUsers(TenantId tenantId, Edge edge) { - log.trace("[{}] syncUsers [{}]", tenantId, edge.getName()); - try { - PageLink pageLink = new PageLink(DEFAULT_LIMIT); - PageData pageData; - do { - pageData = userService.findTenantAdmins(tenantId, pageLink); - pushUsersToEdge(tenantId, pageData, edge); - if (pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } while (pageData.hasNext()); - syncCustomerUsers(tenantId, edge); - } catch (Exception e) { - log.error("Exception during loading edge user(s) on sync!", e); - } - } - - private void syncCustomerUsers(TenantId tenantId, Edge edge) { - if (edge.getCustomerId() != null && !EntityId.NULL_UUID.equals(edge.getCustomerId().getId())) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.CUSTOMER, EdgeEventActionType.ADDED, edge.getCustomerId(), null); - PageLink pageLink = new PageLink(DEFAULT_LIMIT); - PageData pageData; - do { - pageData = userService.findCustomerUsers(tenantId, edge.getCustomerId(), pageLink); - pushUsersToEdge(tenantId, pageData, edge); - if (pageData != null && pageData.hasNext()) { - pageLink = pageLink.nextPageLink(); - } - } while (pageData != null && pageData.hasNext()); - } - } - - private void pushUsersToEdge(TenantId tenantId, PageData pageData, Edge edge) { - if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { - log.trace("[{}] [{}] user(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); - for (User user : pageData.getData()) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, EdgeEventActionType.ADDED, user.getId(), null); - } - } - } - - private void syncWidgetsBundles(TenantId tenantId, Edge edge) { - log.trace("[{}] syncWidgetsBundles [{}]", tenantId, edge.getName()); - List widgetsBundlesToPush = new ArrayList<>(); - widgetsBundlesToPush.addAll(widgetsBundleService.findAllTenantWidgetsBundlesByTenantId(tenantId)); - widgetsBundlesToPush.addAll(widgetsBundleService.findSystemWidgetsBundles(tenantId)); - try { - for (WidgetsBundle widgetsBundle : widgetsBundlesToPush) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGETS_BUNDLE, EdgeEventActionType.ADDED, widgetsBundle.getId(), null); - } - } catch (Exception e) { - log.error("Exception during loading widgets bundle(s) on sync!", e); - } - } - - private void syncWidgetsTypes(TenantId tenantId, Edge edge) { - log.trace("[{}] syncWidgetsTypes [{}]", tenantId, edge.getName()); - List widgetsBundlesToPush = new ArrayList<>(); - widgetsBundlesToPush.addAll(widgetsBundleService.findAllTenantWidgetsBundlesByTenantId(tenantId)); - widgetsBundlesToPush.addAll(widgetsBundleService.findSystemWidgetsBundles(tenantId)); - try { - for (WidgetsBundle widgetsBundle : widgetsBundlesToPush) { - List widgetTypesToPush = - widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundle.getTenantId(), widgetsBundle.getAlias()); - for (WidgetType widgetType : widgetTypesToPush) { - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null); - } - } - } catch (Exception e) { - log.error("Exception during loading widgets type(s) on sync!", e); - } - } - - private void syncAdminSettings(TenantId tenantId, Edge edge) { - log.trace("[{}] syncAdminSettings [{}]", tenantId, edge.getName()); - try { - AdminSettings systemMailSettings = adminSettingsService.findAdminSettingsByKey(TenantId.SYS_TENANT_ID, "mail"); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailSettings)); - AdminSettings tenantMailSettings = convertToTenantAdminSettings(systemMailSettings.getKey(), (ObjectNode) systemMailSettings.getJsonValue()); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailSettings)); - AdminSettings systemMailTemplates = loadMailTemplates(); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(systemMailTemplates)); - AdminSettings tenantMailTemplates = convertToTenantAdminSettings(systemMailTemplates.getKey(), (ObjectNode) systemMailTemplates.getJsonValue()); - saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ADMIN_SETTINGS, EdgeEventActionType.UPDATED, null, mapper.valueToTree(tenantMailTemplates)); - } catch (Exception e) { - log.error("Can't load admin settings", e); - } - } - - private AdminSettings loadMailTemplates() throws Exception { - Map mailTemplates = new HashMap<>(); - Pattern startPattern = Pattern.compile("
"); - Pattern endPattern = Pattern.compile("
"); - File[] files = new DefaultResourceLoader().getResource("classpath:/templates/").getFile().listFiles(); - for (File file : files) { - Map mailTemplate = new HashMap<>(); - String name = validateName(file.getName()); - String stringTemplate = FileUtils.readFileToString(file, StandardCharsets.UTF_8); - Matcher start = startPattern.matcher(stringTemplate); - Matcher end = endPattern.matcher(stringTemplate); - if (start.find() && end.find()) { - String body = StringUtils.substringBetween(stringTemplate, start.group(), end.group()).replaceAll("\t", ""); - String subject = StringUtils.substringBetween(body, "

", "

"); - mailTemplate.put("subject", subject); - mailTemplate.put("body", body); - mailTemplates.put(name, mailTemplate); - } else { - log.error("Can't load mail template from file {}", file.getName()); - } - } - AdminSettings adminSettings = new AdminSettings(); - adminSettings.setId(new AdminSettingsId(Uuids.timeBased())); - adminSettings.setKey("mailTemplates"); - adminSettings.setJsonValue(mapper.convertValue(mailTemplates, JsonNode.class)); - return adminSettings; - } - - private String validateName(String name) throws Exception { - StringBuilder nameBuilder = new StringBuilder(); - name = name.replace(".vm", ""); - String[] nameParts = name.split("\\."); - if (nameParts.length >= 1) { - nameBuilder.append(nameParts[0]); - for (int i = 1; i < nameParts.length; i++) { - String word = WordUtils.capitalize(nameParts[i]); - nameBuilder.append(word); - } - return nameBuilder.toString(); - } else { - throw new Exception("Error during filename validation"); - } - } - - private AdminSettings convertToTenantAdminSettings(String key, ObjectNode jsonValue) { - AdminSettings tenantMailSettings = new AdminSettings(); - jsonValue.put("useSystemMailSettings", true); - tenantMailSettings.setJsonValue(jsonValue); - tenantMailSettings.setKey(key); - return tenantMailSettings; - } - @Override public ListenableFuture processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg) { log.trace("[{}] processRuleChainMetadataRequestMsg [{}][{}]", tenantId, edge.getName(), ruleChainMetadataRequestMsg); @@ -448,7 +126,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService { if (ruleChainMetadataRequestMsg.getRuleChainIdMSB() != 0 && ruleChainMetadataRequestMsg.getRuleChainIdLSB() != 0) { RuleChainId ruleChainId = new RuleChainId(new UUID(ruleChainMetadataRequestMsg.getRuleChainIdMSB(), ruleChainMetadataRequestMsg.getRuleChainIdLSB())); - ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null); + ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), + EdgeEventType.RULE_CHAIN_METADATA, EdgeEventActionType.ADDED, ruleChainId, null); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable EdgeEvent result) { @@ -584,7 +263,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService { return futureToSet; } - private ListenableFuture> findRelationByQuery(TenantId tenantId, Edge edge, EntityId entityId, EntitySearchDirection direction) { + private ListenableFuture> findRelationByQuery(TenantId tenantId, Edge edge, + EntityId entityId, EntitySearchDirection direction) { EntityRelationsQuery query = new EntityRelationsQuery(); query.setParameters(new RelationsSearchParameters(entityId, direction, -1, false)); return relationService.findByQuery(tenantId, query); @@ -596,7 +276,8 @@ public class DefaultSyncEdgeService implements SyncEdgeService { SettableFuture futureToSet = SettableFuture.create(); if (deviceCredentialsRequestMsg.getDeviceIdMSB() != 0 && deviceCredentialsRequestMsg.getDeviceIdLSB() != 0) { DeviceId deviceId = new DeviceId(new UUID(deviceCredentialsRequestMsg.getDeviceIdMSB(), deviceCredentialsRequestMsg.getDeviceIdLSB())); - ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null); + ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, + EdgeEventActionType.CREDENTIALS_UPDATED, deviceId, null); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(@Nullable EdgeEvent result) { @@ -619,8 +300,9 @@ public class DefaultSyncEdgeService implements SyncEdgeService { SettableFuture futureToSet = SettableFuture.create(); if (userCredentialsRequestMsg.getUserIdMSB() != 0 && userCredentialsRequestMsg.getUserIdLSB() != 0) { UserId userId = new UserId(new UUID(userCredentialsRequestMsg.getUserIdMSB(), userCredentialsRequestMsg.getUserIdLSB())); - ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, EdgeEventActionType.CREDENTIALS_UPDATED, userId, null); - Futures.addCallback(future, new FutureCallback() { + ListenableFuture future = saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.USER, + EdgeEventActionType.CREDENTIALS_UPDATED, userId, null); + Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable EdgeEvent result) { futureToSet.set(null); @@ -636,6 +318,139 @@ public class DefaultSyncEdgeService implements SyncEdgeService { return futureToSet; } + @Override + public ListenableFuture processDeviceProfileDevicesRequestMsg(TenantId tenantId, Edge edge, DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg) { + log.trace("[{}] processDeviceProfileDevicesRequestMsg [{}][{}]", tenantId, edge.getName(), deviceProfileDevicesRequestMsg); + SettableFuture futureToSet = SettableFuture.create(); + if (deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB() != 0 && deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB() != 0) { + DeviceProfileId deviceProfileId = new DeviceProfileId(new UUID(deviceProfileDevicesRequestMsg.getDeviceProfileIdMSB(), deviceProfileDevicesRequestMsg.getDeviceProfileIdLSB())); + DeviceProfile deviceProfileById = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); + List> futures; + if (deviceProfileById != null) { + futures = syncDevices(tenantId, edge, deviceProfileById.getName()); + } else { + futures = new ArrayList<>(); + } + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List result) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't sync devices by device profile [{}]", deviceProfileDevicesRequestMsg, t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); + } + return futureToSet; + } + + private List> syncDevices(TenantId tenantId, Edge edge, String deviceType) { + List> futures = new ArrayList<>(); + log.trace("[{}] syncDevices [{}][{}]", tenantId, edge.getName(), deviceType); + try { + PageLink pageLink = new PageLink(DEFAULT_LIMIT); + PageData pageData; + do { + pageData = deviceService.findDevicesByTenantIdAndEdgeIdAndType(tenantId, edge.getId(), deviceType, pageLink); + if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) { + log.trace("[{}] [{}] device(s) are going to be pushed to edge.", edge.getId(), pageData.getData().size()); + for (Device device : pageData.getData()) { + futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.DEVICE, EdgeEventActionType.ADDED, device.getId(), null)); + } + if (pageData.hasNext()) { + pageLink = pageLink.nextPageLink(); + } + } + } while (pageData != null && pageData.hasNext()); + } catch (Exception e) { + log.error("Exception during loading edge device(s) on sync!", e); + } + return futures; + } + + @Override + public ListenableFuture processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge, + WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg) { + log.trace("[{}] processWidgetBundleTypesRequestMsg [{}][{}]", tenantId, edge.getName(), widgetBundleTypesRequestMsg); + SettableFuture futureToSet = SettableFuture.create(); + if (widgetBundleTypesRequestMsg.getWidgetBundleIdMSB() != 0 && widgetBundleTypesRequestMsg.getWidgetBundleIdLSB() != 0) { + WidgetsBundleId widgetsBundleId = new WidgetsBundleId(new UUID(widgetBundleTypesRequestMsg.getWidgetBundleIdMSB(), widgetBundleTypesRequestMsg.getWidgetBundleIdLSB())); + WidgetsBundle widgetsBundleById = widgetsBundleService.findWidgetsBundleById(tenantId, widgetsBundleId); + List> futures = new ArrayList<>(); + if (widgetsBundleById != null) { + List widgetTypesToPush = + widgetTypeService.findWidgetTypesByTenantIdAndBundleAlias(widgetsBundleById.getTenantId(), widgetsBundleById.getAlias()); + + for (WidgetType widgetType : widgetTypesToPush) { + futures.add(saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.WIDGET_TYPE, EdgeEventActionType.ADDED, widgetType.getId(), null)); + } + } + Futures.addCallback(Futures.allAsList(futures), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List result) { + futureToSet.set(null); + } + + @Override + public void onFailure(Throwable t) { + log.error("Can't sync widget types by widget bundle [{}]", widgetBundleTypesRequestMsg, t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); + } + return futureToSet; + } + + @Override + public ListenableFuture processEntityViewsRequestMsg(TenantId tenantId, Edge edge, EntityViewsRequestMsg entityViewsRequestMsg) { + log.trace("[{}] processEntityViewsRequestMsg [{}][{}]", tenantId, edge.getName(), entityViewsRequestMsg); + EntityId entityId = EntityIdFactory.getByTypeAndUuid( + EntityType.valueOf(entityViewsRequestMsg.getEntityType()), + new UUID(entityViewsRequestMsg.getEntityIdMSB(), entityViewsRequestMsg.getEntityIdLSB())); + SettableFuture futureToSet = SettableFuture.create(); + Futures.addCallback(entityViewService.findEntityViewsByTenantIdAndEntityIdAsync(tenantId, entityId), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable List entityViews) { + try { + if (entityViews != null && !entityViews.isEmpty()) { + for (EntityView entityView : entityViews) { + Futures.addCallback(relationService.checkRelation(tenantId, edge.getId(), entityView.getId(), + EntityRelation.CONTAINS_TYPE, RelationTypeGroup.EDGE), new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Boolean result) { + if (Boolean.TRUE.equals(result)) { + saveEdgeEvent(tenantId, edge.getId(), EdgeEventType.ENTITY_VIEW, + EdgeEventActionType.ADDED, entityView.getId(), null); + } + } + + @Override + public void onFailure(Throwable t) { + log.error("Exception during loading relation [{}] to edge on sync!", t, t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); + } + } + futureToSet.set(null); + } catch (Exception e) { + log.error("Exception during loading relation(s) to edge on sync!", e); + futureToSet.setException(e); + } + } + + @Override + public void onFailure(Throwable t) { + log.error("[{}] Can't find entity views by entity id [{}]", tenantId, entityId, t); + futureToSet.setException(t); + } + }, dbCallbackExecutorService); + return futureToSet; + } + private ListenableFuture saveEdgeEvent(TenantId tenantId, EdgeId edgeId, EdgeEventType type, @@ -645,17 +460,10 @@ public class DefaultSyncEdgeService implements SyncEdgeService { log.trace("Pushing edge event to edge queue. tenantId [{}], edgeId [{}], type [{}], action[{}], entityId [{}], body [{}]", tenantId, edgeId, type, action, entityId, body); - EdgeEvent edgeEvent = new EdgeEvent(); - edgeEvent.setTenantId(tenantId); - edgeEvent.setEdgeId(edgeId); - edgeEvent.setType(type); - edgeEvent.setAction(action); - if (entityId != null) { - edgeEvent.setEntityId(entityId.getId()); - } - edgeEvent.setBody(body); + EdgeEvent edgeEvent = EdgeEventUtils.constructEdgeEvent(tenantId, edgeId, type, action, entityId, body); + ListenableFuture future = edgeEventService.saveAsync(edgeEvent); - Futures.addCallback(future, new FutureCallback() { + Futures.addCallback(future, new FutureCallback<>() { @Override public void onSuccess(@Nullable EdgeEvent result) { tbClusterService.onEdgeEventUpdate(tenantId, edgeId); @@ -668,4 +476,5 @@ public class DefaultSyncEdgeService implements SyncEdgeService { }, dbCallbackExecutorService); return future; } + } diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/EdgeRequestsService.java similarity index 71% rename from application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java rename to application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/EdgeRequestsService.java index e95809d019..6e8caf6383 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/init/SyncEdgeService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/sync/EdgeRequestsService.java @@ -13,20 +13,21 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.thingsboard.server.service.edge.rpc.init; +package org.thingsboard.server.service.edge.rpc.sync; import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.gen.edge.AttributesRequestMsg; import org.thingsboard.server.gen.edge.DeviceCredentialsRequestMsg; +import org.thingsboard.server.gen.edge.DeviceProfileDevicesRequestMsg; +import org.thingsboard.server.gen.edge.EntityViewsRequestMsg; import org.thingsboard.server.gen.edge.RelationRequestMsg; import org.thingsboard.server.gen.edge.RuleChainMetadataRequestMsg; import org.thingsboard.server.gen.edge.UserCredentialsRequestMsg; +import org.thingsboard.server.gen.edge.WidgetBundleTypesRequestMsg; -public interface SyncEdgeService { - - void sync(TenantId tenantId, Edge edge); +public interface EdgeRequestsService { ListenableFuture processRuleChainMetadataRequestMsg(TenantId tenantId, Edge edge, RuleChainMetadataRequestMsg ruleChainMetadataRequestMsg); @@ -37,4 +38,10 @@ public interface SyncEdgeService { ListenableFuture processDeviceCredentialsRequestMsg(TenantId tenantId, Edge edge, DeviceCredentialsRequestMsg deviceCredentialsRequestMsg); ListenableFuture processUserCredentialsRequestMsg(TenantId tenantId, Edge edge, UserCredentialsRequestMsg userCredentialsRequestMsg); + + ListenableFuture processDeviceProfileDevicesRequestMsg(TenantId tenantId, Edge edge, DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg); + + ListenableFuture processWidgetBundleTypesRequestMsg(TenantId tenantId, Edge edge, WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg); + + ListenableFuture processEntityViewsRequestMsg(TenantId tenantId, Edge edge, EntityViewsRequestMsg entityViewsRequestMsg); } diff --git a/application/src/main/resources/logback.xml b/application/src/main/resources/logback.xml index b2f19748b6..f34d90876c 100644 --- a/application/src/main/resources/logback.xml +++ b/application/src/main/resources/logback.xml @@ -27,6 +27,7 @@ + diff --git a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java index 92548bb621..120b6007cf 100644 --- a/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java +++ b/application/src/test/java/org/thingsboard/server/edge/BaseEdgeTest.java @@ -191,7 +191,7 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { @Test public void test() throws Exception { testReceivedInitialData(); - int expectedDownlinkSize = 10; + int expectedDownlinkSize = 9; Assert.assertEquals(expectedDownlinkSize, edgeImitator.getDownlinkMsgs().size()); testDevices(); @@ -308,7 +308,8 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { private void testReceivedInitialData() throws Exception { log.info("Checking received data"); - Assert.assertTrue(edgeImitator.waitForMessages()); + boolean condition = edgeImitator.waitForMessages(); + Assert.assertTrue(condition); EdgeConfiguration configuration = edgeImitator.getConfiguration(); Assert.assertNotNull(configuration); @@ -1523,6 +1524,10 @@ abstract public class BaseEdgeTest extends AbstractControllerTest { private void installation() throws Exception { edge = doPost("/api/edge", constructEdge("Test Edge", "test"), Edge.class); + Device savedDeviceDefault = saveDevice("Edge Device Default", "Default"); + doPost("/api/edge/" + edge.getId().getId().toString() + + "/device/" + savedDeviceDefault.getId().getId().toString(), Device.class); + DeviceProfile deviceProfile = this.createDeviceProfile(CUSTOM_DEVICE_PROFILE_NAME, null); extendDeviceProfileData(deviceProfile); doPost("/api/deviceProfile", deviceProfile, DeviceProfile.class); diff --git a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java index 6b4234f151..3717f16d06 100644 --- a/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java +++ b/application/src/test/java/org/thingsboard/server/edge/imitator/EdgeImitator.java @@ -109,7 +109,7 @@ public class EdgeImitator { this::onDownlink, this::onClose); - edgeRpcClient.sendSyncRequestMsg(); + edgeRpcClient.sendSyncRequestMsg(false); } public void disconnect() throws InterruptedException { diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java index c5099069ee..aeb4c38ee4 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeGrpcClient.java @@ -34,6 +34,7 @@ import org.thingsboard.server.gen.edge.EdgeRpcServiceGrpc; import org.thingsboard.server.gen.edge.RequestMsg; import org.thingsboard.server.gen.edge.RequestMsgType; import org.thingsboard.server.gen.edge.ResponseMsg; +import org.thingsboard.server.gen.edge.SyncRequestMsg; import org.thingsboard.server.gen.edge.UplinkMsg; import org.thingsboard.server.gen.edge.UplinkResponseMsg; @@ -102,7 +103,7 @@ public class EdgeGrpcClient implements EdgeRpcClient { Consumer onEdgeUpdate, Consumer onDownlink, Consumer onError) { - return new StreamObserver() { + return new StreamObserver<>() { @Override public void onNext(ResponseMsg responseMsg) { if (responseMsg.hasConnectResponseMsg()) { @@ -195,11 +196,13 @@ public class EdgeGrpcClient implements EdgeRpcClient { } @Override - public void sendSyncRequestMsg() { + public void sendSyncRequestMsg(boolean syncRequired) { try { uplinkMsgLock.lock(); + SyncRequestMsg syncRequestMsg = SyncRequestMsg.newBuilder().setSyncRequired(syncRequired).build(); this.inputStream.onNext(RequestMsg.newBuilder() .setMsgType(RequestMsgType.SYNC_REQUEST_RPC_MESSAGE) + .setSyncRequestMsg(syncRequestMsg) .build()); } finally { uplinkMsgLock.unlock(); diff --git a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java index b3928819a1..83ff20828b 100644 --- a/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java +++ b/common/edge-api/src/main/java/org/thingsboard/edge/rpc/EdgeRpcClient.java @@ -34,7 +34,7 @@ public interface EdgeRpcClient { void disconnect(boolean onError) throws InterruptedException; - void sendSyncRequestMsg(); + void sendSyncRequestMsg(boolean syncRequired); void sendUplinkMsg(UplinkMsg uplinkMsg); diff --git a/common/edge-api/src/main/proto/edge.proto b/common/edge-api/src/main/proto/edge.proto index 9b50b628fd..a630e64066 100644 --- a/common/edge-api/src/main/proto/edge.proto +++ b/common/edge-api/src/main/proto/edge.proto @@ -38,6 +38,7 @@ message RequestMsg { ConnectRequestMsg connectRequestMsg = 2; UplinkMsg uplinkMsg = 3; DownlinkResponseMsg downlinkResponseMsg = 4; + SyncRequestMsg syncRequestMsg = 5; } message ResponseMsg { @@ -74,6 +75,13 @@ message ConnectResponseMsg { EdgeConfiguration configuration = 3; } +message SyncRequestMsg { + bool syncRequired = 1; +} + +message SyncCompletedMsg { +} + message EdgeConfiguration { int64 edgeIdMSB = 1; int64 edgeIdLSB = 2; @@ -358,6 +366,22 @@ message DeviceCredentialsRequestMsg { int64 deviceIdLSB = 2; } +message DeviceProfileDevicesRequestMsg { + int64 deviceProfileIdMSB = 1; + int64 deviceProfileIdLSB = 2; +} + +message WidgetBundleTypesRequestMsg { + int64 widgetBundleIdMSB = 1; + int64 widgetBundleIdLSB = 2; +} + +message EntityViewsRequestMsg { + int64 entityIdMSB = 1; + int64 entityIdLSB = 2; + string entityType = 3; +} + message DeviceRpcCallMsg { int64 deviceIdMSB = 1; int64 deviceIdLSB = 2; @@ -402,6 +426,9 @@ message UplinkMsg { repeated UserCredentialsRequestMsg userCredentialsRequestMsg = 10; repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 11; repeated DeviceRpcCallMsg deviceRpcCallMsg = 12; + repeated DeviceProfileDevicesRequestMsg deviceProfileDevicesRequestMsg = 13; + repeated WidgetBundleTypesRequestMsg widgetBundleTypesRequestMsg = 14; + repeated EntityViewsRequestMsg entityViewsRequestMsg = 15; } message UplinkResponseMsg { @@ -416,24 +443,25 @@ message DownlinkResponseMsg { message DownlinkMsg { int32 downlinkMsgId = 1; - repeated EntityDataProto entityData = 2; - repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 3; - repeated DeviceUpdateMsg deviceUpdateMsg = 4; - repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 5; - repeated DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 6; - repeated RuleChainUpdateMsg ruleChainUpdateMsg = 7; - repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 8; - repeated DashboardUpdateMsg dashboardUpdateMsg = 9; - repeated AssetUpdateMsg assetUpdateMsg = 10; - repeated EntityViewUpdateMsg entityViewUpdateMsg = 11; - repeated AlarmUpdateMsg alarmUpdateMsg = 12; - repeated UserUpdateMsg userUpdateMsg = 13; - repeated UserCredentialsUpdateMsg userCredentialsUpdateMsg = 14; - repeated CustomerUpdateMsg customerUpdateMsg = 15; - repeated RelationUpdateMsg relationUpdateMsg = 16; - repeated WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 17; - repeated WidgetTypeUpdateMsg widgetTypeUpdateMsg = 18; - repeated AdminSettingsUpdateMsg adminSettingsUpdateMsg = 19; - repeated DeviceRpcCallMsg deviceRpcCallMsg = 20; + SyncCompletedMsg syncCompletedMsg = 2; + repeated EntityDataProto entityData = 3; + repeated DeviceCredentialsRequestMsg deviceCredentialsRequestMsg = 4; + repeated DeviceUpdateMsg deviceUpdateMsg = 5; + repeated DeviceProfileUpdateMsg deviceProfileUpdateMsg = 6; + repeated DeviceCredentialsUpdateMsg deviceCredentialsUpdateMsg = 7; + repeated RuleChainUpdateMsg ruleChainUpdateMsg = 8; + repeated RuleChainMetadataUpdateMsg ruleChainMetadataUpdateMsg = 9; + repeated DashboardUpdateMsg dashboardUpdateMsg = 10; + repeated AssetUpdateMsg assetUpdateMsg = 11; + repeated EntityViewUpdateMsg entityViewUpdateMsg = 12; + repeated AlarmUpdateMsg alarmUpdateMsg = 13; + repeated UserUpdateMsg userUpdateMsg = 14; + repeated UserCredentialsUpdateMsg userCredentialsUpdateMsg = 15; + repeated CustomerUpdateMsg customerUpdateMsg = 16; + repeated RelationUpdateMsg relationUpdateMsg = 17; + repeated WidgetsBundleUpdateMsg widgetsBundleUpdateMsg = 18; + repeated WidgetTypeUpdateMsg widgetTypeUpdateMsg = 19; + repeated AdminSettingsUpdateMsg adminSettingsUpdateMsg = 20; + repeated DeviceRpcCallMsg deviceRpcCallMsg = 21; } diff --git a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java index 2ff7d94f43..d9e70bb68b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java +++ b/dao/src/main/java/org/thingsboard/server/dao/edge/EdgeServiceImpl.java @@ -558,6 +558,7 @@ public class EdgeServiceImpl extends AbstractEntityService implements EdgeServic if (userById.getCustomerId() == null || userById.getCustomerId().isNullUid()) { pageData = findEdgesByTenantId(tenantId, pageLink); } else { +// pageData = findEdgesByTenantIdAndCustomerId(tenantId, userById.getCustomerId(), pageLink); pageData = findEdgesByTenantIdAndCustomerId(tenantId, new CustomerId(entityId.getId()), pageLink); } if (pageData != null && pageData.getData() != null && !pageData.getData().isEmpty()) {