From fb5c593bb50a307b362ca4b0832e2bde8d1d42bb Mon Sep 17 00:00:00 2001 From: Volodymyr Babak Date: Mon, 16 Oct 2023 12:54:10 +0300 Subject: [PATCH] Edge events are processed in a separate threads to avoid blocking core consumer threads --- .../edge/DefaultEdgeNotificationService.java | 171 ++++++++++-------- .../src/main/resources/thingsboard.yml | 1 + 2 files changed, 92 insertions(+), 80 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java index 69852e6c05..c8e0c09d50 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/DefaultEdgeNotificationService.java @@ -22,10 +22,11 @@ import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; import org.checkerframework.checker.nullness.qual.Nullable; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.audit.ActionType; import org.thingsboard.server.common.data.edge.Edge; import org.thingsboard.server.common.data.edge.EdgeEventType; @@ -59,7 +60,7 @@ import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.UUID; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; @Service @TbCoreComponent @@ -128,17 +129,20 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { @Autowired protected ApplicationEventPublisher eventPublisher; - private ExecutorService dbCallBackExecutor; + @Value("${actors.system.edge_dispatcher_pool_size:4}") + private int edgeDispatcherSize; + + private ExecutorService executor; @PostConstruct public void initExecutor() { - dbCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("edge-notifications")); + executor = ThingsBoardExecutors.newWorkStealingPool(edgeDispatcherSize, "edge-notifications"); } @PreDestroy public void shutdownExecutor() { - if (dbCallBackExecutor != null) { - dbCallBackExecutor.shutdownNow(); + if (executor != null) { + executor.shutdownNow(); } } @@ -157,82 +161,89 @@ public class DefaultEdgeNotificationService implements EdgeNotificationService { public void pushNotificationToEdge(TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback) { TenantId tenantId = TenantId.fromUUID(new UUID(edgeNotificationMsg.getTenantIdMSB(), edgeNotificationMsg.getTenantIdLSB())); log.debug("[{}] Pushing notification to edge {}", tenantId, edgeNotificationMsg); - try { - EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); - ListenableFuture future; - switch (type) { - case EDGE: - future = edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg); - break; - case ASSET: - future = assetProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case DEVICE: - future = deviceProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case ENTITY_VIEW: - future = entityViewProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case DASHBOARD: - future = dashboardProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case RULE_CHAIN: - future = ruleChainProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case USER: - future = userProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case CUSTOMER: - future = customerProcessor.processCustomerNotification(tenantId, edgeNotificationMsg); - break; - case DEVICE_PROFILE: - future = deviceProfileProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case ASSET_PROFILE: - future = assetProfileProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case OTA_PACKAGE: - future = otaPackageProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case WIDGETS_BUNDLE: - future = widgetBundleProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case WIDGET_TYPE: - future = widgetTypeProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case QUEUE: - future = queueProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case ALARM: - future = alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg); - break; - case RELATION: - future = relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg); - break; - case TENANT: - future = tenantEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - case TENANT_PROFILE: - future = tenantProfileEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg); - break; - default: - log.warn("[{}] Edge event type [{}] is not designed to be pushed to edge", tenantId, type); - future = Futures.immediateFuture(null); - } - Futures.addCallback(future, new FutureCallback<>() { - @Override - public void onSuccess(@Nullable Void unused) { - callback.onSuccess(); + final long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); + executor.submit(() -> { + try { + if (deadline < System.nanoTime()) { + log.warn("[{}] Skipping notification message because deadline reached {}", tenantId, edgeNotificationMsg); + return; } - - @Override - public void onFailure(Throwable throwable) { - callBackFailure(tenantId, edgeNotificationMsg, callback, throwable); + EdgeEventType type = EdgeEventType.valueOf(edgeNotificationMsg.getType()); + ListenableFuture future; + switch (type) { + case EDGE: + future = edgeProcessor.processEdgeNotification(tenantId, edgeNotificationMsg); + break; + case ASSET: + future = assetProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case DEVICE: + future = deviceProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case ENTITY_VIEW: + future = entityViewProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case DASHBOARD: + future = dashboardProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case RULE_CHAIN: + future = ruleChainProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case USER: + future = userProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case CUSTOMER: + future = customerProcessor.processCustomerNotification(tenantId, edgeNotificationMsg); + break; + case DEVICE_PROFILE: + future = deviceProfileProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case ASSET_PROFILE: + future = assetProfileProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case OTA_PACKAGE: + future = otaPackageProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case WIDGETS_BUNDLE: + future = widgetBundleProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case WIDGET_TYPE: + future = widgetTypeProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case QUEUE: + future = queueProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case ALARM: + future = alarmProcessor.processAlarmNotification(tenantId, edgeNotificationMsg); + break; + case RELATION: + future = relationProcessor.processRelationNotification(tenantId, edgeNotificationMsg); + break; + case TENANT: + future = tenantEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + case TENANT_PROFILE: + future = tenantProfileEdgeProcessor.processEntityNotification(tenantId, edgeNotificationMsg); + break; + default: + log.warn("[{}] Edge event type [{}] is not designed to be pushed to edge", tenantId, type); + future = Futures.immediateFuture(null); } - }, dbCallBackExecutor); - } catch (Exception e) { - callBackFailure(tenantId, edgeNotificationMsg, callback, e); - } + Futures.addCallback(future, new FutureCallback<>() { + @Override + public void onSuccess(@Nullable Void unused) { + callback.onSuccess(); + } + + @Override + public void onFailure(Throwable throwable) { + callBackFailure(tenantId, edgeNotificationMsg, callback, throwable); + } + }, executor); + } catch (Exception e) { + callBackFailure(tenantId, edgeNotificationMsg, callback, e); + } + }); } private void callBackFailure(TenantId tenantId, TransportProtos.EdgeNotificationMsgProto edgeNotificationMsg, TbCallback callback, Throwable throwable) { diff --git a/application/src/main/resources/thingsboard.yml b/application/src/main/resources/thingsboard.yml index 4a3023d48f..9fbfd2fc63 100644 --- a/application/src/main/resources/thingsboard.yml +++ b/application/src/main/resources/thingsboard.yml @@ -364,6 +364,7 @@ actors: tenant_dispatcher_pool_size: "${ACTORS_SYSTEM_TENANT_DISPATCHER_POOL_SIZE:2}" device_dispatcher_pool_size: "${ACTORS_SYSTEM_DEVICE_DISPATCHER_POOL_SIZE:4}" rule_dispatcher_pool_size: "${ACTORS_SYSTEM_RULE_DISPATCHER_POOL_SIZE:8}" + edge_dispatcher_pool_size: "${ACTORS_SYSTEM_EDGE_DISPATCHER_POOL_SIZE:4}" tenant: create_components_on_init: "${ACTORS_TENANT_CREATE_COMPONENTS_ON_INIT:true}" session: