From 26a41b7c32aebbdaf8aa3d95fc8358d112c25dc2 Mon Sep 17 00:00:00 2001 From: ViacheslavKlimov Date: Mon, 5 May 2025 17:41:30 +0300 Subject: [PATCH] Introduce TbTelemetryService --- .../controller/TelemetryController.java | 23 ++--- .../telemetry/DefaultTbTelemetryService.java | 92 +++++++++++++++++++ .../service/telemetry/TbTelemetryService.java | 43 +++++++++ 3 files changed, 144 insertions(+), 14 deletions(-) create mode 100644 application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTbTelemetryService.java create mode 100644 application/src/main/java/org/thingsboard/server/service/telemetry/TbTelemetryService.java diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 6ca767dc01..d93e973073 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -92,6 +92,7 @@ import org.thingsboard.server.service.security.AccessValidator; import org.thingsboard.server.service.security.model.SecurityUser; import org.thingsboard.server.service.security.permission.Operation; import org.thingsboard.server.service.telemetry.AttributeData; +import org.thingsboard.server.service.telemetry.TbTelemetryService; import org.thingsboard.server.service.telemetry.TsData; import java.util.ArrayList; @@ -155,6 +156,9 @@ public class TelemetryController extends BaseController { @Autowired private AccessValidator accessValidator; + @Autowired + private TbTelemetryService tbTelemetryService; + @Value("${transport.json.max_string_value_length:0}") private int maxStringValueLength; @@ -323,20 +327,11 @@ public class TelemetryController extends BaseController { @RequestParam(name = "orderBy", defaultValue = "DESC") String orderBy, @Parameter(description = STRICT_DATA_TYPES_DESCRIPTION) @RequestParam(name = "useStrictDataTypes", required = false, defaultValue = "false") Boolean useStrictDataTypes) throws ThingsboardException { - return accessValidator.validateEntityAndCallback(getCurrentUser(), Operation.READ_TELEMETRY, entityType, entityIdStr, - (result, tenantId, entityId) -> { - AggregationParams params; - Aggregation agg = Aggregation.valueOf(aggStr); - if (Aggregation.NONE.equals(agg)) { - params = AggregationParams.none(); - } else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) { - params = interval == 0L ? AggregationParams.none() : AggregationParams.milliseconds(agg, interval); - } else { - params = AggregationParams.calendar(agg, intervalType, timeZone); - } - List queries = toKeysList(keys).stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, params, limit, orderBy)).collect(Collectors.toList()); - Futures.addCallback(tsService.findAll(tenantId, entityId, queries), getTsKvListCallback(result, useStrictDataTypes), MoreExecutors.directExecutor()); - }); + DeferredResult response = new DeferredResult<>(); + Futures.addCallback(tbTelemetryService.getTimeseries(EntityIdFactory.getByTypeAndId(entityType, entityIdStr), toKeysList(keys), startTs, endTs, + intervalType, interval, timeZone, limit, Aggregation.valueOf(aggStr), orderBy, useStrictDataTypes, getCurrentUser()), + getTsKvListCallback(response, useStrictDataTypes), MoreExecutors.directExecutor()); + return response; } @ApiOperation(value = "Save device attributes (saveDeviceAttributes)", diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTbTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTbTelemetryService.java new file mode 100644 index 0000000000..d55f44a027 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTbTelemetryService.java @@ -0,0 +1,92 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.telemetry; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.AggregationParams; +import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; +import org.thingsboard.server.common.data.kv.IntervalType; +import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.dao.timeseries.TimeseriesService; +import org.thingsboard.server.service.security.AccessValidator; +import org.thingsboard.server.service.security.ValidationResult; +import org.thingsboard.server.service.security.model.SecurityUser; +import org.thingsboard.server.service.security.permission.Operation; + +import java.util.List; +import java.util.stream.Collectors; + +@Service +@Slf4j +@RequiredArgsConstructor +public class DefaultTbTelemetryService implements TbTelemetryService { + + private final TimeseriesService tsService; + private final AccessValidator accessValidator; + + @Override + public ListenableFuture> getTimeseries(EntityId entityId, List keys, Long startTs, Long endTs, IntervalType intervalType, + Long interval, String timeZone, Integer limit, Aggregation agg, String orderBy, + Boolean useStrictDataTypes, SecurityUser currentUser) { + SettableFuture> future = SettableFuture.create(); + accessValidator.validate(currentUser, Operation.READ_TELEMETRY, entityId, new FutureCallback<>() { + @Override + public void onSuccess(ValidationResult validationResult) { + try { + AggregationParams params; + if (Aggregation.NONE.equals(agg)) { + params = AggregationParams.none(); + } else if (intervalType == null || IntervalType.MILLISECONDS.equals(intervalType)) { + params = interval == 0L ? AggregationParams.none() : AggregationParams.milliseconds(agg, interval); + } else { + params = AggregationParams.calendar(agg, intervalType, timeZone); + } + List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, params, limit, orderBy)).collect(Collectors.toList()); + Futures.addCallback(tsService.findAll(currentUser.getTenantId(), entityId, queries), new FutureCallback<>() { + @Override + public void onSuccess(List result) { + future.set(result); + } + + @Override + public void onFailure(Throwable t) { + future.setException(t); + } + }, MoreExecutors.directExecutor()); + } catch (Throwable e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable t) { + future.setException(t); + } + }); + return future; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TbTelemetryService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TbTelemetryService.java new file mode 100644 index 0000000000..9820e62592 --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TbTelemetryService.java @@ -0,0 +1,43 @@ +/** + * Copyright © 2016-2025 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.telemetry; + +import com.google.common.util.concurrent.ListenableFuture; +import org.thingsboard.server.common.data.exception.ThingsboardException; +import org.thingsboard.server.common.data.id.EntityId; +import org.thingsboard.server.common.data.kv.Aggregation; +import org.thingsboard.server.common.data.kv.IntervalType; +import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.service.security.model.SecurityUser; + +import java.util.List; + +public interface TbTelemetryService { + + ListenableFuture> getTimeseries(EntityId entityId, + List keys, + Long startTs, + Long endTs, + IntervalType intervalType, + Long interval, + String timeZone, + Integer limit, + Aggregation agg, + String orderBy, + Boolean useStrictDataTypes, + SecurityUser currentUser) throws ThingsboardException; + +}