Browse Source

Merge pull request #5750 from YevhenBondarenko/feature/ts-notification

[3.3.3] ts latest notifications
pull/5787/head
Andrew Shvayka 5 years ago
committed by GitHub
parent
commit
2d0890b43e
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      application/src/main/java/org/thingsboard/server/controller/TelemetryController.java
  2. 11
      application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java
  3. 34
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java
  4. 2
      application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java
  5. 22
      application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java
  6. 2
      application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java
  7. 40
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  8. 10
      common/cluster-api/src/main/proto/queue.proto
  9. 5
      common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
  10. 1
      common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java
  11. 36
      common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java
  12. 52
      dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java
  13. 15
      dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
  14. 46
      dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java
  15. 3
      dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java
  16. 3
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java
  17. 5
      ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts

7
application/src/main/java/org/thingsboard/server/controller/TelemetryController.java

@ -573,10 +573,9 @@ public class TelemetryController extends BaseController {
for (String key : keys) {
deleteTsKvQueries.add(new BaseDeleteTsKvQuery(key, deleteFromTs, deleteToTs, rewriteLatestIfDeleted));
}
ListenableFuture<List<Void>> future = tsService.remove(user.getTenantId(), entityId, deleteTsKvQueries);
Futures.addCallback(future, new FutureCallback<>() {
tsSubService.deleteTimeseriesAndNotify(tenantId, entityId, keys, deleteTsKvQueries, new FutureCallback<>() {
@Override
public void onSuccess(@Nullable List<Void> tmp) {
public void onSuccess(@Nullable Void tmp) {
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, null);
result.setResult(new ResponseEntity<>(HttpStatus.OK));
}
@ -586,7 +585,7 @@ public class TelemetryController extends BaseController {
logTimeseriesDeleted(user, entityId, keys, deleteFromTs, deleteToTs, t);
result.setResult(new ResponseEntity<>(HttpStatus.INTERNAL_SERVER_ERROR));
}
}, executor);
});
});
}

11
application/src/main/java/org/thingsboard/server/service/queue/DefaultTbCoreConsumerService.java

