Browse Source

feat: add entity keys V2 endpoint with sample values

pull/15044/head
Dmytro Skarzhynets 3 months ago
parent
commit
43e8465966
  1. 58
      application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java
  2. 145
      application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java
  3. 7
      application/src/main/java/org/thingsboard/server/service/query/EntityQueryService.java
  4. 58
      application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java
  5. 7
      application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java
  6. 224
      application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java
  7. 7
      application/src/test/java/org/thingsboard/server/controller/EntityRelationControllerTest.java
  8. 11
      common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java
  9. 3
      common/dao-api/src/main/java/org/thingsboard/server/dao/entity/EntityService.java
  10. 7
      common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
  11. 99
      common/data/src/main/java/org/thingsboard/server/common/data/query/AvailableEntityKeysV2.java
  12. 6
      dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java
  13. 20
      dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java
  14. 18
      dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java
  15. 86
      dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java
  16. 6
      dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java
  17. 57
      dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java
  18. 24
      dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java
  19. 10
      dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java
  20. 17
      dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java
  21. 20
      dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java
  22. 13
      dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
  23. 10
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java
  24. 9
      dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java
  25. 103
      dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java
  26. 68
      dao/src/test/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDaoTest.java
  27. 22
      rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java

58
application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.AvailableEntityKeys;
import org.thingsboard.server.common.data.query.AvailableEntityKeysV2;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataPageLink;
@ -47,6 +48,8 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.query.EntityQueryService;
import org.thingsboard.server.service.security.permission.Operation;
import java.util.Set;
import static org.thingsboard.server.controller.ControllerConstants.ALARM_DATA_QUERY_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_COUNT_QUERY_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.ENTITY_DATA_QUERY_DESCRIPTION;
@ -115,9 +118,11 @@ public class EntityQueryController extends BaseController {
return entityQueryService.countAlarmsByQuery(getCurrentUser(), query);
}
@Deprecated(forRemoval = true)
@ApiOperation(
value = "Find Available Entity Keys by Query",
value = "Find Available Entity Keys by Query (deprecated)",
notes = """
**Deprecated.** Use the V2 endpoint (`POST /api/v2/entitiesQuery/find/keys`) instead.\n
Returns unique time series and/or attribute key names from entities matching the query.\n
Executes the Entity Data Query to find up to 100 entities, then fetches and aggregates all distinct key names.\n
Primarily used for UI features like autocomplete suggestions.""" + TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH
@ -128,9 +133,6 @@ public class EntityQueryController extends BaseController {
@Parameter(description = "Entity data query to find entities. Page size is capped at 100.")
@RequestBody EntityDataQuery query,
// fixme: combination of timeseries = false and attributes = false is allowed, but always results in empty response, therefore does not make any sense
// such combinations should NOT be allowed, but changing this will break clients
@Parameter(description = """
When true, includes unique time series key names in the response.
When false, the 'timeseries' list will be empty.""")
@ -155,6 +157,54 @@ public class EntityQueryController extends BaseController {
return wrapFuture(entityQueryService.getKeysByQuery(getCurrentUser(), getTenantId(), query, includeTimeseries, includeAttributes, scope));
}
@ApiOperation(
value = "Find Available Entity Keys By Query",
notes = """
Discovers unique time series and/or attribute key names available on entities that match the given query.
Works in two steps: first, the request body (an Entity Data Query) is executed to find matching entities
(page size is capped at 100); then, all distinct key names are collected from those entities.\n
Optionally, each key can include a sample the most recent value (by timestamp) for that key
across all matched entities."""
+ TENANT_OR_CUSTOMER_AUTHORITY_PARAGRAPH
)
@PreAuthorize("hasAnyAuthority('TENANT_ADMIN', 'CUSTOMER_USER')")
@PostMapping("/v2/entitiesQuery/find/keys")
public DeferredResult<AvailableEntityKeysV2> findAvailableEntityKeysByQueryV2(
@Parameter(description = "Entity data query to find entities. Page size is capped at 100.")
@RequestBody EntityDataQuery query,
@Parameter(description = """
When true, includes unique time series keys in the response.
When false, the 'timeseries' field is omitted. At least one of 'includeTimeseries' or 'includeAttributes' must be true.""")
@RequestParam(defaultValue = "true") boolean includeTimeseries,
@Parameter(description = """
When true, includes unique attribute keys in the response.
When false, the 'attributes' field is omitted. At least one of 'includeTimeseries' or 'includeAttributes' must be true.""")
@RequestParam(defaultValue = "true") boolean includeAttributes,
@Parameter(description = """
Filters attribute keys by scope. Only applies when 'includeAttributes' is true.
When not specified, scopes are auto-determined: all three scopes (server, client, shared) for device entities,
server scope only for other entity types.""",
schema = @Schema(allowableValues = {"SERVER_SCOPE", "SHARED_SCOPE", "CLIENT_SCOPE"}))
@RequestParam(required = false) Set<AttributeScope> scopes,
@Parameter(description = """
When true, each key entry includes a 'sample' object with the most recent value and timestamp.
When false, only key names are returned (sample is omitted from JSON).""")
@RequestParam(defaultValue = "false") boolean includeSamples
) throws ThingsboardException {
resolveQuery(query);
EntityDataPageLink pageLink = query.getPageLink();
if (pageLink.getPageSize() > MAX_PAGE_SIZE) {
pageLink.setPageSize(MAX_PAGE_SIZE);
}
return wrapFuture(entityQueryService.findAvailableEntityKeysByQuery(
getCurrentUser(), query,
includeTimeseries, includeAttributes, scopes, includeSamples));
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN')")
@PostMapping("/edqs/system/request")
public void processSystemEdqsRequest(@RequestBody ToCoreEdqsRequest request) {

145
application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java

@ -15,13 +15,15 @@
*/
package org.thingsboard.server.service.query;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.KvUtil;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
@ -30,11 +32,17 @@ import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DataType;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.AvailableEntityKeys;
import org.thingsboard.server.common.data.query.AvailableEntityKeysV2;
import org.thingsboard.server.common.data.query.AvailableEntityKeysV2.KeyInfo;
import org.thingsboard.server.common.data.query.AvailableEntityKeysV2.KeySample;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
import org.thingsboard.server.common.data.query.DynamicValue;
import org.thingsboard.server.common.data.query.EntityCountQuery;
@ -59,11 +67,13 @@ import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -253,7 +263,7 @@ public class DefaultEntityQueryService implements EntityQueryService {
if (isAttributes) {
Map<EntityType, List<EntityId>> typesMap = ids.stream().collect(Collectors.groupingBy(EntityId::getEntityType));
List<ListenableFuture<List<String>>> futures = new ArrayList<>(typesMap.size());
typesMap.forEach((type, entityIds) -> futures.add(dbCallbackExecutor.submit(() -> attributesService.findAllKeysByEntityIds(tenantId, entityIds, scope))));
typesMap.forEach((type, entityIds) -> futures.add(dbCallbackExecutor.submit(() -> attributesService.findAllKeysByEntityIdsAndScope(tenantId, entityIds, scope))));
attributesKeysFuture = Futures.transform(Futures.allAsList(futures), lists -> {
if (CollectionUtils.isEmpty(lists)) {
return Collections.emptyList();
@ -274,4 +284,135 @@ public class DefaultEntityQueryService implements EntityQueryService {
}, dbCallbackExecutor);
}
@Override
public ListenableFuture<AvailableEntityKeysV2> findAvailableEntityKeysByQuery(SecurityUser securityUser, EntityDataQuery query,
boolean includeTimeseries, boolean includeAttributes,
Set<AttributeScope> scopes, boolean includeSamples) {
if (!includeTimeseries && !includeAttributes) {
return Futures.immediateFailedFuture(
new IllegalArgumentException("At least one of 'includeTimeseries' or 'includeAttributes' must be true"));
}
return Futures.transformAsync(findEntityIdsByQueryAsync(securityUser, query), ids -> {
if (ids.isEmpty()) {
return immediateFuture(new AvailableEntityKeysV2(
Collections.emptySet(),
includeTimeseries ? Collections.emptyList() : null,
includeAttributes ? Collections.emptyMap() : null));
}
TenantId tenantId = securityUser.getTenantId();
Set<EntityType> entityTypes = ids.stream().map(EntityId::getEntityType).collect(Collectors.toSet());
var tsFuture = includeTimeseries ? fetchTimeseriesKeys(tenantId, ids, includeSamples) : null;
Set<AttributeScope> effectiveScopes = includeAttributes
? resolveAttributeScopes(scopes, entityTypes) : Collections.emptySet();
var attrFutures = effectiveScopes.stream()
.map(scope -> fetchAttributeKeys(tenantId, ids, scope, includeSamples))
.toList();
return assembleResult(entityTypes, tsFuture, attrFutures);
}, dbCallbackExecutor);
}
private ListenableFuture<List<EntityId>> findEntityIdsByQueryAsync(SecurityUser securityUser, EntityDataQuery query) {
return Futures.transform(entityService.findEntityDataByQueryAsync(securityUser.getTenantId(), securityUser.getCustomerId(), query),
page -> page.getData().stream()
.map(EntityData::getEntityId)
.toList(),
dbCallbackExecutor);
}
private static Set<AttributeScope> resolveAttributeScopes(Set<AttributeScope> requestedScopes, Set<EntityType> entityTypes) {
boolean hasDevices = entityTypes.contains(EntityType.DEVICE);
Set<AttributeScope> scopes;
if (CollectionUtils.isNotEmpty(requestedScopes)) {
scopes = requestedScopes;
} else { // auto-determine scopes
scopes = hasDevices
? Set.of(AttributeScope.SERVER_SCOPE, AttributeScope.CLIENT_SCOPE, AttributeScope.SHARED_SCOPE)
: Collections.singleton(AttributeScope.SERVER_SCOPE);
}
// Non-device entities only support SERVER_SCOPE
if (!hasDevices) {
return scopes.contains(AttributeScope.SERVER_SCOPE)
? Collections.singleton(AttributeScope.SERVER_SCOPE)
: Collections.emptySet();
}
return scopes;
}
private ListenableFuture<List<KeyInfo>> fetchTimeseriesKeys(TenantId tenantId, List<EntityId> entityIds, boolean includeSamples) {
if (includeSamples) {
return Futures.transform(
timeseriesService.findLatestByEntityIdsAsync(tenantId, entityIds),
entries -> toKeyInfos(entries, true),
dbCallbackExecutor);
}
return Futures.transform(
timeseriesService.findAllKeysByEntityIdsAsync(tenantId, entityIds),
keys -> keys.stream().sorted().map(k -> new KeyInfo(k, null)).toList(),
dbCallbackExecutor);
}
private ListenableFuture<Map.Entry<AttributeScope, List<KeyInfo>>> fetchAttributeKeys(
TenantId tenantId, List<EntityId> entityIds, AttributeScope scope, boolean includeSamples) {
if (includeSamples) {
return Futures.transform(
attributesService.findLatestByEntityIdsAndScopeAsync(tenantId, entityIds, scope),
entries -> Map.entry(scope, toKeyInfos(entries, true)),
dbCallbackExecutor);
}
return Futures.transform(
attributesService.findAllKeysByEntityIdsAndScopeAsync(tenantId, entityIds, scope),
keys -> Map.entry(scope, keys.stream().sorted().map(k -> new KeyInfo(k, null)).toList()),
dbCallbackExecutor);
}
private ListenableFuture<AvailableEntityKeysV2> assembleResult(
Set<EntityType> entityTypes,
ListenableFuture<List<KeyInfo>> tsFuture,
List<ListenableFuture<Map.Entry<AttributeScope, List<KeyInfo>>>> attrFutures) {
var allAttrFuture = attrFutures.isEmpty()
? immediateFuture(List.<Map.Entry<AttributeScope, List<KeyInfo>>>of())
: Futures.allAsList(attrFutures);
List<ListenableFuture<?>> allFutures = new ArrayList<>();
if (tsFuture != null) {
allFutures.add(tsFuture);
}
allFutures.add(allAttrFuture);
var finalTsFuture = tsFuture;
return Futures.whenAllComplete(allFutures)
.call(() -> {
List<KeyInfo> tsKeys = finalTsFuture != null ? Futures.getDone(finalTsFuture) : null;
Map<AttributeScope, List<KeyInfo>> attrMap = attrFutures.isEmpty() ? null : new TreeMap<>();
if (attrMap != null) {
for (var entry : Futures.getDone(allAttrFuture)) {
attrMap.put(entry.getKey(), entry.getValue());
}
}
return new AvailableEntityKeysV2(entityTypes, tsKeys, attrMap);
}, dbCallbackExecutor);
}
private static List<KeyInfo> toKeyInfos(List<? extends KvEntry> entries, boolean includeSamples) {
return entries.stream()
.map(e -> new KeyInfo(e.getKey(), includeSamples ? toKeySample(e) : null))
.sorted(Comparator.comparing(KeyInfo::key))
.toList();
}
private static KeySample toKeySample(KvEntry entry) {
long ts = entry instanceof TsKvEntry tsKv ? tsKv.getTs()
: entry instanceof AttributeKvEntry attr ? attr.getLastUpdateTs()
: 0;
JsonNode value = entry.getDataType() == DataType.JSON
? JacksonUtil.toJsonNode(entry.getJsonValue().get())
: JacksonUtil.valueToTree(entry.getValue());
return new KeySample(ts, value);
}
}

7
application/src/main/java/org/thingsboard/server/service/query/EntityQueryService.java

@ -23,11 +23,14 @@ import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.AvailableEntityKeys;
import org.thingsboard.server.common.data.query.AvailableEntityKeysV2;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.service.security.model.SecurityUser;
import java.util.Set;
public interface EntityQueryService {
long countEntitiesByQuery(SecurityUser securityUser, EntityCountQuery query);
@ -41,4 +44,8 @@ public interface EntityQueryService {
ListenableFuture<AvailableEntityKeys> getKeysByQuery(SecurityUser securityUser, TenantId tenantId, EntityDataQuery query,
boolean isTimeseries, boolean isAttributes, AttributeScope scope);
ListenableFuture<AvailableEntityKeysV2> findAvailableEntityKeysByQuery(SecurityUser securityUser, EntityDataQuery query,
boolean includeTimeseries, boolean includeAttributes,
Set<AttributeScope> scopes, boolean includeSamples);
}

58
application/src/test/java/org/thingsboard/server/controller/AbstractWebTest.java

@ -123,6 +123,8 @@ import org.thingsboard.server.common.data.id.UUIDBased;
import org.thingsboard.server.common.data.id.UserId;
import org.thingsboard.server.common.data.job.Job;
import org.thingsboard.server.common.data.job.JobType;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.notification.Notification;
import org.thingsboard.server.common.data.notification.NotificationDeliveryMethod;
import org.thingsboard.server.common.data.notification.NotificationType;
@ -177,6 +179,7 @@ import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -718,6 +721,10 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
return assetProfile;
}
protected Device createDevice(String name) throws Exception {
return createDevice(name, "default", null, null);
}
protected Device createDevice(String name, String accessToken) throws Exception {
return createDevice(name, "default", null, accessToken);
}
@ -731,7 +738,11 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
deviceData.setTransportConfiguration(new DefaultDeviceTransportConfiguration());
deviceData.setConfiguration(new DefaultDeviceConfiguration());
device.setDeviceData(deviceData);
return doPost("/api/device?accessToken=" + accessToken, device, Device.class);
if (accessToken != null) {
return doPost("/api/device?accessToken=" + accessToken, device, Device.class);
} else {
return doPost("/api/device", device, Device.class);
}
}
protected Device assignDeviceToCustomer(DeviceId deviceId, CustomerId customerId) {
@ -1219,7 +1230,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
Awaitility.await("CF state for entity actor ready to refresh dynamic arguments").atMost(TIMEOUT, TimeUnit.SECONDS).until(() -> {
CalculatedFieldState calculatedFieldState = statesMap.get(cfId);
boolean isReady = calculatedFieldState != null && ((GeofencingCalculatedFieldState) calculatedFieldState).getLastScheduledRefreshTs() <
System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(scheduledUpdateInterval);
System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(scheduledUpdateInterval);
log.warn("entityId {}, cfId {}, state ready to refresh == {}", entityId, cfId, isReady);
return isReady;
});
@ -1411,7 +1422,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
protected List<Job> findJobs(List<JobType> types, List<UUID> entities) throws Exception {
return doGetTypedWithPageLink("/api/jobs?types=" + types.stream().map(Enum::name).collect(Collectors.joining(",")) +
"&entities=" + entities.stream().map(UUID::toString).collect(Collectors.joining(",")) + "&",
"&entities=" + entities.stream().map(UUID::toString).collect(Collectors.joining(",")) + "&",
new TypeReference<PageData<Job>>() {}, new PageLink(100, 0, null, new SortOrder("createdTime", SortOrder.Direction.DESC))).getData();
}
@ -1425,12 +1436,37 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
protected void postTelemetry(EntityId entityId, String payload) throws Exception {
doPostAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() +
"/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(payload), 30_000L).andExpect(status().isOk());
"/timeseries/" + DataConstants.SERVER_SCOPE, JacksonUtil.toJsonNode(payload), 30_000L).andExpect(status().isOk());
}
protected void postTelemetry(EntityId entityId, TsKvEntry entry) throws Exception {
var values = JacksonUtil.newObjectNode();
JacksonUtil.addKvEntry(values, entry);
var payload = JacksonUtil.newObjectNode()
.put("ts", entry.getTs())
.set("values", values);
var url = "/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/timeseries/any";
doPostAsync(url, payload, 30_000L).andExpect(status().isOk());
}
protected void postAttributes(EntityId entityId, AttributeScope scope, String payload) throws Exception {
doPostAsync("/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() +
"/attributes/" + scope, JacksonUtil.toJsonNode(payload), 30_000L).andExpect(status().isOk());
"/attributes/" + scope, JacksonUtil.toJsonNode(payload), 30_000L).andExpect(status().isOk());
}
protected void postAttributes(EntityId entityId, AttributeScope scope, KvEntry... attributes) throws Exception {
postAttributes(entityId, scope, Arrays.asList(attributes));
}
protected void postAttributes(EntityId entityId, AttributeScope scope, Collection<? extends KvEntry> attributes) throws Exception {
var url = "/api/plugins/telemetry/" + entityId.getEntityType() + "/" + entityId.getId() + "/attributes/" + scope;
var payload = JacksonUtil.newObjectNode();
for (KvEntry entry : attributes) {
JacksonUtil.addKvEntry(payload, entry);
}
doPostAsync(url, payload, 30_000L).andExpect(status().isOk());
}
protected CalculatedField saveCalculatedField(CalculatedField calculatedField) {
@ -1439,7 +1475,7 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
protected PageData<CalculatedField> getEntityCalculatedFields(EntityId entityId, CalculatedFieldType type, PageLink pageLink) throws Exception {
return doGetTypedWithPageLink("/api/" + entityId.getEntityType() + "/" + entityId.getId() + "/calculatedFields" +
(type != null ? "?type=" + type.name() + "&" : "?"), new TypeReference<>() {}, pageLink);
(type != null ? "?type=" + type.name() + "&" : "?"), new TypeReference<>() {}, pageLink);
}
protected PageData<String> getCalculatedFieldNames(CalculatedFieldType type, PageLink pageLink) throws Exception {
@ -1452,11 +1488,11 @@ public abstract class AbstractWebTest extends AbstractInMemoryStorageTest {
List<UUID> entities,
List<String> names) throws Exception {
return doGetTypedWithPageLink("/api/calculatedFields?" +
(type != null ? "types=" + type + "&" : "") +
(entityType != null ? "entityType=" + entityType + "&" : "") +
(entities != null ? "entities=" + String.join(",",
entities.stream().map(UUID::toString).toList()) + "&" : "") +
(names != null ? names.stream().map(name -> "name=" + name + "&").collect(Collectors.joining("")) : ""),
(type != null ? "types=" + type + "&" : "") +
(entityType != null ? "entityType=" + entityType + "&" : "") +
(entities != null ? "entities=" + String.join(",",
entities.stream().map(UUID::toString).toList()) + "&" : "") +
(names != null ? names.stream().map(name -> "name=" + name + "&").collect(Collectors.joining("")) : ""),
new TypeReference<PageData<CalculatedFieldInfo>>() {}, new PageLink(10)).getData();
}

7
application/src/test/java/org/thingsboard/server/controller/DeviceControllerTest.java

@ -1732,11 +1732,4 @@ public class DeviceControllerTest extends AbstractControllerTest {
assertThat(fifthDevice.getName()).isEqualTo("My unique device_2");
}
private Device createDevice(String name) {
Device device = new Device();
device.setName(name);
device.setType("default");
return doPost("/api/device", device, Device.class);
}
}

224
application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java

@ -17,6 +17,9 @@ package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.BooleanNode;
import com.fasterxml.jackson.databind.node.DoubleNode;
import com.fasterxml.jackson.databind.node.IntNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.junit.After;
import org.junit.Assert;
@ -43,12 +46,21 @@ import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.AlarmDataPageLink;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.AliasEntityId;
import org.thingsboard.server.common.data.query.AvailableEntityKeysV2;
import org.thingsboard.server.common.data.query.AvailableEntityKeysV2.KeyInfo;
import org.thingsboard.server.common.data.query.DeviceTypeFilter;
import org.thingsboard.server.common.data.query.DynamicValue;
import org.thingsboard.server.common.data.query.DynamicValueSourceType;
@ -84,6 +96,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
@ -1329,7 +1342,7 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
//assign dashboard
doPost("/api/customer/" + savedCustomer.getId().getId().toString()
+ "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class);
+ "/dashboard/" + savedDashboard.getId().getId().toString(), Dashboard.class);
// check entity data query by customer
User customerUser = new User();
@ -1494,4 +1507,213 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
return nameFilter;
}
// --- findAvailableEntityKeysV2 tests ---
@Test
public void testFindAvailableKeysByQueryV2() throws Exception {
// GIVEN — two devices matched by query; a third device should not be matched
var device1 = createDevice("Test device 1");
var device2 = createDevice("Test device 2");
var unmatchedDevice = createDevice("Unmatched device");
// unmatched device has unique keys that must NOT appear in the result
postTelemetry(unmatchedDevice.getId(), new BasicTsKvEntry(9000, new DoubleDataEntry("unmatchedTs", 999.0)));
postAttributes(unmatchedDevice.getId(), AttributeScope.SHARED_SCOPE, new StringDataEntry("unmatchedAttr", "nope"));
// device1: timeseries1 (Double) with two data points, and timeseries2 older data point
postTelemetry(device1.getId(), new BasicTsKvEntry(1000, new DoubleDataEntry("timeseries1", 10.0)));
postTelemetry(device1.getId(), new BasicTsKvEntry(2000, new DoubleDataEntry("timeseries1", 20.5)));
postTelemetry(device1.getId(), new BasicTsKvEntry(1000, new LongDataEntry("timeseries2", 100L)));
// device2: timeseries2 (Long) with a newer data point, and timeseries3 only on this device
postTelemetry(device2.getId(), new BasicTsKvEntry(3000, new LongDataEntry("timeseries2", 300L)));
postTelemetry(device2.getId(), new BasicTsKvEntry(5000, new DoubleDataEntry("timeseries3", 99.9)));
// device1: SHARED_SCOPE attributes
postAttributes(device1.getId(), AttributeScope.SHARED_SCOPE,
new BooleanDataEntry("sharedAttribute1", true), new DoubleDataEntry("sharedAttribute2", 3.14));
// device2: CLIENT_SCOPE attributes (saved via service to bypass API restriction)
attributesService.save(tenantId, device2.getId(), AttributeScope.CLIENT_SCOPE, List.of(
new BaseAttributeKvEntry(new JsonDataEntry("clientAttribute1", "{\"key\":\"val\"}"), System.currentTimeMillis()),
new BaseAttributeKvEntry(new BooleanDataEntry("clientAttribute2", false), System.currentTimeMillis())
)).get();
// device1 also has SERVER_SCOPE attributes (should be omitted by scope filter)
postAttributes(device1.getId(), AttributeScope.SERVER_SCOPE,
new StringDataEntry("serverAttribute1", "sv1"), new LongDataEntry("serverAttribute2", 42L));
// WHEN — query matches both devices; request timeseries + only SHARED and CLIENT attribute scopes
DeviceTypeFilter filter = new DeviceTypeFilter();
filter.setDeviceTypes(List.of("default"));
filter.setDeviceNameFilter("Test device");
EntityDataPageLink pageLink = new EntityDataPageLink(100, 0, null, null);
EntityDataQuery query = new EntityDataQuery(filter, pageLink, List.of(), null, null);
AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2(query,
true, true, List.of(AttributeScope.SHARED_SCOPE, AttributeScope.CLIENT_SCOPE), true);
// THEN
assertThat(result.entityTypes()).containsExactly(EntityType.DEVICE);
// timeseries: keys collected from both devices, samples contain the freshest data points
assertThat(result.timeseries()).extracting(KeyInfo::key)
.containsExactly("timeseries1", "timeseries2", "timeseries3");
assertThat(result.timeseries()).allSatisfy(ki -> assertThat(ki.sample()).isNotNull());
assertKeySample(result.timeseries(), "timeseries1", new DoubleNode(20.5), 2000); // from device1
assertKeySample(result.timeseries(), "timeseries2", new IntNode(300), 3000); // from device2 (newer)
assertKeySample(result.timeseries(), "timeseries3", new DoubleNode(99.9), 5000); // only on device2
// SERVER_SCOPE must be fully omitted from the response
assertThat(result.attributes()).containsOnlyKeys(AttributeScope.SHARED_SCOPE, AttributeScope.CLIENT_SCOPE);
// SHARED_SCOPE: from device1 (alphabetical order)
assertThat(result.attributes().get(AttributeScope.SHARED_SCOPE))
.extracting(KeyInfo::key).containsExactly("sharedAttribute1", "sharedAttribute2");
assertKeySample(result.attributes().get(AttributeScope.SHARED_SCOPE), "sharedAttribute1", BooleanNode.TRUE);
assertKeySample(result.attributes().get(AttributeScope.SHARED_SCOPE), "sharedAttribute2", new DoubleNode(3.14));
// CLIENT_SCOPE: from device2 (alphabetical order)
assertThat(result.attributes().get(AttributeScope.CLIENT_SCOPE))
.extracting(KeyInfo::key).containsExactly("clientAttribute1", "clientAttribute2");
assertKeySample(result.attributes().get(AttributeScope.CLIENT_SCOPE), "clientAttribute1", JacksonUtil.toJsonNode("{\"key\":\"val\"}"));
assertKeySample(result.attributes().get(AttributeScope.CLIENT_SCOPE), "clientAttribute2", BooleanNode.FALSE);
}
@Test
public void testFindAvailableKeysByQueryV2_withoutSamples() throws Exception {
// GIVEN
var device = createDevice("Test device");
postTelemetry(device.getId(), new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", 10.0)));
postAttributes(device.getId(), AttributeScope.SERVER_SCOPE, new StringDataEntry("firmware", "v1.0"));
// WHEN
AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2(
buildDeviceQuery("Test device"), true, true, null, false);
// THEN
assertThat(result.timeseries()).allSatisfy(ki -> assertThat(ki.sample()).isNull());
assertThat(result.attributes().get(AttributeScope.SERVER_SCOPE))
.allSatisfy(ki -> assertThat(ki.sample()).isNull());
}
@Test
public void testFindAvailableKeysByQueryV2_timeseriesOnly() throws Exception {
// GIVEN
var device = createDevice("Test device");
postTelemetry(device.getId(), new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", 10.0)));
postAttributes(device.getId(), AttributeScope.SERVER_SCOPE, new StringDataEntry("firmware", "v1.0"));
// WHEN
AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2(
buildDeviceQuery("Test device"), true, false, null, false);
// THEN
assertThat(result.timeseries()).extracting(KeyInfo::key).contains("temperature");
assertThat(result.attributes()).isNull();
}
@Test
public void testFindAvailableKeysByQueryV2_attributesOnly() throws Exception {
// GIVEN
var device = createDevice("Test device");
postTelemetry(device.getId(), new BasicTsKvEntry(System.currentTimeMillis(), new DoubleDataEntry("temperature", 10.0)));
postAttributes(device.getId(), AttributeScope.SERVER_SCOPE, new StringDataEntry("firmware", "v1.0"));
// WHEN
AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2(
buildDeviceQuery("Test device"), false, true, null, false);
// THEN
assertThat(result.timeseries()).isNull();
assertThat(result.attributes().get(AttributeScope.SERVER_SCOPE))
.extracting(KeyInfo::key).contains("firmware");
}
@Test
public void testFindAvailableKeysByQueryV2_noMatchingEntities() throws Exception {
// WHEN
AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2(
buildDeviceQuery("NonExistentDevice_" + UUID.randomUUID()), true, true, null, true);
// THEN
assertThat(result.entityTypes()).isEmpty();
assertThat(result.timeseries()).isEmpty();
assertThat(result.attributes()).isEmpty();
}
@Test
public void testFindAvailableKeysByQueryV2_assetUsesServerScopeOnly() throws Exception {
// GIVEN
var asset = new Asset();
asset.setName("Test asset");
asset.setType("default");
asset = doPost("/api/asset", asset, Asset.class);
postAttributes(asset.getId(), AttributeScope.SERVER_SCOPE, new StringDataEntry("location", "warehouse"));
// WHEN
var filter = new SingleEntityFilter();
filter.setSingleEntity(AliasEntityId.fromEntityId(asset.getId()));
var query = new EntityDataQuery(filter, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), null, null);
AvailableEntityKeysV2 result = findAvailableEntityKeysByQueryV2(query, false, true, null, false);
// THEN
assertThat(result.entityTypes()).containsExactly(EntityType.ASSET);
assertThat(result.attributes()).containsOnlyKeys(AttributeScope.SERVER_SCOPE);
assertThat(result.attributes().get(AttributeScope.SERVER_SCOPE))
.extracting(KeyInfo::key).containsExactly("location");
}
@Test
public void testFindAvailableKeysByQueryV2_rejectsWhenNoKeyTypeRequested() throws Exception {
// WHEN / THEN
EntityDataQuery query = buildDeviceQuery("NonExistent");
doPostAsync("/api/v2/entitiesQuery/find/keys?includeTimeseries=false&includeAttributes=false",
query, 30_000L).andExpect(status().isBadRequest());
}
private AvailableEntityKeysV2 findAvailableEntityKeysByQueryV2(EntityDataQuery query,
boolean includeTimeseries, boolean includeAttributes,
List<AttributeScope> scopes, boolean includeSamples) throws Exception {
StringBuilder url = new StringBuilder("/api/v2/entitiesQuery/find/keys?")
.append("includeTimeseries=").append(includeTimeseries)
.append("&includeAttributes=").append(includeAttributes)
.append("&includeSamples=").append(includeSamples);
if (scopes != null) {
for (AttributeScope scope : scopes) {
url.append("&scopes=").append(scope);
}
}
return doPostAsyncWithTypedResponse(url.toString(), query,
new TypeReference<>() {}, status().isOk());
}
private static void assertKeySample(List<KeyInfo> keys, String expectedKey, JsonNode expectedValue, long expectedTs) {
KeyInfo keyInfo = findKeyInfo(keys, expectedKey);
assertThat(keyInfo.sample()).isNotNull();
assertThat(keyInfo.sample().value()).isEqualTo(expectedValue);
assertThat(keyInfo.sample().ts()).isEqualTo(expectedTs);
}
private static void assertKeySample(List<KeyInfo> keys, String expectedKey, JsonNode expectedValue) {
KeyInfo keyInfo = findKeyInfo(keys, expectedKey);
assertThat(keyInfo.sample()).isNotNull();
assertThat(keyInfo.sample().value()).isEqualTo(expectedValue);
assertThat(keyInfo.sample().ts()).isGreaterThan(0);
}
private static KeyInfo findKeyInfo(List<KeyInfo> keys, String key) {
return keys.stream()
.filter(ki -> ki.key().equals(key)).findFirst().orElseThrow();
}
private static EntityDataQuery buildDeviceQuery(String deviceName) {
var filter = new DeviceTypeFilter();
filter.setDeviceTypes(Collections.singletonList("default"));
filter.setDeviceNameFilter(deviceName);
return new EntityDataQuery(filter, new EntityDataPageLink(1, 0, null, null), Collections.emptyList(), null, null);
}
}

7
application/src/test/java/org/thingsboard/server/controller/EntityRelationControllerTest.java

@ -633,13 +633,6 @@ public class EntityRelationControllerTest extends AbstractControllerTest {
deleteDifferentTenant();
}
private Device createDevice(String name) {
var device = new Device();
device.setName(name);
device.setType("default");
return doPost("/api/device", device, Device.class);
}
private ResultActions getRelation(EntityRelation relation) throws Exception {
return doGet("/api/relation?" +
"fromId=" + relation.getFrom().getId() +

11
common/dao-api/src/main/java/org/thingsboard/server/dao/attributes/AttributesService.java

@ -27,9 +27,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Optional;
/**
* @author Andrew Shvayka
*/
public interface AttributesService {
ListenableFuture<Optional<AttributeKvEntry>> find(TenantId tenantId, EntityId entityId, AttributeScope scope, String attributeKey);
@ -48,7 +45,13 @@ public interface AttributesService {
List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds);
List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope);
List<String> findAllKeysByEntityIdsAndScope(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope);
ListenableFuture<List<String>> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope);
List<AttributeKvEntry> findLatestByEntityIdsAndScope(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope);
ListenableFuture<List<AttributeKvEntry>> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope);
int removeAllByEntityId(TenantId tenantId, EntityId entityId);

3
common/dao-api/src/main/java/org/thingsboard/server/dao/entity/EntityService.java

@ -16,6 +16,7 @@
package org.thingsboard.server.dao.entity;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.ListenableFuture;
import org.thingsboard.server.common.data.EntityInfo;
import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
@ -51,4 +52,6 @@ public interface EntityService {
PageData<EntityData> findEntityDataByQuery(TenantId tenantId, CustomerId customerId, EntityDataQuery query);
ListenableFuture<PageData<EntityData>> findEntityDataByQueryAsync(TenantId tenantId, CustomerId customerId, EntityDataQuery query);
}

7
common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java

@ -30,9 +30,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Optional;
/**
* @author Andrew Shvayka
*/
public interface TimeseriesService {
ListenableFuture<List<ReadTsKvQueryResult>> findAllByQueries(TenantId tenantId, EntityId entityId, List<ReadTsKvQuery> queries);
@ -65,6 +62,10 @@ public interface TimeseriesService {
ListenableFuture<List<String>> findAllKeysByEntityIdsAsync(TenantId tenantId, List<EntityId> entityIds);
List<TsKvEntry> findLatestByEntityIds(TenantId tenantId, List<EntityId> entityIds);
ListenableFuture<List<TsKvEntry>> findLatestByEntityIdsAsync(TenantId tenantId, List<EntityId> entityIds);
void cleanup(long systemTtl);
}

99
common/data/src/main/java/org/thingsboard/server/common/data/query/AvailableEntityKeysV2.java

@ -0,0 +1,99 @@
/**
* Copyright © 2016-2026 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.query;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.JsonNode;
import io.swagger.v3.oas.annotations.media.ArraySchema;
import io.swagger.v3.oas.annotations.media.Schema;
import org.jspecify.annotations.Nullable;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.EntityType;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Schema(
description = """
Contains unique time series and attribute key names discovered from entities matching a query,
optionally including a sample value for each key."""
)
@JsonInclude(JsonInclude.Include.NON_NULL)
public record AvailableEntityKeysV2(
@Schema(
description = "Set of entity types found among the matched entities.",
example = "[\"DEVICE\", \"ASSET\"]",
requiredMode = Schema.RequiredMode.REQUIRED
)
Set<EntityType> entityTypes,
@ArraySchema(
arraySchema = @Schema(
description = """
List of unique time series keys available on the matched entities, sorted alphabetically.
Omitted when timeseries keys were not requested.""",
nullable = true
),
schema = @Schema(implementation = KeyInfo.class)
)
@Nullable List<KeyInfo> timeseries,
@Schema(
description = """
Map of attribute scope to the list of unique attribute keys available on the matched entities.
Only scopes supported by the matched entity types are included.
Omitted when attribute keys were not requested or when none of the requested scopes apply to the matched entity types.""",
nullable = true
)
@Nullable Map<AttributeScope, List<KeyInfo>> attributes
) {
@Schema(description = "Key name with an optional sample value.")
@JsonInclude(JsonInclude.Include.NON_NULL)
public record KeyInfo(
@Schema(
description = "Key name.",
example = "temperature",
requiredMode = Schema.RequiredMode.REQUIRED
)
String key,
@Schema(
description = "Most recent sample value for this key across the matched entities. Omitted when samples were not requested.",
nullable = true
)
@Nullable KeySample sample
) {}
@Schema(description = "Most recent value and its timestamp.")
public record KeySample(
@Schema(
description = "Timestamp in milliseconds since epoch.", example = "1707000000000",
requiredMode = Schema.RequiredMode.REQUIRED
)
long ts,
@Schema(
description = "Sample value.",
example = "23.5",
requiredMode = Schema.RequiredMode.REQUIRED,
implementation = Object.class
)
JsonNode value
) {}
}

6
dao/src/main/java/org/thingsboard/server/dao/attributes/AttributesDao.java

@ -55,6 +55,12 @@ public interface AttributesDao {
List<String> findAllKeysByEntityIdsAndScope(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope);
ListenableFuture<List<String>> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope);
List<AttributeKvEntry> findLatestByEntityIdsAndScope(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope);
ListenableFuture<List<AttributeKvEntry>> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope);
List<Pair<AttributeScope, String>> removeAllByEntityId(TenantId tenantId, EntityId entityId);
}

20
dao/src/main/java/org/thingsboard/server/dao/attributes/BaseAttributesService.java

@ -45,9 +45,6 @@ import java.util.Optional;
import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
/**
* @author Andrew Shvayka
*/
@Service
@ConditionalOnProperty(prefix = "cache.attributes", value = "enabled", havingValue = "false", matchIfMissing = true)
@Primary
@ -92,7 +89,7 @@ public class BaseAttributesService implements AttributesService {
}
@Override
public List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
public List<String> findAllKeysByEntityIdsAndScope(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
if (scope == null) {
return attributesDao.findAllKeysByEntityIds(tenantId, entityIds);
} else {
@ -100,6 +97,21 @@ public class BaseAttributesService implements AttributesService {
}
}
@Override
public ListenableFuture<List<String>> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
return attributesDao.findAllKeysByEntityIdsAndScopeAsync(tenantId, entityIds, scope);
}
@Override
public List<AttributeKvEntry> findLatestByEntityIdsAndScope(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
return attributesDao.findLatestByEntityIdsAndScope(tenantId, entityIds, scope);
}
@Override
public ListenableFuture<List<AttributeKvEntry>> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
return attributesDao.findLatestByEntityIdsAndScopeAsync(tenantId, entityIds, scope);
}
@Override
public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
validate(entityId, scope);

18
dao/src/main/java/org/thingsboard/server/dao/attributes/CachedAttributesService.java

@ -66,6 +66,7 @@ import static org.thingsboard.server.dao.attributes.AttributeUtils.validate;
@Primary
@Slf4j
public class CachedAttributesService implements AttributesService {
private static final String STATS_NAME = "attributes.cache";
public static final String LOCAL_CACHE_TYPE = "caffeine";
@ -212,7 +213,7 @@ public class CachedAttributesService implements AttributesService {
}
@Override
public List<String> findAllKeysByEntityIds(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
public List<String> findAllKeysByEntityIdsAndScope(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
if (scope == null) {
return attributesDao.findAllKeysByEntityIds(tenantId, entityIds);
} else {
@ -220,6 +221,21 @@ public class CachedAttributesService implements AttributesService {
}
}
@Override
public ListenableFuture<List<String>> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
return attributesDao.findAllKeysByEntityIdsAndScopeAsync(tenantId, entityIds, scope);
}
@Override
public List<AttributeKvEntry> findLatestByEntityIdsAndScope(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
return attributesDao.findLatestByEntityIdsAndScope(tenantId, entityIds, scope);
}
@Override
public ListenableFuture<List<AttributeKvEntry>> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
return attributesDao.findLatestByEntityIdsAndScopeAsync(tenantId, entityIds, scope);
}
@Override
public ListenableFuture<AttributesSaveResult> save(TenantId tenantId, EntityId entityId, AttributeScope scope, AttributeKvEntry attribute) {
validate(entityId, scope);

86
dao/src/main/java/org/thingsboard/server/dao/entity/BaseEntityService.java

@ -16,6 +16,9 @@
package org.thingsboard.server.dao.entity;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
@ -55,6 +58,7 @@ import org.thingsboard.server.common.msg.edqs.EdqsService;
import org.thingsboard.server.common.stats.EdqsStatsService;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.sql.JpaExecutorService;
import java.util.ArrayList;
import java.util.Collections;
@ -104,6 +108,9 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
@Autowired
private EdqsStatsService edqsStatsService;
@Autowired
private JpaExecutorService jpaExecutorService;
@Override
public long countEntitiesByQuery(TenantId tenantId, CustomerId customerId, EntityCountQuery query) {
log.trace("Executing countEntitiesByQuery, tenantId [{}], customerId [{}], query [{}]", tenantId, customerId, query);
@ -142,41 +149,78 @@ public class BaseEntityService extends AbstractEntityService implements EntitySe
EdqsResponse response = processEdqsRequest(tenantId, customerId, request);
result = response.getEntityDataQueryResult();
} else {
if (!isValidForOptimization(query)) {
result = entityQueryDao.findEntityDataByQuery(tenantId, customerId, query);
} else {
// 1 step - find entity data by filter and sort columns
PageData<EntityData> entityDataByQuery = findEntityIdsByFilterAndSorterColumns(tenantId, customerId, query);
if (entityDataByQuery == null || entityDataByQuery.getData().isEmpty()) {
result = entityDataByQuery;
} else {
// 2 step - find entity data by entity ids from the 1st step
List<EntityData> entities = fetchEntityDataByIdsFromInitialQuery(tenantId, customerId, query, entityDataByQuery.getData());
result = new PageData<>(entities, entityDataByQuery.getTotalPages(), entityDataByQuery.getTotalElements(), entityDataByQuery.hasNext());
}
}
result = findEntityDataByQueryInternal(tenantId, customerId, query);
}
edqsStatsService.reportEntityDataQuery(tenantId, query, System.nanoTime() - startNs);
return result;
}
@Override
public ListenableFuture<PageData<EntityData>> findEntityDataByQueryAsync(TenantId tenantId, CustomerId customerId, EntityDataQuery query) {
log.trace("Executing findEntityDataByQueryAsync, tenantId [{}], customerId [{}], query [{}]", tenantId, customerId, query);
try {
validateId(tenantId, id -> INCORRECT_TENANT_ID + id);
validateId(customerId, id -> INCORRECT_CUSTOMER_ID + id);
validateEntityDataQuery(query);
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
if (edqsService.isApiEnabled() && validForEdqs(query) && !tenantId.isSysTenantId()) {
EdqsRequest request = EdqsRequest.builder()
.entityDataQuery(query)
.build();
long startNs = System.nanoTime();
return Futures.transform(processEdqsRequestAsync(tenantId, customerId, request), response -> {
edqsStatsService.reportEntityDataQuery(tenantId, query, System.nanoTime() - startNs);
return response.getEntityDataQueryResult();
}, MoreExecutors.directExecutor());
}
return jpaExecutorService.submit(() -> {
long startNs = System.nanoTime();
PageData<EntityData> result = findEntityDataByQueryInternal(tenantId, customerId, query);
edqsStatsService.reportEntityDataQuery(tenantId, query, System.nanoTime() - startNs);
return result;
});
}
private PageData<EntityData> findEntityDataByQueryInternal(TenantId tenantId, CustomerId customerId, EntityDataQuery query) {
if (!isValidForOptimization(query)) {
return entityQueryDao.findEntityDataByQuery(tenantId, customerId, query);
}
// 1 step - find entity data by filter and sort columns
PageData<EntityData> entityDataByQuery = findEntityIdsByFilterAndSorterColumns(tenantId, customerId, query);
if (entityDataByQuery == null || entityDataByQuery.getData().isEmpty()) {
return entityDataByQuery;
}
// 2 step - find entity data by entity ids from the 1st step
List<EntityData> entities = fetchEntityDataByIdsFromInitialQuery(tenantId, customerId, query, entityDataByQuery.getData());
return new PageData<>(entities, entityDataByQuery.getTotalPages(), entityDataByQuery.getTotalElements(), entityDataByQuery.hasNext());
}
private boolean validForEdqs(EntityCountQuery query) { // for compatibility with PE
return true;
}
private EdqsResponse processEdqsRequest(TenantId tenantId, CustomerId customerId, EdqsRequest request) {
EdqsResponse response;
try {
log.debug("[{}] Sending request to EDQS: {}", tenantId, request);
response = edqsApiService.processRequest(tenantId, customerId, request).get();
return processEdqsRequestAsync(tenantId, customerId, request).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
log.debug("[{}] Received response from EDQS: {}", tenantId, response);
if (response.getError() != null) {
throw new RuntimeException(response.getError());
}
return response;
}
private ListenableFuture<EdqsResponse> processEdqsRequestAsync(TenantId tenantId, CustomerId customerId, EdqsRequest request) {
log.debug("[{}] Sending request to EDQS: {}", tenantId, request);
return Futures.transform(edqsApiService.processRequest(tenantId, customerId, request), response -> {
log.debug("[{}] Received response from EDQS: {}", tenantId, response);
if (response.getError() != null) {
throw new RuntimeException(response.getError());
}
return response;
}, MoreExecutors.directExecutor());
}
@Override

6
dao/src/main/java/org/thingsboard/server/dao/model/sqlts/latest/TsKvLatestEntity.java

@ -65,6 +65,12 @@ import static org.thingsboard.server.dao.model.ModelConstants.VERSION_COLUMN;
query = SearchTsKvLatestRepository.FIND_ALL_BY_ENTITY_ID_QUERY,
resultSetMapping = "tsKvLatestFindMapping",
resultClass = TsKvLatestEntity.class
),
@NamedNativeQuery(
name = SearchTsKvLatestRepository.FIND_LATEST_BY_ENTITY_IDS,
query = SearchTsKvLatestRepository.FIND_LATEST_BY_ENTITY_IDS_QUERY,
resultSetMapping = "tsKvLatestFindMapping",
resultClass = TsKvLatestEntity.class
)
})
public final class TsKvLatestEntity extends AbstractTsKvEntity {

57
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/AttributeKvRepository.java

@ -20,6 +20,14 @@ import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.transaction.annotation.Transactional;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.JsonDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.model.sql.AttributeKvCompositeKey;
import org.thingsboard.server.dao.model.sql.AttributeKvEntity;
@ -60,6 +68,19 @@ public interface AttributeKvRepository extends JpaRepository<AttributeKvEntity,
List<Integer> findAllKeysByEntityIdsAndAttributeType(@Param("entityIds") List<UUID> entityIds,
@Param("attributeType") int attributeType);
@Query(value = """
SELECT DISTINCT ON (a.attribute_key)
kd.key AS strKey,
a.bool_v AS boolV, a.str_v AS strV, a.long_v AS longV,
a.dbl_v AS dblV, a.json_v AS jsonV,
a.last_update_ts AS lastUpdateTs, a.version AS version
FROM attribute_kv a
INNER JOIN key_dictionary kd ON a.attribute_key = kd.key_id
WHERE a.entity_id IN :entityIds AND a.attribute_type = :attributeType
ORDER BY a.attribute_key, a.last_update_ts DESC""", nativeQuery = true)
List<AttributeKvProjection> findLatestByEntityIdsAndAttributeType(@Param("entityIds") List<UUID> entityIds,
@Param("attributeType") int attributeType);
@Query(value = "SELECT attribute_key, attribute_type, entity_id, bool_v, dbl_v, json_v, last_update_ts, long_v, str_v, version FROM attribute_kv WHERE (entity_id, attribute_type, attribute_key) > " +
"(:entityId, :attributeType, :attributeKey) ORDER BY entity_id, attribute_type, attribute_key LIMIT :batchSize", nativeQuery = true)
List<AttributeKvEntity> findNextBatch(@Param("entityId") UUID entityId,
@ -67,4 +88,40 @@ public interface AttributeKvRepository extends JpaRepository<AttributeKvEntity,
@Param("attributeKey") int attributeKey,
@Param("batchSize") int batchSize);
interface AttributeKvProjection {
String getStrKey();
Boolean getBoolV();
String getStrV();
Long getLongV();
Double getDblV();
String getJsonV();
Long getLastUpdateTs();
Long getVersion();
static AttributeKvEntry toAttributeKvEntry(AttributeKvProjection p) {
KvEntry kvEntry = null;
if (p.getStrV() != null) {
kvEntry = new StringDataEntry(p.getStrKey(), p.getStrV());
} else if (p.getBoolV() != null) {
kvEntry = new BooleanDataEntry(p.getStrKey(), p.getBoolV());
} else if (p.getDblV() != null) {
kvEntry = new DoubleDataEntry(p.getStrKey(), p.getDblV());
} else if (p.getLongV() != null) {
kvEntry = new LongDataEntry(p.getStrKey(), p.getLongV());
} else if (p.getJsonV() != null) {
kvEntry = new JsonDataEntry(p.getStrKey(), p.getJsonV());
}
return new BaseAttributeKvEntry(kvEntry, p.getLastUpdateTs(), p.getVersion());
}
}
}

24
dao/src/main/java/org/thingsboard/server/dao/sql/attributes/JpaAttributeDao.java

@ -20,6 +20,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -46,6 +47,7 @@ import org.thingsboard.server.dao.util.SqlDao;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@ -185,6 +187,28 @@ public class JpaAttributeDao extends JpaAbstractDaoListeningExecutorService impl
.toList();
}
@Override
public ListenableFuture<List<String>> findAllKeysByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
return service.submit(() -> findAllKeysByEntityIdsAndScope(tenantId, entityIds, scope));
}
@Override
public List<AttributeKvEntry> findLatestByEntityIdsAndScope(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
if (CollectionUtils.isEmpty(entityIds)) {
return Collections.emptyList();
}
var uniqueIds = entityIds.stream().map(EntityId::getId).distinct().toList();
return attributeKvRepository.findLatestByEntityIdsAndAttributeType(uniqueIds, scope.getId())
.stream()
.map(AttributeKvRepository.AttributeKvProjection::toAttributeKvEntry)
.toList();
}
@Override
public ListenableFuture<List<AttributeKvEntry>> findLatestByEntityIdsAndScopeAsync(TenantId tenantId, List<EntityId> entityIds, AttributeScope scope) {
return service.submit(() -> findLatestByEntityIdsAndScope(tenantId, entityIds, scope));
}
@Override
public ListenableFuture<Long> save(TenantId tenantId, EntityId entityId, AttributeScope attributeScope, AttributeKvEntry attribute) {
AttributeKvEntity entity = new AttributeKvEntity();

10
dao/src/main/java/org/thingsboard/server/dao/sqlts/CachedRedisSqlTimeseriesLatestDao.java

@ -172,4 +172,14 @@ public class CachedRedisSqlTimeseriesLatestDao extends BaseAbstractSqlTimeseries
return sqlDao.findAllKeysByEntityIdsAsync(tenantId, entityIds);
}
@Override
public List<TsKvEntry> findLatestByEntityIds(TenantId tenantId, List<EntityId> entityIds) {
return sqlDao.findLatestByEntityIds(tenantId, entityIds);
}
@Override
public ListenableFuture<List<TsKvEntry>> findLatestByEntityIdsAsync(TenantId tenantId, List<EntityId> entityIds) {
return sqlDao.findLatestByEntityIdsAsync(tenantId, entityIds);
}
}

