Browse Source

Merge pull request #3237 from YevhenBondarenko/master

EntityView improvements
pull/3243/head
Igor Kulikov 6 years ago
committed by GitHub
parent
commit
c98fbc7cf8
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 56
      application/src/main/java/org/thingsboard/server/controller/EntityViewController.java
  2. 24
      application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
  3. 30
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java
  4. 69
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  5. 2
      common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
  6. 52
      dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
  7. 3
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java

56
application/src/main/java/org/thingsboard/server/controller/EntityViewController.java

@ -58,6 +58,7 @@ import org.thingsboard.server.service.security.permission.Resource;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -123,10 +124,10 @@ public class EntityViewController extends BaseController {
futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SERVER_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SERVER_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser()));
futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SHARED_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SHARED_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser()));
} }
if (existingEntityView.getKeys().getTimeseries() != null && !existingEntityView.getKeys().getTimeseries().isEmpty()) {
futures.add(deleteLatestFromEntityView(existingEntityView, existingEntityView.getKeys().getTimeseries(), getCurrentUser()));
}
} }
List<String> tsKeys = existingEntityView.getKeys() != null && existingEntityView.getKeys().getTimeseries() != null ?
existingEntityView.getKeys().getTimeseries() : Collections.emptyList();
futures.add(deleteLatestFromEntityView(existingEntityView, tsKeys, getCurrentUser()));
} }
EntityView savedEntityView = checkNotNull(entityViewService.saveEntityView(entityView)); EntityView savedEntityView = checkNotNull(entityViewService.saveEntityView(entityView));
@ -136,9 +137,7 @@ public class EntityViewController extends BaseController {
futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs(), getCurrentUser())); futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs(), getCurrentUser()));
futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh(), getCurrentUser())); futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh(), getCurrentUser()));
} }
if (savedEntityView.getKeys().getTimeseries() != null && !savedEntityView.getKeys().getTimeseries().isEmpty()) { futures.add(copyLatestFromEntityToEntityView(savedEntityView, getCurrentUser()));
futures.add(copyLatestFromEntityToEntityView(savedEntityView, getCurrentUser()));
}
} }
for (ListenableFuture<?> future : futures) { for (ListenableFuture<?> future : futures) {
try { try {
@ -184,7 +183,27 @@ public class EntityViewController extends BaseController {
} }
}); });
} else { } else {
resultFuture.set(null); tsSubService.deleteAllLatest(entityView.getTenantId(), entityId, new FutureCallback<Collection<String>>() {
@Override
public void onSuccess(@Nullable Collection<String> keys) {
try {
logTimeseriesDeleted(user, entityId, new ArrayList<>(keys), null);
} catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e);
}
resultFuture.set(null);
}
@Override
public void onFailure(Throwable t) {
try {
logTimeseriesDeleted(user, entityId, Collections.emptyList(), t);
} catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e);
}
resultFuture.setException(t);
}
});
} }
return resultFuture; return resultFuture;
} }
@ -222,18 +241,21 @@ public class EntityViewController extends BaseController {
private ListenableFuture<List<Void>> copyLatestFromEntityToEntityView(EntityView entityView, SecurityUser user) { private ListenableFuture<List<Void>> copyLatestFromEntityToEntityView(EntityView entityView, SecurityUser user) {
EntityViewId entityId = entityView.getId(); EntityViewId entityId = entityView.getId();
List<String> keys = entityView.getKeys().getTimeseries(); List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
long startTime = entityView.getStartTimeMs(); entityView.getKeys().getTimeseries() : Collections.emptyList();
long endTime = entityView.getEndTimeMs(); long startTs = entityView.getStartTimeMs();
ListenableFuture<List<TsKvEntry>> latestFuture; long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
if (startTime == 0 && endTime == 0) { ListenableFuture<List<String>> keysFuture;
latestFuture = tsService.findLatest(user.getTenantId(), entityView.getEntityId(), keys); if (keys.isEmpty()) {
keysFuture = Futures.transform(tsService.findAllLatest(user.getTenantId(),
entityView.getEntityId()), latest -> latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()), MoreExecutors.directExecutor());
} else { } else {
long startTs = startTime; keysFuture = Futures.immediateFuture(keys);
long endTs = endTime == 0 ? System.currentTimeMillis() : endTime;
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
latestFuture = tsService.findAll(user.getTenantId(), entityView.getEntityId(), queries);
} }
ListenableFuture<List<TsKvEntry>> latestFuture = Futures.transformAsync(keysFuture, fetchKeys -> {
List<ReadTsKvQuery> queries = fetchKeys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
return tsService.findAll(user.getTenantId(), entityView.getEntityId(), queries);
}, MoreExecutors.directExecutor());
return Futures.transform(latestFuture, latestValues -> { return Futures.transform(latestFuture, latestValues -> {
if (latestValues != null && !latestValues.isEmpty()) { if (latestValues != null && !latestValues.isEmpty()) {
tsSubService.saveLatestAndNotify(entityView.getTenantId(), entityId, latestValues, new FutureCallback<Void>() { tsSubService.saveLatestAndNotify(entityView.getTenantId(), entityId, latestValues, new FutureCallback<Void>() {

24
application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java

@ -43,6 +43,7 @@ import org.thingsboard.server.service.install.InstallScripts;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -145,18 +146,21 @@ public class DefaultDataUpdateService implements DataUpdateService {
private ListenableFuture<List<Void>> updateEntityViewLatestTelemetry(EntityView entityView) { private ListenableFuture<List<Void>> updateEntityViewLatestTelemetry(EntityView entityView) {
EntityViewId entityId = entityView.getId(); EntityViewId entityId = entityView.getId();
List<String> keys = entityView.getKeys().getTimeseries(); List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
long startTime = entityView.getStartTimeMs(); entityView.getKeys().getTimeseries() : Collections.emptyList();
long endTime = entityView.getEndTimeMs(); long startTs = entityView.getStartTimeMs();
ListenableFuture<List<TsKvEntry>> latestFuture; long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
if (startTime == 0 && endTime == 0) { ListenableFuture<List<String>> keysFuture;
latestFuture = tsService.findLatest(TenantId.SYS_TENANT_ID, entityView.getEntityId(), keys); if (keys.isEmpty()) {
keysFuture = Futures.transform(tsService.findAllLatest(TenantId.SYS_TENANT_ID,
entityView.getEntityId()), latest -> latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()), MoreExecutors.directExecutor());
} else { } else {
long startTs = startTime; keysFuture = Futures.immediateFuture(keys);
long endTs = endTime == 0 ? System.currentTimeMillis() : endTime;
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
latestFuture = tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries);
} }
ListenableFuture<List<TsKvEntry>> latestFuture = Futures.transformAsync(keysFuture, fetchKeys -> {
List<ReadTsKvQuery> queries = fetchKeys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
return tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(latestFuture, latestValues -> { return Futures.transformAsync(latestFuture, latestValues -> {
if (latestValues != null && !latestValues.isEmpty()) { if (latestValues != null && !latestValues.isEmpty()) {
ListenableFuture<List<Void>> saveFuture = tsService.saveLatest(TenantId.SYS_TENANT_ID, entityId, latestValues); ListenableFuture<List<Void>> saveFuture = tsService.saveLatest(TenantId.SYS_TENANT_ID, entityId, latestValues);

30
application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java

@ -36,6 +36,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.queue.TbClusterService; import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
@ -114,11 +115,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
//TODO 3.1: replace null callbacks with callbacks from websocket service. //TODO 3.1: replace null callbacks with callbacks from websocket service.
@Override @Override
public void addSubscription(TbSubscription subscription) { public void addSubscription(TbSubscription subscription) {
EntityId entityId = subscription.getEntityId();
// Telemetry subscription on Entity Views are handled differently, because we need to allow only certain keys and time ranges;
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TbSubscriptionType.TIMESERIES.equals(subscription.getType())) {
subscription = resolveEntityViewSubscription((TbTimeseriesSubscription) subscription);
}
pushSubscriptionToManagerService(subscription, true); pushSubscriptionToManagerService(subscription, true);
registerSubscription(subscription); registerSubscription(subscription);
} }
@ -203,30 +199,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
} }
} }
private TbSubscription resolveEntityViewSubscription(TbTimeseriesSubscription subscription) {
EntityView entityView = entityViewService.findEntityViewById(TenantId.SYS_TENANT_ID, new EntityViewId(subscription.getEntityId().getId()));
Map<String, Long> keyStates;
if (subscription.isAllKeys()) {
keyStates = entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(k -> k, k -> 0L));
} else {
keyStates = subscription.getKeyStates().entrySet()
.stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return TbTimeseriesSubscription.builder()
.serviceId(subscription.getServiceId())
.sessionId(subscription.getSessionId())
.subscriptionId(subscription.getSubscriptionId())
.tenantId(subscription.getTenantId())
.entityId(entityView.getEntityId())
.startTime(entityView.getStartTimeMs())
.endTime(entityView.getEndTimeMs())
.allKeys(false)
.keyStates(keyStates).build();
}
private void registerSubscription(TbSubscription subscription) { private void registerSubscription(TbSubscription subscription) {
Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>());
sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);

69
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -52,9 +52,12 @@ import javax.annotation.Nullable;
import javax.annotation.PostConstruct; import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -122,33 +125,37 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override @Override
public void onSuccess(@Nullable List<EntityView> result) { public void onSuccess(@Nullable List<EntityView> result) {
if (result != null) { if (result != null) {
Map<String, List<TsKvEntry>> tsMap = new HashMap<>();
for (TsKvEntry entry : ts) {
tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry);
}
for (EntityView entityView : result) { for (EntityView entityView : result) {
if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
&& !entityView.getKeys().getTimeseries().isEmpty()) { entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet());
List<TsKvEntry> entityViewLatest = new ArrayList<>(); List<TsKvEntry> entityViewLatest = new ArrayList<>();
for (String key : entityView.getKeys().getTimeseries()) { long startTs = entityView.getStartTimeMs();
long startTime = entityView.getStartTimeMs(); long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
long endTime = entityView.getEndTimeMs(); for (String key : keys) {
long startTs = startTime; List<TsKvEntry> entries = tsMap.get(key);
long endTs = endTime == 0 ? System.currentTimeMillis() : endTime; if (entries != null) {
Optional<TsKvEntry> tsKvEntry = ts.stream() Optional<TsKvEntry> tsKvEntry = entries.stream()
.filter(entry -> entry.getKey().equals(key) && entry.getTs() > startTs && entry.getTs() <= endTs) .filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs)
.max(Comparator.comparingLong(TsKvEntry::getTs)); .max(Comparator.comparingLong(TsKvEntry::getTs));
if (tsKvEntry.isPresent()) { if (tsKvEntry.isPresent()) {
entityViewLatest.add(tsKvEntry.get()); entityViewLatest.add(tsKvEntry.get());
} }
} }
if (!entityViewLatest.isEmpty()) { }
saveLatestAndNotify(tenantId, entityView.getId(), entityViewLatest, new FutureCallback<Void>() { if (!entityViewLatest.isEmpty()) {
@Override saveLatestAndNotify(tenantId, entityView.getId(), entityViewLatest, new FutureCallback<Void>() {
public void onSuccess(@Nullable Void tmp) { @Override
} public void onSuccess(@Nullable Void tmp) {
}
@Override @Override
public void onFailure(Throwable t) { public void onFailure(Throwable t) {
} }
}); });
}
} }
} }
} }
@ -189,6 +196,22 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addMainCallback(deleteFuture, callback); addMainCallback(deleteFuture, callback);
} }
@Override
public void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback) {
ListenableFuture<Collection<String>> deleteFuture = tsService.removeAllLatest(tenantId, entityId);
Futures.addCallback(deleteFuture, new FutureCallback<Collection<String>>() {
@Override
public void onSuccess(@Nullable Collection<String> result) {
callback.onSuccess(result);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}, tsCallBackExecutor);
}
@Override @Override
public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) { public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value) saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
@ -255,10 +278,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
} }
} }
private void addMainCallback(ListenableFuture<List<Void>> saveFuture, final FutureCallback<Void> callback) { private <S, R> void addMainCallback(ListenableFuture<S> saveFuture, final FutureCallback<R> callback) {
Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() { Futures.addCallback(saveFuture, new FutureCallback<S>() {
@Override @Override
public void onSuccess(@Nullable List<Void> result) { public void onSuccess(@Nullable S result) {
callback.onSuccess(null); callback.onSuccess(null);
} }

2
common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java

@ -45,4 +45,6 @@ public interface TimeseriesService {
ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries); ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
ListenableFuture<List<Void>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys); ListenableFuture<List<Void>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId);
} }

52
dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.timeseries;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.annotation.Value;
@ -39,6 +40,7 @@ import org.thingsboard.server.dao.service.Validator;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -72,9 +74,11 @@ public class BaseTimeseriesService implements TimeseriesService {
queries.forEach(this::validate); queries.forEach(this::validate);
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId); EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId);
List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
entityView.getKeys().getTimeseries() : Collections.emptyList();
List<ReadTsKvQuery> filteredQueries = List<ReadTsKvQuery> filteredQueries =
queries.stream() queries.stream()
.filter(query -> entityView.getKeys().getTimeseries().isEmpty() || entityView.getKeys().getTimeseries().contains(query.getKey())) .filter(query -> keys.isEmpty() || keys.contains(query.getKey()))
.collect(Collectors.toList()); .collect(Collectors.toList());
return timeseriesDao.findAllAsync(tenantId, entityView.getEntityId(), updateQueriesForEntityView(entityView, filteredQueries)); return timeseriesDao.findAllAsync(tenantId, entityView.getEntityId(), updateQueriesForEntityView(entityView, filteredQueries));
} }
@ -86,27 +90,6 @@ public class BaseTimeseriesService implements TimeseriesService {
validate(entityId); validate(entityId);
List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size()); List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId);
List<String> filteredKeys = new ArrayList<>(keys);
if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null &&
!entityView.getKeys().getTimeseries().isEmpty()) {
filteredKeys.retainAll(entityView.getKeys().getTimeseries());
}
List<ReadTsKvQuery> queries =
filteredKeys.stream()
.map(key -> {
long endTs = entityView.getEndTimeMs() != 0 ? entityView.getEndTimeMs() : Long.MAX_VALUE;
return new BaseReadTsKvQuery(key, entityView.getStartTimeMs(), endTs, 1, "DESC");
})
.collect(Collectors.toList());
if (queries.size() > 0) {
return timeseriesDao.findAllAsync(tenantId, entityView.getEntityId(), queries);
} else {
return Futures.immediateFuture(new ArrayList<>());
}
}
keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key))); keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key)));
return Futures.allAsList(futures); return Futures.allAsList(futures);
} }
@ -114,17 +97,7 @@ public class BaseTimeseriesService implements TimeseriesService {
@Override @Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) { public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
validate(entityId); validate(entityId);
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { return timeseriesLatestDao.findAllLatest(tenantId, entityId);
EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId);
if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null &&
!entityView.getKeys().getTimeseries().isEmpty()) {
return findLatest(tenantId, entityId, entityView.getKeys().getTimeseries());
} else {
return Futures.immediateFuture(new ArrayList<>());
}
} else {
return timeseriesLatestDao.findAllLatest(tenantId, entityId);
}
} }
@Override @Override
@ -212,6 +185,19 @@ public class BaseTimeseriesService implements TimeseriesService {
return Futures.allAsList(futures); return Futures.allAsList(futures);
} }
@Override
public ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId) {
validate(entityId);
return Futures.transformAsync(this.findAllLatest(tenantId, entityId), latest -> {
if (!latest.isEmpty()) {
Collection<String> keys = latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList());
return Futures.transform(this.removeLatest(tenantId, entityId, keys), res -> keys, MoreExecutors.directExecutor());
} else {
return Futures.immediateFuture(Collections.emptyList());
}
}, MoreExecutors.directExecutor());
}
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) { private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) {
futures.add(timeseriesDao.remove(tenantId, entityId, query)); futures.add(timeseriesDao.remove(tenantId, entityId, query));
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));

3
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java

@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -50,5 +51,7 @@ public interface RuleEngineTelemetryService {
void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback); void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
} }

Loading…
Cancel
Save