@ -26,14 +26,15 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.actors.ActorSystemContext;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.rpc.RpcError;
import org.thingsboard.server.common.msg.MsgType;
import org.thingsboard.server.common.msg.TbActorMsg;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.common.transport.util.DataDecodingEncodingService;
import org.thingsboard.server.dao.tenant.TbTenantProfileCache;
@ -47,6 +48,7 @@ import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
@ -64,7 +66,6 @@ import org.thingsboard.server.service.ota.OtaPackageStateService;
import org.thingsboard.server.service.profile.TbDeviceProfileCache;
import org.thingsboard.server.service.queue.processing.AbstractConsumerService;
import org.thingsboard.server.service.queue.processing.IdMsgPair;
import org.thingsboard.server.common.msg.rpc.FromDeviceRpcResponse;
import org.thingsboard.server.service.rpc.TbCoreDeviceRpcService;
import org.thingsboard.server.service.rpc.ToDeviceRpcRequestActorMsg;
import org.thingsboard.server.service.state.DeviceStateService;
@ -467,6 +468,12 @@ public class DefaultTbCoreConsumerService extends AbstractConsumerService<ToCore
new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
proto.getScope(), proto.getKeysList(), callback);
} else if (msg.hasTsDelete()) {
TbTimeSeriesDeleteProto proto = msg.getTsDelete();
subscriptionManagerService.onTimeSeriesDelete(
new TenantId(new UUID(proto.getTenantIdMSB(), proto.getTenantIdLSB())),
TbSubscriptionUtils.toEntityId(proto.getEntityType(), proto.getEntityIdMSB(), proto.getEntityIdLSB()),
proto.getKeysList(), callback);
} else if (msg.hasAlarmUpdate()) {
TbAlarmUpdateProto proto = msg.getAlarmUpdate();
subscriptionManagerService.onAlarmUpdate(

34
application/src/main/java/org/thingsboard/server/service/subscription/DefaultSubscriptionManagerService.java

@ -19,8 +19,10 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.thingsboard.common.util.DonAsynchron;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.common.util.ThingsBoardThreadFactory;
import org.thingsboard.rule.engine.api.msg.DeviceAttributesEventNotificationMsg;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.common.data.DataConstants;
import org.thingsboard.server.common.data.EntityType;
import org.thingsboard.server.common.data.alarm.Alarm;
@ -40,20 +42,20 @@ import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.dao.attributes.AttributesService;
import org.thingsboard.server.dao.timeseries.TimeseriesService;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.gen.transport.TransportProtos.*;
import org.thingsboard.server.gen.transport.TransportProtos.LocalSubscriptionServiceMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmSubscriptionUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateValueListProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreNotificationMsg;
import org.thingsboard.server.queue.TbQueueProducer;
import org.thingsboard.server.queue.common.TbProtoQueueMsg;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.discovery.PartitionService;
import org.thingsboard.server.queue.discovery.TbApplicationEventListener;
import org.thingsboard.server.queue.discovery.TbServiceInfoProvider;
import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent;
import org.thingsboard.server.queue.provider.TbQueueProducerProvider;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.cluster.TbClusterService;
import org.thingsboard.server.service.state.DefaultDeviceStateService;
import org.thingsboard.server.service.state.DeviceStateService;
import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
@ -337,6 +339,30 @@ public class DefaultSubscriptionManagerService extends TbApplicationEventListene
callback.onSuccess();
}
@Override
public void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback) {
onLocalTelemetrySubUpdate(entityId,
s -> {
if (TbSubscriptionType.TIMESERIES.equals(s.getType())) {
return (TbTimeseriesSubscription) s;
} else {
return null;
}
}, s -> true, s -> {
List<TsKvEntry> subscriptionUpdate = null;
for (String key : keys) {
if (s.isAllKeys() || s.getKeyStates().containsKey(key)) {
if (subscriptionUpdate == null) {
subscriptionUpdate = new ArrayList<>();
}
subscriptionUpdate.add(new BasicTsKvEntry(0, new StringDataEntry(key, null)));
}
}
return subscriptionUpdate;
}, false);
callback.onSuccess();
}
private <T extends TbSubscription> void onLocalTelemetrySubUpdate(EntityId entityId,
Function<TbSubscription, T> castFunction,
Predicate<T> filterFunction,

2
application/src/main/java/org/thingsboard/server/service/subscription/SubscriptionManagerService.java

@ -40,6 +40,8 @@ public interface SubscriptionManagerService extends ApplicationListener<Partitio
void onAttributesDelete(TenantId tenantId, EntityId entityId, String scope, List<String> keys, TbCallback empty);
void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, TbCallback callback);
void onAlarmUpdate(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);
void onAlarmDeleted(TenantId tenantId, EntityId entityId, Alarm alarm, TbCallback callback);

22
application/src/main/java/org/thingsboard/server/service/subscription/TbSubscriptionUtils.java

