diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 6fbe9d9864..5ed6c7ab17 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -422,6 +422,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { public void teardownWebTest() throws Exception { log.debug("Executing web test teardown"); + // Drain any pending housekeeper work left by the test body (e.g., bulk tenant deletes) + // before proceeding with teardown deletions, to avoid 90s per-tenant wait timing out. + awaitHousekeeperDrained(); + loginSysAdmin(); deleteTenant(tenantId); deleteDifferentTenant(); @@ -453,6 +457,11 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { .until(() -> storage.getLag("tb_housekeeper") == 0); } + protected void awaitHousekeeperDrained() { + Awaitility.await("housekeeper drained").atMost(5, TimeUnit.MINUTES).during(300, TimeUnit.MILLISECONDS) + .until(() -> storage.getLag("tb_housekeeper") == 0); + } + private List getAllTenants() throws Exception { List loadedTenants = new ArrayList<>(); PageLink pageLink = new PageLink(10); diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java index 9bed9bc483..6a7d631eb1 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java @@ -193,7 +193,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { } catch (Exception e) { log.error("Error during firmware update", e); } - }, 0, TimeUnit.SECONDS); // start immediately, without further delay + }, 1, TimeUnit.SECONDS); // delay 1 sec to allow CoAP Execute response to be delivered before client stops } protected void setLeshanClient(LeshanClient leshanClient) { diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java index a7307b2308..5f43aa7e00 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java @@ -420,6 +420,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt Awaitility.await() .atMost(10, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { List> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() { }); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index 4e4d0debf9..a9bdbb55db 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -191,6 +191,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte AtomicReference device = new AtomicReference<>(); await(alias + "find device [" + deviceName + "] after created") .atMost(200, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); return device.get() != null; @@ -236,6 +237,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte AtomicReference device = new AtomicReference<>(); await(alias + "find device [" + deviceName + "] after created") .atMost(200, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); return device.get() != null; diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java index 756c8e603c..d42adfbcf0 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java @@ -468,6 +468,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra AtomicReference> actualKeys = new AtomicReference<>(); await(alias + SparkplugMessageType.NBIRTH.name()) .atMost(40, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { })); @@ -483,6 +484,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra AtomicReference> actualKeys = new AtomicReference<>(); await(alias + SparkplugMessageType.DBIRTH.name()) .atMost(40, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { })); diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index e79289340c..7f0ab964d6 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -44,6 +44,7 @@ queue.transport_api.response_poll_interval=5 queue.transport.poll_interval=5 queue.core.poll-interval=5 queue.core.partitions=2 +queue.core.housekeeper.task-reprocessing-delay-ms=0 queue.rule-engine.poll-interval=5 queue.rule-engine.stats.enabled=true diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java index 4b2ef07994..8158e82a81 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java @@ -323,22 +323,24 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab @Override public Iterator getAllRegistrations() { - try (var connection = connectionFactory.getConnection()) { + try (var scanConnection = connectionFactory.getConnection(); + var getConnection = connectionFactory.getConnection()) { Collection list = new LinkedList<>(); ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(REG_EP + "*").build(); List> scans = new ArrayList<>(); - if (connection instanceof RedisClusterConnection) { - ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> { - scans.add(((RedisClusterConnection) connection).scan(node, scanOptions)); - }); + if (scanConnection instanceof RedisClusterConnection clusterConnection) { + clusterConnection.clusterGetNodes().forEach(node -> + scans.add(clusterConnection.scan(node, scanOptions))); } else { - scans.add(connection.scan(scanOptions)); + scans.add(scanConnection.scan(scanOptions)); } scans.forEach(scan -> { scan.forEachRemaining(key -> { - byte[] element = connection.get(key); - list.add(deserializeReg(element)); + byte[] element = getConnection.get(key); + if (element != null) { + list.add(deserializeReg(element)); + } }); }); return list.iterator(); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java index 3293cd8b53..4beefb4896 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java @@ -61,21 +61,21 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore { @Override public Set getAll() { - try (var connection = connectionFactory.getConnection()) { + try (var scanConnection = connectionFactory.getConnection(); + var getConnection = connectionFactory.getConnection()) { Set clients = new HashSet<>(); ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(CLIENT_EP + "*").build(); List> scans = new ArrayList<>(); - if (connection instanceof RedisClusterConnection) { - ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> { - scans.add(((RedisClusterConnection) connection).scan(node, scanOptions)); - }); + if (scanConnection instanceof RedisClusterConnection clusterConnection) { + clusterConnection.clusterGetNodes().forEach(node -> + scans.add(clusterConnection.scan(node, scanOptions))); } else { - scans.add(connection.scan(scanOptions)); + scans.add(scanConnection.scan(scanOptions)); } scans.forEach(scan -> { scan.forEachRemaining(key -> { - byte[] element = connection.get(key); + byte[] element = getConnection.get(key); if (element != null) { try { clients.add(deserialize(element)); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java index 73b6f3c8df..31a78234d0 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java @@ -35,22 +35,24 @@ public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore { @Override public List getAll() { - try (var connection = connectionFactory.getConnection()) { + try (var scanConnection = connectionFactory.getConnection(); + var getConnection = connectionFactory.getConnection()) { List configs = new ArrayList<>(); ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(MODEL_EP + "*").build(); List> scans = new ArrayList<>(); - if (connection instanceof RedisClusterConnection) { - ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> { - scans.add(((RedisClusterConnection) connection).scan(node, scanOptions)); - }); + if (scanConnection instanceof RedisClusterConnection clusterConnection) { + clusterConnection.clusterGetNodes().forEach(node -> + scans.add(clusterConnection.scan(node, scanOptions))); } else { - scans.add(connection.scan(scanOptions)); + scans.add(scanConnection.scan(scanOptions)); } scans.forEach(scan -> { scan.forEachRemaining(key -> { - byte[] element = connection.get(key); - configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class)); + byte[] element = getConnection.get(key); + if (element != null) { + configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class)); + } }); }); return configs; diff --git a/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java new file mode 100644 index 0000000000..9fe29c1188 --- /dev/null +++ b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java @@ -0,0 +1,137 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.store; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; +import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientState; +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; + +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.thingsboard.server.transport.lwm2m.server.store.util.LwM2MClientSerDes.serialize; + +/** + * Verifies that {@link TbRedisLwM2MClientStore#getAll()} uses separate connections for + * SCAN and GET operations to prevent Jedis 5.x response-ordering corruption that occurs + * when both commands share the same connection. + */ +@ExtendWith(MockitoExtension.class) +class TbRedisLwM2MClientStoreTest { + + @Mock + RedisConnectionFactory connectionFactory; + + @Mock + RedisConnection scanConnection; + + @Mock + RedisConnection getConnection; + + TbRedisLwM2MClientStore store; + + @BeforeEach + void setUp() { + // First getConnection() call → scanConnection, second → getConnection + when(connectionFactory.getConnection()) + .thenReturn(scanConnection) + .thenReturn(getConnection); + store = new TbRedisLwM2MClientStore(connectionFactory); + } + + @Test + void getAll_returnsSingleClient() { + LwM2mClient client = new LwM2mClient("nodeId", "testEndpoint"); + client.setState(LwM2MClientState.REGISTERED); + byte[] key = "CLIENT#EP#testEndpoint".getBytes(); + byte[] value = serialize(client); + + // Cursor created before thenReturn to avoid Mockito unfinished-stubbing error + Cursor cursor = cursorOf(key); + when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + when(getConnection.get(key)).thenReturn(value); + + Set result = store.getAll(); + + assertThat(result).hasSize(1); + assertThat(result.iterator().next().getEndpoint()).isEqualTo("testEndpoint"); + } + + @Test + void getAll_getIsNeverCalledOnScanConnection() { + Cursor cursor = cursorOf(); + when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + + store.getAll(); + + verify(scanConnection, never()).get(any(byte[].class)); + } + + @Test + void getAll_scanIsNeverCalledOnGetConnection() { + Cursor cursor = cursorOf(); + when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + + store.getAll(); + + verify(getConnection, never()).scan(any(ScanOptions.class)); + } + + @Test + void getAll_skipsKeyWhenValueIsNull() { + byte[] key = "CLIENT#EP#gone".getBytes(); + Cursor cursor = cursorOf(key); + when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + // getConnection.get(key) returns null by default — no stubbing needed + + Set result = store.getAll(); + + assertThat(result).isEmpty(); + } + + /** + * Creates a mock {@link Cursor} that iterates over the given keys via {@code forEachRemaining}. + * The cursor is created separately (not inside a {@code thenReturn()} argument) to avoid + * Mockito's "unfinished stubbing" error caused by nested {@code when()} calls. + */ + @SuppressWarnings("unchecked") + private static Cursor cursorOf(byte[]... keys) { + Cursor cursor = mock(Cursor.class); + List keyList = List.of(keys); + doAnswer(inv -> { + Consumer action = inv.getArgument(0); + keyList.forEach(action); + return null; + }).when(cursor).forEachRemaining(any(Consumer.class)); + return cursor; + } +} diff --git a/pom.xml b/pom.xml index c8783c03ff..b72d39a26a 100755 --- a/pom.xml +++ b/pom.xml @@ -655,7 +655,7 @@ com.github.eirslett frontend-maven-plugin - 1.12.0 + 2.0.0 org.apache.maven.plugins