diff --git a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java index 87f566c90f..dcf9459965 100644 --- a/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java +++ b/common/queue/src/main/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplate.java @@ -98,7 +98,10 @@ public abstract class AbstractTbQueueConsumerTemplate i doSubscribe(partitions); subscribed = true; } - records = partitions.isEmpty() ? emptyList() : doPoll(durationInMillis); + if (partitions.isEmpty()) { + return sleepAndReturnEmpty(startNanos, durationInMillis); + } + records = doPoll(durationInMillis); } finally { consumerLock.unlock(); } diff --git a/common/queue/src/test/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplateTest.java b/common/queue/src/test/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplateTest.java new file mode 100644 index 0000000000..a016408ad4 --- /dev/null +++ b/common/queue/src/test/java/org/thingsboard/server/queue/common/AbstractTbQueueConsumerTemplateTest.java @@ -0,0 +1,146 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.queue.common; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; +import org.thingsboard.server.queue.TbQueueMsg; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.BDDMockito.never; +import static org.mockito.BDDMockito.spy; +import static org.mockito.BDDMockito.times; +import static org.mockito.BDDMockito.verify; + +@Slf4j +@ExtendWith(MockitoExtension.class) +public class AbstractTbQueueConsumerTemplateTest { + + private static final long POLL_DURATION_MS = 100L; + private static final long SLEEP_TOLERANCE_MS = 20L; + + @Test + public void givenEmptyPartitionsAndLongPollingSupported_whenPoll_thenSleepsAndDoesNotCallDoPoll() { + // Regression: with empty partitions AND isLongPollingSupported()==true (e.g. Kafka), + // poll() previously returned instantly with no sleep, causing the consumer loop to busy-spin. + TestConsumer consumer = spy(new TestConsumer("test-topic", true)); + consumer.subscribe(Collections.emptySet()); + + long startNs = System.nanoTime(); + List result = consumer.poll(POLL_DURATION_MS); + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + + assertThat(result, is(empty())); + verify(consumer, never()).doPoll(anyLong()); + assertThat("poll() must sleep ~durationInMillis when partitions are empty (no busy-wait)", + elapsedMs, greaterThanOrEqualTo(POLL_DURATION_MS - SLEEP_TOLERANCE_MS)); + } + + @Test + public void givenEmptyPartitionsAndNoLongPolling_whenPoll_thenSleepsAndDoesNotCallDoPoll() { + TestConsumer consumer = spy(new TestConsumer("test-topic", false)); + consumer.subscribe(Collections.emptySet()); + + long startNs = System.nanoTime(); + List result = consumer.poll(POLL_DURATION_MS); + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + + assertThat(result, is(empty())); + verify(consumer, never()).doPoll(anyLong()); + assertThat(elapsedMs, greaterThanOrEqualTo(POLL_DURATION_MS - SLEEP_TOLERANCE_MS)); + } + + @Test + public void givenNonEmptyPartitions_whenPoll_thenCallsDoPoll() { + TestConsumer consumer = spy(new TestConsumer("test-topic", true)); + consumer.subscribe(Collections.singleton(new TopicPartitionInfo("test-topic", null, 0, true))); + + List result = consumer.poll(POLL_DURATION_MS); + + assertThat(result, is(empty())); + verify(consumer, times(1)).doPoll(POLL_DURATION_MS); + } + + @Test + public void givenPartitionsBecomeEmptyAfterRebalance_whenPollAgain_thenStopsCallingDoPoll() { + // Reproduces the observed trigger: a rebalance leaves the consumer with an empty + // partition assignment. Subsequent poll() calls must not busy-spin or call doPoll(). + TestConsumer consumer = spy(new TestConsumer("test-topic", true)); + consumer.subscribe(Collections.singleton(new TopicPartitionInfo("test-topic", null, 0, true))); + consumer.poll(POLL_DURATION_MS); + verify(consumer, times(1)).doPoll(POLL_DURATION_MS); + + consumer.subscribe(Collections.emptySet()); + + long startNs = System.nanoTime(); + List result = consumer.poll(POLL_DURATION_MS); + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNs); + + assertThat(result, is(empty())); + verify(consumer, times(1)).doPoll(anyLong()); + assertThat(elapsedMs, greaterThanOrEqualTo(POLL_DURATION_MS - SLEEP_TOLERANCE_MS)); + } + + static class TestConsumer extends AbstractTbQueueConsumerTemplate { + + private final boolean longPollingSupported; + + TestConsumer(String topic, boolean longPollingSupported) { + super(topic); + this.longPollingSupported = longPollingSupported; + } + + @Override + protected List doPoll(long durationInMillis) { + return Collections.emptyList(); + } + + @Override + protected TbQueueMsg decode(Object record) { + return null; + } + + @Override + protected void doSubscribe(Set partitions) { + } + + @Override + protected void doCommit() { + } + + @Override + protected void doUnsubscribe() { + } + + @Override + protected boolean isLongPollingSupported() { + return longPollingSupported; + } + } + +} diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java index 7d92639f1a..af5e1f04b2 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/SnmpTransportContext.java @@ -160,7 +160,6 @@ public class SnmpTransportContext extends TransportContext { return; } sessions.put(device.getId(), sessionContext); - snmpTransportService.createQueryingTasks(sessionContext); log.info("Established SNMP device session for device {}", device.getId()); } @@ -224,6 +223,8 @@ public class SnmpTransportContext extends TransportContext { registerTransportSession(sessionContext, msg); }); transportService.lifecycleEvent(sessionContext.getTenantId(), sessionContext.getDeviceId(), ComponentLifecycleEvent.STARTED, true, null); + snmpTransportService.createQueryingTasks(sessionContext); + log.info("[{}] Session registered and querying tasks created", sessionContext.getDeviceId()); } else { log.warn("[{}] Failed to process device auth", sessionContext.getDeviceId()); } diff --git a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java index 0c39b7e740..a15a21b6e3 100644 --- a/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java +++ b/common/transport/snmp/src/main/java/org/thingsboard/server/transport/snmp/service/SnmpTransportService.java @@ -164,7 +164,7 @@ public class SnmpTransportService implements TbTransportService, CommandResponde ScheduledTask scheduledTask = new ScheduledTask(); scheduledTask.init(() -> { try { - if (sessionContext.isActive()) { + if (sessionContext.isActive() && sessionContext.isConnected()) { return sendRequest(sessionContext, repeatingCommunicationConfig); } } catch (Exception e) { @@ -390,7 +390,11 @@ public class SnmpTransportService implements TbTransportService, CommandResponde JsonObject responseData = responseDataMappers.get(requestContext.getCommunicationSpec()).map(response, requestContext); if (responseData.size() == 0) { - log.warn("[{}] No values in the response", sessionContext.getDeviceId()); + log.warn("[{}] No values in the response for spec {}. Response PDUs count: {}, Mappings count: {}", + sessionContext.getDeviceId(), requestContext.getCommunicationSpec(), + response.size(), requestContext.getResponseMappings().size()); + log.debug("[{}] No values in the response for spec {}. Response PDUs: {}", + sessionContext.getDeviceId(), requestContext.getCommunicationSpec(), response); throw new IllegalArgumentException("No values in the response"); } diff --git a/msa/js-executor/package.json b/msa/js-executor/package.json index b857f1e085..8a1007b2c2 100644 --- a/msa/js-executor/package.json +++ b/msa/js-executor/package.json @@ -6,7 +6,7 @@ "main": "server.ts", "bin": "server.js", "scripts": { - "pkg": "tsc && pkg -t node22-linux-x64,node22-win-x64 --out-path ./target ./target/src && node install.js", + "pkg": "tsc && pkg -t node22-linux-x64 --output ./target/thingsboard-js-executor-linux ./target/src && pkg -t node22-win-x64 --no-bytecode --public-packages \"*\" --public --output ./target/thingsboard-js-executor-win.exe ./target/src && node install.js", "test": "echo \"Error: no test specified\" && exit 1", "start": "nodemon --watch '.' --ext 'ts' --exec 'ts-node server.ts'", "start-prod": "nodemon --watch '.' --ext 'ts' --exec 'NODE_ENV=production ts-node server.ts'", diff --git a/msa/web-ui/package.json b/msa/web-ui/package.json index 9e9b643402..f93bc52c9b 100644 --- a/msa/web-ui/package.json +++ b/msa/web-ui/package.json @@ -6,7 +6,7 @@ "main": "server.ts", "bin": "server.js", "scripts": { - "pkg": "tsc && pkg -t node22-linux-x64,node22-win-x64 --out-path ./target ./target/src && node install.js", + "pkg": "tsc && pkg -t node22-linux-x64 --output ./target/thingsboard-web-ui-linux ./target/src && pkg -t node22-win-x64 --no-bytecode --public-packages \"*\" --public --output ./target/thingsboard-web-ui-win.exe ./target/src && node install.js", "test": "echo \"Error: no test specified\" && exit 1", "start": "nodemon --watch '.' --ext 'ts' --exec 'WEB_FOLDER=./target/web ts-node server.ts'", "start-prod": "nodemon --watch '.' --ext 'ts' --exec 'WEB_FOLDER=./target/web NODE_ENV=production ts-node server.ts'", diff --git a/pom.xml b/pom.xml index 621142dd83..1694652e50 100755 --- a/pom.xml +++ b/pom.xml @@ -143,6 +143,7 @@ 3.8.0 1.18.4 1.8.0-TB + 2.5.9 2.38.0 1.24 1.11.0 @@ -1366,6 +1367,11 @@ postgresql ${postgresql.version} + + org.apache.opennlp + opennlp-tools + ${opennlp-tools.version} + commons-io commons-io