17
dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java

@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@ -54,6 +55,7 @@ import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -189,6 +191,21 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
return service.submit(() -> findAllKeysByEntityIds(tenantId, entityIds));
}
@Override
public List<TsKvEntry> findLatestByEntityIds(TenantId tenantId, List<EntityId> entityIds) {
if (CollectionUtils.isEmpty(entityIds)) {
return Collections.emptyList();
}
return DaoUtil.convertDataList(
searchTsKvLatestRepository.findLatestByEntityIds(entityIds.stream().map(EntityId::getId).toList())
);
}
@Override
public ListenableFuture<List<TsKvEntry>> findLatestByEntityIdsAsync(TenantId tenantId, List<EntityId> entityIds) {
return service.submit(() -> findLatestByEntityIds(tenantId, entityIds));
}
private ListenableFuture<TsKvLatestRemovingResult> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query, Long version) {
ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(tenantId, entityId, query);
return Futures.transformAsync(future, entryList -> {

20
dao/src/main/java/org/thingsboard/server/dao/sqlts/latest/SearchTsKvLatestRepository.java

@ -34,6 +34,20 @@ public class SearchTsKvLatestRepository {
" ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue, ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue, ts_kv_latest.ts AS ts, ts_kv_latest.version AS version FROM ts_kv_latest " +
"INNER JOIN key_dictionary ON ts_kv_latest.key = key_dictionary.key_id WHERE ts_kv_latest.entity_id = cast(:id AS uuid)";
public static final String FIND_LATEST_BY_ENTITY_IDS = "findLatestByEntityIds";
public static final String FIND_LATEST_BY_ENTITY_IDS_QUERY = """
SELECT DISTINCT ON (ts_kv_latest.key)
ts_kv_latest.entity_id AS entityId, ts_kv_latest.key AS key,
key_dictionary.key AS strKey, ts_kv_latest.str_v AS strValue,
ts_kv_latest.bool_v AS boolValue, ts_kv_latest.long_v AS longValue,
ts_kv_latest.dbl_v AS doubleValue, ts_kv_latest.json_v AS jsonValue,
ts_kv_latest.ts AS ts, ts_kv_latest.version AS version
FROM ts_kv_latest
INNER JOIN key_dictionary ON ts_kv_latest.key = key_dictionary.key_id
WHERE ts_kv_latest.entity_id IN (:entityIds)
ORDER BY ts_kv_latest.key, ts_kv_latest.ts DESC""";
@PersistenceContext
private EntityManager entityManager;
@ -43,4 +57,10 @@ public class SearchTsKvLatestRepository {
.getResultList();
}
public List<TsKvLatestEntity> findLatestByEntityIds(List<UUID> entityIds) {
return entityManager.createNamedQuery(FIND_LATEST_BY_ENTITY_IDS, TsKvLatestEntity.class)
.setParameter("entityIds", entityIds)
.getResultList();
}
}

13
dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java

@ -55,9 +55,6 @@ import java.util.stream.Collectors;
import static org.thingsboard.server.common.data.StringUtils.isBlank;
/**
* @author Andrew Shvayka
*/
@Service
@Slf4j
public class BaseTimeseriesService implements TimeseriesService {
@ -161,6 +158,16 @@ public class BaseTimeseriesService implements TimeseriesService {
return timeseriesLatestDao.findAllKeysByEntityIdsAsync(tenantId, entityIds);
}
@Override
public List<TsKvEntry> findLatestByEntityIds(TenantId tenantId, List<EntityId> entityIds) {
return timeseriesLatestDao.findLatestByEntityIds(tenantId, entityIds);
}
@Override
public ListenableFuture<List<TsKvEntry>> findLatestByEntityIdsAsync(TenantId tenantId, List<EntityId> entityIds) {
return timeseriesLatestDao.findLatestByEntityIdsAsync(tenantId, entityIds);
}
@Override
public void cleanup(long systemTtl) {
timeseriesDao.cleanup(systemTtl);

10
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java

@ -104,6 +104,16 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
return Futures.immediateFuture(Collections.emptyList());
}
@Override
public List<TsKvEntry> findLatestByEntityIds(TenantId tenantId, List<EntityId> entityIds) {
return Collections.emptyList();
}
@Override
public ListenableFuture<List<TsKvEntry>> findLatestByEntityIdsAsync(TenantId tenantId, List<EntityId> entityIds) {
return Futures.immediateFuture(Collections.emptyList());
}
@Override
public ListenableFuture<Long> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry) {
BoundStatementBuilder stmtBuilder = new BoundStatementBuilder(getLatestStmt().bind());

9
dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java

@ -52,4 +52,13 @@ public interface TimeseriesLatestDao {
ListenableFuture<List<String>> findAllKeysByEntityIdsAsync(TenantId tenantId, List<EntityId> entityIds);
/**
* For each unique timeseries key across the given entities, returns the single most recent {@link TsKvEntry}
* (i.e. the entry with the highest timestamp). If the same key exists on multiple entities,
* only the freshest value is kept. Useful for discovering available keys together with a representative sample value.
*/
List<TsKvEntry> findLatestByEntityIds(TenantId tenantId, List<EntityId> entityIds);
ListenableFuture<List<TsKvEntry>> findLatestByEntityIdsAsync(TenantId tenantId, List<EntityId> entityIds);
}

103
dao/src/test/java/org/thingsboard/server/dao/service/attributes/BaseAttributesServiceTest.java

@ -23,7 +23,6 @@ import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.AttributeScope;
@ -31,8 +30,12 @@ import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.dao.attributes.AttributesDao;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.service.AbstractServiceTest;
@ -40,6 +43,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.Executors;
@ -48,9 +52,6 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
/**
* @author Andrew Shvayka
*/
@Slf4j
public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
@ -60,9 +61,8 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
@Autowired
private AttributesService attributesService;
@Before
public void before() {
}
@Autowired
private AttributesDao attributesDao;
@Test
public void saveAndFetch() throws Exception {
@ -223,7 +223,7 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
saveAttribute(tenantId, deviceId, AttributeScope.SERVER_SCOPE, "key2", "123");
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
List<String> keys = attributesService.findAllKeysByEntityIds(tenantId, List.of(deviceId), AttributeScope.SERVER_SCOPE);
List<String> keys = attributesService.findAllKeysByEntityIdsAndScope(tenantId, List.of(deviceId), AttributeScope.SERVER_SCOPE);
assertThat(keys).containsOnly("key1", "key2");
});
}
@ -241,6 +241,84 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
});
}
@Test
public void findLatestByEntityIdsAndScope_returnsOneEntryPerKey() {
var device1 = new DeviceId(UUID.randomUUID());
var device2 = new DeviceId(UUID.randomUUID());
// Both devices have "temperature", device2 has a newer ts
saveAttribute(tenantId, device1, AttributeScope.SERVER_SCOPE, 1000, new DoubleDataEntry("temperature", 20.0));
saveAttribute(tenantId, device2, AttributeScope.SERVER_SCOPE, 2000, new DoubleDataEntry("temperature", 25.0));
// Only device1 has "humidity"
saveAttribute(tenantId, device1, AttributeScope.SERVER_SCOPE, 1500, new LongDataEntry("humidity", 60L));
// Only device2 has "active"
saveAttribute(tenantId, device2, AttributeScope.SERVER_SCOPE, 3000, new BooleanDataEntry("active", true));
List<AttributeKvEntry> results = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(device1, device2), AttributeScope.SERVER_SCOPE);
Map<String, AttributeKvEntry> byKey = results.stream().collect(Collectors.toMap(AttributeKvEntry::getKey, e -> e));
Assert.assertEquals(3, byKey.size());
// "temperature" should pick device2's value (ts=2000 > ts=1000)
AttributeKvEntry temp = byKey.get("temperature");
Assert.assertNotNull(temp);
Assert.assertEquals(25.0, temp.getDoubleValue().orElseThrow(), 0.0);
Assert.assertEquals(2000, temp.getLastUpdateTs());
// "humidity" — only device1 has it
AttributeKvEntry humidity = byKey.get("humidity");
Assert.assertNotNull(humidity);
Assert.assertEquals(60L, (long) humidity.getLongValue().orElseThrow());
Assert.assertEquals(1500, humidity.getLastUpdateTs());
// "active" — only device2 has it
AttributeKvEntry active = byKey.get("active");
Assert.assertNotNull(active);
Assert.assertEquals(true, active.getBooleanValue().orElseThrow());
Assert.assertEquals(3000, active.getLastUpdateTs());
}
@Test
public void findLatestByEntityIdsAndScope_emptyList() {
List<AttributeKvEntry> results = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(), AttributeScope.SERVER_SCOPE);
Assert.assertTrue(results.isEmpty());
}
@Test
public void findLatestByEntityIdsAndScope_singleEntity() throws Exception {
var device = new DeviceId(UUID.randomUUID());
saveAttribute(tenantId, device, AttributeScope.SERVER_SCOPE, 1000, new StringDataEntry("key1", "value1"));
saveAttribute(tenantId, device, AttributeScope.SERVER_SCOPE, 2000, new StringDataEntry("key2", "value2"));
// sync
List<AttributeKvEntry> results = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(device), AttributeScope.SERVER_SCOPE);
Assert.assertEquals(2, results.size());
Map<String, AttributeKvEntry> byKey = results.stream().collect(Collectors.toMap(AttributeKvEntry::getKey, e -> e));
Assert.assertEquals("value1", byKey.get("key1").getStrValue().orElseThrow());
Assert.assertEquals(1000, byKey.get("key1").getLastUpdateTs());
Assert.assertEquals("value2", byKey.get("key2").getStrValue().orElseThrow());
Assert.assertEquals(2000, byKey.get("key2").getLastUpdateTs());
// async — same result
List<AttributeKvEntry> asyncResults = attributesDao.findLatestByEntityIdsAndScopeAsync(tenantId, List.of(device), AttributeScope.SERVER_SCOPE).get();
Assert.assertEquals(results, asyncResults);
}
@Test
public void findLatestByEntityIdsAndScope_filtersScope() {
var device = new DeviceId(UUID.randomUUID());
saveAttribute(tenantId, device, AttributeScope.SERVER_SCOPE, 1000, new StringDataEntry("serverKey", "sv"));
saveAttribute(tenantId, device, AttributeScope.CLIENT_SCOPE, 1000, new StringDataEntry("clientKey", "cv"));
List<AttributeKvEntry> serverResults = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(device), AttributeScope.SERVER_SCOPE);
Assert.assertEquals(1, serverResults.size());
Assert.assertEquals("serverKey", serverResults.get(0).getKey());
List<AttributeKvEntry> clientResults = attributesDao.findLatestByEntityIdsAndScope(tenantId, List.of(device), AttributeScope.CLIENT_SCOPE);
Assert.assertEquals(1, clientResults.size());
Assert.assertEquals("clientKey", clientResults.get(0).getKey());
}
private void testConcurrentFetchAndUpdate(TenantId tenantId, DeviceId deviceId, ListeningExecutorService pool) throws Exception {
var scope = AttributeScope.SERVER_SCOPE;
var key = "TEST";
@ -313,6 +391,15 @@ public abstract class BaseAttributesServiceTest extends AbstractServiceTest {
}
}
private void saveAttribute(TenantId tenantId, DeviceId deviceId, AttributeScope scope, long ts, KvEntry value) {
try {
attributesService.save(tenantId, deviceId, scope, Collections.singletonList(new BaseAttributeKvEntry(ts, value))).get(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.warn("Failed to save attribute", e.getCause());
throw new RuntimeException(e);
}
}
private void equalsIgnoreVersion(AttributeKvEntry expected, AttributeKvEntry actual) {
Assert.assertEquals(expected.getKey(), actual.getKey());
Assert.assertEquals(expected.getValue(), actual.getValue());

68
dao/src/test/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDaoTest.java

@ -22,6 +22,9 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.service.AbstractServiceTest;
@ -30,7 +33,9 @@ import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@ -102,6 +107,69 @@ public class SqlTimeseriesLatestDaoTest extends AbstractServiceTest {
}
}
@Test
public void findLatestByEntityIds_returnsOneEntryPerKey() throws Exception {
DeviceId device1 = new DeviceId(UUID.randomUUID());
DeviceId device2 = new DeviceId(UUID.randomUUID());
// Both devices have "temperature" key, device2 has a newer ts
timeseriesLatestDao.saveLatest(tenantId, device1, new BasicTsKvEntry(1000, new DoubleDataEntry("temperature", 20.0))).get();
timeseriesLatestDao.saveLatest(tenantId, device2, new BasicTsKvEntry(2000, new DoubleDataEntry("temperature", 25.0))).get();
// Only device1 has "humidity"
timeseriesLatestDao.saveLatest(tenantId, device1, new BasicTsKvEntry(1500, new LongDataEntry("humidity", 60L))).get();
// Only device2 has "active"
timeseriesLatestDao.saveLatest(tenantId, device2, new BasicTsKvEntry(3000, new BooleanDataEntry("active", true))).get();
List<TsKvEntry> results = timeseriesLatestDao.findLatestByEntityIds(tenantId, List.of(device1, device2));
Map<String, TsKvEntry> byKey = results.stream().collect(Collectors.toMap(TsKvEntry::getKey, e -> e));
assertEquals(3, byKey.size());
// "temperature" should pick device2's value (ts=2000 > ts=1000)
TsKvEntry temp = byKey.get("temperature");
assertNotNull(temp);
assertEquals(25.0, temp.getDoubleValue().orElseThrow());
assertEquals(2000, temp.getTs());
// "humidity" — only device1 has it
TsKvEntry humidity = byKey.get("humidity");
assertNotNull(humidity);
assertEquals(60L, humidity.getLongValue().orElseThrow());
assertEquals(1500, humidity.getTs());
// "active" — only device2 has it
TsKvEntry active = byKey.get("active");
assertNotNull(active);
assertEquals(true, active.getBooleanValue().orElseThrow());
assertEquals(3000, active.getTs());
}
@Test
public void findLatestByEntityIds_emptyList() {
List<TsKvEntry> results = timeseriesLatestDao.findLatestByEntityIds(tenantId, List.of());
assertTrue(results.isEmpty());
}
@Test
public void findLatestByEntityIds_singleEntity() throws Exception {
DeviceId device = new DeviceId(UUID.randomUUID());
timeseriesLatestDao.saveLatest(tenantId, device, new BasicTsKvEntry(1000, new StringDataEntry("key1", "value1"))).get();
timeseriesLatestDao.saveLatest(tenantId, device, new BasicTsKvEntry(2000, new StringDataEntry("key2", "value2"))).get();
// sync
List<TsKvEntry> results = timeseriesLatestDao.findLatestByEntityIds(tenantId, List.of(device));
assertEquals(2, results.size());
Map<String, TsKvEntry> byKey = results.stream().collect(Collectors.toMap(TsKvEntry::getKey, e -> e));
assertEquals("value1", byKey.get("key1").getStrValue().orElseThrow());
assertEquals(1000, byKey.get("key1").getTs());
assertEquals("value2", byKey.get("key2").getStrValue().orElseThrow());
assertEquals(2000, byKey.get("key2").getTs());
// async — same result
List<TsKvEntry> asyncResults = timeseriesLatestDao.findLatestByEntityIdsAsync(tenantId, List.of(device)).get();
assertEquals(results, asyncResults);
}
private TsKvEntry createEntry(String key, long ts) {
return new BasicTsKvEntry(ts, new StringDataEntry(key, RandomStringUtils.random(10)));
}

22
rest-client/src/main/java/org/thingsboard/rest/client/RestClient.java

@ -22,7 +22,6 @@ import com.google.common.base.Strings;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.commons.io.IOUtils;
import org.apache.hc.core5.net.URIBuilder;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.apache.hc.core5.net.URIBuilder;
import org.springframework.core.ParameterizedTypeReference;
@ -172,6 +171,7 @@ import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.AvailableEntityKeys;
import org.thingsboard.server.common.data.query.AvailableEntityKeysV2;
import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
@ -1898,6 +1898,10 @@ public class RestClient implements Closeable {
}).getBody();
}
/**
* @deprecated Use {@link #findAvailableEntityKeysV2(EntityDataQuery, boolean, boolean, Set, boolean)} instead.
*/
@Deprecated(forRemoval = true)
public AvailableEntityKeys findAvailableEntityKeysByQuery(EntityDataQuery query, boolean includeTimeseries, boolean includeAttributes, AttributeScope scope) {
var uri = UriComponentsBuilder.fromUriString(baseURL)
.path("/api/entitiesQuery/find/keys")
@ -1909,6 +1913,22 @@ public class RestClient implements Closeable {
return restTemplate.exchange(uri, HttpMethod.POST, new HttpEntity<>(query), new ParameterizedTypeReference<AvailableEntityKeys>() {}).getBody();
}
@SneakyThrows(URISyntaxException.class)
public AvailableEntityKeysV2 findAvailableEntityKeysV2(
EntityDataQuery query, boolean includeTimeseries, boolean includeAttributes, Set<AttributeScope> scopes, boolean includeSamples
) {
var builder = new URIBuilder(baseURL).appendPath("/api/v2/entitiesQuery/find/keys")
.addParameter("includeTimeseries", String.valueOf(includeTimeseries))
.addParameter("includeAttributes", String.valueOf(includeAttributes))
.addParameter("includeSamples", String.valueOf(includeSamples));
if (scopes != null) {
for (AttributeScope scope : scopes) {
builder.addParameter("scopes", scope.name());
}
}
return restTemplate.exchange(builder.build(), HttpMethod.POST, new HttpEntity<>(query), new ParameterizedTypeReference<AvailableEntityKeysV2>() {}).getBody();
}
public PageData<AlarmData> findAlarmDataByQuery(AlarmDataQuery query) {
return restTemplate.exchange(
baseURL + "/api/alarmsQuery/find",

Loading…
Cancel
Save