Browse Source

Merge remote-tracking branch 'upstream/lts-4.2' into merge/lts-rc/141125

pull/14345/head
Vladyslav_Prykhodko 7 months ago
parent
commit
0ca44579f6
  1. 52
      application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java
  2. 9
      application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java
  3. 2
      application/src/main/java/org/thingsboard/server/service/system/SystemInfoService.java
  4. 272
      application/src/main/java/org/thingsboard/server/service/system/SystemPatchApplier.java
  5. 3
      application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java
  6. 4
      application/src/test/java/org/thingsboard/server/system/BaseRestApiLimitsTest.java
  7. 1
      application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java
  8. 410
      application/src/test/java/org/thingsboard/server/system/SystemPatchApplierTest.java
  9. 3
      application/src/test/java/org/thingsboard/server/system/sql/DeviceApiSqlTest.java
  10. 1
      application/src/test/java/org/thingsboard/server/system/sql/RestApiLimitsSqlTest.java
  11. 23
      common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java
  12. 42
      rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java
  13. 566
      rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java

52
application/src/main/java/org/thingsboard/server/service/install/DefaultDatabaseSchemaSettingsService.java

@ -17,7 +17,6 @@ package org.thingsboard.server.service.install;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Profile;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Service;
import org.thingsboard.server.service.install.update.DefaultDataUpdateService;
@ -25,14 +24,13 @@ import org.thingsboard.server.service.install.update.DefaultDataUpdateService;
import java.util.List;
@Service
@Profile("install")
@Slf4j
@RequiredArgsConstructor
public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSettingsService {
// This list should include all versions which are compatible for the upgrade.
// The compatibility cycle usually breaks when we have some scripts written in Java that may not work after new release.
private static final List<String> SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("4.2.0");
// This list should include all versions that are compatible for the upgrade in 4 digits format (like 4.2.0.0, etc.).
// The compatibility cycle usually breaks when we have some scripts written in Java that may not work after a new release.
private static final List<String> SUPPORTED_VERSIONS_FOR_UPGRADE = List.of("4.2.0.0");
private final ProjectInfo projectInfo;
private final JdbcTemplate jdbcTemplate;
@ -80,7 +78,7 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti
@Override
public String getPackageSchemaVersion() {
if (packageSchemaVersion == null) {
packageSchemaVersion = projectInfo.getProjectVersion();
packageSchemaVersion = normalizeVersion(projectInfo.getProjectVersion());
}
return packageSchemaVersion;
}
@ -88,17 +86,28 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti
@Override
public String getDbSchemaVersion() {
if (schemaVersionFromDb == null) {
Long version = getSchemaVersionFromDb();
if (version == null) {
Long dbVersion = getSchemaVersionFromDb();
if (dbVersion == null) {
onSchemaSettingsError("Upgrade failed: the database schema version is missing.");
}
@SuppressWarnings("DataFlowIssue")
long major = version / 1000000;
long minor = (version % 1000000) / 1000;
long patch = version % 1000;
schemaVersionFromDb = major + "." + minor + "." + patch;
long version = dbVersion;
if (version < 1_000_000_000) {
// Old format: MMM mmm ppp (e.g., 4002001 = 4.2.1)
long major = version / 1_000_000;
long minor = (version % 1_000_000) / 1000;
long maintenance = version % 1000;
schemaVersionFromDb = major + "." + minor + "." + maintenance + ".0";
} else {
// New format: MMM mmm mmm ppp (e.g., 4002001001 = 4.2.1.1)
long major = version / 1_000_000_000;
long minor = (version % 1_000_000_000) / 1_000_000;
long maintenance = (version % 1_000_000) / 1000;
long patch = version % 1000;
schemaVersionFromDb = major + "." + minor + "." + maintenance + "." + patch;
}
}
return schemaVersionFromDb;
}
@ -116,13 +125,26 @@ public class DefaultDatabaseSchemaSettingsService implements DatabaseSchemaSetti
long major = Integer.parseInt(versionParts[0]);
long minor = Integer.parseInt(versionParts[1]);
long patch = versionParts.length > 2 ? Integer.parseInt(versionParts[2]) : 0;
long maintenance = Integer.parseInt(versionParts[2]);
long patch = Integer.parseInt(versionParts[3]);
return major * 1000000 + minor * 1000 + patch;
return major * 1_000_000_000L + minor * 1_000_000L + maintenance * 1000L + patch;
}
private void onSchemaSettingsError(String message) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> log.error(message)));
throw new RuntimeException(message);
}
private String normalizeVersion(String version) {
String[] parts = version.split("\\.");
int major = Integer.parseInt(parts[0]);
int minor = parts.length > 1 ? Integer.parseInt(parts[1]) : 0;
int maintenance = parts.length > 2 ? Integer.parseInt(parts[2]) : 0;
int patch = parts.length > 3 ? Integer.parseInt(parts[3]) : 0;
return major + "." + minor + "." + maintenance + "." + patch;
}
}

