From b79c2407c49aceb814a17d5a6e99198c5119d020 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 14 Dec 2021 09:12:25 +0200 Subject: [PATCH 1/5] ts notification --- .../controller/TelemetryController.java | 7 ++-- .../queue/DefaultTbCoreConsumerService.java | 11 ++++-- .../DefaultSubscriptionManagerService.java | 34 ++++++++++++++++--- .../SubscriptionManagerService.java | 2 ++ .../subscription/TbSubscriptionUtils.java | 22 +++++++++--- .../DefaultTelemetrySubscriptionService.java | 22 ++++++++++++ common/cluster-api/src/main/proto/queue.proto | 10 ++++++ .../common/data/kv/BaseDeleteTsKvQuery.java | 4 ++- .../api/RuleEngineTelemetryService.java | 3 +- 9 files changed, 99 insertions(+), 16 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 7a6a1620ae..70a6b029ed 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -573,10 +573,9 @@ public class TelemetryController extends BaseController { for (String key : keys) { deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted)); } - ListenableFuture> future = tsService.remove(user.getTenantId(), entityId, deleteTsKvQueries); - Futures.addCallback(future, new FutureCallback<>() { + tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() { @Override - public void onSuccess(@Nullable List tmp) { + public void onSuccess(@Nullable Void tmp) { logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null); result.setResult(new ResponseEntity<>(HttpStatus.OK)); } @@ -586,7 +585,7 @@ public class TelemetryController extends BaseController { logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t); result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR)); } - }, executor); + }); }); } diff --git a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java index ad7a0340cf..3602aa804e 100644 --- a/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java +++ b/application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java @@ -26,14 +26,15 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.common.util.ThingsBoardThreadFactory; -import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.actors.ActorSystemContext; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.id.TenantId; +import org.thingsboard.server.common.data.rpc.RpcError; import org.thingsboard.server.common.msg.MsgType; import org.thingsboard.server.common.msg.TbActorMsg; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; +import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; import org.thingsboard.server.dao.tenant.TbTenantProfileCache; @@ -47,6 +48,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg; @@ -64,7 +66,6 @@ import org.thingsboard.server.service.ota.OtaPackageStateService; import org.thingsboard.server.service.profile.TbDeviceProfileCache; import org.thingsboard.server.service.queue.processing.AbstractConsumerService; import org.thingsboard.server.service.queue.processing.IdMsgPair; -import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse; import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService; import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg; import org.thingsboard.server.service.state.DeviceStateService; @@ -467,6 +468,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService keys, TbCallback callback) { + onLocalTelemetrySubUpdate(entityId, + s -> { + if (TbSubscriptionType.TIMESERIES.equals(s.getType())) { + return (TbTimeseriesSubscription) s; + } else { + return null; + } + }, s -> true, s -> { + List subscriptionUpdate = null; + for (String key : keys) { + if (s.isAllKeys() || s.getKeyStates().containsKey(key)) { + if (subscriptionUpdate == null) { + subscriptionUpdate = new ArrayList<>(); + } + subscriptionUpdate.add(new BasicTsKvEntry(0, new StringDataEntry(key, null))); + } + } + return subscriptionUpdate; + }, false); + callback.onSuccess(); + } + private void onLocalTelemetrySubUpdate(EntityId entityId, Function castFunction, Predicate filterFunction, diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java index 37850b701f..67f2e10614 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java @@ -40,6 +40,8 @@ public interface SubscriptionManagerService extends ApplicationListener keys, TbCallback empty); + void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, TbCallback callback); + void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback); void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java index 25b67d3a06..82e0b17f67 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.subscription; +import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.common.data.alarm.Alarm; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.EntityIdFactory; @@ -30,25 +31,25 @@ import org.thingsboard.server.common.data.kv.KvEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; -import org.thingsboard.common.util.JacksonUtil; import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; +import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionKetStateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue; +import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto; import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto; import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg; import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto; -import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto; import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; @@ -207,6 +208,19 @@ public class TbSubscriptionUtils { return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); } + public static ToCoreMsg toTimeseriesDeleteProto(TenantId tenantId, EntityId entityId, List keys) { + TbTimeSeriesDeleteProto.Builder builder = TbTimeSeriesDeleteProto.newBuilder(); + builder.setEntityType(entityId.getEntityType().name()); + builder.setEntityIdMSB(entityId.getId().getMostSignificantBits()); + builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits()); + builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits()); + builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits()); + builder.addAllKeys(keys); + SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder(); + msgBuilder.setTsDelete(builder); + return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build(); + } + public static ToCoreMsg toAttributesUpdateProto(TenantId tenantId, EntityId entityId, String scope, List attributes) { TbAttributeUpdateProto.Builder builder = TbAttributeUpdateProto.newBuilder(); builder.setEntityType(entityId.getEntityType().name()); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index a3771988cf..dba664013d 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -31,6 +31,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; import org.thingsboard.server.common.data.kv.BooleanDataEntry; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; @@ -271,6 +272,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer }, tsCallBackExecutor); } + @Override + public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback) { + ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); + addVoidCallback(deleteFuture, callback); + addWsCallback(deleteFuture, success -> onTimeSeriesDelete(tenantId, entityId, keys)); + } + @Override public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback callback) { saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value) @@ -337,6 +345,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } + private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys) { + TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); + if (currentPartitions.contains(tpi)) { + if (subscriptionManagerService.isPresent()) { + subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, keys, TbCallback.EMPTY); + } else { + log.warn("Possible misconfiguration because subscriptionManagerService is null!"); + } + } else { + TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toTimeseriesDeleteProto(tenantId, entityId, keys); + clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null); + } + } + private void addVoidCallback(ListenableFuture saveFuture, final FutureCallback callback) { Futures.addCallback(saveFuture, new FutureCallback() { @Override diff --git a/common/cluster-api/src/main/proto/queue.proto b/common/cluster-api/src/main/proto/queue.proto index dca92c1c7a..d576f38340 100644 --- a/common/cluster-api/src/main/proto/queue.proto +++ b/common/cluster-api/src/main/proto/queue.proto @@ -561,6 +561,15 @@ message TbAttributeDeleteProto { repeated string keys = 7; } +message TbTimeSeriesDeleteProto { + string entityType = 1; + int64 entityIdMSB = 2; + int64 entityIdLSB = 3; + int64 tenantIdMSB = 4; + int64 tenantIdLSB = 5; + repeated string keys = 6; +} + message TbTimeSeriesUpdateProto { string entityType = 1; int64 entityIdMSB = 2; @@ -614,6 +623,7 @@ message SubscriptionMgrMsgProto { TbAlarmSubscriptionProto alarmSub = 7; TbAlarmUpdateProto alarmUpdate = 8; TbAlarmDeleteProto alarmDelete = 9; + TbTimeSeriesDeleteProto tsDelete = 10; } message LocalSubscriptionServiceMsgProto { diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java index 6c9dfbd198..3ffedaa69a 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java @@ -31,5 +31,7 @@ public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuer this(key, startTs, endTs, false); } - + public Boolean getRewriteLatestIfDeleted() { + return rewriteLatestIfDeleted; + } } diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index e641c83144..a19d0fad0c 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; +import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.Collection; @@ -54,5 +55,5 @@ public interface RuleEngineTelemetryService { void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback); - + void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback); } From b8d49b66a6f6926a3697c9e1ccd67e3c59dca672 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Fri, 17 Dec 2021 16:36:19 +0200 Subject: [PATCH 2/5] send notifications by rewriteLatestIfDeleted --- .../controller/TelemetryController.java | 2 +- .../AbstractSubscriptionService.java | 2 +- .../DefaultTelemetrySubscriptionService.java | 27 ++++++++++++++----- .../dao/timeseries/TimeseriesService.java | 4 +-- .../dao/sqlts/SqlTimeseriesLatestDao.java | 13 ++++----- .../dao/timeseries/BaseTimeseriesService.java | 14 +++++----- .../CassandraBaseTimeseriesLatestDao.java | 11 ++++---- .../dao/timeseries/TimeseriesLatestDao.java | 2 +- .../api/RuleEngineTelemetryService.java | 2 +- .../models/datasource/attribute-datasource.ts | 10 ++++++- 10 files changed, 55 insertions(+), 32 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index 70a6b029ed..dfe0701642 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -573,7 +573,7 @@ public class TelemetryController extends BaseController { for (String key : keys) { deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted)); } - tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() { + tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, rewriteLatestIfDeleted, new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) { logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java index bbd3811075..3512c37134 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java @@ -90,7 +90,7 @@ public abstract class AbstractSubscriptionService extends TbApplicationEventList Futures.addCallback(saveFuture, new FutureCallback() { @Override public void onSuccess(@Nullable T result) { - callback.accept(null); + callback.accept(result); } @Override diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index dba664013d..8a911962f0 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -22,6 +22,7 @@ import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.thingsboard.common.util.ThingsBoardThreadFactory; +import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.common.data.ApiUsageRecordKey; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.EntityView; @@ -46,7 +47,6 @@ import org.thingsboard.server.gen.transport.TransportProtos; import org.thingsboard.server.queue.discovery.PartitionService; import org.thingsboard.server.queue.usagestats.TbApiUsageClient; import org.thingsboard.server.service.apiusage.TbApiUsageStateService; -import org.thingsboard.server.cluster.TbClusterService; import org.thingsboard.server.service.subscription.TbSubscriptionUtils; import javax.annotation.Nullable; @@ -59,9 +59,12 @@ import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.stream.Collectors; /** * Created by ashvayka on 27.03.18. @@ -252,7 +255,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback) { - ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); + ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); addVoidCallback(deleteFuture, callback); } @@ -273,10 +276,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } @Override - public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback) { - ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); + public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback callback) { + ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); addVoidCallback(deleteFuture, callback); - addWsCallback(deleteFuture, success -> onTimeSeriesDelete(tenantId, entityId, keys)); + addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list, rewriteLatestIfDeleted)); } @Override @@ -345,11 +348,21 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } - private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys) { + private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, List ts, boolean rewriteLatestIfDeleted) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); if (currentPartitions.contains(tpi)) { if (subscriptionManagerService.isPresent()) { - subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, keys, TbCallback.EMPTY); + Set updated; + if (rewriteLatestIfDeleted) { + List filteredTs = ts.stream().filter(Objects::nonNull).collect(Collectors.toList()); + subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, ts, TbCallback.EMPTY); + updated = filteredTs.stream().map(TsKvEntry::getKey).collect(Collectors.toSet()); + } else { + updated = Collections.emptySet(); + } + + List deleted = keys.stream().filter(key -> updated.isEmpty() || !updated.remove(key)).collect(Collectors.toList()); + subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, deleted, TbCallback.EMPTY); } else { log.warn("Possible misconfiguration because subscriptionManagerService is null!"); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index b1a2541fc7..4da8c14eeb 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -43,9 +43,9 @@ public interface TimeseriesService { ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntry); - ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); + ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); - ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys); + ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys); ListenableFuture> removeAllLatest(TenantId tenantId, EntityId entityId); diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index 3937b7c2c5..3534a4c253 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -147,7 +147,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } @Override - public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { return getRemoveLatestFuture(tenantId, entityId, query); } @@ -175,11 +175,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return tsKvLatestRepository.findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList())); } - private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture> future = findNewLatestEntryFuture(tenantId, entityId, query); return Futures.transformAsync(future, entryList -> { if (entryList.size() == 1) { - return getSaveLatestFuture(entityId, entryList.get(0)); + TsKvEntry entry = entryList.get(0); + return Futures.transform(getSaveLatestFuture(entityId, entry), v -> entry, MoreExecutors.directExecutor()); } else { log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); } @@ -212,7 +213,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return Futures.immediateFuture(result); } - protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture latestFuture = getFindLatestFuture(entityId, query.getKey()); ListenableFuture booleanFuture = Futures.transform(latestFuture, tsKvEntry -> { @@ -233,12 +234,12 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return Futures.immediateFuture(null); }, service); - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); + final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); Futures.addCallback(removedLatestFuture, new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { if (query.getRewriteLatestIfDeleted()) { - ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { + ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { if (isRemove) { return getNewLatestEntryFuture(tenantId, entityId, query); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 3170f6c256..95be63dae7 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -195,10 +195,10 @@ public class BaseTimeseriesService implements TimeseriesService { } @Override - public ListenableFuture> remove(TenantId tenantId, EntityId entityId, List deleteTsKvQueries) { + public ListenableFuture> remove(TenantId tenantId, EntityId entityId, List deleteTsKvQueries) { validate(entityId); deleteTsKvQueries.forEach(BaseTimeseriesService::validate); - List> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY); + List> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY); for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) { deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery); } @@ -206,9 +206,9 @@ public class BaseTimeseriesService implements TimeseriesService { } @Override - public ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys) { + public ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List> futures = Lists.newArrayListWithExpectedSize(keys.size()); + List> futures = Lists.newArrayListWithExpectedSize(keys.size()); for (String key : keys) { DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); @@ -229,10 +229,10 @@ public class BaseTimeseriesService implements TimeseriesService { }, MoreExecutors.directExecutor()); } - private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { - futures.add(timeseriesDao.remove(tenantId, entityId, query)); + private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { + futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); - futures.add(timeseriesDao.removePartition(tenantId, entityId, query)); + futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); } private static void validate(EntityId entityId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index 0b4ae71e85..6e9bea0ecb 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -114,7 +114,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes } @Override - public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture latestEntryFuture = findLatest(tenantId, entityId, query.getKey()); ListenableFuture booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> { @@ -134,12 +134,12 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return Futures.immediateFuture(null); }, readResultsProcessingExecutor); - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); + final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); Futures.addCallback(removedLatestFuture, new FutureCallback() { @Override public void onSuccess(@Nullable Void result) { if (query.getRewriteLatestIfDeleted()) { - ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { + ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { if (isRemove) { return getNewLatestEntryFuture(tenantId, entityId, query); } @@ -164,7 +164,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return resultFuture; } - private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { long startTs = 0; long endTs = query.getStartTs() - 1; ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, @@ -173,7 +173,8 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return Futures.transformAsync(future, entryList -> { if (entryList.size() == 1) { - return saveLatest(tenantId, entityId, entryList.get(0)); + TsKvEntry entry = entryList.get(0); + return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> entry, MoreExecutors.directExecutor()); } else { log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java index f19cc461dc..d0253db0bd 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java @@ -32,7 +32,7 @@ public interface TimeseriesLatestDao { ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); - ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); + ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index a19d0fad0c..fe0b39c299 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -55,5 +55,5 @@ public interface RuleEngineTelemetryService { void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback); - void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback); + void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback callback); } diff --git a/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts b/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts index 89a0c8f5b2..99907bcaa9 100644 --- a/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts +++ b/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts @@ -88,7 +88,15 @@ export class AttributeDatasource implements DataSource { fetchAttributes(entityId: EntityId, attributesScope: TelemetryType, pageLink: PageLink): Observable> { return this.getAllAttributes(entityId, attributesScope).pipe( - map((data) => pageLink.filterData(data)) + map((data) => { + let filteredData = []; + for(let key in filteredData) { + if(data[key]['value'] !== null) { + filteredData.push(data[key]); + } + } + return pageLink.filterData(filteredData); + }) ); } From 6d61b249e57d3ff3c2079cd9849ae295bbec9fd4 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 20 Dec 2021 15:23:28 +0200 Subject: [PATCH 3/5] improvements (not notify if lates ts has not been deleted) --- .../controller/TelemetryController.java | 2 +- .../DefaultTelemetrySubscriptionService.java | 35 +++++++------ .../dao/timeseries/TimeseriesService.java | 5 +- .../data/kv/TsKvLatestRemovingResult.java | 36 +++++++++++++ .../dao/sqlts/SqlTimeseriesLatestDao.java | 51 +++++-------------- .../dao/timeseries/BaseTimeseriesService.java | 11 ++-- .../CassandraBaseTimeseriesLatestDao.java | 45 +++++----------- .../dao/timeseries/TimeseriesLatestDao.java | 3 +- .../api/RuleEngineTelemetryService.java | 2 +- .../models/datasource/attribute-datasource.ts | 2 +- 10 files changed, 94 insertions(+), 98 deletions(-) create mode 100644 common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java diff --git a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java index dfe0701642..70a6b029ed 100644 --- a/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java +++ b/application/src/main/java/org/thingsboard/server/controller/TelemetryController.java @@ -573,7 +573,7 @@ public class TelemetryController extends BaseController { for (String key : keys) { deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted)); } - tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, rewriteLatestIfDeleted, new FutureCallback<>() { + tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() { @Override public void onSuccess(@Nullable Void tmp) { logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index 8a911962f0..8100b12464 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -37,6 +37,7 @@ import org.thingsboard.server.common.data.kv.DoubleDataEntry; import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.msg.queue.ServiceType; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; @@ -61,10 +62,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.stream.Collectors; /** * Created by ashvayka on 27.03.18. @@ -255,7 +254,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback) { - ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); + ListenableFuture> deleteFuture = tsService.removeLatest(tenantId, entityId, keys); addVoidCallback(deleteFuture, callback); } @@ -276,10 +275,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } @Override - public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback callback) { - ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); + public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback) { + ListenableFuture> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries); addVoidCallback(deleteFuture, callback); - addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list, rewriteLatestIfDeleted)); + addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list)); } @Override @@ -348,20 +347,24 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } - private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, List ts, boolean rewriteLatestIfDeleted) { + private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List keys, List ts) { TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId); if (currentPartitions.contains(tpi)) { if (subscriptionManagerService.isPresent()) { - Set updated; - if (rewriteLatestIfDeleted) { - List filteredTs = ts.stream().filter(Objects::nonNull).collect(Collectors.toList()); - subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, ts, TbCallback.EMPTY); - updated = filteredTs.stream().map(TsKvEntry::getKey).collect(Collectors.toSet()); - } else { - updated = Collections.emptySet(); - } + List updated = new ArrayList<>(); + List deleted = new ArrayList<>(); + + ts.stream().filter(Objects::nonNull).forEach(res -> { + if (res.isRemoved()) { + if (res.getData() != null) { + updated.add(res.getData()); + } else { + deleted.add(res.getKey()); + } + } + }); - List deleted = keys.stream().filter(key -> updated.isEmpty() || !updated.remove(key)).collect(Collectors.toList()); + subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, updated, TbCallback.EMPTY); subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, deleted, TbCallback.EMPTY); } else { log.warn("Possible misconfiguration because subscriptionManagerService is null!"); diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index 4da8c14eeb..c707e12a46 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.Collection; @@ -43,9 +44,9 @@ public interface TimeseriesService { ListenableFuture> saveLatest(TenantId tenantId, EntityId entityId, List tsKvEntry); - ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); + ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); - ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys); + ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys); ListenableFuture> removeAllLatest(TenantId tenantId, EntityId entityId); diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java new file mode 100644 index 0000000000..4c67dea2aa --- /dev/null +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java @@ -0,0 +1,36 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.common.data.kv; + +import lombok.Data; + +@Data +public class TsKvLatestRemovingResult { + private String key; + private TsKvEntry data; + private boolean removed; + + public TsKvLatestRemovingResult(TsKvEntry data) { + this.key = data.getKey(); + this.data = data; + this.removed = true; + } + + public TsKvLatestRemovingResult(String key, boolean removed) { + this.key = key; + this.removed = removed; + } +} diff --git a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java index 3534a4c253..1c5d28b715 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java @@ -16,7 +16,6 @@ package org.thingsboard.server.dao.sqlts; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.MoreExecutors; @@ -34,6 +33,7 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; import org.thingsboard.server.common.data.kv.StringDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.stats.StatsFactory; import org.thingsboard.server.dao.DaoUtil; import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity; @@ -45,11 +45,9 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository; import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository; import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository; -import org.thingsboard.server.dao.timeseries.SimpleListenableFuture; import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao; import org.thingsboard.server.dao.util.SqlTsLatestAnyDao; -import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; @@ -59,7 +57,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.stream.Collectors; @@ -147,7 +144,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme } @Override - public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { return getRemoveLatestFuture(tenantId, entityId, query); } @@ -175,16 +172,16 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return tsKvLatestRepository.findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList())); } - private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture> future = findNewLatestEntryFuture(tenantId, entityId, query); return Futures.transformAsync(future, entryList -> { if (entryList.size() == 1) { TsKvEntry entry = entryList.get(0); - return Futures.transform(getSaveLatestFuture(entityId, entry), v -> entry, MoreExecutors.directExecutor()); + return Futures.transform(getSaveLatestFuture(entityId, entry), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor()); } else { log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); } - return Futures.immediateFuture(null); + return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true)); }, service); } @@ -213,7 +210,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return Futures.immediateFuture(result); } - protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + protected ListenableFuture getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture latestFuture = getFindLatestFuture(entityId, query.getKey()); ListenableFuture booleanFuture = Futures.transform(latestFuture, tsKvEntry -> { @@ -221,47 +218,25 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme return ts > query.getStartTs() && ts <= query.getEndTs(); }, service); - ListenableFuture removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { + ListenableFuture removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { if (isRemove) { TsKvLatestEntity latestEntity = new TsKvLatestEntity(); latestEntity.setEntityId(entityId.getId()); latestEntity.setKey(getOrSaveKeyId(query.getKey())); return service.submit(() -> { tsKvLatestRepository.delete(latestEntity); - return null; + return true; }); } - return Futures.immediateFuture(null); + return Futures.immediateFuture(false); }, service); - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); - Futures.addCallback(removedLatestFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void result) { - if (query.getRewriteLatestIfDeleted()) { - ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { - if (isRemove) { - return getNewLatestEntryFuture(tenantId, entityId, query); - } - return Futures.immediateFuture(null); - }, service); - - try { - resultFuture.set(savedLatestFuture.get()); - } catch (InterruptedException | ExecutionException e) { - log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e); - } - } else { - resultFuture.set(null); - } - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to process remove of the latest value", entityId, t); + return Futures.transformAsync(removedLatestFuture, isRemoved -> { + if (isRemoved && query.getRewriteLatestIfDeleted()) { + return getNewLatestEntryFuture(tenantId, entityId, query); } + return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved)); }, MoreExecutors.directExecutor()); - return resultFuture; } protected ListenableFuture> getFindAllLatestFuture(EntityId entityId) { diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 95be63dae7..39bed5359d 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.entityview.EntityViewService; import org.thingsboard.server.dao.exception.IncorrectParameterException; @@ -195,10 +196,10 @@ public class BaseTimeseriesService implements TimeseriesService { } @Override - public ListenableFuture> remove(TenantId tenantId, EntityId entityId, List deleteTsKvQueries) { + public ListenableFuture> remove(TenantId tenantId, EntityId entityId, List deleteTsKvQueries) { validate(entityId); deleteTsKvQueries.forEach(BaseTimeseriesService::validate); - List> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY); + List> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY); for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) { deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery); } @@ -206,9 +207,9 @@ public class BaseTimeseriesService implements TimeseriesService { } @Override - public ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys) { + public ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys) { validate(entityId); - List> futures = Lists.newArrayListWithExpectedSize(keys.size()); + List> futures = Lists.newArrayListWithExpectedSize(keys.size()); for (String key : keys) { DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); @@ -229,7 +230,7 @@ public class BaseTimeseriesService implements TimeseriesService { }, MoreExecutors.directExecutor()); } - private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { + private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor())); diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java index 6e9bea0ecb..5b1a4d2eb0 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.Aggregation; import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; import org.thingsboard.server.common.data.kv.ReadTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.dao.model.ModelConstants; import org.thingsboard.server.dao.nosql.TbResultSet; @@ -114,7 +115,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes } @Override - public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + public ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { ListenableFuture latestEntryFuture = findLatest(tenantId, entityId, query.getKey()); ListenableFuture booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> { @@ -127,44 +128,22 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return false; }, readResultsProcessingExecutor); - ListenableFuture removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { + ListenableFuture removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { if (isRemove) { - return deleteLatest(tenantId, entityId, query.getKey()); + return Futures.transform(deleteLatest(tenantId, entityId, query.getKey()), res -> true, MoreExecutors.directExecutor()); } - return Futures.immediateFuture(null); + return Futures.immediateFuture(false); }, readResultsProcessingExecutor); - final SimpleListenableFuture resultFuture = new SimpleListenableFuture<>(); - Futures.addCallback(removedLatestFuture, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void result) { - if (query.getRewriteLatestIfDeleted()) { - ListenableFuture savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> { - if (isRemove) { - return getNewLatestEntryFuture(tenantId, entityId, query); - } - return Futures.immediateFuture(null); - }, readResultsProcessingExecutor); - - try { - resultFuture.set(savedLatestFuture.get()); - } catch (InterruptedException | ExecutionException e) { - log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e); - } - } else { - resultFuture.set(null); - } - } - - @Override - public void onFailure(Throwable t) { - log.warn("[{}] Failed to process remove of the latest value", entityId, t); + return Futures.transformAsync(removedLatestFuture, isRemoved -> { + if (isRemoved && query.getRewriteLatestIfDeleted()) { + return getNewLatestEntryFuture(tenantId, entityId, query); } + return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved)); }, MoreExecutors.directExecutor()); - return resultFuture; } - private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { + private ListenableFuture getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) { long startTs = 0; long endTs = query.getStartTs() - 1; ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1, @@ -174,11 +153,11 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes return Futures.transformAsync(future, entryList -> { if (entryList.size() == 1) { TsKvEntry entry = entryList.get(0); - return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> entry, MoreExecutors.directExecutor()); + return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor()); } else { log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey()); } - return Futures.immediateFuture(null); + return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true)); }, readResultsProcessingExecutor); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java index d0253db0bd..f83a41c010 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java @@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId; import org.thingsboard.server.common.data.id.EntityId; import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; +import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; import org.thingsboard.server.common.data.kv.TsKvEntry; import java.util.List; @@ -32,7 +33,7 @@ public interface TimeseriesLatestDao { ListenableFuture saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry); - ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); + ListenableFuture removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query); List findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index fe0b39c299..a19d0fad0c 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -55,5 +55,5 @@ public interface RuleEngineTelemetryService { void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback); - void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, boolean rewriteLatestIfDeleted, FutureCallback callback); + void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List keys, List deleteTsKvQueries, FutureCallback callback); } diff --git a/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts b/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts index 99907bcaa9..6e6c937a8e 100644 --- a/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts +++ b/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts @@ -90,7 +90,7 @@ export class AttributeDatasource implements DataSource { return this.getAllAttributes(entityId, attributesScope).pipe( map((data) => { let filteredData = []; - for(let key in filteredData) { + for(let key in data) { if(data[key]['value'] !== null) { filteredData.push(data[key]); } From 07824ed50c89205aa0df5afdae4eb0164f7dcccf Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Mon, 20 Dec 2021 15:38:08 +0200 Subject: [PATCH 4/5] refactored --- .../thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java index 3ffedaa69a..161883313d 100644 --- a/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java +++ b/common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java @@ -31,7 +31,4 @@ public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuer this(key, startTs, endTs, false); } - public Boolean getRewriteLatestIfDeleted() { - return rewriteLatestIfDeleted; - } } From 00167d5872a855cc31ceb02058cc18404e86021a Mon Sep 17 00:00:00 2001 From: Vladyslav_Prykhodko Date: Thu, 23 Dec 2021 11:44:17 +0200 Subject: [PATCH 5/5] UI: Refactoring attribute-datasource --- .../modules/home/models/datasource/attribute-datasource.ts | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts b/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts index 6e6c937a8e..9b5cbd1598 100644 --- a/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts +++ b/ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts @@ -89,12 +89,7 @@ export class AttributeDatasource implements DataSource { pageLink: PageLink): Observable> { return this.getAllAttributes(entityId, attributesScope).pipe( map((data) => { - let filteredData = []; - for(let key in data) { - if(data[key]['value'] !== null) { - filteredData.push(data[key]); - } - } + const filteredData = data.filter(attrData => attrData.lastUpdateTs !== 0 && attrData.value !== null); return pageLink.filterData(filteredData); }) );