diff --git a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java index 5d45e1c3d5..9c100b4960 100644 --- a/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/edge/rpc/EdgeGrpcService.java @@ -31,7 +31,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.AttributeScope; @@ -71,7 +71,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -170,9 +169,9 @@ public class EdgeGrpcService extends EdgeRpcServiceGrpc.EdgeRpcServiceImplBase i log.error("Failed to start Edge RPC server!", e); throw new RuntimeException("Failed to start Edge RPC server!"); } - this.edgeEventProcessingExecutorService = Executors.newScheduledThreadPool(schedulerPoolSize, ThingsBoardThreadFactory.forName("edge-event-check-scheduler")); - this.sendDownlinkExecutorService = Executors.newScheduledThreadPool(sendSchedulerPoolSize, ThingsBoardThreadFactory.forName("edge-send-scheduler")); - this.executorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("edge-service")); + this.edgeEventProcessingExecutorService = ThingsBoardExecutors.newScheduledThreadPool(schedulerPoolSize, "edge-event-check-scheduler"); + this.sendDownlinkExecutorService = ThingsBoardExecutors.newScheduledThreadPool(sendSchedulerPoolSize, "edge-send-scheduler"); + this.executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("edge-service"); log.info("Edge RPC service initialized!"); } diff --git a/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java b/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java index c8e30ba471..45792323e2 100644 --- a/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java +++ b/application/src/main/java/org/thingsboard/server/service/executors/PubSubRuleNodeExecutorProvider.java @@ -21,10 +21,9 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Component; import org.thingsboard.common.util.ExecutorProvider; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.queue.util.TbRuleEngineComponent; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @Lazy @@ -46,7 +45,7 @@ public class PubSubRuleNodeExecutorProvider implements ExecutorProvider { if (threadPoolSize == null) { threadPoolSize = THREADS_PER_CPU * Runtime.getRuntime().availableProcessors(); } - executor = Executors.newScheduledThreadPool(threadPoolSize, ThingsBoardThreadFactory.forName("pubsub-rule-nodes")); + executor = ThingsBoardExecutors.newScheduledThreadPool(threadPoolSize, "pubsub-rule-nodes"); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java b/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java index 1603720450..586259eb06 100644 --- a/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java +++ b/application/src/main/java/org/thingsboard/server/service/mail/DefaultMailService.java @@ -35,7 +35,7 @@ import org.springframework.mail.javamail.JavaMailSenderImpl; import org.springframework.mail.javamail.MimeMessageHelper; import org.springframework.stereotype.Service; import org.springframework.ui.freemarker.FreeMarkerTemplateUtils; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.TbEmail; import org.thingsboard.server.cache.limits.RateLimitService; @@ -60,7 +60,6 @@ import java.io.ByteArrayInputStream; import java.util.HashMap; import java.util.Locale; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -111,7 +110,7 @@ public class DefaultMailService implements MailService { this.freemarkerConfig = freemarkerConfig; this.adminSettingsService = adminSettingsService; this.apiUsageClient = apiUsageClient; - this.timeoutScheduler = Executors.newScheduledThreadPool(1, ThingsBoardThreadFactory.forName("mail-service-watchdog")); + this.timeoutScheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("mail-service-watchdog"); } @PostConstruct diff --git a/application/src/main/java/org/thingsboard/server/service/notification/DefaultNotificationSchedulerService.java b/application/src/main/java/org/thingsboard/server/service/notification/DefaultNotificationSchedulerService.java index 98e85077a4..8e59891990 100644 --- a/application/src/main/java/org/thingsboard/server/service/notification/DefaultNotificationSchedulerService.java +++ b/application/src/main/java/org/thingsboard/server/service/notification/DefaultNotificationSchedulerService.java @@ -23,7 +23,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.NotificationCenter; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.NotificationRequestId; @@ -49,7 +49,6 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -63,7 +62,7 @@ public class DefaultNotificationSchedulerService extends AbstractPartitionBasedS private final NotificationCenter notificationCenter; private final NotificationRequestService notificationRequestService; private final NotificationExecutorService notificationExecutor; - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("notification-scheduler")); + private final ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("notification-scheduler"); private final Map scheduledNotificationRequests = new ConcurrentHashMap<>(); diff --git a/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java b/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java index fc5ce5214c..3d4a8654d6 100644 --- a/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java +++ b/application/src/main/java/org/thingsboard/server/service/partition/AbstractPartitionBasedService.java @@ -22,7 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.common.util.DonAsynchron; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -40,7 +40,6 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; @Slf4j public abstract class AbstractPartitionBasedService extends TbApplicationEventListener { @@ -67,7 +66,7 @@ public abstract class AbstractPartitionBasedService extends protected void init() { // Should be always single threaded due to absence of locks. - scheduledExecutor = MoreExecutors.listeningDecorator(Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getSchedulerExecutorName()))); + scheduledExecutor = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newSingleThreadScheduledExecutor(getSchedulerExecutorName())); } protected ServiceType getServiceType() { diff --git a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java index 1b7e5d53a3..95fc8cb843 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/processing/AbstractConsumerService.java @@ -82,7 +82,7 @@ public abstract class AbstractConsumerService>builder() .name(getServiceType().getLabel() + " Notifications") diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java index 9cd680596c..e8cce43aa2 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbCoreDeviceRpcService.java @@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; @@ -46,7 +46,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -86,7 +85,7 @@ public class DefaultTbCoreDeviceRpcService implements TbCoreDeviceRpcService { @PostConstruct public void initExecutor() { - scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-core-rpc-scheduler")); + scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("tb-core-rpc-scheduler"); serviceId = serviceInfoProvider.getServiceId(); } diff --git a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java index 3b1b50d5d7..debc1cb987 100644 --- a/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java +++ b/application/src/main/java/org/thingsboard/server/service/rpc/DefaultTbRuleEngineRpcService.java @@ -20,7 +20,7 @@ import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcRequest; import org.thingsboard.rule.engine.api.RuleEngineDeviceRpcResponse; import org.thingsboard.server.cluster.TbClusterService; @@ -45,7 +45,6 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -83,7 +82,7 @@ public class DefaultTbRuleEngineRpcService implements TbRuleEngineDeviceRpcServi @PostConstruct public void initExecutor() { - scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("rule-engine-rpc-scheduler")); + scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("rule-engine-rpc-scheduler"); serviceId = serviceInfoProvider.getServiceId(); } diff --git a/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java b/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java index fea30da91c..6f4d18b94e 100644 --- a/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java +++ b/application/src/main/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallService.java @@ -19,7 +19,7 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.msg.TbMsg; @@ -30,7 +30,6 @@ import org.thingsboard.server.gen.transport.TransportProtos; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -51,7 +50,7 @@ public class DefaultRuleEngineCallService implements RuleEngineCallService { @PostConstruct public void initExecutor() { - executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback")); + executor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("re-rest-callback"); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 019b6eb5e8..8af29d4044 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -31,6 +31,7 @@ import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.web.socket.CloseStatus; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; @@ -83,7 +84,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -150,11 +150,10 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc serviceId = serviceInfoProvider.getServiceId(); wsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ws-entity-sub-callback")); tsInSqlDB = databaseTsType.equalsIgnoreCase("sql") || databaseTsType.equalsIgnoreCase("timescale"); - ThreadFactory tbThreadFactory = ThingsBoardThreadFactory.forName("ws-entity-sub-scheduler"); if (dynamicPageLinkRefreshPoolSize == 1) { - scheduler = Executors.newSingleThreadScheduledExecutor(tbThreadFactory); + scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("ws-entity-sub-scheduler"); } else { - scheduler = Executors.newScheduledThreadPool(dynamicPageLinkRefreshPoolSize, tbThreadFactory); + scheduler = ThingsBoardExecutors.newScheduledThreadPool(dynamicPageLinkRefreshPoolSize, "ws-entity-sub-scheduler"); } } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 4043b519da..de7a2570ea 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -130,7 +130,7 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); tsCallBackExecutor = Executors.newFixedThreadPool(8, ThingsBoardThreadFactory.forName("ts-sub-callback")); //since we are using locks by TenantId serviceId = serviceInfoProvider.getServiceId(); - staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup")); + staleSessionCleanupExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("stale-session-cleanup"); staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 60, 60, TimeUnit.SECONDS); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionSchedulerComponent.java b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionSchedulerComponent.java index 680be6929d..94a92d5bbd 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionSchedulerComponent.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionSchedulerComponent.java @@ -19,10 +19,9 @@ import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.Getter; import org.springframework.stereotype.Service; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.queue.util.TbCoreComponent; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -36,7 +35,7 @@ public class SubscriptionSchedulerComponent { @PostConstruct public void initExecutor() { - scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("subscription-scheduler")); + scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("subscription-scheduler"); } @PreDestroy diff --git a/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java b/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java index 46e28c3632..4016352f2d 100644 --- a/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java +++ b/application/src/main/java/org/thingsboard/server/service/system/DefaultSystemInfoService.java @@ -24,7 +24,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.MailService; import org.thingsboard.rule.engine.api.SmsService; import org.thingsboard.server.common.data.AdminSettings; @@ -55,7 +55,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -106,7 +105,7 @@ public class DefaultSystemInfoService extends TbApplicationEventListener checkUpdatesFuture = null; private final RestTemplate restClient = new RestTemplate(); diff --git a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java index 8d96d14b49..98b9668c97 100644 --- a/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java @@ -33,7 +33,6 @@ import org.springframework.stereotype.Service; import org.springframework.web.socket.CloseStatus; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardExecutors; -import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.TenantProfile; @@ -97,7 +96,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -157,7 +155,7 @@ public class DefaultWebSocketService implements WebSocketService { serviceId = serviceInfoProvider.getServiceId(); executor = ThingsBoardExecutors.newWorkStealingPool(50, getClass()); - pingExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("telemetry-web-socket-ping")); + pingExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("telemetry-web-socket-ping"); pingExecutor.scheduleWithFixedDelay(this::sendPing, pingTimeout / NUMBER_OF_PING_ATTEMPTS, pingTimeout / NUMBER_OF_PING_ATTEMPTS, TimeUnit.MILLISECONDS); cmdsHandlers = new EnumMap<>(WsCmdType.class); diff --git a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java index fdf2e203ee..66e3de13d1 100644 --- a/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java +++ b/application/src/test/java/org/thingsboard/server/service/queue/ruleengine/TbRuleEngineQueueConsumerManagerTest.java @@ -157,7 +157,7 @@ public class TbRuleEngineQueueConsumerManagerTest { when(producerProvider.getRuleEngineMsgProducer()).thenReturn(ruleEngineMsgProducer); consumersExecutor = Executors.newCachedThreadPool(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer")); mgmtExecutor = ThingsBoardExecutors.newWorkStealingPool(3, "tb-rule-engine-mgmt"); - scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tb-rule-engine-consumer-scheduler")); + scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("tb-rule-engine-consumer-scheduler"); ruleEngineConsumerContext.setTopicDeletionDelayInSec(5); queue = new Queue(); diff --git a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java index 282563574f..c49149bd07 100644 --- a/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java +++ b/application/src/test/java/org/thingsboard/server/service/ruleengine/DefaultRuleEngineCallServiceTest.java @@ -23,7 +23,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.testcontainers.shaded.org.awaitility.Awaitility; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.id.TenantId; @@ -37,7 +37,6 @@ import java.util.HashMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -63,7 +62,7 @@ public class DefaultRuleEngineCallServiceTest { @BeforeEach void setUp() { - executor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("re-rest-callback")); + executor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("re-rest-callback"); ruleEngineCallService = new DefaultRuleEngineCallService(tbClusterServiceMock); ReflectionTestUtils.setField(ruleEngineCallService, "executor", executor); ReflectionTestUtils.setField(ruleEngineCallService, "requests", requests); diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java index 983cfde0e5..8423d5e40a 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/AbstractLwM2MIntegrationTest.java @@ -32,7 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.mock.mockito.SpyBean; import org.springframework.test.context.TestPropertySource; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileProvisionType; @@ -80,7 +80,6 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -197,7 +196,7 @@ public abstract class AbstractLwM2MIntegrationTest extends AbstractTransportInte } private void init() throws Exception { - executor = Executors.newScheduledThreadPool(10, ThingsBoardThreadFactory.forName("test-lwm2m-scheduled")); + executor = ThingsBoardExecutors.newScheduledThreadPool(10, "test-lwm2m-scheduled"); loginTenantAdmin(); for (String resourceName : this.resources) { TbResource lwModel = new TbResource(); diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java index f42a7c938a..649135d11a 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java @@ -24,12 +24,11 @@ import org.eclipse.leshan.core.request.argument.Arguments; import org.eclipse.leshan.core.response.ExecuteResponse; import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.core.response.WriteResponse; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import javax.security.auth.Destroyable; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -39,7 +38,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { private static final List supportedResources = Arrays.asList(0, 1, 2, 3, 5, 6, 7, 9); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope")); + private final ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(getClass().getSimpleName() + "-test-scope"); private final AtomicInteger state = new AtomicInteger(0); diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java index 284afc3f51..b4318b9cab 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/SwLwM2MDevice.java @@ -24,12 +24,11 @@ import org.eclipse.leshan.core.request.argument.Arguments; import org.eclipse.leshan.core.response.ExecuteResponse; import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.core.response.WriteResponse; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import javax.security.auth.Destroyable; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -39,7 +38,7 @@ public class SwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { private static final List supportedResources = Arrays.asList(0, 1, 2, 3, 4, 6, 7, 9); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope")); + private final ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(getClass().getSimpleName() + "-test-scope"); private final AtomicInteger state = new AtomicInteger(0); diff --git a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java index 7bb74c804c..16e881b384 100644 --- a/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java +++ b/common/actor/src/main/java/org/thingsboard/server/actors/DefaultTbActorSystem.java @@ -18,7 +18,7 @@ package org.thingsboard.server.actors; import lombok.Data; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.msg.TbActorMsg; import java.util.Collections; @@ -27,7 +27,6 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -51,7 +50,7 @@ public class DefaultTbActorSystem implements TbActorSystem { public DefaultTbActorSystem(TbActorSystemSettings settings) { this.settings = settings; - this.scheduler = Executors.newScheduledThreadPool(settings.getSchedulerPoolSize(), ThingsBoardThreadFactory.forName("actor-system-scheduler")); + this.scheduler = ThingsBoardExecutors.newScheduledThreadPool(settings.getSchedulerPoolSize(), "actor-system-scheduler"); } @Override diff --git a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java index 3ef97a0dee..f3d30bfea4 100644 --- a/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java +++ b/common/coap-server/src/main/java/org/thingsboard/server/coapserver/DefaultCoapServerService.java @@ -27,14 +27,13 @@ import org.eclipse.californium.scandium.DTLSConnector; import org.eclipse.californium.scandium.config.DtlsConnectorConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Random; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -115,7 +114,7 @@ public class DefaultCoapServerService implements CoapServerService { CoapEndpoint dtlsCoapEndpoint = dtlsCoapEndpointBuilder.build(); server.addEndpoint(dtlsCoapEndpoint); tbDtlsCertificateVerifier = (TbCoapDtlsCertificateVerifier) dtlsConnectorConfig.getAdvancedCertificateVerifier(); - dtlsSessionsExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName())); + dtlsSessionsExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor(getClass().getSimpleName()); dtlsSessionsExecutor.scheduleAtFixedRate(this::evictTimeoutSessions, new Random().nextInt((int) getDtlsSessionReportTimeout()), getDtlsSessionReportTimeout(), TimeUnit.MILLISECONDS); } Resource root = server.getRoot(); diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java index 630e1e4b11..1042eeb280 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/DefaultTbQueueResponseTemplate.java @@ -17,6 +17,7 @@ package org.thingsboard.server.queue.common; import lombok.Builder; import lombok.extern.slf4j.Slf4j; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.stats.MessagesStats; @@ -71,7 +72,7 @@ public class DefaultTbQueueResponseTemplate scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return schedulerExecutor.scheduleAtFixedRate(() -> runSafely(command), initialDelay, period, unit); + return schedulerExecutor.scheduleAtFixedRate(command, initialDelay, period, unit); } public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return schedulerExecutor.scheduleWithFixedDelay(() -> runSafely(command), initialDelay, delay, unit); - } - - private void runSafely(Runnable command) { - try { - command.run(); - } catch (Throwable t) { - log.error("Unexpected error occurred while executing task!", t); - } + return schedulerExecutor.scheduleWithFixedDelay(command, initialDelay, delay, unit); } } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java index fec6185d90..01fe887165 100644 --- a/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java +++ b/common/queue/src/test/java/org/thingsboard/server/queue/discovery/ZkDiscoveryServiceTest.java @@ -28,13 +28,12 @@ import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.context.ApplicationEventPublisher; import org.springframework.test.util.ReflectionTestUtils; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.gen.transport.TransportProtos; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import static org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type.CHILD_ADDED; @@ -81,7 +80,7 @@ public class ZkDiscoveryServiceTest { @BeforeEach public void setup() { zkDiscoveryService = Mockito.spy(new ZkDiscoveryService(applicationEventPublisher, serviceInfoProvider, partitionService)); - ScheduledExecutorService zkExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("zk-discovery")); + ScheduledExecutorService zkExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("zk-discovery"); when(client.getState()).thenReturn(CuratorFrameworkState.STARTED); ReflectionTestUtils.setField(zkDiscoveryService, "stopped", false); ReflectionTestUtils.setField(zkDiscoveryService, "client", client); diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponentTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponentTest.java deleted file mode 100644 index cff306c748..0000000000 --- a/common/queue/src/test/java/org/thingsboard/server/queue/scheduler/DefaultSchedulerComponentTest.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Copyright © 2016-2024 The Thingsboard Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.thingsboard.server.queue.scheduler; - -import org.awaitility.Awaitility; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; - -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import static org.assertj.core.api.Assertions.assertThat; - -class DefaultSchedulerComponentTest { - - DefaultSchedulerComponent schedulerComponent; - - @BeforeEach - void setup() { - schedulerComponent = new DefaultSchedulerComponent(); - schedulerComponent.init(); - } - - @AfterEach - void cleanup() { - schedulerComponent.destroy(); - } - - @Test - @DisplayName("scheduleAtFixedRate() should continue periodic execution even if command throws exception") - void scheduleAtFixedRateShouldNotStopPeriodicExecutionWhenCommandThrowsException() { - // GIVEN - var wasExecutedAtLeastOnce = new AtomicBoolean(false); - - Runnable exceptionThrowingCommand = () -> { - try { - throw new RuntimeException("Unexpected exception"); - } finally { - wasExecutedAtLeastOnce.set(true); - } - }; - - // WHEN - ScheduledFuture future = schedulerComponent.scheduleAtFixedRate(exceptionThrowingCommand, 0, 200, TimeUnit.MILLISECONDS); - - // THEN - Awaitility.await().alias("Wait until command is executed at least once") - .atMost(5, TimeUnit.SECONDS) - .until(wasExecutedAtLeastOnce::get); - - assertThat(future.isDone()).as("Periodic execution should not stop after unhandled exception is thrown by the command").isFalse(); - } - - @Test - @DisplayName("scheduleWithFixedDelay() should continue periodic execution even if command throws exception") - void scheduleWithFixedDelayShouldNotStopPeriodicExecutionWhenCommandThrowsException() { - // GIVEN - var wasExecutedAtLeastOnce = new AtomicBoolean(false); - - Runnable exceptionThrowingCommand = () -> { - try { - throw new RuntimeException("Unexpected exception"); - } finally { - wasExecutedAtLeastOnce.set(true); - } - }; - - // WHEN - ScheduledFuture future = schedulerComponent.scheduleWithFixedDelay(exceptionThrowingCommand, 0, 200, TimeUnit.MILLISECONDS); - - // THEN - Awaitility.await().alias("Wait until command is executed at least once") - .atMost(5, TimeUnit.SECONDS) - .until(wasExecutedAtLeastOnce::get); - - assertThat(future.isDone()).as("Periodic execution should not stop after unhandled exception is thrown by the command").isFalse(); - } - -} diff --git a/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java b/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java index 9b52be0f58..342801f9a8 100644 --- a/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java +++ b/common/script/script-api/src/main/java/org/thingsboard/script/api/AbstractScriptInvokeService.java @@ -21,7 +21,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.TenantId; @@ -29,7 +29,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -87,7 +86,7 @@ public abstract class AbstractScriptInvokeService implements ScriptInvokeService public void init() { if (getMaxEvalRequestsTimeout() > 0 || getMaxInvokeRequestsTimeout() > 0) { - timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("script-timeout")); + timeoutExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("script-timeout"); } } diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbInMemoryRegistrationStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbInMemoryRegistrationStore.java index 138d0d6af8..7479bf23ca 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbInMemoryRegistrationStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbInMemoryRegistrationStore.java @@ -34,7 +34,6 @@ import org.eclipse.leshan.core.observation.ObservationIdentifier; import org.eclipse.leshan.core.observation.SingleObservation; import org.eclipse.leshan.core.peer.LwM2mIdentity; import org.eclipse.leshan.core.request.ContentFormat; -import org.eclipse.leshan.core.util.NamedThreadFactory; import org.eclipse.leshan.server.registration.Deregistration; import org.eclipse.leshan.server.registration.ExpirationListener; import org.eclipse.leshan.server.registration.Registration; @@ -42,6 +41,7 @@ import org.eclipse.leshan.server.registration.RegistrationStore; import org.eclipse.leshan.server.registration.RegistrationUpdate; import org.eclipse.leshan.server.registration.UpdatedRegistration; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider; @@ -56,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -99,9 +98,7 @@ public class TbInMemoryRegistrationStore implements RegistrationStore, Startable } public TbInMemoryRegistrationStore(LwM2MTransportServerConfig config, long cleanPeriodInSec, LwM2mVersionedModelProvider modelProvider) { - this(config, Executors.newScheduledThreadPool(1, - new NamedThreadFactory(String.format("TbInMemoryRegistrationStore Cleaner (%ds)", cleanPeriodInSec))), - cleanPeriodInSec, modelProvider); + this(config, ThingsBoardExecutors.newSingleThreadScheduledExecutor(String.format("TbInMemoryRegistrationStore Cleaner (%ds)", cleanPeriodInSec)), cleanPeriodInSec, modelProvider); } public TbInMemoryRegistrationStore(LwM2MTransportServerConfig config, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, LwM2mVersionedModelProvider modelProvider) { diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java index b52764187d..7b87b43650 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java @@ -34,7 +34,6 @@ import org.eclipse.leshan.core.observation.ObservationIdentifier; import org.eclipse.leshan.core.observation.SingleObservation; import org.eclipse.leshan.core.peer.LwM2mIdentity; import org.eclipse.leshan.core.request.ContentFormat; -import org.eclipse.leshan.core.util.NamedThreadFactory; import org.eclipse.leshan.core.util.Validate; import org.eclipse.leshan.server.redis.RedisRegistrationStore; import org.eclipse.leshan.server.redis.serialization.ObservationSerDes; @@ -54,6 +53,7 @@ import org.springframework.data.redis.core.Cursor; import org.springframework.data.redis.core.ScanOptions; import org.springframework.integration.redis.util.RedisLockRegistry; import org.thingsboard.common.util.JacksonUtil; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.transport.lwm2m.config.LwM2MTransportServerConfig; import org.thingsboard.server.transport.lwm2m.server.LwM2mVersionedModelProvider; @@ -67,7 +67,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -130,9 +129,7 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab } public TbLwM2mRedisRegistrationStore(LwM2MTransportServerConfig config, RedisConnectionFactory connectionFactory, long cleanPeriodInSec, long lifetimeGracePeriodInSec, int cleanLimit, LwM2mVersionedModelProvider modelProvider) { - this(config, connectionFactory, Executors.newScheduledThreadPool(1, - new NamedThreadFactory(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec))), - cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, modelProvider); + this(config, connectionFactory, ThingsBoardExecutors.newSingleThreadScheduledExecutor(String.format("RedisRegistrationStore Cleaner (%ds)", cleanPeriodInSec)), cleanPeriodInSec, lifetimeGracePeriodInSec, cleanLimit, modelProvider); } public TbLwM2mRedisRegistrationStore(LwM2MTransportServerConfig config, RedisConnectionFactory connectionFactory, ScheduledExecutorService schedExecutor, long cleanPeriodInSec, diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index 26008e2c82..04f113b918 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -50,7 +50,6 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardExecutors; -import org.thingsboard.common.util.ThingsBoardThreadFactory; import org.thingsboard.server.common.adaptor.JsonConverter; import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.TbTransportService; @@ -76,7 +75,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -112,7 +110,7 @@ public class SnmpTransportService implements TbTransportService, CommandResponde @PostConstruct private void init() throws IOException { - scheduler = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(schedulerThreadPoolSize, ThingsBoardThreadFactory.forName("snmp-querying"))); + scheduler = MoreExecutors.listeningDecorator(ThingsBoardExecutors.newScheduledThreadPool(schedulerThreadPoolSize, "snmp-querying")); executor = ThingsBoardExecutors.newWorkStealingPool(responseProcessingThreadPoolSize, "snmp-response-processing"); initializeSnmp(); diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java index fde2362091..ecba97ba8d 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardExecutors.java @@ -16,7 +16,9 @@ package org.thingsboard.common.util; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ScheduledExecutorService; public class ThingsBoardExecutors { @@ -48,4 +50,12 @@ public class ThingsBoardExecutors { return newWorkStealingPool(parallelism, clazz.getSimpleName()); } + public static ScheduledExecutorService newSingleThreadScheduledExecutor(String name) { + return Executors.unconfigurableScheduledExecutorService(new ThingsBoardScheduledThreadPoolExecutor(1, ThingsBoardThreadFactory.forName(name))); + } + + public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, String name) { + return new ThingsBoardScheduledThreadPoolExecutor(corePoolSize, ThingsBoardThreadFactory.forName(name)); + } + } diff --git a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutor.java b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutor.java index 07c975d822..7cee5543d9 100644 --- a/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutor.java +++ b/common/util/src/main/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutor.java @@ -1,3 +1,18 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.thingsboard.common.util; import lombok.extern.slf4j.Slf4j; @@ -5,31 +20,18 @@ import lombok.extern.slf4j.Slf4j; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; @Slf4j -public final class ThingsBoardScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { - - public ThingsBoardScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { - super(corePoolSize, threadFactory, handler); - } - - public ThingsBoardScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { - super(corePoolSize, handler); - } +final class ThingsBoardScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor { public ThingsBoardScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, threadFactory); } - public ThingsBoardScheduledThreadPoolExecutor(int corePoolSize) { - super(corePoolSize); - } - @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); @@ -42,21 +44,21 @@ public final class ThingsBoardScheduledThreadPoolExecutor extends ScheduledThrea if (wrappedThrowable instanceof CancellationException) { log.debug("Task was cancelled.", wrappedThrowable); } else { - log.error("Unexpected exception occurred during task execution!", wrappedThrowable); + log.error("Uncaught error occurred during task execution!", wrappedThrowable); } } if (directThrowable != null) { - log.error("Unexpected exception occurred during task execution!", directThrowable); + log.error("Uncaught error occurred during task execution!", directThrowable); } } private static Throwable extractThrowableWrappedInFuture(Runnable runnable) { - if (runnable instanceof Future && ((Future) runnable).isDone()) { + if (runnable instanceof Future future && future.isDone()) { try { - ((Future) runnable).get(); + future.get(); } catch (InterruptedException e) { // should not happen due to isDone() check - throw new AssertionError(e); + throw new AssertionError("InterruptedException caught after isDone() check on a future", e); } catch (CancellationException e) { return e; } catch (ExecutionException e) { @@ -71,7 +73,7 @@ public final class ThingsBoardScheduledThreadPoolExecutor extends ScheduledThrea if (command == null) { // preserve the original NPE behavior of ScheduledThreadPoolExecutor with a more helpful message throw new NullPointerException("command is null"); } - return super.scheduleAtFixedRate(new SafeRunnable(command), initialDelay, period, unit); + return super.scheduleAtFixedRate(new SafePeriodicRunnable(command), initialDelay, period, unit); } @Override @@ -79,21 +81,16 @@ public final class ThingsBoardScheduledThreadPoolExecutor extends ScheduledThrea if (command == null) { // preserve the original NPE behavior of ScheduledThreadPoolExecutor with a more helpful message throw new NullPointerException("command is null"); } - return super.scheduleWithFixedDelay(new SafeRunnable(command), initialDelay, delay, unit); + return super.scheduleWithFixedDelay(new SafePeriodicRunnable(command), initialDelay, delay, unit); } - private record SafeRunnable(Runnable runnable) implements Runnable { + private record SafePeriodicRunnable(Runnable runnable) implements Runnable { public void run() { try { runnable.run(); } catch (Exception ex) { - log.error("Unexpected exception occurred while periodically executing task!", ex); - // TODO: is calling uncaught execution handler here correct? - if (Thread.getDefaultUncaughtExceptionHandler() != null) { - log.debug("Default exception handler is set, delegating exception handling to it"); - Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), ex); - } + log.error("Uncaught exception occurred during periodic task execution!", ex); } // Intentionally, no catch block for Throwable; uncaught Throwables will be handled in afterExecute() } diff --git a/common/util/src/test/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutorTest.java b/common/util/src/test/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutorTest.java new file mode 100644 index 0000000000..a066ac8851 --- /dev/null +++ b/common/util/src/test/java/org/thingsboard/common/util/ThingsBoardScheduledThreadPoolExecutorTest.java @@ -0,0 +1,147 @@ +/** + * Copyright © 2016-2024 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.common.util; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.Assertions.assertThat; + +class ThingsBoardScheduledThreadPoolExecutorTest { + + ThingsBoardScheduledThreadPoolExecutor scheduler; + + @BeforeEach + void setup() { + scheduler = new ThingsBoardScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory()); + } + + @AfterEach + void cleanup() { + scheduler.shutdownNow(); + } + + @Test + @DisplayName("scheduleAtFixedRate() should continue periodic execution even if command throws exception") + void scheduleAtFixedRateShouldNotStopPeriodicExecutionWhenCommandThrowsException() { + // GIVEN + AtomicInteger executionCounter = new AtomicInteger(0); + + Runnable exceptionThrowingCommand = () -> { + try { + throw new RuntimeException("Unexpected exception"); + } finally { + executionCounter.incrementAndGet(); + } + }; + + // WHEN + ScheduledFuture future = scheduler.scheduleAtFixedRate(exceptionThrowingCommand, 0, 100, TimeUnit.MILLISECONDS); + + // THEN + Awaitility.await().alias("Wait until command is executed at least twice") + .atMost(10, TimeUnit.SECONDS) + .failFast("Future should not be done or cancelled; task should continue running", () -> future.isDone() || future.isCancelled()) + .untilAsserted(() -> assertThat(executionCounter.get()) + .as("Task should be executed at least twice") + .isGreaterThan(2)); + } + + @Test + @DisplayName("scheduleAtFixedRate() should stop periodic execution if command throws an error") + void scheduleAtFixedRateShouldStopPeriodicExecutionWhenCommandThrowsException() { + // GIVEN + AtomicInteger executionCounter = new AtomicInteger(0); + + Runnable exceptionThrowingCommand = () -> { + try { + throw new Error("Unexpected error"); + } finally { + executionCounter.incrementAndGet(); + } + }; + + // WHEN + scheduler.scheduleAtFixedRate(exceptionThrowingCommand, 0, 100, TimeUnit.MILLISECONDS); + + // THEN + Awaitility.await().alias("Command that throws an error should execute exactly once") + .pollDelay(5, TimeUnit.SECONDS) + .atMost(10, TimeUnit.SECONDS) + .failFast("Command that throws an error should not execute more than once", () -> executionCounter.get() > 1) + .until(() -> executionCounter.get() == 1); + } + + @Test + @DisplayName("scheduleWithFixedDelay() should continue periodic execution even if command throws exception") + void scheduleWithFixedDelayShouldNotStopPeriodicExecutionWhenCommandThrowsException() { + // GIVEN + AtomicInteger executionCounter = new AtomicInteger(0); + + Runnable exceptionThrowingCommand = () -> { + try { + throw new RuntimeException("Unexpected exception"); + } finally { + executionCounter.incrementAndGet(); + } + }; + + // WHEN + ScheduledFuture future = scheduler.scheduleWithFixedDelay(exceptionThrowingCommand, 0, 100, TimeUnit.MILLISECONDS); + + // THEN + Awaitility.await().alias("Wait until command is executed at least twice") + .atMost(10, TimeUnit.SECONDS) + .failFast("Future should not be done or cancelled; task should continue running", () -> future.isDone() || future.isCancelled()) + .untilAsserted(() -> assertThat(executionCounter.get()) + .as("Task should be executed at least twice") + .isGreaterThan(2)); + } + + @Test + @DisplayName("scheduleWithFixedDelay() should stop periodic execution if command throws an error") + void scheduleWithFixedDelayShouldStopPeriodicExecutionWhenCommandThrowsException() { + // GIVEN + AtomicInteger executionCounter = new AtomicInteger(0); + + Runnable exceptionThrowingCommand = () -> { + try { + throw new Error("Unexpected error"); + } finally { + executionCounter.incrementAndGet(); + } + }; + + // WHEN + scheduler.scheduleWithFixedDelay(exceptionThrowingCommand, 0, 100, TimeUnit.MILLISECONDS); + + // THEN + Awaitility.await().alias("Command that throws an error should execute exactly once") + .pollDelay(5, TimeUnit.SECONDS) + .atMost(10, TimeUnit.SECONDS) + .failFast("Command that throws an error should not execute more than once", () -> executionCounter.get() > 1) + .until(() -> executionCounter.get() == 1); + } + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java index 0ee34f2b9c..2b46b49827 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/relation/BaseRelationService.java @@ -32,7 +32,7 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.event.TransactionalEventListener; import org.springframework.transaction.support.TransactionSynchronizationManager; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.cache.TbTransactionalCache; import org.thingsboard.server.common.data.StringUtils; import org.thingsboard.server.common.data.audit.ActionType; @@ -62,7 +62,6 @@ import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; @@ -101,7 +100,7 @@ public class BaseRelationService implements RelationService { @PostConstruct public void init() { - timeoutExecutorService = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("relations-query-timeout")); + timeoutExecutorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("relations-query-timeout"); } @PreDestroy diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/ScheduledLogExecutorComponent.java b/dao/src/main/java/org/thingsboard/server/dao/sql/ScheduledLogExecutorComponent.java index 484abd6a59..5bb3ff09bb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/ScheduledLogExecutorComponent.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/ScheduledLogExecutorComponent.java @@ -18,9 +18,8 @@ package org.thingsboard.server.dao.sql; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import org.springframework.stereotype.Component; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -31,7 +30,7 @@ public class ScheduledLogExecutorComponent { @PostConstruct public void init() { - schedulerLogExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("sql-log")); + schedulerLogExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("sql-log"); } @PreDestroy diff --git a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java index 34316bc615..d1f09d98e8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java +++ b/dao/src/main/java/org/thingsboard/server/dao/util/AbstractBufferedRateExecutor.java @@ -92,7 +92,7 @@ public abstract class AbstractBufferedRateExecutor(queueLimit); this.dispatcherExecutor = Executors.newFixedThreadPool(dispatcherThreads, ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-dispatcher")); this.callbackExecutor = ThingsBoardExecutors.newWorkStealingPool(callbackThreads, "nosql-" + getBufferName() + "-callback"); - this.timeoutExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("nosql-" + getBufferName() + "-timeout")); + this.timeoutExecutor = ThingsBoardExecutors.newSingleThreadScheduledExecutor("nosql-" + getBufferName() + "-timeout"); this.stats = new BufferedRateExecutorStats(statsFactory); String concurrencyLevelKey = StatsType.RATE_EXECUTOR.getName() + "." + CONCURRENCY_LEVEL + getBufferName(); //metric name may change with buffer name suffix this.concurrencyLevel = statsFactory.createGauge(concurrencyLevelKey, new AtomicInteger(0)); diff --git a/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java index a50a794db5..f8459f56ec 100644 --- a/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java +++ b/monitoring/src/main/java/org/thingsboard/monitoring/ThingsboardMonitoringApplication.java @@ -23,12 +23,11 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.EnableScheduling; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.monitoring.service.BaseMonitoringService; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -51,7 +50,7 @@ public class ThingsboardMonitoringApplication { @EventListener(ApplicationReadyEvent.class) public void startMonitoring() { - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("monitoring-executor")); + ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor("monitoring-executor"); scheduler.scheduleWithFixedDelay(() -> { monitoringServices.forEach(monitoringService -> { monitoringService.runChecks(); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractLwm2mClientTest.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractLwm2mClientTest.java index c0ae8e9776..34d78abfb4 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractLwm2mClientTest.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/AbstractLwm2mClientTest.java @@ -20,7 +20,7 @@ import org.apache.commons.io.IOUtils; import org.eclipse.leshan.client.object.Security; import org.eclipse.leshan.core.util.Hex; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.server.common.data.Device; import org.thingsboard.server.common.data.DeviceProfile; import org.thingsboard.server.common.data.DeviceProfileProvisionType; @@ -56,7 +56,6 @@ import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -186,7 +185,7 @@ public class AbstractLwm2mClientTest extends AbstractContainerTest{ if (executor != null) { executor.shutdown(); } - executor = Executors.newScheduledThreadPool(10, ThingsBoardThreadFactory.forName("test-scheduled-" + deviceProfileName)); + executor = ThingsBoardExecutors.newScheduledThreadPool(10, "test-scheduled-" + deviceProfileName); lwm2mDeviceProfile = getDeviceProfile(deviceProfileName); tenantId = lwm2mDeviceProfile.getTenantId(); diff --git a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/lwm2m/FwLwM2MDevice.java b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/lwm2m/FwLwM2MDevice.java index 2910a2b7da..6cf96d043a 100644 --- a/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/lwm2m/FwLwM2MDevice.java +++ b/msa/black-box-tests/src/test/java/org/thingsboard/server/msa/connectivity/lwm2m/FwLwM2MDevice.java @@ -24,12 +24,11 @@ import org.eclipse.leshan.core.request.argument.Arguments; import org.eclipse.leshan.core.response.ExecuteResponse; import org.eclipse.leshan.core.response.ReadResponse; import org.eclipse.leshan.core.response.WriteResponse; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import javax.security.auth.Destroyable; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -39,7 +38,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { private static final List supportedResources = Arrays.asList(0, 1, 2, 3, 5, 6, 7, 9); - private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName(getClass().getSimpleName() + "-test-scope")); + private final ScheduledExecutorService scheduler = ThingsBoardExecutors.newSingleThreadScheduledExecutor(getClass().getSimpleName() + "-test-scope"); private final AtomicInteger state = new AtomicInteger(0); diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgCountNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgCountNodeTest.java index 8b2b786955..e1f020f5ff 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgCountNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/action/TbMsgCountNodeTest.java @@ -25,7 +25,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.stubbing.Answer; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNodeConfiguration; import org.thingsboard.rule.engine.api.TbNodeException; @@ -41,7 +41,6 @@ import java.util.ArrayList; import java.util.Map; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -64,7 +63,6 @@ public class TbMsgCountNodeTest { private final DeviceId DEVICE_ID = new DeviceId(UUID.fromString("1b21c7cc-0c9e-4ab1-b867-99451599e146")); private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("04dfbd38-10e5-47b7-925f-11e795db89e1")); - private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("msg-count-node-test"); private final TbMsg tickMsg = TbMsg.newMsg(TbMsgType.MSG_COUNT_SELF_MSG, RULE_NODE_ID, TbMsgMetaData.EMPTY, TbMsg.EMPTY_STRING); private ScheduledExecutorService executorService; @@ -78,7 +76,7 @@ public class TbMsgCountNodeTest { public void setUp() { node = new TbMsgCountNode(); config = new TbMsgCountNodeConfiguration().defaultConfiguration(); - executorService = Executors.newSingleThreadScheduledExecutor(factory); + executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("msg-count-node-test"); } @AfterEach diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java index c1a2282083..53b8e5f814 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/debug/TbMsgGeneratorNodeTest.java @@ -29,7 +29,7 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.springframework.test.util.ReflectionTestUtils; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.ScriptEngine; import org.thingsboard.rule.engine.api.TbContext; @@ -53,7 +53,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; @@ -79,8 +78,6 @@ public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest { private final RuleNodeId RULE_NODE_ID = new RuleNodeId(UUID.fromString("1c649392-1f53-4377-b12f-1ba172611746")); private final TenantId TENANT_ID = TenantId.fromUUID(UUID.fromString("4470dfc2-f621-42b2-b82c-b5776d424140")); - private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("msg-generator-node-test"); - private TbMsgGeneratorNode node; private TbMsgGeneratorNodeConfiguration config; private ScheduledExecutorService executorService; @@ -94,7 +91,7 @@ public class TbMsgGeneratorNodeTest extends AbstractRuleNodeUpgradeTest { public void setUp() { node = spy(new TbMsgGeneratorNode()); config = new TbMsgGeneratorNodeConfiguration().defaultConfiguration(); - executorService = Executors.newSingleThreadScheduledExecutor(factory); + executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("msg-generator-node-test"); } @AfterEach diff --git a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java index 7832bbc1dc..26fd9081a9 100644 --- a/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java +++ b/rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/transform/TbMsgDeduplicationNodeTest.java @@ -29,7 +29,7 @@ import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; import org.mockito.stubbing.Answer; import org.thingsboard.common.util.JacksonUtil; -import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.common.util.ThingsBoardExecutors; import org.thingsboard.rule.engine.AbstractRuleNodeUpgradeTest; import org.thingsboard.rule.engine.api.TbContext; import org.thingsboard.rule.engine.api.TbNode; @@ -54,7 +54,6 @@ import java.util.Random; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -78,8 +77,7 @@ public class TbMsgDeduplicationNodeTest extends AbstractRuleNodeUpgradeTest { private TbContext ctx; - private final ThingsBoardThreadFactory factory = ThingsBoardThreadFactory.forName("de-duplication-node-test"); - private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(factory); + private final ScheduledExecutorService executorService = ThingsBoardExecutors.newSingleThreadScheduledExecutor("de-duplication-node-test"); private final int deduplicationInterval = 1; private TenantId tenantId;