Browse Source

Merge remote-tracking branch 'origin/lts-4.2' into lts-4.3

pull/15439/head
Viacheslav Klimov 3 months ago
parent
commit
20b98b7093
Failed to extract signature
  1. 9
      application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java
  2. 2
      application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java
  3. 1
      application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java
  4. 2
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java
  5. 2
      application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java
  6. 1
      application/src/test/resources/application-test.properties
  7. 18
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java
  8. 14
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java
  9. 18
      common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java
  10. 137
      common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java
  11. 2
      pom.xml

9
application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java

@ -422,6 +422,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
public void teardownWebTest() throws Exception {
log.debug("Executing web test teardown");
// Drain any pending housekeeper work left by the test body (e.g., bulk tenant deletes)
// before proceeding with teardown deletions, to avoid 90s per-tenant wait timing out.
awaitHousekeeperDrained();
loginSysAdmin();
deleteTenant(tenantId);
deleteDifferentTenant();
@ -453,6 +457,11 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
.until(() -> storage.getLag("tb_housekeeper") == 0);
}
protected void awaitHousekeeperDrained() {
Awaitility.await("housekeeper drained").atMost(5, TimeUnit.MINUTES).during(300, TimeUnit.MILLISECONDS)
.until(() -> storage.getLag("tb_housekeeper") == 0);
}
private List<Tenant> getAllTenants() throws Exception {
List<Tenant> loadedTenants = new ArrayList<>();
PageLink pageLink = new PageLink(10);

2
application/src/test/java/org/thingsboard/server/transport/lwm2m/client/FwLwM2MDevice.java

@ -193,7 +193,7 @@ public class FwLwM2MDevice extends BaseInstanceEnabler implements Destroyable {
} catch (Exception e) {
log.error("Error during firmware update", e);
}
}, 0, TimeUnit.SECONDS); // start immediately, without further delay
}, 1, TimeUnit.SECONDS); // delay 1 sec to allow CoAP Execute response to be delivered before client stops
}
protected void setLeshanClient(LeshanClient leshanClient) {

1
application/src/test/java/org/thingsboard/server/transport/mqtt/mqttv3/attributes/AbstractMqttAttributesIntegrationTest.java

@ -420,6 +420,7 @@ public abstract class AbstractMqttAttributesIntegrationTest extends AbstractMqtt
Awaitility.await()
.atMost(10, TimeUnit.SECONDS)
.ignoreExceptions()
.until(() -> {
List<Map<String, Object>> attributes = doGetAsyncTyped(attributeValuesUrl, new TypeReference<>() {
});

2
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/AbstractMqttV5ClientSparkplugTest.java

@ -191,6 +191,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
AtomicReference<Device> device = new AtomicReference<>();
await(alias + "find device [" + deviceName + "] after created")
.atMost(200, TimeUnit.SECONDS)
.ignoreExceptions()
.until(() -> {
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
return device.get() != null;
@ -236,6 +237,7 @@ public abstract class AbstractMqttV5ClientSparkplugTest extends AbstractMqttInte
AtomicReference<Device> device = new AtomicReference<>();
await(alias + "find device [" + deviceName + "] after created")
.atMost(200, TimeUnit.SECONDS)
.ignoreExceptions()
.until(() -> {
device.set(doGet("/api/tenant/devices?deviceName=" + deviceName, Device.class));
return device.get() != null;

2
application/src/test/java/org/thingsboard/server/transport/mqtt/sparkplug/attributes/AbstractMqttV5ClientSparkplugAttributesTest.java

@ -468,6 +468,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
AtomicReference<List<String>> actualKeys = new AtomicReference<>();
await(alias + SparkplugMessageType.NBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.ignoreExceptions()
.until(() -> {
actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() {
}));
@ -483,6 +484,7 @@ public abstract class AbstractMqttV5ClientSparkplugAttributesTest extends Abstra
AtomicReference<List<String>> actualKeys = new AtomicReference<>();
await(alias + SparkplugMessageType.DBIRTH.name())
.atMost(40, TimeUnit.SECONDS)
.ignoreExceptions()
.until(() -> {
actualKeys.set(doGetAsyncTyped(urlTemplate, new TypeReference<>() {
}));

1
application/src/test/resources/application-test.properties

@ -44,6 +44,7 @@ queue.transport_api.response_poll_interval=5
queue.transport.poll_interval=5
queue.core.poll-interval=5
queue.core.partitions=2
queue.core.housekeeper.task-reprocessing-delay-ms=0
queue.rule-engine.poll-interval=5
queue.rule-engine.stats.enabled=true

18
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbLwM2mRedisRegistrationStore.java

@ -323,22 +323,24 @@ public class TbLwM2mRedisRegistrationStore implements RegistrationStore, Startab
@Override
public Iterator<Registration> getAllRegistrations() {
try (var connection = connectionFactory.getConnection()) {
try (var scanConnection = connectionFactory.getConnection();
var getConnection = connectionFactory.getConnection()) {
Collection<Registration> list = new LinkedList<>();
ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(REG_EP + "*").build();
List<Cursor<byte[]>> scans = new ArrayList<>();
if (connection instanceof RedisClusterConnection) {
((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> {
scans.add(((RedisClusterConnection) connection).scan(node, scanOptions));
});
if (scanConnection instanceof RedisClusterConnection clusterConnection) {
clusterConnection.clusterGetNodes().forEach(node ->
scans.add(clusterConnection.scan(node, scanOptions)));
} else {
scans.add(connection.scan(scanOptions));
scans.add(scanConnection.scan(scanOptions));
}
scans.forEach(scan -> {
scan.forEachRemaining(key -> {
byte[] element = connection.get(key);
list.add(deserializeReg(element));
byte[] element = getConnection.get(key);
if (element != null) {
list.add(deserializeReg(element));
}
});
});
return list.iterator();

14
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStore.java

@ -61,21 +61,21 @@ public class TbRedisLwM2MClientStore implements TbLwM2MClientStore {
@Override
public Set<LwM2mClient> getAll() {
try (var connection = connectionFactory.getConnection()) {
try (var scanConnection = connectionFactory.getConnection();
var getConnection = connectionFactory.getConnection()) {
Set<LwM2mClient> clients = new HashSet<>();
ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(CLIENT_EP + "*").build();
List<Cursor<byte[]>> scans = new ArrayList<>();
if (connection instanceof RedisClusterConnection) {
((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> {
scans.add(((RedisClusterConnection) connection).scan(node, scanOptions));
});
if (scanConnection instanceof RedisClusterConnection clusterConnection) {
clusterConnection.clusterGetNodes().forEach(node ->
scans.add(clusterConnection.scan(node, scanOptions)));
} else {
scans.add(connection.scan(scanOptions));
scans.add(scanConnection.scan(scanOptions));
}
scans.forEach(scan -> {
scan.forEachRemaining(key -> {
byte[] element = connection.get(key);
byte[] element = getConnection.get(key);
if (element != null) {
try {
clients.add(deserialize(element));

18
common/transport/lwm2m/src/main/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MModelConfigStore.java

@ -35,22 +35,24 @@ public class TbRedisLwM2MModelConfigStore implements TbLwM2MModelConfigStore {
@Override
public List<LwM2MModelConfig> getAll() {
try (var connection = connectionFactory.getConnection()) {
try (var scanConnection = connectionFactory.getConnection();
var getConnection = connectionFactory.getConnection()) {
List<LwM2MModelConfig> configs = new ArrayList<>();
ScanOptions scanOptions = ScanOptions.scanOptions().count(100).match(MODEL_EP + "*").build();
List<Cursor<byte[]>> scans = new ArrayList<>();
if (connection instanceof RedisClusterConnection) {
((RedisClusterConnection) connection).clusterGetNodes().forEach(node -> {
scans.add(((RedisClusterConnection) connection).scan(node, scanOptions));
});
if (scanConnection instanceof RedisClusterConnection clusterConnection) {
clusterConnection.clusterGetNodes().forEach(node ->
scans.add(clusterConnection.scan(node, scanOptions)));
} else {
scans.add(connection.scan(scanOptions));
scans.add(scanConnection.scan(scanOptions));
}
scans.forEach(scan -> {
scan.forEachRemaining(key -> {
byte[] element = connection.get(key);
configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class));
byte[] element = getConnection.get(key);
if (element != null) {
configs.add(JacksonUtil.fromBytes(element, LwM2MModelConfig.class));
}
});
});
return configs;

137
common/transport/lwm2m/src/test/java/org/thingsboard/server/transport/lwm2m/server/store/TbRedisLwM2MClientStoreTest.java

@ -0,0 +1,137 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.transport.lwm2m.server.store;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.ScanOptions;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2MClientState;
import org.thingsboard.server.transport.lwm2m.server.client.LwM2mClient;
import java.util.List;
import java.util.Set;
import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.thingsboard.server.transport.lwm2m.server.store.util.LwM2MClientSerDes.serialize;
/**
* Verifies that {@link TbRedisLwM2MClientStore#getAll()} uses separate connections for
* SCAN and GET operations to prevent Jedis 5.x response-ordering corruption that occurs
* when both commands share the same connection.
*/
@ExtendWith(MockitoExtension.class)
class TbRedisLwM2MClientStoreTest {
@Mock
RedisConnectionFactory connectionFactory;
@Mock
RedisConnection scanConnection;
@Mock
RedisConnection getConnection;
TbRedisLwM2MClientStore store;
@BeforeEach
void setUp() {
// First getConnection() call → scanConnection, second → getConnection
when(connectionFactory.getConnection())
.thenReturn(scanConnection)
.thenReturn(getConnection);
store = new TbRedisLwM2MClientStore(connectionFactory);
}
@Test
void getAll_returnsSingleClient() {
LwM2mClient client = new LwM2mClient("nodeId", "testEndpoint");
client.setState(LwM2MClientState.REGISTERED);
byte[] key = "CLIENT#EP#testEndpoint".getBytes();
byte[] value = serialize(client);
// Cursor created before thenReturn to avoid Mockito unfinished-stubbing error
Cursor<byte[]> cursor = cursorOf(key);
when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor);
when(getConnection.get(key)).thenReturn(value);
Set<LwM2mClient> result = store.getAll();
assertThat(result).hasSize(1);
assertThat(result.iterator().next().getEndpoint()).isEqualTo("testEndpoint");
}
@Test
void getAll_getIsNeverCalledOnScanConnection() {
Cursor<byte[]> cursor = cursorOf();
when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor);
store.getAll();
verify(scanConnection, never()).get(any(byte[].class));
}
@Test
void getAll_scanIsNeverCalledOnGetConnection() {
Cursor<byte[]> cursor = cursorOf();
when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor);
store.getAll();
verify(getConnection, never()).scan(any(ScanOptions.class));
}
@Test
void getAll_skipsKeyWhenValueIsNull() {
byte[] key = "CLIENT#EP#gone".getBytes();
Cursor<byte[]> cursor = cursorOf(key);
when(scanConnection.scan(any(ScanOptions.class))).thenReturn(cursor);
// getConnection.get(key) returns null by default — no stubbing needed
Set<LwM2mClient> result = store.getAll();
assertThat(result).isEmpty();
}
/**
* Creates a mock {@link Cursor} that iterates over the given keys via {@code forEachRemaining}.
* The cursor is created separately (not inside a {@code thenReturn()} argument) to avoid
* Mockito's "unfinished stubbing" error caused by nested {@code when()} calls.
*/
@SuppressWarnings("unchecked")
private static Cursor<byte[]> cursorOf(byte[]... keys) {
Cursor<byte[]> cursor = mock(Cursor.class);
List<byte[]> keyList = List.of(keys);
doAnswer(inv -> {
Consumer<byte[]> action = inv.getArgument(0);
keyList.forEach(action);
return null;
}).when(cursor).forEachRemaining(any(Consumer.class));
return cursor;
}
}

2
pom.xml

@ -655,7 +655,7 @@
<plugin>
<groupId>com.github.eirslett</groupId>
<artifactId>frontend-maven-plugin</artifactId>
<version>1.12.0</version>
<version>2.0.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>

Loading…
Cancel
Save