Browse Source

Merge fc8ddede16 into 767fdbef6d

pull/15429/merge
Oleksandra Matviienko 4 days ago
committed by GitHub
parent
commit
b68bb92e49
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 29
      application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java
  2. 74
      application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java
  3. 3
      application/src/main/java/org/thingsboard/server/service/query/EntityQueryService.java
  4. 222
      application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java

29
application/src/main/java/org/thingsboard/server/controller/EntityQueryController.java

@ -17,6 +17,7 @@ package org.thingsboard.server.controller;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
@ -31,6 +32,7 @@ import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.thingsboard.server.common.data.edqs.EdqsState;
import org.thingsboard.server.common.data.edqs.ToCoreEdqsRequest;
import org.thingsboard.server.common.data.exception.ThingsboardErrorCode;
import org.thingsboard.server.common.data.exception.ThingsboardException;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.id.UserId;
@ -49,6 +51,7 @@ import org.thingsboard.server.config.annotations.ApiOperation;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.query.EntityQueryService;
import org.thingsboard.server.service.security.permission.Operation;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggHistoryCmd;
import static org.thingsboard.server.controller.ControllerConstants.ALARM_DATA_QUERY_DESCRIPTION;
import static org.thingsboard.server.controller.ControllerConstants.ATTRIBUTES_SCOPE_DESCRIPTION;
@ -149,6 +152,32 @@ public class EntityQueryController extends BaseController {
return entityQueryService.getKeysByQuery(getCurrentUser(), tenantId, query, isTimeseries, isAttributes, scope);
}
@ApiOperation(value = "Find Aggregated Historical Entity Data by Query",
notes = "Runs the entity data query and, for each matched entity, fetches a single aggregated value per key over [startTs, endTs] " +
"with per-key aggregation function. Optional previousStartTs/previousEndTs per key add a comparison window. " +
"REST equivalent of the WebSocket AggHistoryCmd. The aggregated values are returned in the 'aggLatest' field of each EntityData.")
@PreAuthorize("hasAnyAuthority('SYS_ADMIN', 'TENANT_ADMIN', 'CUSTOMER_USER')")
@PostMapping("/entitiesQuery/find/aggHistory")
public DeferredResult<PageData<EntityData>> findEntityDataAggHistoryByQuery(
@Parameter(description = "A JSON value representing the entity data query and aggregated history command.")
@RequestBody EntityDataAggHistoryRequest request) throws ThingsboardException {
checkNotNull(request);
checkNotNull(request.getQuery());
checkNotNull(request.getAggHistoryCmd());
AggHistoryCmd cmd = request.getAggHistoryCmd();
if (cmd.getEndTs() < cmd.getStartTs()) {
throw new ThingsboardException("endTs must be >= startTs", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
}
resolveQuery(request.getQuery());
return entityQueryService.findEntityDataAggHistoryByQuery(getCurrentUser(), request.getQuery(), cmd);
}
@Data
public static class EntityDataAggHistoryRequest {
private EntityDataQuery query;
private AggHistoryCmd aggHistoryCmd;
}
@PreAuthorize("hasAnyAuthority('SYS_ADMIN')")
@PostMapping("/edqs/system/request")
public void processSystemEdqsRequest(@RequestBody ToCoreEdqsRequest request) {

74
application/src/main/java/org/thingsboard/server/service/query/DefaultEntityQueryService.java

@ -36,10 +36,14 @@ import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQueryResult;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.AlarmData;
import org.thingsboard.server.common.data.query.AlarmDataQuery;
import org.thingsboard.server.common.data.query.ComparisonTsValue;
import org.thingsboard.server.common.data.query.ComplexFilterPredicate;
import org.thingsboard.server.common.data.query.DynamicValue;
import org.thingsboard.server.common.data.query.EntityCountQuery;
@ -53,6 +57,7 @@ import org.thingsboard.server.common.data.query.FilterPredicateType;
import org.thingsboard.server.common.data.query.KeyFilter;
import org.thingsboard.server.common.data.query.KeyFilterPredicate;
import org.thingsboard.server.common.data.query.SimpleKeyFilterPredicate;
import org.thingsboard.server.common.data.query.TsValue;
import org.thingsboard.server.dao.alarm.AlarmService;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.entity.EntityService;
@ -63,10 +68,14 @@ import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.executors.DbCallbackExecutorService;
import org.thingsboard.server.service.security.AccessValidator;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.subscription.ReadTsKvQueryInfo;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggKey;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -339,4 +348,69 @@ public class DefaultEntityQueryService implements EntityQueryService {
}, dbCallbackExecutor);
}
@Override
public DeferredResult<PageData<EntityData>> findEntityDataAggHistoryByQuery(SecurityUser securityUser, EntityDataQuery query, AggHistoryCmd cmd) {
DeferredResult<PageData<EntityData>> response = new DeferredResult<>();
PageData<EntityData> pageData = findEntityDataByQuery(securityUser, query);
List<EntityData> entityDataList = pageData.getData();
if (entityDataList.isEmpty() || cmd.getKeys() == null || cmd.getKeys().isEmpty()) {
response.setResult(pageData);
return response;
}
TenantId tenantId = securityUser.getTenantId();
Map<Integer, ReadTsKvQueryInfo> queries = new HashMap<>();
for (AggKey key : cmd.getKeys()) {
if (key.getPreviousValueOnly() == null || !key.getPreviousValueOnly()) {
var q = new BaseReadTsKvQuery(key.getKey(), cmd.getStartTs(), cmd.getEndTs(),
cmd.getEndTs() - cmd.getStartTs(), 1, key.getAgg());
queries.put(q.getId(), new ReadTsKvQueryInfo(key, q, false));
}
if (key.getPreviousStartTs() != null && key.getPreviousEndTs() != null && key.getPreviousEndTs() >= key.getPreviousStartTs()) {
var q = new BaseReadTsKvQuery(key.getKey(), key.getPreviousStartTs(), key.getPreviousEndTs(),
key.getPreviousEndTs() - key.getPreviousStartTs(), 1, key.getAgg());
queries.put(q.getId(), new ReadTsKvQueryInfo(key, q, true));
}
}
List<ReadTsKvQuery> queryList = queries.values().stream().map(ReadTsKvQueryInfo::getQuery).collect(Collectors.toList());
Map<EntityData, ListenableFuture<List<ReadTsKvQueryResult>>> fetchResultMap = new LinkedHashMap<>();
entityDataList.forEach(entityData -> fetchResultMap.put(entityData,
timeseriesService.findAllByQueries(tenantId, entityData.getEntityId(), queryList)));
Futures.addCallback(Futures.allAsList(fetchResultMap.values()), new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<List<ReadTsKvQueryResult>> ignored) {
fetchResultMap.forEach((entityData, future) -> {
try {
List<ReadTsKvQueryResult> queryResults = future.get();
if (queryResults != null) {
for (ReadTsKvQueryResult queryResult : queryResults) {
ReadTsKvQueryInfo info = queries.get(queryResult.getQueryId());
ComparisonTsValue comparisonTsValue = entityData.getAggLatest()
.computeIfAbsent(info.getKey().getId(), k -> new ComparisonTsValue());
if (info.isPrevious()) {
comparisonTsValue.setPrevious(queryResult.toTsValue(info.getQuery()));
} else {
comparisonTsValue.setCurrent(queryResult.toTsValue(info.getQuery()));
}
}
}
cmd.getKeys().forEach(key -> entityData.getAggLatest()
.putIfAbsent(key.getId(), new ComparisonTsValue(TsValue.EMPTY, TsValue.EMPTY)));
} catch (InterruptedException | ExecutionException e) {
log.warn("[{}] Failed to fetch aggregated historical data", entityData.getEntityId(), e);
cmd.getKeys().forEach(key -> entityData.getAggLatest()
.putIfAbsent(key.getId(), new ComparisonTsValue(TsValue.EMPTY, TsValue.EMPTY)));
}
});
response.setResult(pageData);
}
@Override
public void onFailure(Throwable t) {
log.error("Failed to fetch aggregated historical data", t);
response.setErrorResult(t);
}
}, dbCallbackExecutor);
return response;
}
}

