From 6458db74fa1db57c5bde7764acaf5d41db0bb640 Mon Sep 17 00:00:00 2001 From: YevhenBondarenko Date: Tue, 4 Aug 2020 20:15:31 +0300 Subject: [PATCH] EntityView improvements --- .../controller/EntityViewController.java | 56 ++++++++++----- .../update/DefaultDataUpdateService.java | 24 ++++--- .../DefaultTbLocalSubscriptionService.java | 30 +------- .../DefaultTelemetrySubscriptionService.java | 69 ++++++++++++------- .../dao/timeseries/TimeseriesService.java | 2 + .../dao/timeseries/BaseTimeseriesService.java | 52 +++++--------- .../api/RuleEngineTelemetryService.java | 3 + 7 files changed, 124 insertions(+), 112 deletions(-) diff --git a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java index 788d8dcc37..0faa70727b 100644 --- a/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java +++ b/application/src/main/java/org/thingsboard/server/controller/EntityViewController.java @@ -58,6 +58,7 @@ import org.thingsboard.server.service.security.permission.Resource; import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -123,10 +124,10 @@ public class EntityViewController extends BaseController { futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SERVER_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SHARED_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser())); } - if (existingEntityView.getKeys().getTimeseries() != null && !existingEntityView.getKeys().getTimeseries().isEmpty()) { - futures.add(deleteLatestFromEntityView(existingEntityView, existingEntityView.getKeys().getTimeseries(), getCurrentUser())); - } } + List tsKeys = existingEntityView.getKeys() != null && existingEntityView.getKeys().getTimeseries() != null ? + existingEntityView.getKeys().getTimeseries() : Collections.emptyList(); + futures.add(deleteLatestFromEntityView(existingEntityView, tsKeys, getCurrentUser())); } EntityView savedEntityView = checkNotNull(entityViewService.saveEntityView(entityView)); @@ -136,9 +137,7 @@ public class EntityViewController extends BaseController { futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs(), getCurrentUser())); futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh(), getCurrentUser())); } - if (savedEntityView.getKeys().getTimeseries() != null && !savedEntityView.getKeys().getTimeseries().isEmpty()) { - futures.add(copyLatestFromEntityToEntityView(savedEntityView, getCurrentUser())); - } + futures.add(copyLatestFromEntityToEntityView(savedEntityView, getCurrentUser())); } for (ListenableFuture future : futures) { try { @@ -184,7 +183,27 @@ public class EntityViewController extends BaseController { } }); } else { - resultFuture.set(null); + tsSubService.deleteAllLatest(entityView.getTenantId(), entityId, new FutureCallback>() { + @Override + public void onSuccess(@Nullable Collection keys) { + try { + logTimeseriesDeleted(user, entityId, new ArrayList<>(keys), null); + } catch (ThingsboardException e) { + log.error("Failed to log timeseries delete", e); + } + resultFuture.set(null); + } + + @Override + public void onFailure(Throwable t) { + try { + logTimeseriesDeleted(user, entityId, Collections.emptyList(), t); + } catch (ThingsboardException e) { + log.error("Failed to log timeseries delete", e); + } + resultFuture.setException(t); + } + }); } return resultFuture; } @@ -222,18 +241,21 @@ public class EntityViewController extends BaseController { private ListenableFuture> copyLatestFromEntityToEntityView(EntityView entityView, SecurityUser user) { EntityViewId entityId = entityView.getId(); - List keys = entityView.getKeys().getTimeseries(); - long startTime = entityView.getStartTimeMs(); - long endTime = entityView.getEndTimeMs(); - ListenableFuture> latestFuture; - if (startTime == 0 && endTime == 0) { - latestFuture = tsService.findLatest(user.getTenantId(), entityView.getEntityId(), keys); + List keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ? + entityView.getKeys().getTimeseries() : Collections.emptyList(); + long startTs = entityView.getStartTimeMs(); + long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs(); + ListenableFuture> keysFuture; + if (keys.isEmpty()) { + keysFuture = Futures.transform(tsService.findAllLatest(user.getTenantId(), + entityView.getEntityId()), latest -> latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()), MoreExecutors.directExecutor()); } else { - long startTs = startTime; - long endTs = endTime == 0 ? System.currentTimeMillis() : endTime; - List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList()); - latestFuture = tsService.findAll(user.getTenantId(), entityView.getEntityId(), queries); + keysFuture = Futures.immediateFuture(keys); } + ListenableFuture> latestFuture = Futures.transformAsync(keysFuture, fetchKeys -> { + List queries = fetchKeys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList()); + return tsService.findAll(user.getTenantId(), entityView.getEntityId(), queries); + }, MoreExecutors.directExecutor()); return Futures.transform(latestFuture, latestValues -> { if (latestValues != null && !latestValues.isEmpty()) { tsSubService.saveLatestAndNotify(entityView.getTenantId(), entityId, latestValues, new FutureCallback() { diff --git a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java index dd61dc0d0f..76f5b47896 100644 --- a/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java +++ b/application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java @@ -43,6 +43,7 @@ import org.thingsboard.server.service.install.InstallScripts; import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @@ -145,18 +146,21 @@ public class DefaultDataUpdateService implements DataUpdateService { private ListenableFuture> updateEntityViewLatestTelemetry(EntityView entityView) { EntityViewId entityId = entityView.getId(); - List keys = entityView.getKeys().getTimeseries(); - long startTime = entityView.getStartTimeMs(); - long endTime = entityView.getEndTimeMs(); - ListenableFuture> latestFuture; - if (startTime == 0 && endTime == 0) { - latestFuture = tsService.findLatest(TenantId.SYS_TENANT_ID, entityView.getEntityId(), keys); + List keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ? + entityView.getKeys().getTimeseries() : Collections.emptyList(); + long startTs = entityView.getStartTimeMs(); + long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs(); + ListenableFuture> keysFuture; + if (keys.isEmpty()) { + keysFuture = Futures.transform(tsService.findAllLatest(TenantId.SYS_TENANT_ID, + entityView.getEntityId()), latest -> latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()), MoreExecutors.directExecutor()); } else { - long startTs = startTime; - long endTs = endTime == 0 ? System.currentTimeMillis() : endTime; - List queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList()); - latestFuture = tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries); + keysFuture = Futures.immediateFuture(keys); } + ListenableFuture> latestFuture = Futures.transformAsync(keysFuture, fetchKeys -> { + List queries = fetchKeys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList()); + return tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries); + }, MoreExecutors.directExecutor()); return Futures.transformAsync(latestFuture, latestValues -> { if (latestValues != null && !latestValues.isEmpty()) { ListenableFuture> saveFuture = tsService.saveLatest(TenantId.SYS_TENANT_ID, entityId, latestValues); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java index 4614247f7c..89233d2f4f 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java @@ -36,6 +36,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; import org.thingsboard.server.common.msg.queue.TbCallback; import org.thingsboard.server.queue.util.TbCoreComponent; import org.thingsboard.server.service.queue.TbClusterService; +import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService; import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate; import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; @@ -114,11 +115,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer //TODO 3.1: replace null callbacks with callbacks from websocket service. @Override public void addSubscription(TbSubscription subscription) { - EntityId entityId = subscription.getEntityId(); - // Telemetry subscription on Entity Views are handled differently, because we need to allow only certain keys and time ranges; - if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TbSubscriptionType.TIMESERIES.equals(subscription.getType())) { - subscription = resolveEntityViewSubscription((TbTimeseriesSubscription) subscription); - } pushSubscriptionToManagerService(subscription, true); registerSubscription(subscription); } @@ -203,30 +199,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer } } - private TbSubscription resolveEntityViewSubscription(TbTimeseriesSubscription subscription) { - EntityView entityView = entityViewService.findEntityViewById(TenantId.SYS_TENANT_ID, new EntityViewId(subscription.getEntityId().getId())); - - Map keyStates; - if (subscription.isAllKeys()) { - keyStates = entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(k -> k, k -> 0L)); - } else { - keyStates = subscription.getKeyStates().entrySet() - .stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - return TbTimeseriesSubscription.builder() - .serviceId(subscription.getServiceId()) - .sessionId(subscription.getSessionId()) - .subscriptionId(subscription.getSubscriptionId()) - .tenantId(subscription.getTenantId()) - .entityId(entityView.getEntityId()) - .startTime(entityView.getStartTimeMs()) - .endTime(entityView.getEndTimeMs()) - .allKeys(false) - .keyStates(keyStates).build(); - } - private void registerSubscription(TbSubscription subscription) { Map sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>()); sessionSubscriptions.put(subscription.getSubscriptionId(), subscription); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java index a21d839639..16d95a48ce 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java @@ -52,9 +52,12 @@ import javax.annotation.Nullable; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -122,33 +125,37 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @Override public void onSuccess(@Nullable List result) { if (result != null) { + Map> tsMap = new HashMap<>(); + for (TsKvEntry entry : ts) { + tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry); + } for (EntityView entityView : result) { - if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null - && !entityView.getKeys().getTimeseries().isEmpty()) { - List entityViewLatest = new ArrayList<>(); - for (String key : entityView.getKeys().getTimeseries()) { - long startTime = entityView.getStartTimeMs(); - long endTime = entityView.getEndTimeMs(); - long startTs = startTime; - long endTs = endTime == 0 ? System.currentTimeMillis() : endTime; - Optional tsKvEntry = ts.stream() - .filter(entry -> entry.getKey().equals(key) && entry.getTs() > startTs && entry.getTs() <= endTs) + List keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ? + entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet()); + List entityViewLatest = new ArrayList<>(); + long startTs = entityView.getStartTimeMs(); + long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs(); + for (String key : keys) { + List entries = tsMap.get(key); + if (entries != null) { + Optional tsKvEntry = entries.stream() + .filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs) .max(Comparator.comparingLong(TsKvEntry::getTs)); if (tsKvEntry.isPresent()) { entityViewLatest.add(tsKvEntry.get()); } } - if (!entityViewLatest.isEmpty()) { - saveLatestAndNotify(tenantId, entityView.getId(), entityViewLatest, new FutureCallback() { - @Override - public void onSuccess(@Nullable Void tmp) { - } + } + if (!entityViewLatest.isEmpty()) { + saveLatestAndNotify(tenantId, entityView.getId(), entityViewLatest, new FutureCallback() { + @Override + public void onSuccess(@Nullable Void tmp) { + } - @Override - public void onFailure(Throwable t) { - } - }); - } + @Override + public void onFailure(Throwable t) { + } + }); } } } @@ -189,6 +196,22 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer addMainCallback(deleteFuture, callback); } + @Override + public void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback) { + ListenableFuture> deleteFuture = tsService.removeAllLatest(tenantId, entityId); + Futures.addCallback(deleteFuture, new FutureCallback>() { + @Override + public void onSuccess(@Nullable Collection result) { + callback.onSuccess(result); + } + + @Override + public void onFailure(Throwable t) { + callback.onFailure(t); + } + }, tsCallBackExecutor); + } + @Override public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback callback) { saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value) @@ -255,10 +278,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer } } - private void addMainCallback(ListenableFuture> saveFuture, final FutureCallback callback) { - Futures.addCallback(saveFuture, new FutureCallback>() { + private void addMainCallback(ListenableFuture saveFuture, final FutureCallback callback) { + Futures.addCallback(saveFuture, new FutureCallback() { @Override - public void onSuccess(@Nullable List result) { + public void onSuccess(@Nullable S result) { callback.onSuccess(null); } diff --git a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java index a490562ac9..2a5a701f9e 100644 --- a/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java +++ b/common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java @@ -45,4 +45,6 @@ public interface TimeseriesService { ListenableFuture> remove(TenantId tenantId, EntityId entityId, List queries); ListenableFuture> removeLatest(TenantId tenantId, EntityId entityId, Collection keys); + + ListenableFuture> removeAllLatest(TenantId tenantId, EntityId entityId); } diff --git a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java index 3c21ace038..b86d6128f6 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java +++ b/dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java @@ -18,6 +18,7 @@ package org.thingsboard.server.dao.timeseries; import com.google.common.collect.Lists; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; @@ -39,6 +40,7 @@ import org.thingsboard.server.dao.service.Validator; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -72,9 +74,11 @@ public class BaseTimeseriesService implements TimeseriesService { queries.forEach(this::validate); if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId); + List keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ? + entityView.getKeys().getTimeseries() : Collections.emptyList(); List filteredQueries = queries.stream() - .filter(query -> entityView.getKeys().getTimeseries().isEmpty() || entityView.getKeys().getTimeseries().contains(query.getKey())) + .filter(query -> keys.isEmpty() || keys.contains(query.getKey())) .collect(Collectors.toList()); return timeseriesDao.findAllAsync(tenantId, entityView.getEntityId(), updateQueriesForEntityView(entityView, filteredQueries)); } @@ -86,27 +90,6 @@ public class BaseTimeseriesService implements TimeseriesService { validate(entityId); List> futures = Lists.newArrayListWithExpectedSize(keys.size()); keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key)); - if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { - EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId); - List filteredKeys = new ArrayList<>(keys); - if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null && - !entityView.getKeys().getTimeseries().isEmpty()) { - filteredKeys.retainAll(entityView.getKeys().getTimeseries()); - } - List queries = - filteredKeys.stream() - .map(key -> { - long endTs = entityView.getEndTimeMs() != 0 ? entityView.getEndTimeMs() : Long.MAX_VALUE; - return new BaseReadTsKvQuery(key, entityView.getStartTimeMs(), endTs, 1, "DESC"); - }) - .collect(Collectors.toList()); - - if (queries.size() > 0) { - return timeseriesDao.findAllAsync(tenantId, entityView.getEntityId(), queries); - } else { - return Futures.immediateFuture(new ArrayList<>()); - } - } keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key))); return Futures.allAsList(futures); } @@ -114,17 +97,7 @@ public class BaseTimeseriesService implements TimeseriesService { @Override public ListenableFuture> findAllLatest(TenantId tenantId, EntityId entityId) { validate(entityId); - if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) { - EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId); - if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null && - !entityView.getKeys().getTimeseries().isEmpty()) { - return findLatest(tenantId, entityId, entityView.getKeys().getTimeseries()); - } else { - return Futures.immediateFuture(new ArrayList<>()); - } - } else { - return timeseriesLatestDao.findAllLatest(tenantId, entityId); - } + return timeseriesLatestDao.findAllLatest(tenantId, entityId); } @Override @@ -212,6 +185,19 @@ public class BaseTimeseriesService implements TimeseriesService { return Futures.allAsList(futures); } + @Override + public ListenableFuture> removeAllLatest(TenantId tenantId, EntityId entityId) { + validate(entityId); + return Futures.transformAsync(this.findAllLatest(tenantId, entityId), latest -> { + if (!latest.isEmpty()) { + Collection keys = latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()); + return Futures.transform(this.removeLatest(tenantId, entityId, keys), res -> keys, MoreExecutors.directExecutor()); + } else { + return Futures.immediateFuture(Collections.emptyList()); + } + }, MoreExecutors.directExecutor()); + } + private void deleteAndRegisterFutures(TenantId tenantId, List> futures, EntityId entityId, DeleteTsKvQuery query) { futures.add(timeseriesDao.remove(tenantId, entityId, query)); futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query)); diff --git a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java index 8cb410bf19..07cc897d19 100644 --- a/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java +++ b/rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.TenantId; import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; +import java.util.Collection; import java.util.List; import java.util.Set; @@ -50,5 +51,7 @@ public interface RuleEngineTelemetryService { void deleteLatest(TenantId tenantId, EntityId entityId, List keys, FutureCallback callback); + void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback> callback); + }