From d5b3a4ae7078ee30b5acc25af2235deed9082617 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Wed, 4 Mar 2026 14:44:54 +0100 Subject: [PATCH 1/6] Fix LwM2M Redis stores using separate connections for SCAN and GET MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Using the same connection for both SCAN cursor iteration and GET value fetches causes Jedis 5.x response-ordering corruption: the SCAN response parser receives a GET response (byte[]) where it expects a List, and vice versa, resulting in ClassCastException on startup. Fix: open two connections per getAll() call — one dedicated to the scan cursor and one for value fetches — eliminating any interleaving. Affected: TbRedisLwM2MClientStore, TbRedisLwM2MModelConfigStore, TbLwM2mRedisRegistrationStore. Co-Authored-By: Claude Sonnet 4.6 --- .../store/TbLwM2mRedisRegistrationStore.java | 18 ++- .../server/store/TbRedisLwM2MClientStore.java | 14 +- .../store/TbRedisLwM2MModelConfigStore.java | 18 ++- .../store/TbRedisLwM2MClientStoreTest.java | 137 ++++++++++++++++++ 4 files changed, 164 insertions(+), 23 deletions(-) create mode 100644 common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java index 4b2ef07994..8158e82a81 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java @@ -323,22 +323,24 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab @Override public Iterator getAllRegistrations() { - try (var connection = connectionFactory.getConnection()) { + try (var scanConnection = connectionFactory.getConnection(); + var getConnection = connectionFactory.getConnection()) { Collection list = new LinkedList<>(); ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(REG_EP + "*").build(); List> scans = new ArrayList<>(); - if (connection instanceof RedisClusterConnection) { - ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> { - scans.add(((RedisClusterConnection) connection).scan(node, scanOptions)); - }); + if (scanConnection instanceof RedisClusterConnection clusterConnection) { + clusterConnection.clusterGetNodes().forEach(node -> + scans.add(clusterConnection.scan(node, scanOptions))); } else { - scans.add(connection.scan(scanOptions)); + scans.add(scanConnection.scan(scanOptions)); } scans.forEach(scan -> { scan.forEachRemaining(key -> { - byte[] element = connection.get(key); - list.add(deserializeReg(element)); + byte[] element = getConnection.get(key); + if (element != null) { + list.add(deserializeReg(element)); + } }); }); return list.iterator(); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java index 3293cd8b53..4beefb4896 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java @@ -61,21 +61,21 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore { @Override public Set getAll() { - try (var connection = connectionFactory.getConnection()) { + try (var scanConnection = connectionFactory.getConnection(); + var getConnection = connectionFactory.getConnection()) { Set clients = new HashSet<>(); ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(CLIENT_EP + "*").build(); List> scans = new ArrayList<>(); - if (connection instanceof RedisClusterConnection) { - ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> { - scans.add(((RedisClusterConnection) connection).scan(node, scanOptions)); - }); + if (scanConnection instanceof RedisClusterConnection clusterConnection) { + clusterConnection.clusterGetNodes().forEach(node -> + scans.add(clusterConnection.scan(node, scanOptions))); } else { - scans.add(connection.scan(scanOptions)); + scans.add(scanConnection.scan(scanOptions)); } scans.forEach(scan -> { scan.forEachRemaining(key -> { - byte[] element = connection.get(key); + byte[] element = getConnection.get(key); if (element != null) { try { clients.add(deserialize(element)); diff --git a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java index 73b6f3c8df..31a78234d0 100644 --- a/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java +++ b/common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java @@ -35,22 +35,24 @@ public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore { @Override public List getAll() { - try (var connection = connectionFactory.getConnection()) { + try (var scanConnection = connectionFactory.getConnection(); + var getConnection = connectionFactory.getConnection()) { List configs = new ArrayList<>(); ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(MODEL_EP + "*").build(); List> scans = new ArrayList<>(); - if (connection instanceof RedisClusterConnection) { - ((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> { - scans.add(((RedisClusterConnection) connection).scan(node, scanOptions)); - }); + if (scanConnection instanceof RedisClusterConnection clusterConnection) { + clusterConnection.clusterGetNodes().forEach(node -> + scans.add(clusterConnection.scan(node, scanOptions))); } else { - scans.add(connection.scan(scanOptions)); + scans.add(scanConnection.scan(scanOptions)); } scans.forEach(scan -> { scan.forEachRemaining(key -> { - byte[] element = connection.get(key); - configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class)); + byte[] element = getConnection.get(key); + if (element != null) { + configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class)); + } }); }); return configs; diff --git a/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java new file mode 100644 index 0000000000..9fe29c1188 --- /dev/null +++ b/common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java @@ -0,0 +1,137 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.transport.lwm2m.server.store; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.ScanOptions; +import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientState; +import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient; + +import java.util.List; +import java.util.Set; +import java.util.function.Consumer; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.thingsboard.server.transport.lwm2m.server.store.util.LwM2MClientSerDes.serialize; + +/** + * Verifies that {@link TbRedisLwM2MClientStore#getAll()} uses separate connections for + * SCAN and GET operations to prevent Jedis 5.x response-ordering corruption that occurs + * when both commands share the same connection. + */ +@ExtendWith(MockitoExtension.class) +class TbRedisLwM2MClientStoreTest { + + @Mock + RedisConnectionFactory connectionFactory; + + @Mock + RedisConnection scanConnection; + + @Mock + RedisConnection getConnection; + + TbRedisLwM2MClientStore store; + + @BeforeEach + void setUp() { + // First getConnection() call → scanConnection, second → getConnection + when(connectionFactory.getConnection()) + .thenReturn(scanConnection) + .thenReturn(getConnection); + store = new TbRedisLwM2MClientStore(connectionFactory); + } + + @Test + void getAll_returnsSingleClient() { + LwM2mClient client = new LwM2mClient("nodeId", "testEndpoint"); + client.setState(LwM2MClientState.REGISTERED); + byte[] key = "CLIENT#EP#testEndpoint".getBytes(); + byte[] value = serialize(client); + + // Cursor created before thenReturn to avoid Mockito unfinished-stubbing error + Cursor cursor = cursorOf(key); + when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + when(getConnection.get(key)).thenReturn(value); + + Set result = store.getAll(); + + assertThat(result).hasSize(1); + assertThat(result.iterator().next().getEndpoint()).isEqualTo("testEndpoint"); + } + + @Test + void getAll_getIsNeverCalledOnScanConnection() { + Cursor cursor = cursorOf(); + when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + + store.getAll(); + + verify(scanConnection, never()).get(any(byte[].class)); + } + + @Test + void getAll_scanIsNeverCalledOnGetConnection() { + Cursor cursor = cursorOf(); + when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + + store.getAll(); + + verify(getConnection, never()).scan(any(ScanOptions.class)); + } + + @Test + void getAll_skipsKeyWhenValueIsNull() { + byte[] key = "CLIENT#EP#gone".getBytes(); + Cursor cursor = cursorOf(key); + when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor); + // getConnection.get(key) returns null by default — no stubbing needed + + Set result = store.getAll(); + + assertThat(result).isEmpty(); + } + + /** + * Creates a mock {@link Cursor} that iterates over the given keys via {@code forEachRemaining}. + * The cursor is created separately (not inside a {@code thenReturn()} argument) to avoid + * Mockito's "unfinished stubbing" error caused by nested {@code when()} calls. + */ + @SuppressWarnings("unchecked") + private static Cursor cursorOf(byte[]... keys) { + Cursor cursor = mock(Cursor.class); + List keyList = List.of(keys); + doAnswer(inv -> { + Consumer action = inv.getArgument(0); + keyList.forEach(action); + return null; + }).when(cursor).forEachRemaining(any(Consumer.class)); + return cursor; + } +} From 455f62eaefcbf7f996af54f1d7da33238f797601 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 5 Mar 2026 16:19:59 +0100 Subject: [PATCH 2/6] fix: delay LwM2M client stop in FW update to prevent Execute response race When Execute RPC is sent to FW Update resource (/5/0/2), the test client's startUpdating() scheduled the client stop with 0 delay. This caused a race where the client stopped before the CoAP Execute response (2.04 Changed) was delivered to the server, resulting in RequestCanceledException and INTERNAL_SERVER_ERROR instead of CHANGED. Adding a 1-second delay before leshanClient.stop() ensures the CoAP response is transmitted and received before the client disconnects, fixing the flaky testExecuteUpdateFWById_Result_CHANGED test. Co-Authored-By: Claude Sonnet 4.6 --- .../server/transport/lwm2m/client/FwLwM2MDevice.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java index 9bed9bc483..6a7d631eb1 100644 --- a/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java +++ b/application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java @@ -193,7 +193,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable { } catch (Exception e) { log.error("Error during firmware update", e); } - }, 0, TimeUnit.SECONDS); // start immediately, without further delay + }, 1, TimeUnit.SECONDS); // delay 1 sec to allow CoAP Execute response to be delivered before client stops } protected void setLeshanClient(LeshanClient leshanClient) { From 96b742189d25d832e47f95fdd056550629fc6db9 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 5 Mar 2026 14:11:33 +0100 Subject: [PATCH 3/6] Fix flaky Sparkplug connection test: handle 404 during device await doGet(url, Class) asserts HTTP 200 internally, so when the Sparkplug device hasn't been created yet the method throws AssertionError instead of returning null. Awaitility propagates Error immediately rather than continuing to poll, causing the test to fail after ~3 s instead of retrying for up to 200 s. Add .ignoreExceptions() to both await() calls in connectClientWithCorrectAccessTokenWithNDEATHCreatedDevices and connectClientWithCorrectAccessTokenWithNDEATHWithAliasCreatedDevices so that a transient 404 is treated as "condition not yet met" and polling continues as intended. Co-Authored-By: Claude Sonnet 4.6 --- .../mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java index 4e4d0debf9..a9bdbb55db 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java @@ -191,6 +191,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte AtomicReference device = new AtomicReference<>(); await(alias + "find device [" + deviceName + "] after created") .atMost(200, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); return device.get() != null; @@ -236,6 +237,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte AtomicReference device = new AtomicReference<>(); await(alias + "find device [" + deviceName + "] after created") .atMost(200, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class)); return device.get() != null; From 0e3381bca36a644fa97342bd752ebcfb816585e8 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Thu, 5 Mar 2026 15:26:12 +0100 Subject: [PATCH 4/6] Fix similar flaky await patterns in MQTT transport tests doGet/doGetAsyncTyped assert HTTP 200 internally, so any non-200 response throws AssertionError which Awaitility re-throws immediately instead of continuing to poll. Add .ignoreExceptions() to three additional await() polling loops that call HTTP helpers: - AbstractMqttV5ClientSparkplugAttributesTest: two doGetAsyncTyped calls polling for attribute keys after NBIRTH/DBIRTH - AbstractMqttAttributesIntegrationTest: doGetAsyncTyped polling for attribute values after client publish Co-Authored-By: Claude Sonnet 4.6 --- .../attributes/AbstractMqttAttributesIntegrationTest.java | 1 + .../attributes/AbstractMqttV5ClientSparkplugAttributesTest.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java index a7307b2308..5f43aa7e00 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java @@ -420,6 +420,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt Awaitility.await() .atMost(10, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { List> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() { }); diff --git a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java index 756c8e603c..d42adfbcf0 100644 --- a/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java +++ b/application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java @@ -468,6 +468,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra AtomicReference> actualKeys = new AtomicReference<>(); await(alias + SparkplugMessageType.NBIRTH.name()) .atMost(40, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { })); @@ -483,6 +484,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra AtomicReference> actualKeys = new AtomicReference<>(); await(alias + SparkplugMessageType.DBIRTH.name()) .atMost(40, TimeUnit.SECONDS) + .ignoreExceptions() .until(() -> { actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() { })); From 148dd17ce1e3c1a50bd5ee4d8e4aeb53f4fcccd8 Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Fri, 6 Mar 2026 09:07:06 +0100 Subject: [PATCH 5/6] Fix flaky TenantControllerTest by draining housekeeper before teardown MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests like testFindTenantsByTitle create 261 tenants and delete them via deleteEntitiesAsync, which only waits for HTTP responses — not housekeeper completion. Each tenant deletion submits ~30 TenantEntitiesDeletionHousekeeper tasks (~7800 tasks total), which cascade further. The teardown's deleteTenant then waits for lag==0 with a 90s timeout, which is insufficient for this backlog and causes ConditionTimeoutException. Fix: add awaitHousekeeperDrained() (5-min timeout) called at the start of teardownWebTest so any pending housekeeper work from the test body drains before per-tenant teardown deletions begin. Co-Authored-By: Claude Sonnet 4.6 --- .../thingsboard/server/controller/AbstractWebTest.java | 9 +++++++++ .../src/test/resources/application-test.properties | 1 + 2 files changed, 10 insertions(+) diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index 3b7286cf01..5f4b7e952f 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -405,6 +405,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { public void teardownWebTest() throws Exception { log.debug("Executing web test teardown"); + // Drain any pending housekeeper work left by the test body (e.g., bulk tenant deletes) + // before proceeding with teardown deletions, to avoid 90s per-tenant wait timing out. + awaitHousekeeperDrained(); + loginSysAdmin(); deleteTenant(tenantId); deleteDifferentTenant(); @@ -436,6 +440,11 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { .until(() -> storage.getLag("tb_housekeeper") == 0); } + protected void awaitHousekeeperDrained() { + Awaitility.await("housekeeper drained").atMost(5, TimeUnit.MINUTES).during(300, TimeUnit.MILLISECONDS) + .until(() -> storage.getLag("tb_housekeeper") == 0); + } + private List getAllTenants() throws Exception { List loadedTenants = new ArrayList<>(); PageLink pageLink = new PageLink(10); diff --git a/application/src/test/resources/application-test.properties b/application/src/test/resources/application-test.properties index e79289340c..7f0ab964d6 100644 --- a/application/src/test/resources/application-test.properties +++ b/application/src/test/resources/application-test.properties @@ -44,6 +44,7 @@ queue.transport_api.response_poll_interval=5 queue.transport.poll_interval=5 queue.core.poll-interval=5 queue.core.partitions=2 +queue.core.housekeeper.task-reprocessing-delay-ms=0 queue.rule-engine.poll-interval=5 queue.rule-engine.stats.enabled=true From f73abcdebd1bcfd2b910f97169cd4a6d3bf3b60c Mon Sep 17 00:00:00 2001 From: Sergey Matvienko Date: Mon, 9 Mar 2026 09:30:31 +0100 Subject: [PATCH 6/6] bump frontend-maven-plugin version to 2.0.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f00718c322..cd58b11f54 100755 --- a/pom.xml +++ b/pom.xml @@ -655,7 +655,7 @@ com.github.eirslett frontend-maven-plugin - 1.12.0 + 2.0.0 org.apache.maven.plugins