3
application/src/main/java/org/thingsboard/server/service/query/EntityQueryService.java

@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.query.EntityCountQuery;
import org.thingsboard.server.common.data.query.EntityData;
import org.thingsboard.server.common.data.query.EntityDataQuery;
import org.thingsboard.server.service.security.model.SecurityUser;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggHistoryCmd;
public interface EntityQueryService {
@ -40,4 +41,6 @@ public interface EntityQueryService {
DeferredResult<ResponseEntity> getKeysByQuery(SecurityUser securityUser, TenantId tenantId, EntityDataQuery query,
boolean isTimeseries, boolean isAttributes, String attributesScope);
DeferredResult<PageData<EntityData>> findEntityDataAggHistoryByQuery(SecurityUser securityUser, EntityDataQuery query, AggHistoryCmd aggHistoryCmd);
}

222
application/src/test/java/org/thingsboard/server/controller/EntityQueryControllerTest.java

@ -18,6 +18,8 @@ package org.thingsboard.server.controller;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.util.concurrent.FutureCallback;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -27,6 +29,7 @@ import org.springframework.test.context.TestPropertySource;
import org.springframework.test.web.servlet.ResultActions;
import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.rule.engine.api.TimeseriesSaveRequest;
import org.thingsboard.server.common.data.AttributeScope;
import org.thingsboard.server.common.data.Customer;
import org.thingsboard.server.common.data.Dashboard;
@ -42,6 +45,10 @@ import org.thingsboard.server.common.data.alarm.AlarmSeverity;
import org.thingsboard.server.common.data.asset.Asset;
import org.thingsboard.server.common.data.id.DeviceId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BasicTsKvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.page.PageData;
import org.thingsboard.server.common.data.query.AlarmCountQuery;
import org.thingsboard.server.common.data.query.AlarmData;
@ -75,14 +82,20 @@ import org.thingsboard.server.common.data.relation.RelationEntityTypeFilter;
import org.thingsboard.server.common.data.security.Authority;
import org.thingsboard.server.common.data.tenant.profile.DefaultTenantProfileConfiguration;
import org.thingsboard.server.common.data.tenant.profile.TenantProfileData;
import org.thingsboard.server.common.data.query.ComparisonTsValue;
import org.thingsboard.server.dao.queue.QueueStatsService;
import org.thingsboard.server.dao.service.DaoSqlTest;
import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggHistoryCmd;
import org.thingsboard.server.service.ws.telemetry.cmd.v2.AggKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
@ -107,6 +120,9 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
@Autowired
private QueueStatsService queueStatsService;
@Autowired
private TelemetrySubscriptionService tsService;
@Before
public void beforeTest() throws Exception {
loginSysAdmin();
@ -1329,6 +1345,212 @@ public class EntityQueryControllerTest extends AbstractControllerTest {
return numericFilter;
}
@Test
public void testFindEntityDataAggHistoryByQuery() throws Exception {
Device device = new Device();
device.setName("AggHistoryDevice");
device.setType("default");
device = doPost("/api/device", device, Device.class);
long now = System.currentTimeMillis();
long startTs = now - TimeUnit.MINUTES.toMillis(30);
long endTs = now;
long previousStartTs = now - TimeUnit.MINUTES.toMillis(60);
long previousEndTs = startTs - 1;
List<TsKvEntry> currentData = new ArrayList<>();
for (int i = 0; i < 20; i++) {
long ts = startTs + i * TimeUnit.MINUTES.toMillis(1);
currentData.add(new BasicTsKvEntry(ts, new LongDataEntry("temperature", 30L)));
}
List<TsKvEntry> previousData = new ArrayList<>();
for (int i = 0; i < 10; i++) {
long ts = previousStartTs + i * TimeUnit.MINUTES.toMillis(1);
previousData.add(new BasicTsKvEntry(ts, new LongDataEntry("temperature", 30L)));
}
sendTelemetry(device, currentData);
sendTelemetry(device, previousData);
DeviceTypeFilter filter = new DeviceTypeFilter(List.of("default"), "AggHistoryDevice");
EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null);
EntityDataQuery query = new EntityDataQuery(filter, pageLink, Collections.emptyList(), Collections.emptyList(), null);
AggKey key = new AggKey();
key.setId(1);
key.setKey("temperature");
key.setAgg(Aggregation.SUM);
key.setPreviousStartTs(previousStartTs);
key.setPreviousEndTs(previousEndTs);
AggHistoryCmd cmd = new AggHistoryCmd();
cmd.setKeys(List.of(key));
cmd.setStartTs(startTs);
cmd.setEndTs(endTs);
ObjectNode request = JacksonUtil.newObjectNode();
request.set("query", JacksonUtil.valueToTree(query));
request.set("aggHistoryCmd", JacksonUtil.valueToTree(cmd));
PageData<EntityData> result = doPostAsyncWithTypedResponse("/api/entitiesQuery/find/aggHistory",
request, new TypeReference<>() {}, status().isOk());
assertThat(result.getData()).hasSize(1);
EntityData entityData = result.getData().get(0);
assertThat(entityData.getEntityId()).isEqualTo(device.getId());
ComparisonTsValue agg = entityData.getAggLatest().get(1);
assertThat(agg).isNotNull();
assertThat(agg.getCurrent().getValue()).isEqualTo("600");
assertThat(agg.getPrevious().getValue()).isEqualTo("300");
}
@Test
public void testFindEntityDataAggHistoryByQuery_previousValueOnly() throws Exception {
Device device = new Device();
device.setName("AggPrevOnlyDevice");
device.setType("default");
device = doPost("/api/device", device, Device.class);
long now = System.currentTimeMillis();
long startTs = now - TimeUnit.MINUTES.toMillis(30);
long endTs = now;
long previousStartTs = now - TimeUnit.MINUTES.toMillis(60);
long previousEndTs = startTs - 1;
List<TsKvEntry> currentData = new ArrayList<>();
for (int i = 0; i < 5; i++) {
currentData.add(new BasicTsKvEntry(startTs + i * TimeUnit.MINUTES.toMillis(1),
new LongDataEntry("temperature", 100L)));
}
List<TsKvEntry> previousData = new ArrayList<>();
for (int i = 0; i < 3; i++) {
previousData.add(new BasicTsKvEntry(previousStartTs + i * TimeUnit.MINUTES.toMillis(1),
new LongDataEntry("temperature", 10L)));
}
sendTelemetry(device, currentData);
sendTelemetry(device, previousData);
AggKey key = new AggKey();
key.setId(7);
key.setKey("temperature");
key.setAgg(Aggregation.SUM);
key.setPreviousStartTs(previousStartTs);
key.setPreviousEndTs(previousEndTs);
key.setPreviousValueOnly(true);
PageData<EntityData> result = postAggHistory(device, startTs, endTs, List.of(key));
ComparisonTsValue agg = result.getData().get(0).getAggLatest().get(7);
assertThat(agg.getPrevious().getValue()).isEqualTo("30");
assertThat(agg.getCurrent()).isNull();
}
@Test
public void testFindEntityDataAggHistoryByQuery_noTelemetry() throws Exception {
Device device = new Device();
device.setName("AggEmptyDevice");
device.setType("default");
device = doPost("/api/device", device, Device.class);
long now = System.currentTimeMillis();
AggKey key = new AggKey();
key.setId(1);
key.setKey("temperature");
key.setAgg(Aggregation.SUM);
PageData<EntityData> result = postAggHistory(device, now - TimeUnit.HOURS.toMillis(1), now, List.of(key));
ComparisonTsValue agg = result.getData().get(0).getAggLatest().get(1);
assertThat(agg).isNotNull();
assertThat(agg.getCurrent().getValue()).isEqualTo("0");
assertThat(agg.getPrevious()).isNull();
}
@Test
public void testFindEntityDataAggHistoryByQuery_multipleKeysMixedAggregation() throws Exception {
Device device = new Device();
device.setName("AggMultiKeyDevice");
device.setType("default");
device = doPost("/api/device", device, Device.class);
long now = System.currentTimeMillis();
long startTs = now - TimeUnit.MINUTES.toMillis(30);
long endTs = now;
List<TsKvEntry> data = new ArrayList<>();
long[] values = {10L, 20L, 30L, 40L, 50L};
for (int i = 0; i < values.length; i++) {
data.add(new BasicTsKvEntry(startTs + i * TimeUnit.MINUTES.toMillis(1),
new LongDataEntry("temperature", values[i])));
}
sendTelemetry(device, data);
AggKey minKey = new AggKey();
minKey.setId(1);
minKey.setKey("temperature");
minKey.setAgg(Aggregation.MIN);
AggKey maxKey = new AggKey();
maxKey.setId(2);
maxKey.setKey("temperature");
maxKey.setAgg(Aggregation.MAX);
AggKey avgKey = new AggKey();
avgKey.setId(3);
avgKey.setKey("temperature");
avgKey.setAgg(Aggregation.AVG);
PageData<EntityData> result = postAggHistory(device, startTs, endTs, List.of(minKey, maxKey, avgKey));
var aggLatest = result.getData().get(0).getAggLatest();
assertThat(aggLatest.get(1).getCurrent().getValue()).isEqualTo("10");
assertThat(aggLatest.get(2).getCurrent().getValue()).isEqualTo("50");
assertThat(Double.parseDouble(aggLatest.get(3).getCurrent().getValue())).isEqualTo(30.0);
}
private PageData<EntityData> postAggHistory(Device device, long startTs, long endTs, List<AggKey> keys) throws Exception {
DeviceTypeFilter filter = new DeviceTypeFilter(List.of("default"), device.getName());
EntityDataPageLink pageLink = new EntityDataPageLink(10, 0, null, null);
EntityDataQuery query = new EntityDataQuery(filter, pageLink, Collections.emptyList(), Collections.emptyList(), null);
AggHistoryCmd cmd = new AggHistoryCmd();
cmd.setKeys(keys);
cmd.setStartTs(startTs);
cmd.setEndTs(endTs);
ObjectNode request = JacksonUtil.newObjectNode();
request.set("query", JacksonUtil.valueToTree(query));
request.set("aggHistoryCmd", JacksonUtil.valueToTree(cmd));
return doPostAsyncWithTypedResponse("/api/entitiesQuery/find/aggHistory",
request, new TypeReference<>() {}, status().isOk());
}
private void sendTelemetry(Device device, List<TsKvEntry> tsData) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Throwable> error = new AtomicReference<>();
tsService.saveTimeseries(TimeseriesSaveRequest.builder()
.tenantId(device.getTenantId())
.entityId(device.getId())
.entries(tsData)
.callback(new FutureCallback<>() {
@Override
public void onSuccess(@Nullable Void result) {
latch.countDown();
}
@Override
public void onFailure(Throwable t) {
error.set(t);
latch.countDown();
}
})
.build());
assertThat(latch.await(TIMEOUT, TimeUnit.SECONDS)).isTrue();
if (error.get() != null) {
throw new AssertionError("Failed to save telemetry", error.get());
}
}
private KeyFilter buildStringKeyFilter(EntityKeyType entityKeyType, String name, StringFilterPredicate.StringOperation operation, String value) {
KeyFilter nameFilter = new KeyFilter();
nameFilter.setKey(new EntityKey(entityKeyType, name));

Loading…
Cancel
Save