From 43e84659668d3d6a5c286d871b03b828e81bdfcf Mon Sep 17 00:00:00 2001 From: Dmytro Skarzhynets Date: Mon, 16 Feb 2026 17:01:11 +0200 Subject: [PATCH] feat: add entity keys V2 endpoint with sample values --- .../controller/EntityQueryController.java | 58 ++++- .../query/DefaultEntityQueryService.java | 145 +++++++++++- .../service/query/EntityQueryService.java | 7 + .../server/controller/AbstractWebTest.java | 58 ++++- .../controller/DeviceControllerTest.java | 7 - .../controller/EntityQueryControllerTest.java | 224 +++++++++++++++++- .../EntityRelationControllerTest.java | 7 - .../dao/attributes/AttributesService.java | 11 +- .../server/dao/entity/EntityService.java | 3 + .../dao/timeseries/TimeseriesService.java | 7 +- .../data/query/AvailableEntityKeysV2.java | 99 ++++++++ .../server/dao/attributes/AttributesDao.java | 6 + .../dao/attributes/BaseAttributesService.java | 20 +- .../attributes/CachedAttributesService.java | 18 +- .../server/dao/entity/BaseEntityService.java | 86 +++++-- .../model/sqlts/latest/TsKvLatestEntity.java | 6 + .../sql/attributes/AttributeKvRepository.java | 57 +++++ .../dao/sql/attributes/JpaAttributeDao.java | 24 ++ .../CachedRedisSqlTimeseriesLatestDao.java | 10 + .../dao/sqlts/SqlTimeseriesLatestDao.java | 17 ++ .../latest/SearchTsKvLatestRepository.java | 20 ++ .../dao/timeseries/BaseTimeseriesService.java | 13 +- .../CassandraBaseTimeseriesLatestDao.java | 10 + .../dao/timeseries/TimeseriesLatestDao.java | 9 + .../attributes/BaseAttributesServiceTest.java | 103 +++++++- .../dao/sqlts/SqlTimeseriesLatestDaoTest.java | 68 ++++++ .../thingsboard/rest/client/RestClient.java | 22 +- 27 files changed, 1038 insertions(+), 77 deletions(-) create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/query/AvailableEntityKeysV2.java diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java b/application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java index bb025a5fcf..cde3129d53 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.query.AlarmCountQuery; import org.thingsboard.server.common.data.query.AlarmData; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.AvailableEntityKeys; +import org.thingsboard.server.common.data.query.AvailableEntityKeysV2; import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityDataPageLink; @@ -47,6 +48,8 @@ import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.query.EntityQueryService; import org.thingsboard.server.service.security.permission.Operation; +import java.util.Set; + import static org.thingsboard.server.controller.ControllerConstants.ALARM_DATA_QUERY_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.ENTITY_COUNT_QUERY_DESCRIPTION; import static org.thingsboard.server.controller.ControllerConstants.ENTITY_DATA_QUERY_DESCRIPTION; @@ -115,9 +118,11 @@ public class EntityQueryController extends BaseController { return entityQueryService.countAlarmsByQuery(getCurrentUser(), query); } + @Deprecated(forRemoval = true) @ApiOperation( - value = "Find Available Entity Keys by Query", + value = "Find Available Entity Keys by Query (deprecated)", notes = """ + **Deprecated.** Use the V2 endpoint (`POST /api/v2/entitiesQuery/find/keys`) instead.\n Returns unique time series and/or attribute key names from entities matching the query.\n Executes the Entity Data Query to find up to 100 entities, then fetches and aggregates all distinct key names.\n Primarily used for UI features like autocomplete suggestions.""" + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH @@ -128,9 +133,6 @@ public class EntityQueryController extends BaseController { @Parameter(description = "Entity data query to find entities. Page size is capped at 100.") @RequestBody EntityDataQuery query, - // fixme: combination of timeseries = false and attributes = false is allowed, but always results in empty response, therefore does not make any sense - // such combinations should NOT be allowed, but changing this will break clients - @Parameter(description = """ When true, includes unique time series key names in the response. When false, the 'timeseries' list will be empty.""") @@ -155,6 +157,54 @@ public class EntityQueryController extends BaseController { return wrapFuture(entityQueryService.getKeysByQuery(getCurrentUser(), getTenantId(), query, includeTimeseries, includeAttributes, scope)); } + @ApiOperation( + value = "Find Available Entity Keys By Query", + notes = """ + Discovers unique time series and/or attribute key names available on entities that match the given query. + Works in two steps: first, the request body (an Entity Data Query) is executed to find matching entities + (page size is capped at 100); then, all distinct key names are collected from those entities.\n + Optionally, each key can include a sample — the most recent value (by timestamp) for that key + across all matched entities.""" + + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH + ) + @PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')") + @PostMapping("/v2/entitiesQuery/find/keys") + public DeferredResult findAvailableEntityKeysByQueryV2( + @Parameter(description = "Entity data query to find entities. Page size is capped at 100.") + @RequestBody EntityDataQuery query, + + @Parameter(description = """ + When true, includes unique time series keys in the response. + When false, the 'timeseries' field is omitted. At least one of 'includeTimeseries' or 'includeAttributes' must be true.""") + @RequestParam(defaultValue = "true") boolean includeTimeseries, + + @Parameter(description = """ + When true, includes unique attribute keys in the response. + When false, the 'attributes' field is omitted. At least one of 'includeTimeseries' or 'includeAttributes' must be true.""") + @RequestParam(defaultValue = "true") boolean includeAttributes, + + @Parameter(description = """ + Filters attribute keys by scope. Only applies when 'includeAttributes' is true. + When not specified, scopes are auto-determined: all three scopes (server, client, shared) for device entities, + server scope only for other entity types.""", + schema = @Schema(allowableValues = {"SERVER_SCOPE", "SHARED_SCOPE", "CLIENT_SCOPE"})) + @RequestParam(required = false) Set scopes, + + @Parameter(description = """ + When true, each key entry includes a 'sample' object with the most recent value and timestamp. + When false, only key names are returned (sample is omitted from JSON).""") + @RequestParam(defaultValue = "false") boolean includeSamples + ) throws ThingsboardException { + resolveQuery(query); + EntityDataPageLink pageLink = query.getPageLink(); + if (pageLink.getPageSize() > MAX_PAGE_SIZE) { + pageLink.setPageSize(MAX_PAGE_SIZE); + } + return wrapFuture(entityQueryService.findAvailableEntityKeysByQuery( + getCurrentUser(), query, + includeTimeseries, includeAttributes, scopes, includeSamples)); + } + @PreAuthorize("hasAnyAuthority('SYS_ADMIN')") @PostMapping("/edqs/system/request") public void processSystemEdqsRequest(@RequestBody ToCoreEdqsRequest request) { diff --git a/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java b/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java index 4d76989a92..50f0f0e75a 100644 --- a/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java +++ b/application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java @@ -15,13 +15,15 @@ */ package org.thingsboard.server.service.query; +import com.fasterxml.jackson.databind.JsonNode; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; -import org.springframework.util.CollectionUtils; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.KvUtil; import org.thingsboard.server.common.data.AttributeScope; import org.thingsboard.server.common.data.EntityType; @@ -30,11 +32,17 @@ import org.thingsboard.server.common.data.exception.ThingsboardException; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.DataType; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.AlarmCountQuery; import org.thingsboard.server.common.data.query.AlarmData; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.AvailableEntityKeys; +import org.thingsboard.server.common.data.query.AvailableEntityKeysV2; +import org.thingsboard.server.common.data.query.AvailableEntityKeysV2.KeyInfo; +import org.thingsboard.server.common.data.query.AvailableEntityKeysV2.KeySample; import org.thingsboard.server.common.data.query.ComplexFilterPredicate; import org.thingsboard.server.common.data.query.DynamicValue; import org.thingsboard.server.common.data.query.EntityCountQuery; @@ -59,11 +67,13 @@ import org.thingsboard.server.service.security.model.SecurityUser; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.TreeMap; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -253,7 +263,7 @@ public class DefaultEntityQueryService implements EntityQueryService { if (isAttributes) { Map> typesMap = ids.stream().collect(Collectors.groupingBy(EntityId::getEntityType)); List>> futures = new ArrayList<>(typesMap.size()); - typesMap.forEach((type, entityIds) -> futures.add(dbCallbackExecutor.submit(() -> attributesService.findAllKeysByEntityIds(tenantId, entityIds, scope)))); + typesMap.forEach((type, entityIds) -> futures.add(dbCallbackExecutor.submit(() -> attributesService.findAllKeysByEntityIdsAndScope(tenantId, entityIds, scope)))); attributesKeysFuture = Futures.transform(Futures.allAsList(futures), lists -> { if (CollectionUtils.isEmpty(lists)) { return Collections.emptyList(); @@ -274,4 +284,135 @@ public class DefaultEntityQueryService implements EntityQueryService { }, dbCallbackExecutor); } + @Override + public ListenableFuture findAvailableEntityKeysByQuery(SecurityUser securityUser, EntityDataQuery query, + boolean includeTimeseries, boolean includeAttributes, + Set scopes, boolean includeSamples) { + if (!includeTimeseries && !includeAttributes) { + return Futures.immediateFailedFuture( + new IllegalArgumentException("At least one of 'includeTimeseries' or 'includeAttributes' must be true")); + } + + return Futures.transformAsync(findEntityIdsByQueryAsync(securityUser, query), ids -> { + if (ids.isEmpty()) { + return immediateFuture(new AvailableEntityKeysV2( + Collections.emptySet(), + includeTimeseries ? Collections.emptyList() : null, + includeAttributes ? Collections.emptyMap() : null)); + } + + TenantId tenantId = securityUser.getTenantId(); + Set entityTypes = ids.stream().map(EntityId::getEntityType).collect(Collectors.toSet()); + + var tsFuture = includeTimeseries ? fetchTimeseriesKeys(tenantId, ids, includeSamples) : null; + + Set effectiveScopes = includeAttributes + ? resolveAttributeScopes(scopes, entityTypes) : Collections.emptySet(); + var attrFutures = effectiveScopes.stream() + .map(scope -> fetchAttributeKeys(tenantId, ids, scope, includeSamples)) + .toList(); + + return assembleResult(entityTypes, tsFuture, attrFutures); + }, dbCallbackExecutor); + } + + private ListenableFuture> findEntityIdsByQueryAsync(SecurityUser securityUser, EntityDataQuery query) { + return Futures.transform(entityService.findEntityDataByQueryAsync(securityUser.getTenantId(), securityUser.getCustomerId(), query), + page -> page.getData().stream() + .map(EntityData::getEntityId) + .toList(), + dbCallbackExecutor); + } + + private static Set resolveAttributeScopes(Set requestedScopes, Set entityTypes) { + boolean hasDevices = entityTypes.contains(EntityType.DEVICE); + Set scopes; + if (CollectionUtils.isNotEmpty(requestedScopes)) { + scopes = requestedScopes; + } else { // auto-determine scopes + scopes = hasDevices + ? Set.of(AttributeScope.SERVER_SCOPE, AttributeScope.CLIENT_SCOPE, AttributeScope.SHARED_SCOPE) + : Collections.singleton(AttributeScope.SERVER_SCOPE); + } + // Non-device entities only support SERVER_SCOPE + if (!hasDevices) { + return scopes.contains(AttributeScope.SERVER_SCOPE) + ? Collections.singleton(AttributeScope.SERVER_SCOPE) + : Collections.emptySet(); + } + return scopes; + } + + private ListenableFuture> fetchTimeseriesKeys(TenantId tenantId, List entityIds, boolean includeSamples) { + if (includeSamples) { + return Futures.transform( + timeseriesService.findLatestByEntityIdsAsync(tenantId, entityIds), + entries -> toKeyInfos(entries, true), + dbCallbackExecutor); + } + return Futures.transform( + timeseriesService.findAllKeysByEntityIdsAsync(tenantId, entityIds), + keys -> keys.stream().sorted().map(k -> new KeyInfo(k, null)).toList(), + dbCallbackExecutor); + } + + private ListenableFuture>> fetchAttributeKeys( + TenantId tenantId, List entityIds, AttributeScope scope, boolean includeSamples) { + if (includeSamples) { + return Futures.transform( + attributesService.findLatestByEntityIdsAndScopeAsync(tenantId, entityIds, scope), + entries -> Map.entry(scope, toKeyInfos(entries, true)), + dbCallbackExecutor); + } + return Futures.transform( + attributesService.findAllKeysByEntityIdsAndScopeAsync(tenantId, entityIds, scope), + keys -> Map.entry(scope, keys.stream().sorted().map(k -> new KeyInfo(k, null)).toList()), + dbCallbackExecutor); + } + + private ListenableFuture assembleResult( + Set entityTypes, + ListenableFuture> tsFuture, + List>>> attrFutures) { + var allAttrFuture = attrFutures.isEmpty() + ? immediateFuture(List.>>of()) + : Futures.allAsList(attrFutures); + + List> allFutures = new ArrayList<>(); + if (tsFuture != null) { + allFutures.add(tsFuture); + } + allFutures.add(allAttrFuture); + + var finalTsFuture = tsFuture; + return Futures.whenAllComplete(allFutures) + .call(() -> { + List tsKeys = finalTsFuture != null ? Futures.getDone(finalTsFuture) : null; + Map> attrMap = attrFutures.isEmpty() ? null : new TreeMap<>(); + if (attrMap != null) { + for (var entry : Futures.getDone(allAttrFuture)) { + attrMap.put(entry.getKey(), entry.getValue()); + } + } + return new AvailableEntityKeysV2(entityTypes, tsKeys, attrMap); + }, dbCallbackExecutor); + } + + private static List toKeyInfos(List entries, boolean includeSamples) { + return entries.stream() + .map(e -> new KeyInfo(e.getKey(), includeSamples ? toKeySample(e) : null)) + .sorted(Comparator.comparing(KeyInfo::key)) + .toList(); + } + + private static KeySample toKeySample(KvEntry entry) { + long ts = entry instanceof TsKvEntry tsKv ? tsKv.getTs() + : entry instanceof AttributeKvEntry attr ? attr.getLastUpdateTs() + : 0; + JsonNode value = entry.getDataType() == DataType.JSON + ? JacksonUtil.toJsonNode(entry.getJsonValue().get()) + : JacksonUtil.valueToTree(entry.getValue()); + return new KeySample(ts, value); + } + } diff --git a/application/src/main/java/org/thingsboard/server/service/query/EntityQueryService.java b/application/src/main/java/org/thingsboard/server/service/query/EntityQueryService.java index a90cf684a1..354f8e9278 100644 --- a/application/src/main/java/org/thingsboard/server/service/query/EntityQueryService.java +++ b/application/src/main/java/org/thingsboard/server/service/query/EntityQueryService.java @@ -23,11 +23,14 @@ import org.thingsboard.server.common.data.query.AlarmCountQuery; import org.thingsboard.server.common.data.query.AlarmData; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.AvailableEntityKeys; +import org.thingsboard.server.common.data.query.AvailableEntityKeysV2; import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.service.security.model.SecurityUser; +import java.util.Set; + public interface EntityQueryService { long countEntitiesByQuery(SecurityUser securityUser, EntityCountQuery query); @@ -41,4 +44,8 @@ public interface EntityQueryService { ListenableFuture getKeysByQuery(SecurityUser securityUser, TenantId tenantId, EntityDataQuery query, boolean isTimeseries, boolean isAttributes, AttributeScope scope); + ListenableFuture findAvailableEntityKeysByQuery(SecurityUser securityUser, EntityDataQuery query, + boolean includeTimeseries, boolean includeAttributes, + Set scopes, boolean includeSamples); + } diff --git a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java index eb053cff76..e8fcd6a723 100644 --- a/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java @@ -123,6 +123,8 @@ import org.thingsboard.server.common.data.id.UUIDBased; import org.thingsboard.server.common.data.id.UserId; import org.thingsboard.server.common.data.job.Job; import org.thingsboard.server.common.data.job.JobType; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.notification.Notification; import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod; import org.thingsboard.server.common.data.notification.NotificationType; @@ -177,6 +179,7 @@ import java.nio.charset.StandardCharsets; import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -718,6 +721,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { return assetProfile; } + protected Device createDevice(String name) throws Exception { + return createDevice(name, "default", null, null); + } + protected Device createDevice(String name, String accessToken) throws Exception { return createDevice(name, "default", null, accessToken); } @@ -731,7 +738,11 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration()); deviceData.setConfiguration(new DefaultDeviceConfiguration()); device.setDeviceData(deviceData); - return doPost("/api/device?accessToken=" + accessToken, device, Device.class); + if (accessToken != null) { + return doPost("/api/device?accessToken=" + accessToken, device, Device.class); + } else { + return doPost("/api/device", device, Device.class); + } } protected Device assignDeviceToCustomer(DeviceId deviceId, CustomerId customerId) { @@ -1219,7 +1230,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { Awaitility.await("CF state for entity actor ready to refresh dynamic arguments").atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> { CalculatedFieldState calculatedFieldState = statesMap.get(cfId); boolean isReady = calculatedFieldState != null && ((GeofencingCalculatedFieldState) calculatedFieldState).getLastScheduledRefreshTs() < - System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(scheduledUpdateInterval); + System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(scheduledUpdateInterval); log.warn("entityId {}, cfId {}, state ready to refresh == {}", entityId, cfId, isReady); return isReady; }); @@ -1411,7 +1422,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { protected List findJobs(List types, List entities) throws Exception { return doGetTypedWithPageLink("/api/jobs?types=" + types.stream().map(Enum::name).collect(Collectors.joining(",")) + - "&entities=" + entities.stream().map(UUID::toString).collect(Collectors.joining(",")) + "&", + "&entities=" + entities.stream().map(UUID::toString).collect(Collectors.joining(",")) + "&", new TypeReference>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData(); } @@ -1425,12 +1436,37 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { protected void postTelemetry(EntityId entityId, String payload) throws Exception { doPostAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + - "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(payload), 30_000L).andExpect(status().isOk()); + "/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(payload), 30_000L).andExpect(status().isOk()); + } + + protected void postTelemetry(EntityId entityId, TsKvEntry entry) throws Exception { + var values = JacksonUtil.newObjectNode(); + JacksonUtil.addKvEntry(values, entry); + + var payload = JacksonUtil.newObjectNode() + .put("ts", entry.getTs()) + .set("values", values); + + var url = "/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/timeseries/any"; + doPostAsync(url, payload, 30_000L).andExpect(status().isOk()); } protected void postAttributes(EntityId entityId, AttributeScope scope, String payload) throws Exception { doPostAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + - "/attributes/" + scope, JacksonUtil.toJsonNode(payload), 30_000L).andExpect(status().isOk()); + "/attributes/" + scope, JacksonUtil.toJsonNode(payload), 30_000L).andExpect(status().isOk()); + } + + protected void postAttributes(EntityId entityId, AttributeScope scope, KvEntry... attributes) throws Exception { + postAttributes(entityId, scope, Arrays.asList(attributes)); + } + + protected void postAttributes(EntityId entityId, AttributeScope scope, Collection attributes) throws Exception { + var url = "/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/attributes/" + scope; + var payload = JacksonUtil.newObjectNode(); + for (KvEntry entry : attributes) { + JacksonUtil.addKvEntry(payload, entry); + } + doPostAsync(url, payload, 30_000L).andExpect(status().isOk()); } protected CalculatedField saveCalculatedField(CalculatedField calculatedField) { @@ -1439,7 +1475,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { protected PageData getEntityCalculatedFields(EntityId entityId, CalculatedFieldType type, PageLink pageLink) throws Exception { return doGetTypedWithPageLink("/api/" + entityId.getEntityType() + "/" + entityId.getId() + "/calculatedFields" + - (type != null ? "?type=" + type.name() + "&" : "?"), new TypeReference<>() {}, pageLink); + (type != null ? "?type=" + type.name() + "&" : "?"), new TypeReference<>() {}, pageLink); } protected PageData getCalculatedFieldNames(CalculatedFieldType type, PageLink pageLink) throws Exception { @@ -1452,11 +1488,11 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest { List entities, List names) throws Exception { return doGetTypedWithPageLink("/api/calculatedFields?" + - (type != null ? "types=" + type + "&" : "") + - (entityType != null ? "entityType=" + entityType + "&" : "") + - (entities != null ? "entities=" + String.join(",", - entities.stream().map(UUID::toString).toList()) + "&" : "") + - (names != null ? names.stream().map(name -> "name=" + name + "&").collect(Collectors.joining("")) : ""), + (type != null ? "types=" + type + "&" : "") + + (entityType != null ? "entityType=" + entityType + "&" : "") + + (entities != null ? "entities=" + String.join(",", + entities.stream().map(UUID::toString).toList()) + "&" : "") + + (names != null ? names.stream().map(name -> "name=" + name + "&").collect(Collectors.joining("")) : ""), new TypeReference>() {}, new PageLink(10)).getData(); } diff --git a/application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java index 7332ee26c6..07b68210bb 100644 --- a/application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java @@ -1732,11 +1732,4 @@ public class DeviceControllerTest extends AbstractControllerTest { assertThat(fifthDevice.getName()).isEqualTo("My unique device_2"); } - private Device createDevice(String name) { - Device device = new Device(); - device.setName(name); - device.setType("default"); - return doPost("/api/device", device, Device.class); - } - } diff --git a/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java index e1173ef9a7..1c8060989b 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java @@ -17,6 +17,9 @@ package org.thingsboard.server.controller; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.BooleanNode; +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.IntNode; import com.fasterxml.jackson.databind.node.ObjectNode; import org.junit.After; import org.junit.Assert; @@ -43,12 +46,21 @@ import org.thingsboard.server.common.data.alarm.AlarmSeverity; import org.thingsboard.server.common.data.asset.Asset; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.AlarmCountQuery; import org.thingsboard.server.common.data.query.AlarmData; import org.thingsboard.server.common.data.query.AlarmDataPageLink; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.AliasEntityId; +import org.thingsboard.server.common.data.query.AvailableEntityKeysV2; +import org.thingsboard.server.common.data.query.AvailableEntityKeysV2.KeyInfo; import org.thingsboard.server.common.data.query.DeviceTypeFilter; import org.thingsboard.server.common.data.query.DynamicValue; import org.thingsboard.server.common.data.query.DynamicValueSourceType; @@ -84,6 +96,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.stream.Collectors; @@ -1329,7 +1342,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest { //assign dashboard doPost("/api/customer/" + savedCustomer.getId().getId().toString() - + "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class); + + "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class); // check entity data query by customer User customerUser = new User(); @@ -1494,4 +1507,213 @@ public class EntityQueryControllerTest extends AbstractControllerTest { return nameFilter; } + // --- findAvailableEntityKeysV2 tests --- + + @Test + public void testFindAvailableKeysByQueryV2() throws Exception { + // GIVEN — two devices matched by query; a third device should not be matched + var device1 = createDevice("Test device 1"); + var device2 = createDevice("Test device 2"); + var unmatchedDevice = createDevice("Unmatched device"); + + // unmatched device has unique keys that must NOT appear in the result + postTelemetry(unmatchedDevice.getId(), new BasicTsKvEntry(9000, new DoubleDataEntry("unmatchedTs", 999.0))); + postAttributes(unmatchedDevice.getId(), AttributeScope.SHARED_SCOPE, new StringDataEntry("unmatchedAttr", "nope")); + + // device1: timeseries1 (Double) with two data points, and timeseries2 older data point + postTelemetry(device1.getId(), new BasicTsKvEntry(1000, new DoubleDataEntry("timeseries1", 10.0))); + postTelemetry(device1.getId(), new BasicTsKvEntry(2000, new DoubleDataEntry("timeseries1", 20.5))); + postTelemetry(device1.getId(), new BasicTsKvEntry(1000, new LongDataEntry("timeseries2", 100L))); + + // device2: timeseries2 (Long) with a newer data point, and timeseries3 only on this device + postTelemetry(device2.getId(), new BasicTsKvEntry(3000, new LongDataEntry("timeseries2", 300L))); + postTelemetry(device2.getId(), new BasicTsKvEntry(5000, new DoubleDataEntry("timeseries3", 99.9))); + + // device1: SHARED_SCOPE attributes + postAttributes(device1.getId(), AttributeScope.SHARED_SCOPE, + new BooleanDataEntry("sharedAttribute1", true), new DoubleDataEntry("sharedAttribute2", 3.14)); + + // device2: CLIENT_SCOPE attributes (saved via service to bypass API restriction) + attributesService.save(tenantId, device2.getId(), AttributeScope.CLIENT_SCOPE, List.of( + new BaseAttributeKvEntry(new JsonDataEntry("clientAttribute1", "{\"key\":\"val\"}"), System.currentTimeMillis()), + new BaseAttributeKvEntry(new BooleanDataEntry("clientAttribute2", false), System.currentTimeMillis()) + )).get(); + + // device1 also has SERVER_SCOPE attributes (should be omitted by scope filter) + postAttributes(device1.getId(), AttributeScope.SERVER_SCOPE, + new StringDataEntry("serverAttribute1", "sv1"), new LongDataEntry("serverAttribute2", 42L)); + + // WHEN — query matches both devices; request timeseries + only SHARED and CLIENT attribute scopes + DeviceTypeFilter filter = new DeviceTypeFilter(); + filter.setDeviceTypes(List.of("default")); + filter.setDeviceNameFilter("Test device"); + EntityDataPageLink pageLink = new EntityDataPageLink(100, 0, null, null); + EntityDataQuery query = new EntityDataQuery(filter, pageLink, List.of(), null, null); + + AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2(query, + true, true, List.of(AttributeScope.SHARED_SCOPE, AttributeScope.CLIENT_SCOPE), true); + + // THEN + assertThat(result.entityTypes()).containsExactly(EntityType.DEVICE); + + // timeseries: keys collected from both devices, samples contain the freshest data points + assertThat(result.timeseries()).extracting(KeyInfo::key) + .containsExactly("timeseries1", "timeseries2", "timeseries3"); + assertThat(result.timeseries()).allSatisfy(ki -> assertThat(ki.sample()).isNotNull()); + assertKeySample(result.timeseries(), "timeseries1", new DoubleNode(20.5), 2000); // from device1 + assertKeySample(result.timeseries(), "timeseries2", new IntNode(300), 3000); // from device2 (newer) + assertKeySample(result.timeseries(), "timeseries3", new DoubleNode(99.9), 5000); // only on device2 + + // SERVER_SCOPE must be fully omitted from the response + assertThat(result.attributes()).containsOnlyKeys(AttributeScope.SHARED_SCOPE, AttributeScope.CLIENT_SCOPE); + + // SHARED_SCOPE: from device1 (alphabetical order) + assertThat(result.attributes().get(AttributeScope.SHARED_SCOPE)) + .extracting(KeyInfo::key).containsExactly("sharedAttribute1", "sharedAttribute2"); + assertKeySample(result.attributes().get(AttributeScope.SHARED_SCOPE), "sharedAttribute1", BooleanNode.TRUE); + assertKeySample(result.attributes().get(AttributeScope.SHARED_SCOPE), "sharedAttribute2", new DoubleNode(3.14)); + + // CLIENT_SCOPE: from device2 (alphabetical order) + assertThat(result.attributes().get(AttributeScope.CLIENT_SCOPE)) + .extracting(KeyInfo::key).containsExactly("clientAttribute1", "clientAttribute2"); + assertKeySample(result.attributes().get(AttributeScope.CLIENT_SCOPE), "clientAttribute1", JacksonUtil.toJsonNode("{\"key\":\"val\"}")); + assertKeySample(result.attributes().get(AttributeScope.CLIENT_SCOPE), "clientAttribute2", BooleanNode.FALSE); + } + + @Test + public void testFindAvailableKeysByQueryV2_withoutSamples() throws Exception { + // GIVEN + var device = createDevice("Test device"); + postTelemetry(device.getId(), new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", 10.0))); + postAttributes(device.getId(), AttributeScope.SERVER_SCOPE, new StringDataEntry("firmware", "v1.0")); + + // WHEN + AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2( + buildDeviceQuery("Test device"), true, true, null, false); + + // THEN + assertThat(result.timeseries()).allSatisfy(ki -> assertThat(ki.sample()).isNull()); + assertThat(result.attributes().get(AttributeScope.SERVER_SCOPE)) + .allSatisfy(ki -> assertThat(ki.sample()).isNull()); + } + + @Test + public void testFindAvailableKeysByQueryV2_timeseriesOnly() throws Exception { + // GIVEN + var device = createDevice("Test device"); + postTelemetry(device.getId(), new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", 10.0))); + postAttributes(device.getId(), AttributeScope.SERVER_SCOPE, new StringDataEntry("firmware", "v1.0")); + + // WHEN + AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2( + buildDeviceQuery("Test device"), true, false, null, false); + + // THEN + assertThat(result.timeseries()).extracting(KeyInfo::key).contains("temperature"); + assertThat(result.attributes()).isNull(); + } + + @Test + public void testFindAvailableKeysByQueryV2_attributesOnly() throws Exception { + // GIVEN + var device = createDevice("Test device"); + postTelemetry(device.getId(), new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", 10.0))); + postAttributes(device.getId(), AttributeScope.SERVER_SCOPE, new StringDataEntry("firmware", "v1.0")); + + // WHEN + AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2( + buildDeviceQuery("Test device"), false, true, null, false); + + // THEN + assertThat(result.timeseries()).isNull(); + assertThat(result.attributes().get(AttributeScope.SERVER_SCOPE)) + .extracting(KeyInfo::key).contains("firmware"); + } + + @Test + public void testFindAvailableKeysByQueryV2_noMatchingEntities() throws Exception { + // WHEN + AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2( + buildDeviceQuery("NonExistentDevice_" + UUID.randomUUID()), true, true, null, true); + + // THEN + assertThat(result.entityTypes()).isEmpty(); + assertThat(result.timeseries()).isEmpty(); + assertThat(result.attributes()).isEmpty(); + } + + @Test + public void testFindAvailableKeysByQueryV2_assetUsesServerScopeOnly() throws Exception { + // GIVEN + var asset = new Asset(); + asset.setName("Test asset"); + asset.setType("default"); + asset = doPost("/api/asset", asset, Asset.class); + postAttributes(asset.getId(), AttributeScope.SERVER_SCOPE, new StringDataEntry("location", "warehouse")); + + // WHEN + var filter = new SingleEntityFilter(); + filter.setSingleEntity(AliasEntityId.fromEntityId(asset.getId())); + var query = new EntityDataQuery(filter, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), null, null); + + AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2(query, false, true, null, false); + + // THEN + assertThat(result.entityTypes()).containsExactly(EntityType.ASSET); + assertThat(result.attributes()).containsOnlyKeys(AttributeScope.SERVER_SCOPE); + assertThat(result.attributes().get(AttributeScope.SERVER_SCOPE)) + .extracting(KeyInfo::key).containsExactly("location"); + } + + @Test + public void testFindAvailableKeysByQueryV2_rejectsWhenNoKeyTypeRequested() throws Exception { + // WHEN / THEN + EntityDataQuery query = buildDeviceQuery("NonExistent"); + + doPostAsync("/api/v2/entitiesQuery/find/keys?includeTimeseries=false&includeAttributes=false", + query, 30_000L).andExpect(status().isBadRequest()); + } + + private AvailableEntityKeysV2 findAvailableEntityKeysByQueryV2(EntityDataQuery query, + boolean includeTimeseries, boolean includeAttributes, + List scopes, boolean includeSamples) throws Exception { + StringBuilder url = new StringBuilder("/api/v2/entitiesQuery/find/keys?") + .append("includeTimeseries=").append(includeTimeseries) + .append("&includeAttributes=").append(includeAttributes) + .append("&includeSamples=").append(includeSamples); + if (scopes != null) { + for (AttributeScope scope : scopes) { + url.append("&scopes=").append(scope); + } + } + return doPostAsyncWithTypedResponse(url.toString(), query, + new TypeReference<>() {}, status().isOk()); + } + + private static void assertKeySample(List keys, String expectedKey, JsonNode expectedValue, long expectedTs) { + KeyInfo keyInfo = findKeyInfo(keys, expectedKey); + assertThat(keyInfo.sample()).isNotNull(); + assertThat(keyInfo.sample().value()).isEqualTo(expectedValue); + assertThat(keyInfo.sample().ts()).isEqualTo(expectedTs); + } + + private static void assertKeySample(List keys, String expectedKey, JsonNode expectedValue) { + KeyInfo keyInfo = findKeyInfo(keys, expectedKey); + assertThat(keyInfo.sample()).isNotNull(); + assertThat(keyInfo.sample().value()).isEqualTo(expectedValue); + assertThat(keyInfo.sample().ts()).isGreaterThan(0); + } + + private static KeyInfo findKeyInfo(List keys, String key) { + return keys.stream() + .filter(ki -> ki.key().equals(key)).findFirst().orElseThrow(); + } + + private static EntityDataQuery buildDeviceQuery(String deviceName) { + var filter = new DeviceTypeFilter(); + filter.setDeviceTypes(Collections.singletonList("default")); + filter.setDeviceNameFilter(deviceName); + return new EntityDataQuery(filter, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), null, null); + } + } diff --git a/application/src/test/java/org/thingsboard/server/controller/EntityRelationControllerTest.java b/application/src/test/java/org/thingsboard/server/controller/EntityRelationControllerTest.java index b0c76a16a8..b4ce7179d2 100644 --- a/application/src/test/java/org/thingsboard/server/controller/EntityRelationControllerTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/EntityRelationControllerTest.java @@ -633,13 +633,6 @@ public class EntityRelationControllerTest extends AbstractControllerTest { deleteDifferentTenant(); } - private Device createDevice(String name) { - var device = new Device(); - device.setName(name); - device.setType("default"); - return doPost("/api/device", device, Device.class); - } - private ResultActions getRelation(EntityRelation relation) throws Exception { return doGet("/api/relation?" + "fromId=" + relation.getFrom().getId() + diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java index 3166c64835..c5e359e2d5 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java @@ -27,9 +27,6 @@ import java.util.Collection; import java.util.List; import java.util.Optional; -/** - * @author Andrew Shvayka - */ public interface AttributesService { ListenableFuture> find(TenantId tenantId, EntityId entityId, AttributeScope scope, String attributeKey); @@ -48,7 +45,13 @@ public interface AttributesService { List findAllKeysByEntityIds(TenantId tenantId, List entityIds); - List findAllKeysByEntityIds(TenantId tenantId, List entityIds, AttributeScope scope); + List findAllKeysByEntityIdsAndScope(TenantId tenantId, List entityIds, AttributeScope scope); + + ListenableFuture> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope); + + List findLatestByEntityIdsAndScope(TenantId tenantId, List entityIds, AttributeScope scope); + + ListenableFuture> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope); int removeAllByEntityId(TenantId tenantId, EntityId entityId); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/entity/EntityService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/entity/EntityService.java index 3a67c31b57..fdd722a266 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/entity/EntityService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/entity/EntityService.java @@ -16,6 +16,7 @@ package org.thingsboard.server.dao.entity; import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.ListenableFuture; import org.thingsboard.server.common.data.EntityInfo; import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; @@ -51,4 +52,6 @@ public interface EntityService { PageData findEntityDataByQuery(TenantId tenantId, CustomerId customerId, EntityDataQuery query); + ListenableFuture> findEntityDataByQueryAsync(TenantId tenantId, CustomerId customerId, EntityDataQuery query); + } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 5bf2c63176..6d5727474a 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -30,9 +30,6 @@ import java.util.Collection; import java.util.List; import java.util.Optional; -/** - * @author Andrew Shvayka - */ public interface TimeseriesService { ListenableFuture> findAllByQueries(TenantId tenantId, EntityId entityId, List queries); @@ -65,6 +62,10 @@ public interface TimeseriesService { ListenableFuture> findAllKeysByEntityIdsAsync(TenantId tenantId, List entityIds); + List findLatestByEntityIds(TenantId tenantId, List entityIds); + + ListenableFuture> findLatestByEntityIdsAsync(TenantId tenantId, List entityIds); + void cleanup(long systemTtl); } diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/query/AvailableEntityKeysV2.java b/common/data/src/main/java/org/thingsboard/server/common/data/query/AvailableEntityKeysV2.java new file mode 100644 index 0000000000..536b70e4ff --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/query/AvailableEntityKeysV2.java @@ -0,0 +1,99 @@ +/** + * Copyright © 2016-2026 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.query; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.JsonNode; +import io.swagger.v3.oas.annotations.media.ArraySchema; +import io.swagger.v3.oas.annotations.media.Schema; +import org.jspecify.annotations.Nullable; +import org.thingsboard.server.common.data.AttributeScope; +import org.thingsboard.server.common.data.EntityType; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +@Schema( + description = """ + Contains unique time series and attribute key names discovered from entities matching a query, + optionally including a sample value for each key.""" +) +@JsonInclude(JsonInclude.Include.NON_NULL) +public record AvailableEntityKeysV2( + @Schema( + description = "Set of entity types found among the matched entities.", + example = "[\"DEVICE\", \"ASSET\"]", + requiredMode = Schema.RequiredMode.REQUIRED + ) + Set entityTypes, + + @ArraySchema( + arraySchema = @Schema( + description = """ + List of unique time series keys available on the matched entities, sorted alphabetically. + Omitted when timeseries keys were not requested.""", + nullable = true + ), + schema = @Schema(implementation = KeyInfo.class) + ) + @Nullable List timeseries, + + @Schema( + description = """ + Map of attribute scope to the list of unique attribute keys available on the matched entities. + Only scopes supported by the matched entity types are included. + Omitted when attribute keys were not requested or when none of the requested scopes apply to the matched entity types.""", + nullable = true + ) + @Nullable Map> attributes +) { + + @Schema(description = "Key name with an optional sample value.") + @JsonInclude(JsonInclude.Include.NON_NULL) + public record KeyInfo( + @Schema( + description = "Key name.", + example = "temperature", + requiredMode = Schema.RequiredMode.REQUIRED + ) + String key, + + @Schema( + description = "Most recent sample value for this key across the matched entities. Omitted when samples were not requested.", + nullable = true + ) + @Nullable KeySample sample + ) {} + + @Schema(description = "Most recent value and its timestamp.") + public record KeySample( + @Schema( + description = "Timestamp in milliseconds since epoch.", example = "1707000000000", + requiredMode = Schema.RequiredMode.REQUIRED + ) + long ts, + + @Schema( + description = "Sample value.", + example = "23.5", + requiredMode = Schema.RequiredMode.REQUIRED, + implementation = Object.class + ) + JsonNode value + ) {} + +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java index 2241b20e14..7c563debd6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java @@ -55,6 +55,12 @@ public interface AttributesDao { List findAllKeysByEntityIdsAndScope(TenantId tenantId, List entityIds, AttributeScope scope); + ListenableFuture> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope); + + List findLatestByEntityIdsAndScope(TenantId tenantId, List entityIds, AttributeScope scope); + + ListenableFuture> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope); + List> removeAllByEntityId(TenantId tenantId, EntityId entityId); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java index 117c47b61c..ca4c937d4d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java @@ -45,9 +45,6 @@ import java.util.Optional; import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; -/** - * @author Andrew Shvayka - */ @Service @ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "false", matchIfMissing = true) @Primary @@ -92,7 +89,7 @@ public class BaseAttributesService implements AttributesService { } @Override - public List findAllKeysByEntityIds(TenantId tenantId, List entityIds, AttributeScope scope) { + public List findAllKeysByEntityIdsAndScope(TenantId tenantId, List entityIds, AttributeScope scope) { if (scope == null) { return attributesDao.findAllKeysByEntityIds(tenantId, entityIds); } else { @@ -100,6 +97,21 @@ public class BaseAttributesService implements AttributesService { } } + @Override + public ListenableFuture> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope) { + return attributesDao.findAllKeysByEntityIdsAndScopeAsync(tenantId, entityIds, scope); + } + + @Override + public List findLatestByEntityIdsAndScope(TenantId tenantId, List entityIds, AttributeScope scope) { + return attributesDao.findLatestByEntityIdsAndScope(tenantId, entityIds, scope); + } + + @Override + public ListenableFuture> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope) { + return attributesDao.findLatestByEntityIdsAndScopeAsync(tenantId, entityIds, scope); + } + @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { validate(entityId, scope); diff --git a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java index c150eabfa2..1c99dfc495 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java @@ -66,6 +66,7 @@ import static org.thingsboard.server.dao.attributes.AttributeUtils.validate; @Primary @Slf4j public class CachedAttributesService implements AttributesService { + private static final String STATS_NAME = "attributes.cache"; public static final String LOCAL_CACHE_TYPE = "caffeine"; @@ -212,7 +213,7 @@ public class CachedAttributesService implements AttributesService { } @Override - public List findAllKeysByEntityIds(TenantId tenantId, List entityIds, AttributeScope scope) { + public List findAllKeysByEntityIdsAndScope(TenantId tenantId, List entityIds, AttributeScope scope) { if (scope == null) { return attributesDao.findAllKeysByEntityIds(tenantId, entityIds); } else { @@ -220,6 +221,21 @@ public class CachedAttributesService implements AttributesService { } } + @Override + public ListenableFuture> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope) { + return attributesDao.findAllKeysByEntityIdsAndScopeAsync(tenantId, entityIds, scope); + } + + @Override + public List findLatestByEntityIdsAndScope(TenantId tenantId, List entityIds, AttributeScope scope) { + return attributesDao.findLatestByEntityIdsAndScope(tenantId, entityIds, scope); + } + + @Override + public ListenableFuture> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope) { + return attributesDao.findLatestByEntityIdsAndScopeAsync(tenantId, entityIds, scope); + } + @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) { validate(entityId, scope); diff --git a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java index dc950cebd4..b20535dbf4 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java @@ -16,6 +16,9 @@ package org.thingsboard.server.dao.entity; import com.google.common.util.concurrent.FluentFuture; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; @@ -55,6 +58,7 @@ import org.thingsboard.server.common.msg.edqs.EdqsService; import org.thingsboard.server.common.stats.EdqsStatsService; import org.thingsboard.server.dao.exception.IncorrectParameterException; import org.thingsboard.server.dao.model.ModelConstants; +import org.thingsboard.server.dao.sql.JpaExecutorService; import java.util.ArrayList; import java.util.Collections; @@ -104,6 +108,9 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe @Autowired private EdqsStatsService edqsStatsService; + @Autowired + private JpaExecutorService jpaExecutorService; + @Override public long countEntitiesByQuery(TenantId tenantId, CustomerId customerId, EntityCountQuery query) { log.trace("Executing countEntitiesByQuery, tenantId [{}], customerId [{}], query [{}]", tenantId, customerId, query); @@ -142,41 +149,78 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe EdqsResponse response = processEdqsRequest(tenantId, customerId, request); result = response.getEntityDataQueryResult(); } else { - if (!isValidForOptimization(query)) { - result = entityQueryDao.findEntityDataByQuery(tenantId, customerId, query); - } else { - // 1 step - find entity data by filter and sort columns - PageData entityDataByQuery = findEntityIdsByFilterAndSorterColumns(tenantId, customerId, query); - if (entityDataByQuery == null || entityDataByQuery.getData().isEmpty()) { - result = entityDataByQuery; - } else { - // 2 step - find entity data by entity ids from the 1st step - List entities = fetchEntityDataByIdsFromInitialQuery(tenantId, customerId, query, entityDataByQuery.getData()); - result = new PageData<>(entities, entityDataByQuery.getTotalPages(), entityDataByQuery.getTotalElements(), entityDataByQuery.hasNext()); - } - } + result = findEntityDataByQueryInternal(tenantId, customerId, query); } edqsStatsService.reportEntityDataQuery(tenantId, query, System.nanoTime() - startNs); return result; } + @Override + public ListenableFuture> findEntityDataByQueryAsync(TenantId tenantId, CustomerId customerId, EntityDataQuery query) { + log.trace("Executing findEntityDataByQueryAsync, tenantId [{}], customerId [{}], query [{}]", tenantId, customerId, query); + + try { + validateId(tenantId, id -> INCORRECT_TENANT_ID + id); + validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id); + validateEntityDataQuery(query); + } catch (Exception e) { + return Futures.immediateFailedFuture(e); + } + + if (edqsService.isApiEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) { + EdqsRequest request = EdqsRequest.builder() + .entityDataQuery(query) + .build(); + long startNs = System.nanoTime(); + return Futures.transform(processEdqsRequestAsync(tenantId, customerId, request), response -> { + edqsStatsService.reportEntityDataQuery(tenantId, query, System.nanoTime() - startNs); + return response.getEntityDataQueryResult(); + }, MoreExecutors.directExecutor()); + } + + return jpaExecutorService.submit(() -> { + long startNs = System.nanoTime(); + PageData result = findEntityDataByQueryInternal(tenantId, customerId, query); + edqsStatsService.reportEntityDataQuery(tenantId, query, System.nanoTime() - startNs); + return result; + }); + } + + private PageData findEntityDataByQueryInternal(TenantId tenantId, CustomerId customerId, EntityDataQuery query) { + if (!isValidForOptimization(query)) { + return entityQueryDao.findEntityDataByQuery(tenantId, customerId, query); + } + // 1 step - find entity data by filter and sort columns + PageData entityDataByQuery = findEntityIdsByFilterAndSorterColumns(tenantId, customerId, query); + if (entityDataByQuery == null || entityDataByQuery.getData().isEmpty()) { + return entityDataByQuery; + } + // 2 step - find entity data by entity ids from the 1st step + List entities = fetchEntityDataByIdsFromInitialQuery(tenantId, customerId, query, entityDataByQuery.getData()); + return new PageData<>(entities, entityDataByQuery.getTotalPages(), entityDataByQuery.getTotalElements(), entityDataByQuery.hasNext()); + } + private boolean validForEdqs(EntityCountQuery query) { // for compatibility with PE return true; } private EdqsResponse processEdqsRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) { - EdqsResponse response; try { - log.debug("[{}] Sending request to EDQS: {}", tenantId, request); - response = edqsApiService.processRequest(tenantId, customerId, request).get(); + return processEdqsRequestAsync(tenantId, customerId, request).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } - log.debug("[{}] Received response from EDQS: {}", tenantId, response); - if (response.getError() != null) { - throw new RuntimeException(response.getError()); - } - return response; + } + + private ListenableFuture processEdqsRequestAsync(TenantId tenantId, CustomerId customerId, EdqsRequest request) { + log.debug("[{}] Sending request to EDQS: {}", tenantId, request); + return Futures.transform(edqsApiService.processRequest(tenantId, customerId, request), response -> { + log.debug("[{}] Received response from EDQS: {}", tenantId, response); + if (response.getError() != null) { + throw new RuntimeException(response.getError()); + } + return response; + }, MoreExecutors.directExecutor()); } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java index 44a38bf445..971c06c7a0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java +++ b/dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java @@ -65,6 +65,12 @@ import static org.thingsboard.server.dao.model.ModelConstants.VERSION_COLUMN; query = SearchTsKvLatestRepository.FIND_ALL_BY_ENTITY_ID_QUERY, resultSetMapping = "tsKvLatestFindMapping", resultClass = TsKvLatestEntity.class + ), + @NamedNativeQuery( + name = SearchTsKvLatestRepository.FIND_LATEST_BY_ENTITY_IDS, + query = SearchTsKvLatestRepository.FIND_LATEST_BY_ENTITY_IDS_QUERY, + resultSetMapping = "tsKvLatestFindMapping", + resultClass = TsKvLatestEntity.class ) }) public final class TsKvLatestEntity extends AbstractTsKvEntity { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java index aa7f0490d7..8a3405549e 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java @@ -20,6 +20,14 @@ import org.springframework.data.jpa.repository.Modifying; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import org.springframework.transaction.annotation.Transactional; +import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.JsonDataEntry; +import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; +import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey; import org.thingsboard.server.dao.model.sql.AttributeKvEntity; @@ -60,6 +68,19 @@ public interface AttributeKvRepository extends JpaRepository findAllKeysByEntityIdsAndAttributeType(@Param("entityIds") List entityIds, @Param("attributeType") int attributeType); + @Query(value = """ + SELECT DISTINCT ON (a.attribute_key) + kd.key AS strKey, + a.bool_v AS boolV, a.str_v AS strV, a.long_v AS longV, + a.dbl_v AS dblV, a.json_v AS jsonV, + a.last_update_ts AS lastUpdateTs, a.version AS version + FROM attribute_kv a + INNER JOIN key_dictionary kd ON a.attribute_key = kd.key_id + WHERE a.entity_id IN :entityIds AND a.attribute_type = :attributeType + ORDER BY a.attribute_key, a.last_update_ts DESC""", nativeQuery = true) + List findLatestByEntityIdsAndAttributeType(@Param("entityIds") List entityIds, + @Param("attributeType") int attributeType); + @Query(value = "SELECT attribute_key, attribute_type, entity_id, bool_v, dbl_v, json_v, last_update_ts, long_v, str_v, version FROM attribute_kv WHERE (entity_id, attribute_type, attribute_key) > " + "(:entityId, :attributeType, :attributeKey) ORDER BY entity_id, attribute_type, attribute_key LIMIT :batchSize", nativeQuery = true) List findNextBatch(@Param("entityId") UUID entityId, @@ -67,4 +88,40 @@ public interface AttributeKvRepository extends JpaRepository> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope) { + return service.submit(() -> findAllKeysByEntityIdsAndScope(tenantId, entityIds, scope)); + } + + @Override + public List findLatestByEntityIdsAndScope(TenantId tenantId, List entityIds, AttributeScope scope) { + if (CollectionUtils.isEmpty(entityIds)) { + return Collections.emptyList(); + } + var uniqueIds = entityIds.stream().map(EntityId::getId).distinct().toList(); + return attributeKvRepository.findLatestByEntityIdsAndAttributeType(uniqueIds, scope.getId()) + .stream() + .map(AttributeKvRepository.AttributeKvProjection::toAttributeKvEntry) + .toList(); + } + + @Override + public ListenableFuture> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List entityIds, AttributeScope scope) { + return service.submit(() -> findLatestByEntityIdsAndScope(tenantId, entityIds, scope)); + } + @Override public ListenableFuture save(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, AttributeKvEntry attribute) { AttributeKvEntity entity = new AttributeKvEntity(); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java index eabbdfed80..143ab53e18 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java @@ -172,4 +172,14 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries return sqlDao.findAllKeysByEntityIdsAsync(tenantId, entityIds); } + @Override + public List findLatestByEntityIds(TenantId tenantId, List entityIds) { + return sqlDao.findLatestByEntityIds(tenantId, entityIds); + } + + @Override + public ListenableFuture> findLatestByEntityIdsAsync(TenantId tenantId, List entityIds) { + return sqlDao.findLatestByEntityIdsAsync(tenantId, entityIds); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index d807794ddc..b33ceca63b 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors; import jakarta.annotation.PostConstruct; import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections4.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -54,6 +55,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -189,6 +191,21 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return service.submit(() -> findAllKeysByEntityIds(tenantId, entityIds)); } + @Override + public List findLatestByEntityIds(TenantId tenantId, List entityIds) { + if (CollectionUtils.isEmpty(entityIds)) { + return Collections.emptyList(); + } + return DaoUtil.convertDataList( + searchTsKvLatestRepository.findLatestByEntityIds(entityIds.stream().map(EntityId::getId).toList()) + ); + } + + @Override + public ListenableFuture> findLatestByEntityIdsAsync(TenantId tenantId, List entityIds) { + return service.submit(() -> findLatestByEntityIds(tenantId, entityIds)); + } + private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query, Long version) { ListenableFuture> future = findNewLatestEntryFuture(tenantId, entityId, query); return Futures.transformAsync(future, entryList -> { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java index 0ab1994059..8b3a66612c 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java @@ -34,6 +34,20 @@ public class SearchTsKvLatestRepository { " ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts, ts_kv_latest.version AS version FROM ts_kv_latest " + "INNER JOIN key_dictionary ON ts_kv_latest.key = key_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)"; + public static final String FIND_LATEST_BY_ENTITY_IDS = "findLatestByEntityIds"; + + public static final String FIND_LATEST_BY_ENTITY_IDS_QUERY = """ + SELECT DISTINCT ON (ts_kv_latest.key) + ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key, + key_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue, + ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, + ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, + ts_kv_latest.ts AS ts, ts_kv_latest.version AS version + FROM ts_kv_latest + INNER JOIN key_dictionary ON ts_kv_latest.key = key_dictionary.key_id + WHERE ts_kv_latest.entity_id IN (:entityIds) + ORDER BY ts_kv_latest.key, ts_kv_latest.ts DESC"""; + @PersistenceContext private EntityManager entityManager; @@ -43,4 +57,10 @@ public class SearchTsKvLatestRepository { .getResultList(); } + public List findLatestByEntityIds(List entityIds) { + return entityManager.createNamedQuery(FIND_LATEST_BY_ENTITY_IDS, TsKvLatestEntity.class) + .setParameter("entityIds", entityIds) + .getResultList(); + } + } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 747197470c..70e04eb5e3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -55,9 +55,6 @@ import java.util.stream.Collectors; import static org.thingsboard.server.common.data.StringUtils.isBlank; -/** - * @author Andrew Shvayka - */ @Service @Slf4j public class BaseTimeseriesService implements TimeseriesService { @@ -161,6 +158,16 @@ public class BaseTimeseriesService implements TimeseriesService { return timeseriesLatestDao.findAllKeysByEntityIdsAsync(tenantId, entityIds); } + @Override + public List findLatestByEntityIds(TenantId tenantId, List entityIds) { + return timeseriesLatestDao.findLatestByEntityIds(tenantId, entityIds); + } + + @Override + public ListenableFuture> findLatestByEntityIdsAsync(TenantId tenantId, List entityIds) { + return timeseriesLatestDao.findLatestByEntityIdsAsync(tenantId, entityIds); + } + @Override public void cleanup(long systemTtl) { timeseriesDao.cleanup(systemTtl); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index 6bcc88c2df..b3cc8496a0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -104,6 +104,16 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return Futures.immediateFuture(Collections.emptyList()); } + @Override + public List findLatestByEntityIds(TenantId tenantId, List entityIds) { + return Collections.emptyList(); + } + + @Override + public ListenableFuture> findLatestByEntityIdsAsync(TenantId tenantId, List entityIds) { + return Futures.immediateFuture(Collections.emptyList()); + } + @Override public ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) { BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(getLatestStmt().bind()); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java index 26e784b760..c7582944a9 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java @@ -52,4 +52,13 @@ public interface TimeseriesLatestDao { ListenableFuture> findAllKeysByEntityIdsAsync(TenantId tenantId, List entityIds); + /** + * For each unique timeseries key across the given entities, returns the single most recent {@link TsKvEntry} + * (i.e. the entry with the highest timestamp). If the same key exists on multiple entities, + * only the freshest value is kept. Useful for discovering available keys together with a representative sample value. + */ + List findLatestByEntityIds(TenantId tenantId, List entityIds); + + ListenableFuture> findLatestByEntityIdsAsync(TenantId tenantId, List entityIds); + } diff --git a/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java index d3aa0fde0f..373112293d 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java @@ -23,7 +23,6 @@ import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.common.data.AttributeScope; @@ -31,8 +30,12 @@ import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.KvEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; +import org.thingsboard.server.dao.attributes.AttributesDao; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.service.AbstractServiceTest; @@ -40,6 +43,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import java.util.concurrent.Executors; @@ -48,9 +52,6 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; -/** - * @author Andrew Shvayka - */ @Slf4j public abstract class BaseAttributesServiceTest extends AbstractServiceTest { @@ -60,9 +61,8 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { @Autowired private AttributesService attributesService; - @Before - public void before() { - } + @Autowired + private AttributesDao attributesDao; @Test public void saveAndFetch() throws Exception { @@ -223,7 +223,7 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { saveAttribute(tenantId, deviceId, AttributeScope.SERVER_SCOPE, "key2", "123"); Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> { - List keys = attributesService.findAllKeysByEntityIds(tenantId, List.of(deviceId), AttributeScope.SERVER_SCOPE); + List keys = attributesService.findAllKeysByEntityIdsAndScope(tenantId, List.of(deviceId), AttributeScope.SERVER_SCOPE); assertThat(keys).containsOnly("key1", "key2"); }); } @@ -241,6 +241,84 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { }); } + @Test + public void findLatestByEntityIdsAndScope_returnsOneEntryPerKey() { + var device1 = new DeviceId(UUID.randomUUID()); + var device2 = new DeviceId(UUID.randomUUID()); + + // Both devices have "temperature", device2 has a newer ts + saveAttribute(tenantId, device1, AttributeScope.SERVER_SCOPE, 1000, new DoubleDataEntry("temperature", 20.0)); + saveAttribute(tenantId, device2, AttributeScope.SERVER_SCOPE, 2000, new DoubleDataEntry("temperature", 25.0)); + // Only device1 has "humidity" + saveAttribute(tenantId, device1, AttributeScope.SERVER_SCOPE, 1500, new LongDataEntry("humidity", 60L)); + // Only device2 has "active" + saveAttribute(tenantId, device2, AttributeScope.SERVER_SCOPE, 3000, new BooleanDataEntry("active", true)); + + List results = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(device1, device2), AttributeScope.SERVER_SCOPE); + Map byKey = results.stream().collect(Collectors.toMap(AttributeKvEntry::getKey, e -> e)); + + Assert.assertEquals(3, byKey.size()); + + // "temperature" should pick device2's value (ts=2000 > ts=1000) + AttributeKvEntry temp = byKey.get("temperature"); + Assert.assertNotNull(temp); + Assert.assertEquals(25.0, temp.getDoubleValue().orElseThrow(), 0.0); + Assert.assertEquals(2000, temp.getLastUpdateTs()); + + // "humidity" — only device1 has it + AttributeKvEntry humidity = byKey.get("humidity"); + Assert.assertNotNull(humidity); + Assert.assertEquals(60L, (long) humidity.getLongValue().orElseThrow()); + Assert.assertEquals(1500, humidity.getLastUpdateTs()); + + // "active" — only device2 has it + AttributeKvEntry active = byKey.get("active"); + Assert.assertNotNull(active); + Assert.assertEquals(true, active.getBooleanValue().orElseThrow()); + Assert.assertEquals(3000, active.getLastUpdateTs()); + } + + @Test + public void findLatestByEntityIdsAndScope_emptyList() { + List results = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(), AttributeScope.SERVER_SCOPE); + Assert.assertTrue(results.isEmpty()); + } + + @Test + public void findLatestByEntityIdsAndScope_singleEntity() throws Exception { + var device = new DeviceId(UUID.randomUUID()); + saveAttribute(tenantId, device, AttributeScope.SERVER_SCOPE, 1000, new StringDataEntry("key1", "value1")); + saveAttribute(tenantId, device, AttributeScope.SERVER_SCOPE, 2000, new StringDataEntry("key2", "value2")); + + // sync + List results = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(device), AttributeScope.SERVER_SCOPE); + Assert.assertEquals(2, results.size()); + Map byKey = results.stream().collect(Collectors.toMap(AttributeKvEntry::getKey, e -> e)); + Assert.assertEquals("value1", byKey.get("key1").getStrValue().orElseThrow()); + Assert.assertEquals(1000, byKey.get("key1").getLastUpdateTs()); + Assert.assertEquals("value2", byKey.get("key2").getStrValue().orElseThrow()); + Assert.assertEquals(2000, byKey.get("key2").getLastUpdateTs()); + + // async — same result + List asyncResults = attributesDao.findLatestByEntityIdsAndScopeAsync(tenantId, List.of(device), AttributeScope.SERVER_SCOPE).get(); + Assert.assertEquals(results, asyncResults); + } + + @Test + public void findLatestByEntityIdsAndScope_filtersScope() { + var device = new DeviceId(UUID.randomUUID()); + saveAttribute(tenantId, device, AttributeScope.SERVER_SCOPE, 1000, new StringDataEntry("serverKey", "sv")); + saveAttribute(tenantId, device, AttributeScope.CLIENT_SCOPE, 1000, new StringDataEntry("clientKey", "cv")); + + List serverResults = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(device), AttributeScope.SERVER_SCOPE); + Assert.assertEquals(1, serverResults.size()); + Assert.assertEquals("serverKey", serverResults.get(0).getKey()); + + List clientResults = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(device), AttributeScope.CLIENT_SCOPE); + Assert.assertEquals(1, clientResults.size()); + Assert.assertEquals("clientKey", clientResults.get(0).getKey()); + } + private void testConcurrentFetchAndUpdate(TenantId tenantId, DeviceId deviceId, ListeningExecutorService pool) throws Exception { var scope = AttributeScope.SERVER_SCOPE; var key = "TEST"; @@ -313,6 +391,15 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest { } } + private void saveAttribute(TenantId tenantId, DeviceId deviceId, AttributeScope scope, long ts, KvEntry value) { + try { + attributesService.save(tenantId, deviceId, scope, Collections.singletonList(new BaseAttributeKvEntry(ts, value))).get(10, TimeUnit.SECONDS); + } catch (Exception e) { + log.warn("Failed to save attribute", e.getCause()); + throw new RuntimeException(e); + } + } + private void equalsIgnoreVersion(AttributeKvEntry expected, AttributeKvEntry actual) { Assert.assertEquals(expected.getKey(), actual.getKey()); Assert.assertEquals(expected.getValue(), actual.getValue()); diff --git a/dao/src/test/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDaoTest.java b/dao/src/test/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDaoTest.java index 96f3390b13..02abf1e5fd 100644 --- a/dao/src/test/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDaoTest.java +++ b/dao/src/test/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDaoTest.java @@ -22,6 +22,9 @@ import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.thingsboard.server.common.data.id.DeviceId; import org.thingsboard.server.common.data.kv.BasicTsKvEntry; +import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DoubleDataEntry; +import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.service.AbstractServiceTest; @@ -30,7 +33,9 @@ import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; +import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -102,6 +107,69 @@ public class SqlTimeseriesLatestDaoTest extends AbstractServiceTest { } } + @Test + public void findLatestByEntityIds_returnsOneEntryPerKey() throws Exception { + DeviceId device1 = new DeviceId(UUID.randomUUID()); + DeviceId device2 = new DeviceId(UUID.randomUUID()); + + // Both devices have "temperature" key, device2 has a newer ts + timeseriesLatestDao.saveLatest(tenantId, device1, new BasicTsKvEntry(1000, new DoubleDataEntry("temperature", 20.0))).get(); + timeseriesLatestDao.saveLatest(tenantId, device2, new BasicTsKvEntry(2000, new DoubleDataEntry("temperature", 25.0))).get(); + // Only device1 has "humidity" + timeseriesLatestDao.saveLatest(tenantId, device1, new BasicTsKvEntry(1500, new LongDataEntry("humidity", 60L))).get(); + // Only device2 has "active" + timeseriesLatestDao.saveLatest(tenantId, device2, new BasicTsKvEntry(3000, new BooleanDataEntry("active", true))).get(); + + List results = timeseriesLatestDao.findLatestByEntityIds(tenantId, List.of(device1, device2)); + Map byKey = results.stream().collect(Collectors.toMap(TsKvEntry::getKey, e -> e)); + + assertEquals(3, byKey.size()); + + // "temperature" should pick device2's value (ts=2000 > ts=1000) + TsKvEntry temp = byKey.get("temperature"); + assertNotNull(temp); + assertEquals(25.0, temp.getDoubleValue().orElseThrow()); + assertEquals(2000, temp.getTs()); + + // "humidity" — only device1 has it + TsKvEntry humidity = byKey.get("humidity"); + assertNotNull(humidity); + assertEquals(60L, humidity.getLongValue().orElseThrow()); + assertEquals(1500, humidity.getTs()); + + // "active" — only device2 has it + TsKvEntry active = byKey.get("active"); + assertNotNull(active); + assertEquals(true, active.getBooleanValue().orElseThrow()); + assertEquals(3000, active.getTs()); + } + + @Test + public void findLatestByEntityIds_emptyList() { + List results = timeseriesLatestDao.findLatestByEntityIds(tenantId, List.of()); + assertTrue(results.isEmpty()); + } + + @Test + public void findLatestByEntityIds_singleEntity() throws Exception { + DeviceId device = new DeviceId(UUID.randomUUID()); + timeseriesLatestDao.saveLatest(tenantId, device, new BasicTsKvEntry(1000, new StringDataEntry("key1", "value1"))).get(); + timeseriesLatestDao.saveLatest(tenantId, device, new BasicTsKvEntry(2000, new StringDataEntry("key2", "value2"))).get(); + + // sync + List results = timeseriesLatestDao.findLatestByEntityIds(tenantId, List.of(device)); + assertEquals(2, results.size()); + Map byKey = results.stream().collect(Collectors.toMap(TsKvEntry::getKey, e -> e)); + assertEquals("value1", byKey.get("key1").getStrValue().orElseThrow()); + assertEquals(1000, byKey.get("key1").getTs()); + assertEquals("value2", byKey.get("key2").getStrValue().orElseThrow()); + assertEquals(2000, byKey.get("key2").getTs()); + + // async — same result + List asyncResults = timeseriesLatestDao.findLatestByEntityIdsAsync(tenantId, List.of(device)).get(); + assertEquals(results, asyncResults); + } + private TsKvEntry createEntry(String key, long ts) { return new BasicTsKvEntry(ts, new StringDataEntry(key, RandomStringUtils.random(10))); } diff --git a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java index edef60f30d..f133527ddc 100644 --- a/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java +++ b/rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java @@ -22,7 +22,6 @@ import com.google.common.base.Strings; import lombok.Getter; import lombok.SneakyThrows; import org.apache.commons.io.IOUtils; -import org.apache.hc.core5.net.URIBuilder; import org.apache.commons.lang3.concurrent.LazyInitializer; import org.apache.hc.core5.net.URIBuilder; import org.springframework.core.ParameterizedTypeReference; @@ -172,6 +171,7 @@ import org.thingsboard.server.common.data.query.AlarmCountQuery; import org.thingsboard.server.common.data.query.AlarmData; import org.thingsboard.server.common.data.query.AlarmDataQuery; import org.thingsboard.server.common.data.query.AvailableEntityKeys; +import org.thingsboard.server.common.data.query.AvailableEntityKeysV2; import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityDataQuery; @@ -1898,6 +1898,10 @@ public class RestClient implements Closeable { }).getBody(); } + /** + * @deprecated Use {@link #findAvailableEntityKeysV2(EntityDataQuery, boolean, boolean, Set, boolean)} instead. + */ + @Deprecated(forRemoval = true) public AvailableEntityKeys findAvailableEntityKeysByQuery(EntityDataQuery query, boolean includeTimeseries, boolean includeAttributes, AttributeScope scope) { var uri = UriComponentsBuilder.fromUriString(baseURL) .path("/api/entitiesQuery/find/keys") @@ -1909,6 +1913,22 @@ public class RestClient implements Closeable { return restTemplate.exchange(uri, HttpMethod.POST, new HttpEntity<>(query), new ParameterizedTypeReference() {}).getBody(); } + @SneakyThrows(URISyntaxException.class) + public AvailableEntityKeysV2 findAvailableEntityKeysV2( + EntityDataQuery query, boolean includeTimeseries, boolean includeAttributes, Set scopes, boolean includeSamples + ) { + var builder = new URIBuilder(baseURL).appendPath("/api/v2/entitiesQuery/find/keys") + .addParameter("includeTimeseries", String.valueOf(includeTimeseries)) + .addParameter("includeAttributes", String.valueOf(includeAttributes)) + .addParameter("includeSamples", String.valueOf(includeSamples)); + if (scopes != null) { + for (AttributeScope scope : scopes) { + builder.addParameter("scopes", scope.name()); + } + } + return restTemplate.exchange(builder.build(), HttpMethod.POST, new HttpEntity<>(query), new ParameterizedTypeReference() {}).getBody(); + } + public PageData findAlarmDataByQuery(AlarmDataQuery query) { return restTemplate.exchange( baseURL + "/api/alarmsQuery/find",