9
application/src/main/java/org/thingsboard/server/service/install/InstallScripts.java

@ -65,9 +65,6 @@ import java.util.stream.Stream;
import static org.thingsboard.server.utils.LwM2mObjectModelUtils.toLwm2mResource;
/**
* Created by ashvayka on 18.04.18.
*/
@Component
@Slf4j
public class InstallScripts {
@ -134,6 +131,10 @@ public class InstallScripts {
return Paths.get(getDataDir(), JSON_DIR, EDGE_DIR, RULE_CHAINS_DIR);
}
public Path getWidgetTypesDir() {
return Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, WIDGET_TYPES_DIR);
}
public String getDataDir() {
if (!StringUtils.isEmpty(dataDir)) {
if (!Paths.get(this.dataDir).toFile().isDirectory()) {
@ -237,7 +238,7 @@ public class InstallScripts {
}
);
}
Path widgetTypesDir = Paths.get(getDataDir(), JSON_DIR, SYSTEM_DIR, WIDGET_TYPES_DIR);
Path widgetTypesDir = getWidgetTypesDir();
if (Files.exists(widgetTypesDir)) {
try (Stream<Path> dirStream = listDir(widgetTypesDir).filter(path -> path.toString().endsWith(JSON_EXT))) {
dirStream.forEach(

2
application/src/main/java/org/thingsboard/server/service/system/SystemInfoService.java

@ -19,7 +19,9 @@ import org.thingsboard.server.common.data.FeaturesInfo;
import org.thingsboard.server.common.data.SystemInfo;
public interface SystemInfoService {
SystemInfo getSystemInfo();
FeaturesInfo getFeaturesInfo();
}

272
application/src/main/java/org/thingsboard/server/service/system/SystemPatchApplier.java

@ -0,0 +1,272 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.service.system;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.hash.Hashing;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.widget.WidgetTypeDetails;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.install.DatabaseSchemaSettingsService;
import org.thingsboard.server.service.install.InstallScripts;
import org.thingsboard.server.service.install.update.DefaultDataUpdateService;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
/**
* Runs at application startup and applies no-downtime data updates
* when the package PATCH version increases (e.g., 4.2.1.0 -> 4.2.1.1).
*/
@Slf4j
@Component
@TbCoreComponent
@RequiredArgsConstructor
public class SystemPatchApplier {
private static final long ADVISORY_LOCK_ID = 7536891047216478431L;
private final JdbcTemplate jdbcTemplate;
private final InstallScripts installScripts;
private final DatabaseSchemaSettingsService schemaSettingsService;
private final WidgetTypeService widgetTypeService;
@PostConstruct
private void init() {
ExecutorService executor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("system-patch-applier"));
executor.submit(() -> {
try {
applyPatchIfNeeded();
} catch (Exception e) {
log.error("Failed to apply system data patch updates", e);
} finally {
executor.shutdown();
}
});
}
private void applyPatchIfNeeded() {
boolean skipVersionCheck = DefaultDataUpdateService.getEnv("SKIP_PATCH_VERSION_CHECK", false);
if (!skipVersionCheck && !isVersionChanged()) {
return;
}
if (!acquireAdvisoryLock()) {
log.trace("Could not acquire advisory lock. Another node is processing patch updates.");
return;
}
try {
int updated = updateWidgetTypes();
log.info("Updated {} widget types", updated);
schemaSettingsService.updateSchemaVersion();
log.info("System data patch update completed successfully");
} finally {
releaseAdvisoryLock();
}
}
private boolean isVersionChanged() {
String packageVersion = schemaSettingsService.getPackageSchemaVersion();
String dbVersion = schemaSettingsService.getDbSchemaVersion();
log.trace("Package version: {}, DB schema version: {}", packageVersion, dbVersion);
VersionInfo packageVersionInfo = parseVersion(packageVersion);
VersionInfo dbVersionInfo = parseVersion(dbVersion);
if (packageVersionInfo == null || dbVersionInfo == null) {
log.warn("Unable to parse versions. Package: {}, DB: {}", packageVersion, dbVersion);
return false;
}
if (!isPatchVersionChanged(packageVersionInfo, dbVersionInfo)) {
return false;
}
log.info("Patch version increased from {} to {}. Starting system data update.", dbVersion, packageVersion);
return true;
}
private boolean isPatchVersionChanged(VersionInfo packageVersion, VersionInfo dbVersion) {
return packageVersion.major == dbVersion.major && packageVersion.minor == dbVersion.minor
&& packageVersion.maintenance == dbVersion.maintenance && packageVersion.patch > dbVersion.patch;
}
private int updateWidgetTypes() {
AtomicInteger updated = new AtomicInteger();
Path widgetTypesDir = installScripts.getWidgetTypesDir();
if (!Files.exists(widgetTypesDir)) {
log.trace("Widget types directory does not exist: {}", widgetTypesDir);
return 0;
}
try (Stream<Path> dirStream = listDir(widgetTypesDir).filter(path -> path.toString().endsWith(InstallScripts.JSON_EXT))) {
dirStream.forEach(
path -> {
try {
if (updateWidgetTypeFromFile(path)) {
updated.incrementAndGet();
}
} catch (Exception e) {
log.error("Unable to update widget type from json: [{}]", path.toString());
throw new RuntimeException("Unable to update widget type from json", e);
}
}
);
}
return updated.get();
}
private boolean updateWidgetTypeFromFile(Path filePath) {
JsonNode json = JacksonUtil.toJsonNode(filePath.toFile());
WidgetTypeDetails fileWidgetType = JacksonUtil.treeToValue(json, WidgetTypeDetails.class);
String fqn = fileWidgetType.getFqn();
WidgetTypeDetails existingWidgetType = widgetTypeService.findWidgetTypeDetailsByTenantIdAndFqn(TenantId.SYS_TENANT_ID, fqn);
if (existingWidgetType == null) {
// We expect only update here, so it's probably never happening, but for test purpose leave it like this:
throw new RuntimeException("Widget type not found: " + fqn);
}
if (isWidgetTypeChanged(existingWidgetType, fileWidgetType)) {
existingWidgetType.setDescription(fileWidgetType.getDescription());
existingWidgetType.setName(fileWidgetType.getName());
existingWidgetType.setDescriptor(fileWidgetType.getDescriptor());
widgetTypeService.saveWidgetType(existingWidgetType);
log.trace("Updated widget type: {}", fqn);
return true;
}
log.trace("Widget type unchanged: {}", fqn);
return false;
}
private boolean isWidgetTypeChanged(WidgetTypeDetails existing, WidgetTypeDetails file) {
if (!isDescriptorEqual(existing.getDescriptor(), file.getDescriptor())) {
return true;
}
if (!Objects.equals(existing.getName(), file.getName())) {
return true;
}
return !Objects.equals(existing.getDescription(), file.getDescription());
}
private boolean isDescriptorEqual(JsonNode desc1, JsonNode desc2) {
if (desc1 == null && desc2 == null) {
return true;
}
if (desc1 == null || desc2 == null) {
return false;
}
try {
String hash1 = computeChecksum(desc1);
String hash2 = computeChecksum(desc2);
return Objects.equals(hash1, hash2);
} catch (Exception e) {
log.warn("Failed to compare descriptors using checksum, falling back to equals", e);
return desc1.equals(desc2);
}
}
private String computeChecksum(JsonNode node) {
String canonicalString = JacksonUtil.toCanonicalString(node);
if (canonicalString == null) {
return null;
}
return Hashing.sha256().hashBytes(canonicalString.getBytes()).toString();
}
private boolean acquireAdvisoryLock() {
try {
Boolean acquired = jdbcTemplate.queryForObject(
"SELECT pg_try_advisory_lock(?)",
Boolean.class,
ADVISORY_LOCK_ID
);
if (Boolean.TRUE.equals(acquired)) {
log.trace("Acquired advisory lock");
return true;
}
return false;
} catch (Exception e) {
log.error("Failed to acquire advisory lock", e);
return false;
}
}
private void releaseAdvisoryLock() {
try {
jdbcTemplate.queryForObject(
"SELECT pg_advisory_unlock(?)",
Boolean.class,
ADVISORY_LOCK_ID
);
log.debug("Released advisory lock");
} catch (Exception e) {
log.error("Failed to release advisory lock", e);
}
}
private VersionInfo parseVersion(String version) {
try {
String[] parts = version.split("\\.");
int major = Integer.parseInt(parts[0]);
int minor = parts.length > 1 ? Integer.parseInt(parts[1]) : 0;
int maintenance = parts.length > 2 ? Integer.parseInt(parts[2]) : 0;
int patch = parts.length > 3 ? Integer.parseInt(parts[3]) : 0;
return new VersionInfo(major, minor, maintenance, patch);
} catch (Exception e) {
log.error("Failed to parse version: {}", version, e);
return null;
}
}
private Stream<Path> listDir(Path dir) {
try {
return Files.list(dir);
} catch (NoSuchFileException e) {
return Stream.empty();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
public record VersionInfo(int major, int minor, int maintenance, int patch) {}
}

3
application/src/test/java/org/thingsboard/server/system/BaseHttpDeviceApiTest.java

@ -37,9 +37,6 @@ import static org.springframework.test.web.servlet.request.MockMvcRequestBuilder
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.request;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
/**
* @author Andrew Shvayka
*/
@TestPropertySource(properties = {
"transport.http.enabled=true",
"transport.http.max_payload_size=/api/v1/*/rpc/**=10000;/api/v1/**=20000"

4
application/src/test/java/org/thingsboard/server/system/BaseRestApiLimitsTest.java

@ -45,10 +45,6 @@ import java.util.concurrent.TimeoutException;
import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
/**
* @author Illia Barkov
*/
@Slf4j
public abstract class BaseRestApiLimitsTest extends AbstractControllerTest {

1
application/src/test/java/org/thingsboard/server/system/RestTemplateConvertersTest.java

@ -29,7 +29,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.test.web.client.match.MockRestRequestMatchers.requestTo;
import static org.springframework.test.web.client.response.MockRestResponseCreators.withSuccess;
@Slf4j
public class RestTemplateConvertersTest {

410
application/src/test/java/org/thingsboard/server/system/SystemPatchApplierTest.java

@ -0,0 +1,410 @@
/**
* Copyright © 2016-2025 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.system;
import com.fasterxml.jackson.databind.JsonNode;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.util.ReflectionTestUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.WidgetTypeId;
import org.thingsboard.server.common.data.widget.WidgetTypeDetails;
import org.thingsboard.server.dao.widget.WidgetTypeService;
import org.thingsboard.server.service.install.InstallScripts;
import org.thingsboard.server.service.system.SystemPatchApplier;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.contains;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
public class SystemPatchApplierTest {
@Mock
private JdbcTemplate jdbcTemplate;
@Mock
private InstallScripts installScripts;
@Mock
private WidgetTypeService widgetTypeService;
@InjectMocks
private SystemPatchApplier reconciler;
@TempDir
Path tempDir;
@ParameterizedTest(name = "Parse version {0} should return major={1}, minor={2}, patch={3}")
@CsvSource({
"4.2.1, 4, 2, 1, 0",
"4.2.0, 4, 2, 0, 0",
"4.2, 4, 2, 0, 0",
"4.0.1.2, 4, 0, 1, 2",
"4, 4, 0, 0, 0",
"1.0.5.7, 1, 0, 5, 7",
"10.20.30.40, 10, 20, 30, 40",
"0.0.1, 0, 0, 1, 0"
})
void testParseVersion(String versionString, int expectedMajor, int expectedMinor, int expectedMaintenance, int expectedPatch) {
SystemPatchApplier.VersionInfo version = ReflectionTestUtils.invokeMethod(reconciler, "parseVersion", versionString);
assertNotNull(version, "Version should not be null for: " + versionString);
assertEquals(expectedMajor, version.major(), "Major version mismatch");
assertEquals(expectedMinor, version.minor(), "Minor version mismatch");
assertEquals(expectedMaintenance, version.maintenance(), "Maintenance version mismatch");
assertEquals(expectedPatch, version.patch(), "Patch version mismatch");
}
@ParameterizedTest(name = "Parse invalid version: {0}")
@CsvSource({
"invalid",
"a.b.c",
"1.2.y.x",
"''",
"1.x.3"
})
void testParseInvalidVersion(String invalidVersion) {
SystemPatchApplier.VersionInfo version = ReflectionTestUtils.invokeMethod(reconciler, "parseVersion", invalidVersion);
assertNull(version, "Version should be null for invalid input: " + invalidVersion);
}
@Test
void whenLockIsNotAcquired_thenAcquiredIsSuccess() {
when(jdbcTemplate.queryForObject(anyString(), eq(Boolean.class), anyLong())).thenReturn(true);
Boolean acquired = ReflectionTestUtils.invokeMethod(reconciler, "acquireAdvisoryLock");
assertEquals(Boolean.TRUE, acquired);
verify(jdbcTemplate).queryForObject(contains("pg_try_advisory_lock"), eq(Boolean.class), anyLong());
}
@Test
void whenLockIsAlreadyAcquired_thenAcquiredIsFailed() {
when(jdbcTemplate.queryForObject(anyString(), eq(Boolean.class), anyLong())).thenReturn(false);
Boolean acquired = ReflectionTestUtils.invokeMethod(reconciler, "acquireAdvisoryLock");
assertNotEquals(Boolean.TRUE, acquired);
}
@Test
void testReleaseAdvisoryLock() {
when(jdbcTemplate.queryForObject(anyString(), eq(Boolean.class), anyLong()))
.thenReturn(true);
ReflectionTestUtils.invokeMethod(reconciler, "releaseAdvisoryLock");
verify(jdbcTemplate).queryForObject(
contains("pg_advisory_unlock"), eq(Boolean.class), anyLong());
}
@Test
void whenWidgetNotFound_thenThrowException() throws Exception {
Path widgetTypesDir = tempDir.resolve("widget_types");
Files.createDirectories(widgetTypesDir);
when(installScripts.getWidgetTypesDir()).thenReturn(widgetTypesDir);
WidgetTypeDetails testWidget = createTestWidgetType("test_widget", "Test Widget");
String json = JacksonUtil.toString(testWidget);
assertNotNull(json);
Files.writeString(widgetTypesDir.resolve("test_widget.json"), json);
when(widgetTypeService.findWidgetTypeDetailsByTenantIdAndFqn(TenantId.SYS_TENANT_ID, "test_widget")).thenReturn(null);
assertThrows(RuntimeException.class, () -> ReflectionTestUtils.invokeMethod(reconciler, "updateWidgetTypes"));
}
@Test
void whenDescriptorChanged_thenUpdateTheExistingWidget() throws Exception {
Path widgetTypesDir = tempDir.resolve("widget_types");
Files.createDirectories(widgetTypesDir);
when(installScripts.getWidgetTypesDir()).thenReturn(widgetTypesDir);
WidgetTypeDetails fileWidget = createTestWidgetType("test_widget", "Test Widget");
fileWidget.setDescriptor(JacksonUtil.toJsonNode("{\"type\":\"latest\",\"version\":2}"));
String json = JacksonUtil.toString(fileWidget);
assertNotNull(json);
Files.writeString(widgetTypesDir.resolve("test_widget.json"), json);
WidgetTypeDetails existingWidget = createTestWidgetType("test_widget", "Test Widget");
existingWidget.setId(new WidgetTypeId(UUID.randomUUID()));
existingWidget.setDescriptor(JacksonUtil.toJsonNode("{\"type\":\"latest\",\"version\":1}"));
when(widgetTypeService.findWidgetTypeDetailsByTenantIdAndFqn(TenantId.SYS_TENANT_ID, "test_widget"))
.thenReturn(existingWidget);
Integer updated = ReflectionTestUtils.invokeMethod(reconciler, "updateWidgetTypes");
assertEquals(1, updated);
verify(widgetTypeService).saveWidgetType(argThat(w ->
w.getDescriptor().get("version").asInt() == 2
));
}
@Test
void whenNameChanged_thenUpdateTheExistingWidget() throws Exception {
Path widgetTypesDir = tempDir.resolve("widget_types");
Files.createDirectories(widgetTypesDir);
when(installScripts.getWidgetTypesDir()).thenReturn(widgetTypesDir);
WidgetTypeDetails fileWidget = createTestWidgetType("test_widget", "New Name");
String json = JacksonUtil.toString(fileWidget);
assertNotNull(json);
Files.writeString(widgetTypesDir.resolve("test_widget.json"), json);
WidgetTypeDetails existingWidget = createTestWidgetType("test_widget", "Old Name");
existingWidget.setId(new WidgetTypeId(UUID.randomUUID()));
when(widgetTypeService.findWidgetTypeDetailsByTenantIdAndFqn(TenantId.SYS_TENANT_ID, "test_widget"))
.thenReturn(existingWidget);
Integer updated = ReflectionTestUtils.invokeMethod(reconciler, "updateWidgetTypes");
assertEquals(1, updated);
verify(widgetTypeService).saveWidgetType(argThat(w -> "New Name".equals(w.getName())));
}
@Test
void whenNothingChanged_thenSkipTheUpdateOfTheExistingWidget() throws Exception {
Path widgetTypesDir = tempDir.resolve("widget_types");
Files.createDirectories(widgetTypesDir);
when(installScripts.getWidgetTypesDir()).thenReturn(widgetTypesDir);
WidgetTypeDetails fileWidget = createTestWidgetType("test_widget", "Test Widget");
String json = JacksonUtil.toString(fileWidget);
assertNotNull(json);
Files.writeString(widgetTypesDir.resolve("test_widget.json"), json);
WidgetTypeDetails existingWidget = createTestWidgetType("test_widget", "Test Widget");
existingWidget.setId(new WidgetTypeId(UUID.randomUUID()));
when(widgetTypeService.findWidgetTypeDetailsByTenantIdAndFqn(TenantId.SYS_TENANT_ID, "test_widget"))
.thenReturn(existingWidget);
Integer updated = ReflectionTestUtils.invokeMethod(reconciler, "updateWidgetTypes");
assertEquals(0, updated);
verify(widgetTypeService, never()).saveWidgetType(any());
}
@ParameterizedTest(name = "{0}")
@MethodSource("provideDescriptorComparisonTestCases")
void testIfDescriptorsAreEqual(String testName, JsonNode desc1, JsonNode desc2, boolean expectedEqual) {
Boolean result = ReflectionTestUtils.invokeMethod(reconciler, "isDescriptorEqual", desc1, desc2);
assertEquals(expectedEqual, result, testName);
}
@Test
void whenDescriptorChanged_thenReturnWidgetTypeChanged() {
WidgetTypeDetails existing = createTestWidgetType("test", "Test");
existing.setDescriptor(JacksonUtil.toJsonNode("{\"version\":1}"));
WidgetTypeDetails file = createTestWidgetType("test", "Test");
file.setDescriptor(JacksonUtil.toJsonNode("{\"version\":2}"));
boolean result = Boolean.TRUE.equals(ReflectionTestUtils.invokeMethod(reconciler, "isWidgetTypeChanged", existing, file));
assertTrue(result);
}
@Test
void whenNameChanged_thenReturnWidgetTypeChanged() {
WidgetTypeDetails existing = createTestWidgetType("test", "Old Name");
WidgetTypeDetails file = createTestWidgetType("test", "New Name");
boolean result = Boolean.TRUE.equals(ReflectionTestUtils.invokeMethod(reconciler, "isWidgetTypeChanged", existing, file));
assertTrue(result);
}
@Test
void whenDescriptionChanged_thenReturnWidgetTypeChanged() {
WidgetTypeDetails existing = createTestWidgetType("test", "Test");
existing.setDescription("Old description");
WidgetTypeDetails file = createTestWidgetType("test", "Test");
file.setDescription("New description");
boolean result = Boolean.TRUE.equals(ReflectionTestUtils.invokeMethod(reconciler, "isWidgetTypeChanged", existing, file));
assertTrue(result);
}
@Test
void whenWidgetTypeAreIdentical_thenNoUpdateIsPerformed() {
WidgetTypeDetails existing = createTestWidgetType("test", "Test");
WidgetTypeDetails file = createTestWidgetType("test", "Test");
boolean result = Boolean.TRUE.equals(ReflectionTestUtils.invokeMethod(reconciler, "isWidgetTypeChanged", existing, file));
assertFalse(result);
}
@Test
void whenLockIsHeldByOneThread_thenSecondThreadCannotAcquireLock() throws Exception {
CountDownLatch lockAcquired = new CountDownLatch(1);
CountDownLatch startSecondThread = new CountDownLatch(1);
CountDownLatch testComplete = new CountDownLatch(1);
AtomicBoolean firstThreadAcquiredLock = new AtomicBoolean(false);
AtomicBoolean secondThreadAcquiredLock = new AtomicBoolean(false);
AtomicBoolean firstThreadSavedWidget = new AtomicBoolean(false);
AtomicBoolean secondThreadSavedWidget = new AtomicBoolean(false);
Path widgetTypesDir = tempDir.resolve("widget_types");
Files.createDirectories(widgetTypesDir);
when(installScripts.getWidgetTypesDir()).thenReturn(widgetTypesDir);
WidgetTypeDetails fileWidget = createTestWidgetType("test_widget", "Test Widget");
fileWidget.setDescriptor(JacksonUtil.toJsonNode("{\"type\":\"latest\",\"version\":2}"));
String toString = JacksonUtil.toCanonicalString(fileWidget);
assertNotNull(toString);
Files.writeString(widgetTypesDir.resolve("test_widget.json"), toString);
WidgetTypeDetails existingWidget = createTestWidgetType("test_widget", "Test Widget");
existingWidget.setId(new WidgetTypeId(UUID.randomUUID()));
existingWidget.setDescriptor(JacksonUtil.toJsonNode("{\"type\":\"latest\",\"version\":1}"));
when(widgetTypeService.findWidgetTypeDetailsByTenantIdAndFqn(TenantId.SYS_TENANT_ID, "test_widget")).thenReturn(existingWidget);
when(jdbcTemplate.queryForObject(contains("pg_try_advisory_lock"), eq(Boolean.class), anyLong()))
.thenReturn(true)
.thenReturn(false);
when(jdbcTemplate.queryForObject(contains("pg_advisory_unlock"), eq(Boolean.class), anyLong()))
.thenReturn(true);
// The first thread-acquires lock and performs update
Thread firstThread = new Thread(() -> {
try {
Boolean acquired = ReflectionTestUtils.invokeMethod(reconciler, "acquireAdvisoryLock");
firstThreadAcquiredLock.set(Boolean.TRUE.equals(acquired));
if (firstThreadAcquiredLock.get()) {
lockAcquired.countDown();
startSecondThread.await(5, TimeUnit.SECONDS);
// Simulate work while holding lock
Thread.sleep(100);
Integer updated = ReflectionTestUtils.invokeMethod(reconciler, "updateWidgetTypes");
firstThreadSavedWidget.set(updated != null && updated > 0);
ReflectionTestUtils.invokeMethod(reconciler, "releaseAdvisoryLock");
}
} catch (Exception ignored) {
} finally {
testComplete.countDown();
}
});
// Second thread - attempts to acquire lock but fails
Thread secondThread = new Thread(() -> {
try {
lockAcquired.await(5, TimeUnit.SECONDS);
startSecondThread.countDown();
Boolean acquired = ReflectionTestUtils.invokeMethod(reconciler, "acquireAdvisoryLock");
secondThreadAcquiredLock.set(Boolean.TRUE.equals(acquired));
if (secondThreadAcquiredLock.get()) {
Integer updated = ReflectionTestUtils.invokeMethod(reconciler, "updateWidgetTypes");
secondThreadSavedWidget.set(updated != null && updated > 0);
ReflectionTestUtils.invokeMethod(reconciler, "releaseAdvisoryLock");
}
} catch (Exception ignored) {}
});
firstThread.start();
secondThread.start();
assertTrue(testComplete.await(10, TimeUnit.SECONDS), "Test should complete within timeout");
firstThread.join(1000);
secondThread.join(1000);
assertTrue(firstThreadAcquiredLock.get(), "First thread should acquire lock");
assertFalse(secondThreadAcquiredLock.get(), "Second thread should NOT acquire lock");
assertTrue(firstThreadSavedWidget.get(), "First thread should save widget");
assertFalse(secondThreadSavedWidget.get(), "Second thread should NOT save widget");
verify(widgetTypeService, times(1)).saveWidgetType(any());
}
private static Stream<Arguments> provideDescriptorComparisonTestCases() {
return Stream.of(
Arguments.of("Both null", null, null, true),
Arguments.of("First null", null, JacksonUtil.newObjectNode(), false),
Arguments.of("Second null", JacksonUtil.newObjectNode(), null, false),
Arguments.of("Same content",
JacksonUtil.toJsonNode("{\"type\":\"latest\",\"version\":1}"),
JacksonUtil.toJsonNode("{\"type\":\"latest\",\"version\":1}"),
true),
Arguments.of("Different content",
JacksonUtil.toJsonNode("{\"type\":\"latest\",\"version\":1}"),
JacksonUtil.toJsonNode("{\"type\":\"latest\",\"version\":2}"),
false),
Arguments.of("Different key order but same content",
JacksonUtil.toJsonNode("{\"version\":1,\"type\":\"latest\"}"),
JacksonUtil.toJsonNode("{\"type\":\"latest\",\"version\":1}"),
true),
Arguments.of("Empty objects",
JacksonUtil.toJsonNode("{}"),
JacksonUtil.toJsonNode("{}"),
true)
);
}
private WidgetTypeDetails createTestWidgetType(String fqn, String name) {
WidgetTypeDetails widget = new WidgetTypeDetails();
widget.setFqn(fqn);
widget.setName(name);
widget.setDescription("Test description");
widget.setTenantId(TenantId.SYS_TENANT_ID);
widget.setDescriptor(JacksonUtil.toJsonNode("{\"type\":\"latest\"}"));
return widget;
}
}

3
application/src/test/java/org/thingsboard/server/system/sql/DeviceApiSqlTest.java

@ -18,9 +18,6 @@ package org.thingsboard.server.system.sql;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.system.BaseHttpDeviceApiTest;
/**
* Created by Valerii Sosliuk on 6/27/2017.
*/
@DaoSqlTest
public class DeviceApiSqlTest extends BaseHttpDeviceApiTest {
}

1
application/src/test/java/org/thingsboard/server/system/sql/RestApiLimitsSqlTest.java

@ -18,7 +18,6 @@ package org.thingsboard.server.system.sql;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.system.BaseRestApiLimitsTest;
@DaoSqlTest
public class RestApiLimitsSqlTest extends BaseRestApiLimitsTest {
}

23
common/util/src/main/java/org/thingsboard/common/util/JacksonUtil.java

@ -83,6 +83,12 @@ public class JacksonUtil {
.addModule(new Jdk8Module())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.build();
public static final ObjectMapper CANONICAL_JSON_MAPPER = JsonMapper.builder()
.addModule(new Jdk8Module())
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true)
.serializationInclusion(com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL)
.build();
public static ObjectMapper getObjectMapperWithJavaTimeModule() {
return JsonMapper.builder()
@ -207,6 +213,23 @@ public class JacksonUtil {
return data;
}
public static String toCanonicalString(Object value) {
try {
if (value == null) {
return null;
}
if (value instanceof JsonNode) {
Object pojo = CANONICAL_JSON_MAPPER.convertValue(value, Object.class);
return CANONICAL_JSON_MAPPER.writeValueAsString(pojo);
}
return CANONICAL_JSON_MAPPER.writeValueAsString(value);
} catch (Exception e) {
throw new IllegalArgumentException("The given Json object value cannot be transformed to a canonical String: " + value, e);
}
}
public static <T> T treeToValue(JsonNode node, Class<T> clazz) {
try {
return OBJECT_MAPPER.treeToValue(node, clazz);

42
rule-engine/rule-engine-components/src/main/java/org/thingsboard/rule/engine/profile/DeviceState.java

@ -17,6 +17,7 @@ package org.thingsboard.rule.engine.profile;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TbContext;
@ -29,9 +30,14 @@ import org.thingsboard.server.common.data.Device;
import org.thingsboard.server.common.data.DeviceProfile;
import org.thingsboard.server.common.data.StringUtils;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.device.profile.AlarmCondition;
import org.thingsboard.server.common.data.device.profile.AlarmConditionFilterKey;
import org.thingsboard.server.common.data.device.profile.AlarmConditionKeyType;
import org.thingsboard.server.common.data.device.profile.AlarmConditionSpec;
import org.thingsboard.server.common.data.device.profile.AlarmConditionSpecType;
import org.thingsboard.server.common.data.device.profile.AlarmRule;
import org.thingsboard.server.common.data.device.profile.DeviceProfileAlarm;
import org.thingsboard.server.common.data.device.profile.DurationAlarmConditionSpec;
import org.thingsboard.server.common.data.exception.ApiUsageLimitsExceededException;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.DeviceProfileId;
@ -39,6 +45,8 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.query.DynamicValue;
import org.thingsboard.server.common.data.query.DynamicValueSourceType;
import org.thingsboard.server.common.data.query.EntityKey;
import org.thingsboard.server.common.data.query.EntityKeyType;
import org.thingsboard.server.common.data.rule.RuleNodeState;
@ -56,6 +64,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.thingsboard.server.common.data.msg.TbMsgType.ACTIVITY_EVENT;
import static org.thingsboard.server.common.data.msg.TbMsgType.ALARM_ACK;
@ -87,6 +96,10 @@ class DeviceState {
this.deviceId = deviceId;
this.deviceProfile = deviceProfile;
if (hasDurationRulesWithDynamicValueFromCurrentDevice(deviceProfile)) {
latestValues = fetchLatestValues(ctx, deviceId);
}
this.dynamicPredicateValueCtx = new DynamicPredicateValueCtxImpl(ctx.getTenantId(), deviceId, ctx);
if (config.isPersistAlarmRulesState()) {
@ -116,7 +129,10 @@ class DeviceState {
public void updateProfile(TbContext ctx, DeviceProfile deviceProfile) throws ExecutionException, InterruptedException {
Set<AlarmConditionFilterKey> oldKeys = Set.copyOf(this.deviceProfile.getEntityKeys());
this.deviceProfile.updateDeviceProfile(deviceProfile);
if (latestValues != null) {
if (latestValues == null && hasDurationRulesWithDynamicValueFromCurrentDevice(this.deviceProfile)) {
latestValues = fetchLatestValues(ctx, deviceId);
} else if (latestValues != null) {
Set<AlarmConditionFilterKey> keysToFetch = new HashSet<>(this.deviceProfile.getEntityKeys());
keysToFetch.removeAll(oldKeys);
if (!keysToFetch.isEmpty()) {
@ -134,10 +150,31 @@ class DeviceState {
}
}
private static boolean hasDurationRulesWithDynamicValueFromCurrentDevice(ProfileState deviceProfile) {
return deviceProfile.getAlarmSettings().stream().anyMatch(DeviceState::isDurationRuleWithDynamicValueFromCurrentDevice);
}
private static boolean isDurationRuleWithDynamicValueFromCurrentDevice(DeviceProfileAlarm alarm) {
return Stream.concat(alarm.getCreateRules().values().stream(), Stream.ofNullable(alarm.getClearRule()))
.map(AlarmRule::getCondition)
.map(AlarmCondition::getSpec)
.anyMatch(spec -> isDurationRule(spec) && hasDynamicDurationValueFromCurrentDevice((DurationAlarmConditionSpec) spec));
}
private static boolean isDurationRule(AlarmConditionSpec spec) {
return spec instanceof DurationAlarmConditionSpec durationSpec && durationSpec.getType() == AlarmConditionSpecType.DURATION;
}
private static boolean hasDynamicDurationValueFromCurrentDevice(DurationAlarmConditionSpec spec) {
DynamicValue<Long> dynamicValue = spec.getPredicate().getDynamicValue();
return dynamicValue != null && dynamicValue.getSourceType() == DynamicValueSourceType.CURRENT_DEVICE;
}
public void harvestAlarms(TbContext ctx, long ts) throws ExecutionException, InterruptedException {
log.debug("[{}] Going to harvest alarms: {}", ctx.getSelfId(), ts);
boolean stateChanged = false;
for (AlarmState state : alarmStates.values()) {
state.setDataSnapshot(latestValues);
stateChanged |= state.process(ctx, ts);
}
if (persistState && stateChanged) {
@ -347,7 +384,8 @@ class DeviceState {
return EntityKeyType.ATTRIBUTE;
}
private DataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) throws ExecutionException, InterruptedException {
@SneakyThrows
private DataSnapshot fetchLatestValues(TbContext ctx, EntityId originator) {
Set<AlarmConditionFilterKey> entityKeysToFetch = deviceProfile.getEntityKeys();
DataSnapshot result = new DataSnapshot(entityKeysToFetch);
addEntityKeysToSnapshot(ctx, originator, entityKeysToFetch, result);

566
rule-engine/rule-engine-components/src/test/java/org/thingsboard/rule/engine/profile/TbDeviceProfileNodeTest.java

File diff suppressed because it is too large
Loading…
Cancel
Save