diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java index 92c48a5fed..561dc7122a 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbClusterService.java @@ -551,11 +551,15 @@ public class DefaultTbClusterService implements TbClusterService { } private void processEdgeNotification(EdgeId edgeId, ToEdgeNotificationMsg toEdgeNotificationMsg) { - var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeId)); - serviceIdOpt.ifPresentOrElse( - serviceId -> pushMsgToEdgeNotification(toEdgeNotificationMsg, serviceId.get()), - () -> broadcastEdgeNotification(edgeId, toEdgeNotificationMsg) - ); + if (edgesEnabled) { + var serviceIdOpt = Optional.ofNullable(edgeIdServiceIdCache.get(edgeId)); + serviceIdOpt.ifPresentOrElse( + serviceId -> pushMsgToEdgeNotification(toEdgeNotificationMsg, serviceId.get()), + () -> broadcastEdgeNotification(edgeId, toEdgeNotificationMsg) + ); + } else { + log.trace("Edges disabled. Ignoring edge notification {} for edgeId: {}", toEdgeNotificationMsg, edgeId); + } } private void pushMsgToEdgeNotification(ToEdgeNotificationMsg toEdgeNotificationMsg, String serviceId) { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java index fdaa2103e2..2e4c15f8e8 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbEdgeConsumerService.java @@ -51,6 +51,7 @@ import org.thingsboard.server.queue.provider.TbCoreQueueFactory; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.edge.EdgeContextComponent; import org.thingsboard.server.queue.common.consumer.MainQueueConsumerManager; +import org.thingsboard.server.service.edge.rpc.EdgeRpcService; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.IdMsgPair; @@ -195,36 +196,42 @@ public class DefaultTbEdgeConsumerService extends AbstractConsumerService msg, TbCallback callback) { ToEdgeNotificationMsg toEdgeNotificationMsg = msg.getValue(); try { + EdgeRpcService edgeRpcService = edgeCtx.getEdgeRpcService(); + if (edgeRpcService == null) { + log.debug("No EdgeRpcService available (edge functionality disabled), ignoring msg: {}", toEdgeNotificationMsg); + callback.onSuccess(); + return; + } if (toEdgeNotificationMsg.hasEdgeHighPriority()) { EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeHighPriority()); - edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); + edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); callback.onSuccess(); } else if (toEdgeNotificationMsg.hasEdgeEventUpdate()) { EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getEdgeEventUpdate()); - edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); + edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); callback.onSuccess(); } else if (toEdgeNotificationMsg.hasToEdgeSyncRequest()) { EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getToEdgeSyncRequest()); - edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); + edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); callback.onSuccess(); } else if (toEdgeNotificationMsg.hasFromEdgeSyncResponse()) { EdgeSessionMsg edgeSessionMsg = ProtoUtils.fromProto(toEdgeNotificationMsg.getFromEdgeSyncResponse()); - edgeCtx.getEdgeRpcService().onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); + edgeRpcService.onToEdgeSessionMsg(edgeSessionMsg.getTenantId(), edgeSessionMsg); callback.onSuccess(); } else if (toEdgeNotificationMsg.hasComponentLifecycle()) { ComponentLifecycleMsg componentLifecycle = ProtoUtils.fromProto(toEdgeNotificationMsg.getComponentLifecycle()); TenantId tenantId = componentLifecycle.getTenantId(); EdgeId edgeId = new EdgeId(componentLifecycle.getEntityId().getId()); if (ComponentLifecycleEvent.DELETED.equals(componentLifecycle.getEvent())) { - edgeCtx.getEdgeRpcService().deleteEdge(tenantId, edgeId); + edgeRpcService.deleteEdge(tenantId, edgeId); } else if (ComponentLifecycleEvent.UPDATED.equals(componentLifecycle.getEvent())) { Edge edge = edgeCtx.getEdgeService().findEdgeById(tenantId, edgeId); - edgeCtx.getEdgeRpcService().updateEdge(tenantId, edge); + edgeRpcService.updateEdge(tenantId, edge); } callback.onSuccess(); } } catch (Exception e) { - log.error("Error processing edge notification message", e); + log.error("Error processing edge notification message {}", toEdgeNotificationMsg, e); callback.onFailure(e); }