@ -15,6 +15,7 @@
*/
package org.thingsboard.server.service.subscription;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.common.data.alarm.Alarm;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.EntityIdFactory;
@ -30,25 +31,25 @@ import org.thingsboard.server.common.data.kv.KvEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.common.util.JacksonUtil;
import org.thingsboard.server.gen.transport.TransportProtos;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
import org.thingsboard.server.gen.transport.TransportProtos.SubscriptionMgrMsgProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeSubscriptionProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAttributeDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionCloseProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionKetStateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbSubscriptionUpdateTsValue;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesDeleteProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesSubscriptionProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbTimeSeriesUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.ToCoreMsg;
import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmUpdateProto;
import org.thingsboard.server.gen.transport.TransportProtos.TbAlarmDeleteProto;
import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode;
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
@ -207,6 +208,19 @@ public class TbSubscriptionUtils {
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
}
public static ToCoreMsg toTimeseriesDeleteProto(TenantId tenantId, EntityId entityId, List<String> keys) {
TbTimeSeriesDeleteProto.Builder builder = TbTimeSeriesDeleteProto.newBuilder();
builder.setEntityType(entityId.getEntityType().name());
builder.setEntityIdMSB(entityId.getId().getMostSignificantBits());
builder.setEntityIdLSB(entityId.getId().getLeastSignificantBits());
builder.setTenantIdMSB(tenantId.getId().getMostSignificantBits());
builder.setTenantIdLSB(tenantId.getId().getLeastSignificantBits());
builder.addAllKeys(keys);
SubscriptionMgrMsgProto.Builder msgBuilder = SubscriptionMgrMsgProto.newBuilder();
msgBuilder.setTsDelete(builder);
return ToCoreMsg.newBuilder().setToSubscriptionMgrMsg(msgBuilder.build()).build();
}
public static ToCoreMsg toAttributesUpdateProto(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes) {
TbAttributeUpdateProto.Builder builder = TbAttributeUpdateProto.newBuilder();
builder.setEntityType(entityId.getEntityType().name());

2
application/src/main/java/org/thingsboard/server/service/telemetry/AbstractSubscriptionService.java

@ -90,7 +90,7 @@ public abstract class AbstractSubscriptionService extends TbApplicationEventList
Futures.addCallback(saveFuture, new FutureCallback<T>() {
@Override
public void onSuccess(@Nullable T result) {
callback.accept(null);
callback.accept(result);
}
@Override

40
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -33,10 +33,12 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
import org.thingsboard.server.common.data.kv.BooleanDataEntry;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.DoubleDataEntry;
import org.thingsboard.server.common.data.kv.LongDataEntry;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.msg.queue.ServiceType;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
@ -59,6 +61,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -279,7 +282,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void deleteLatestInternal(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback) {
ListenableFuture<List<Void>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture = tsService.removeLatest(tenantId, entityId, keys);
addVoidCallback(deleteFuture, callback);
}
@ -299,6 +302,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}, tsCallBackExecutor);
}
@Override
public void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback) {
ListenableFuture<List<TsKvLatestRemovingResult>> deleteFuture = tsService.remove(tenantId, entityId, deleteTsKvQueries);
addVoidCallback(deleteFuture, callback);
addWsCallback(deleteFuture, list -> onTimeSeriesDelete(tenantId, entityId, keys, list));
}
@Override
public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
@ -365,6 +375,34 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
private void onTimeSeriesDelete(TenantId tenantId, EntityId entityId, List<String> keys, List<TsKvLatestRemovingResult> ts) {
TopicPartitionInfo tpi = partitionService.resolve(ServiceType.TB_CORE, tenantId, entityId);
if (currentPartitions.contains(tpi)) {
if (subscriptionManagerService.isPresent()) {
List<TsKvEntry> updated = new ArrayList<>();
List<String> deleted = new ArrayList<>();
ts.stream().filter(Objects::nonNull).forEach(res -> {
if (res.isRemoved()) {
if (res.getData() != null) {
updated.add(res.getData());
} else {
deleted.add(res.getKey());
}
}
});
subscriptionManagerService.get().onTimeSeriesUpdate(tenantId, entityId, updated, TbCallback.EMPTY);
subscriptionManagerService.get().onTimeSeriesDelete(tenantId, entityId, deleted, TbCallback.EMPTY);
} else {
log.warn("Possible misconfiguration because subscriptionManagerService is null!");
}
} else {
TransportProtos.ToCoreMsg toCoreMsg = TbSubscriptionUtils.toTimeseriesDeleteProto(tenantId, entityId, keys);
clusterService.pushMsgToCore(tpi, entityId.getId(), toCoreMsg, null);
}
}
private <S> void addVoidCallback(ListenableFuture<S> saveFuture, final FutureCallback<Void> callback) {
Futures.addCallback(saveFuture, new FutureCallback<S>() {
@Override

10
common/cluster-api/src/main/proto/queue.proto

@ -561,6 +561,15 @@ message TbAttributeDeleteProto {
repeated string keys = 7;
}
message TbTimeSeriesDeleteProto {
string entityType = 1;
int64 entityIdMSB = 2;
int64 entityIdLSB = 3;
int64 tenantIdMSB = 4;
int64 tenantIdLSB = 5;
repeated string keys = 6;
}
message TbTimeSeriesUpdateProto {
string entityType = 1;
int64 entityIdMSB = 2;
@ -614,6 +623,7 @@ message SubscriptionMgrMsgProto {
TbAlarmSubscriptionProto alarmSub = 7;
TbAlarmUpdateProto alarmUpdate = 8;
TbAlarmDeleteProto alarmDelete = 9;
TbTimeSeriesDeleteProto tsDelete = 10;
}
message LocalSubscriptionServiceMsgProto {

5
common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java

@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.Collection;
@ -45,9 +46,9 @@ public interface TimeseriesService {
ListenableFuture<List<Void>> saveLatest(TenantId tenantId, EntityId entityId, List<TsKvEntry> tsKvEntry);
ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
ListenableFuture<List<Void>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
ListenableFuture<List<TsKvLatestRemovingResult>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId);

1
common/data/src/main/java/org/thingsboard/server/common/data/kv/BaseDeleteTsKvQuery.java

@ -31,5 +31,4 @@ public class BaseDeleteTsKvQuery extends BaseTsKvQuery implements DeleteTsKvQuer
this(key, startTs, endTs, false);
}
}

36
common/data/src/main/java/org/thingsboard/server/common/data/kv/TsKvLatestRemovingResult.java

@ -0,0 +1,36 @@
/**
* Copyright © 2016-2021 The Thingsboard Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.common.data.kv;
import lombok.Data;
@Data
public class TsKvLatestRemovingResult {
private String key;
private TsKvEntry data;
private boolean removed;
public TsKvLatestRemovingResult(TsKvEntry data) {
this.key = data.getKey();
this.data = data;
this.removed = true;
}
public TsKvLatestRemovingResult(String key, boolean removed) {
this.key = key;
this.removed = removed;
}
}

52
dao/src/main/java/org/thingsboard/server/dao/sqlts/SqlTimeseriesLatestDao.java

@ -16,7 +16,6 @@
package org.thingsboard.server.dao.sqlts;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@ -34,6 +33,7 @@ import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.StringDataEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.stats.StatsFactory;
import org.thingsboard.server.dao.DaoUtil;
import org.thingsboard.server.dao.model.sql.AbstractTsKvEntity;
@ -45,11 +45,9 @@ import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
import org.thingsboard.server.dao.sqlts.insert.latest.InsertLatestTsRepository;
import org.thingsboard.server.dao.sqlts.latest.SearchTsKvLatestRepository;
import org.thingsboard.server.dao.sqlts.latest.TsKvLatestRepository;
import org.thingsboard.server.dao.timeseries.SimpleListenableFuture;
import org.thingsboard.server.dao.timeseries.TimeseriesLatestDao;
import org.thingsboard.server.dao.util.SqlTsLatestAnyDao;
import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
@ -59,7 +57,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -147,7 +144,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
}
@Override
public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
return getRemoveLatestFuture(tenantId, entityId, query);
}
@ -175,15 +172,16 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
return tsKvLatestRepository.findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList()));
}
private ListenableFuture<Void> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
private ListenableFuture<TsKvLatestRemovingResult> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<List<TsKvEntry>> future = findNewLatestEntryFuture(tenantId, entityId, query);
return Futures.transformAsync(future, entryList -> {
if (entryList.size() == 1) {
return getSaveLatestFuture(entityId, entryList.get(0));
TsKvEntry entry = entryList.get(0);
return Futures.transform(getSaveLatestFuture(entityId, entry), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor());
} else {
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
}
return Futures.immediateFuture(null);
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true));
}, service);
}
@ -212,7 +210,7 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
return Futures.immediateFuture(result);
}
protected ListenableFuture<Void> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
protected ListenableFuture<TsKvLatestRemovingResult> getRemoveLatestFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<TsKvEntry> latestFuture = getFindLatestFuture(entityId, query.getKey());
ListenableFuture<Boolean> booleanFuture = Futures.transform(latestFuture, tsKvEntry -> {
@ -220,47 +218,25 @@ public class SqlTimeseriesLatestDao extends BaseAbstractSqlTimeseriesDao impleme
return ts > query.getStartTs() && ts <= query.getEndTs();
}, service);
ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
ListenableFuture<Boolean> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
if (isRemove) {
TsKvLatestEntity latestEntity = new TsKvLatestEntity();
latestEntity.setEntityId(entityId.getId());
latestEntity.setKey(getOrSaveKeyId(query.getKey()));
return service.submit(() -> {
tsKvLatestRepository.delete(latestEntity);
return null;
return true;
});
}
return Futures.immediateFuture(null);
return Futures.immediateFuture(false);
}, service);
final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
if (query.getRewriteLatestIfDeleted()) {
ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
if (isRemove) {
return getNewLatestEntryFuture(tenantId, entityId, query);
}
return Futures.immediateFuture(null);
}, service);
try {
resultFuture.set(savedLatestFuture.get());
} catch (InterruptedException | ExecutionException e) {
log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
}
} else {
resultFuture.set(null);
}
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to process remove of the latest value", entityId, t);
return Futures.transformAsync(removedLatestFuture, isRemoved -> {
if (isRemoved && query.getRewriteLatestIfDeleted()) {
return getNewLatestEntryFuture(tenantId, entityId, query);
}
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved));
}, MoreExecutors.directExecutor());
return resultFuture;
}
protected ListenableFuture<List<TsKvEntry>> getFindAllLatestFuture(EntityId entityId) {

15
dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java

@ -36,6 +36,7 @@ import org.thingsboard.server.common.data.kv.BaseDeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.entityview.EntityViewService;
import org.thingsboard.server.dao.exception.IncorrectParameterException;
@ -218,10 +219,10 @@ public class BaseTimeseriesService implements TimeseriesService {
}
@Override
public ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
public ListenableFuture<List<TsKvLatestRemovingResult>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> deleteTsKvQueries) {
validate(entityId);
deleteTsKvQueries.forEach(BaseTimeseriesService::validate);
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
List<ListenableFuture<TsKvLatestRemovingResult>> futures = Lists.newArrayListWithExpectedSize(deleteTsKvQueries.size() * DELETES_PER_ENTRY);
for (DeleteTsKvQuery tsKvQuery : deleteTsKvQueries) {
deleteAndRegisterFutures(tenantId, futures, entityId, tsKvQuery);
}
@ -229,9 +230,9 @@ public class BaseTimeseriesService implements TimeseriesService {
}
@Override
public ListenableFuture<List<Void>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
public ListenableFuture<List<TsKvLatestRemovingResult>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys) {
validate(entityId);
List<ListenableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(keys.size());
List<ListenableFuture<TsKvLatestRemovingResult>> futures = Lists.newArrayListWithExpectedSize(keys.size());
for (String key : keys) {
DeleteTsKvQuery query = new BaseDeleteTsKvQuery(key, 0, System.currentTimeMillis(), false);
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
@ -252,10 +253,10 @@ public class BaseTimeseriesService implements TimeseriesService {
}, MoreExecutors.directExecutor());
}
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) {
futures.add(timeseriesDao.remove(tenantId, entityId, query));
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<TsKvLatestRemovingResult>> futures, EntityId entityId, DeleteTsKvQuery query) {
futures.add(Futures.transform(timeseriesDao.remove(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));
futures.add(timeseriesDao.removePartition(tenantId, entityId, query));
futures.add(Futures.transform(timeseriesDao.removePartition(tenantId, entityId, query), v -> null, MoreExecutors.directExecutor()));
}
private static void validate(EntityId entityId) {

46
dao/src/main/java/org/thingsboard/server/dao/timeseries/CassandraBaseTimeseriesLatestDao.java

@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.kv.Aggregation;
import org.thingsboard.server.common.data.kv.BaseReadTsKvQuery;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.ReadTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import org.thingsboard.server.dao.model.ModelConstants;
import org.thingsboard.server.dao.nosql.TbResultSet;
@ -114,7 +115,7 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
}
@Override
public ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
public ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
ListenableFuture<TsKvEntry> latestEntryFuture = findLatest(tenantId, entityId, query.getKey());
ListenableFuture<Boolean> booleanFuture = Futures.transform(latestEntryFuture, latestEntry -> {
@ -127,44 +128,22 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
return false;
}, readResultsProcessingExecutor);
ListenableFuture<Void> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
ListenableFuture<Boolean> removedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
if (isRemove) {
return deleteLatest(tenantId, entityId, query.getKey());
return Futures.transform(deleteLatest(tenantId, entityId, query.getKey()), res -> true, MoreExecutors.directExecutor());
}
return Futures.immediateFuture(null);
return Futures.immediateFuture(false);
}, readResultsProcessingExecutor);
final SimpleListenableFuture<Void> resultFuture = new SimpleListenableFuture<>();
Futures.addCallback(removedLatestFuture, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void result) {
if (query.getRewriteLatestIfDeleted()) {
ListenableFuture<Void> savedLatestFuture = Futures.transformAsync(booleanFuture, isRemove -> {
if (isRemove) {
return getNewLatestEntryFuture(tenantId, entityId, query);
}
return Futures.immediateFuture(null);
}, readResultsProcessingExecutor);
try {
resultFuture.set(savedLatestFuture.get());
} catch (InterruptedException | ExecutionException e) {
log.warn("Could not get latest saved value for [{}], {}", entityId, query.getKey(), e);
}
} else {
resultFuture.set(null);
}
}
@Override
public void onFailure(Throwable t) {
log.warn("[{}] Failed to process remove of the latest value", entityId, t);
return Futures.transformAsync(removedLatestFuture, isRemoved -> {
if (isRemoved && query.getRewriteLatestIfDeleted()) {
return getNewLatestEntryFuture(tenantId, entityId, query);
}
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), isRemoved));
}, MoreExecutors.directExecutor());
return resultFuture;
}
private ListenableFuture<Void> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
private ListenableFuture<TsKvLatestRemovingResult> getNewLatestEntryFuture(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query) {
long startTs = 0;
long endTs = query.getStartTs() - 1;
ReadTsKvQuery findNewLatestQuery = new BaseReadTsKvQuery(query.getKey(), startTs, endTs, endTs - startTs, 1,
@ -173,11 +152,12 @@ public class CassandraBaseTimeseriesLatestDao extends AbstractCassandraBaseTimes
return Futures.transformAsync(future, entryList -> {
if (entryList.size() == 1) {
return saveLatest(tenantId, entityId, entryList.get(0));
TsKvEntry entry = entryList.get(0);
return Futures.transform(saveLatest(tenantId, entityId, entryList.get(0)), v -> new TsKvLatestRemovingResult(entry), MoreExecutors.directExecutor());
} else {
log.trace("Could not find new latest value for [{}], key - {}", entityId, query.getKey());
}
return Futures.immediateFuture(null);
return Futures.immediateFuture(new TsKvLatestRemovingResult(query.getKey(), true));
}, readResultsProcessingExecutor);
}

3
dao/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesLatestDao.java

@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.List;
@ -32,7 +33,7 @@ public interface TimeseriesLatestDao {
ListenableFuture<Void> saveLatest(TenantId tenantId, EntityId entityId, TsKvEntry tsKvEntry);
ListenableFuture<Void> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
ListenableFuture<TsKvLatestRemovingResult> removeLatest(TenantId tenantId, EntityId entityId, DeleteTsKvQuery query);
List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);

3
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java

@ -20,6 +20,7 @@ import org.thingsboard.server.common.data.id.CustomerId;
import org.thingsboard.server.common.data.id.EntityId;
import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.Collection;
@ -56,5 +57,5 @@ public interface RuleEngineTelemetryService {
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
void deleteTimeseriesAndNotify(TenantId tenantId, EntityId entityId, List<String> keys, List<DeleteTsKvQuery> deleteTsKvQueries, FutureCallback<Void> callback);
}

5
ui-ngx/src/app/modules/home/models/datasource/attribute-datasource.ts

@ -88,7 +88,10 @@ export class AttributeDatasource implements DataSource<AttributeData> {
fetchAttributes(entityId: EntityId, attributesScope: TelemetryType,
pageLink: PageLink): Observable<PageData<AttributeData>> {
return this.getAllAttributes(entityId, attributesScope).pipe(
map((data) => pageLink.filterData(data))
map((data) => {
const filteredData = data.filter(attrData => attrData.lastUpdateTs !== 0 && attrData.value !== null);
return pageLink.filterData(filteredData);
})
);
}

Loading…
Cancel
Save