Browse Source

EntityView improvements

pull/3228/head
YevhenBondarenko 6 years ago
parent
commit
6458db74fa
  1. 56
      application/src/main/java/org/thingsboard/server/controller/EntityViewController.java
  2. 24
      application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java
  3. 30
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java
  4. 69
      application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java
  5. 2
      common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java
  6. 52
      dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java
  7. 3
      rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java

56
application/src/main/java/org/thingsboard/server/controller/EntityViewController.java

@ -58,6 +58,7 @@ import org.thingsboard.server.service.security.permission.Resource;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -123,10 +124,10 @@ public class EntityViewController extends BaseController {
futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SERVER_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser()));
futures.add(deleteAttributesFromEntityView(existingEntityView, DataConstants.SHARED_SCOPE, existingEntityView.getKeys().getAttributes().getCs(), getCurrentUser()));
}
if (existingEntityView.getKeys().getTimeseries() != null && !existingEntityView.getKeys().getTimeseries().isEmpty()) {
futures.add(deleteLatestFromEntityView(existingEntityView, existingEntityView.getKeys().getTimeseries(), getCurrentUser()));
}
}
List<String> tsKeys = existingEntityView.getKeys() != null && existingEntityView.getKeys().getTimeseries() != null ?
existingEntityView.getKeys().getTimeseries() : Collections.emptyList();
futures.add(deleteLatestFromEntityView(existingEntityView, tsKeys, getCurrentUser()));
}
EntityView savedEntityView = checkNotNull(entityViewService.saveEntityView(entityView));
@ -136,9 +137,7 @@ public class EntityViewController extends BaseController {
futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SERVER_SCOPE, savedEntityView.getKeys().getAttributes().getSs(), getCurrentUser()));
futures.add(copyAttributesFromEntityToEntityView(savedEntityView, DataConstants.SHARED_SCOPE, savedEntityView.getKeys().getAttributes().getSh(), getCurrentUser()));
}
if (savedEntityView.getKeys().getTimeseries() != null && !savedEntityView.getKeys().getTimeseries().isEmpty()) {
futures.add(copyLatestFromEntityToEntityView(savedEntityView, getCurrentUser()));
}
futures.add(copyLatestFromEntityToEntityView(savedEntityView, getCurrentUser()));
}
for (ListenableFuture<?> future : futures) {
try {
@ -184,7 +183,27 @@ public class EntityViewController extends BaseController {
}
});
} else {
resultFuture.set(null);
tsSubService.deleteAllLatest(entityView.getTenantId(), entityId, new FutureCallback<Collection<String>>() {
@Override
public void onSuccess(@Nullable Collection<String> keys) {
try {
logTimeseriesDeleted(user, entityId, new ArrayList<>(keys), null);
} catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e);
}
resultFuture.set(null);
}
@Override
public void onFailure(Throwable t) {
try {
logTimeseriesDeleted(user, entityId, Collections.emptyList(), t);
} catch (ThingsboardException e) {
log.error("Failed to log timeseries delete", e);
}
resultFuture.setException(t);
}
});
}
return resultFuture;
}
@ -222,18 +241,21 @@ public class EntityViewController extends BaseController {
private ListenableFuture<List<Void>> copyLatestFromEntityToEntityView(EntityView entityView, SecurityUser user) {
EntityViewId entityId = entityView.getId();
List<String> keys = entityView.getKeys().getTimeseries();
long startTime = entityView.getStartTimeMs();
long endTime = entityView.getEndTimeMs();
ListenableFuture<List<TsKvEntry>> latestFuture;
if (startTime == 0 && endTime == 0) {
latestFuture = tsService.findLatest(user.getTenantId(), entityView.getEntityId(), keys);
List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
entityView.getKeys().getTimeseries() : Collections.emptyList();
long startTs = entityView.getStartTimeMs();
long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
ListenableFuture<List<String>> keysFuture;
if (keys.isEmpty()) {
keysFuture = Futures.transform(tsService.findAllLatest(user.getTenantId(),
entityView.getEntityId()), latest -> latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()), MoreExecutors.directExecutor());
} else {
long startTs = startTime;
long endTs = endTime == 0 ? System.currentTimeMillis() : endTime;
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
latestFuture = tsService.findAll(user.getTenantId(), entityView.getEntityId(), queries);
keysFuture = Futures.immediateFuture(keys);
}
ListenableFuture<List<TsKvEntry>> latestFuture = Futures.transformAsync(keysFuture, fetchKeys -> {
List<ReadTsKvQuery> queries = fetchKeys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
return tsService.findAll(user.getTenantId(), entityView.getEntityId(), queries);
}, MoreExecutors.directExecutor());
return Futures.transform(latestFuture, latestValues -> {
if (latestValues != null && !latestValues.isEmpty()) {
tsSubService.saveLatestAndNotify(entityView.getTenantId(), entityId, latestValues, new FutureCallback<Void>() {

24
application/src/main/java/org/thingsboard/server/service/install/update/DefaultDataUpdateService.java

@ -43,6 +43,7 @@ import org.thingsboard.server.service.install.InstallScripts;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -145,18 +146,21 @@ public class DefaultDataUpdateService implements DataUpdateService {
private ListenableFuture<List<Void>> updateEntityViewLatestTelemetry(EntityView entityView) {
EntityViewId entityId = entityView.getId();
List<String> keys = entityView.getKeys().getTimeseries();
long startTime = entityView.getStartTimeMs();
long endTime = entityView.getEndTimeMs();
ListenableFuture<List<TsKvEntry>> latestFuture;
if (startTime == 0 && endTime == 0) {
latestFuture = tsService.findLatest(TenantId.SYS_TENANT_ID, entityView.getEntityId(), keys);
List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
entityView.getKeys().getTimeseries() : Collections.emptyList();
long startTs = entityView.getStartTimeMs();
long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
ListenableFuture<List<String>> keysFuture;
if (keys.isEmpty()) {
keysFuture = Futures.transform(tsService.findAllLatest(TenantId.SYS_TENANT_ID,
entityView.getEntityId()), latest -> latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList()), MoreExecutors.directExecutor());
} else {
long startTs = startTime;
long endTs = endTime == 0 ? System.currentTimeMillis() : endTime;
List<ReadTsKvQuery> queries = keys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
latestFuture = tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries);
keysFuture = Futures.immediateFuture(keys);
}
ListenableFuture<List<TsKvEntry>> latestFuture = Futures.transformAsync(keysFuture, fetchKeys -> {
List<ReadTsKvQuery> queries = fetchKeys.stream().map(key -> new BaseReadTsKvQuery(key, startTs, endTs, 1, "DESC")).collect(Collectors.toList());
return tsService.findAll(TenantId.SYS_TENANT_ID, entityView.getEntityId(), queries);
}, MoreExecutors.directExecutor());
return Futures.transformAsync(latestFuture, latestValues -> {
if (latestValues != null && !latestValues.isEmpty()) {
ListenableFuture<List<Void>> saveFuture = tsService.saveLatest(TenantId.SYS_TENANT_ID, entityId, latestValues);

30
application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbLocalSubscriptionService.java

@ -36,6 +36,7 @@ import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
import org.thingsboard.server.common.msg.queue.TbCallback;
import org.thingsboard.server.queue.util.TbCoreComponent;
import org.thingsboard.server.service.queue.TbClusterService;
import org.thingsboard.server.service.telemetry.DefaultTelemetryWebSocketService;
import org.thingsboard.server.service.telemetry.sub.AlarmSubscriptionUpdate;
import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate;
@ -114,11 +115,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
//TODO 3.1: replace null callbacks with callbacks from websocket service.
@Override
public void addSubscription(TbSubscription subscription) {
EntityId entityId = subscription.getEntityId();
// Telemetry subscription on Entity Views are handled differently, because we need to allow only certain keys and time ranges;
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW) && TbSubscriptionType.TIMESERIES.equals(subscription.getType())) {
subscription = resolveEntityViewSubscription((TbTimeseriesSubscription) subscription);
}
pushSubscriptionToManagerService(subscription, true);
registerSubscription(subscription);
}
@ -203,30 +199,6 @@ public class DefaultTbLocalSubscriptionService implements TbLocalSubscriptionSer
}
}
private TbSubscription resolveEntityViewSubscription(TbTimeseriesSubscription subscription) {
EntityView entityView = entityViewService.findEntityViewById(TenantId.SYS_TENANT_ID, new EntityViewId(subscription.getEntityId().getId()));
Map<String, Long> keyStates;
if (subscription.isAllKeys()) {
keyStates = entityView.getKeys().getTimeseries().stream().collect(Collectors.toMap(k -> k, k -> 0L));
} else {
keyStates = subscription.getKeyStates().entrySet()
.stream().filter(entry -> entityView.getKeys().getTimeseries().contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
return TbTimeseriesSubscription.builder()
.serviceId(subscription.getServiceId())
.sessionId(subscription.getSessionId())
.subscriptionId(subscription.getSubscriptionId())
.tenantId(subscription.getTenantId())
.entityId(entityView.getEntityId())
.startTime(entityView.getStartTimeMs())
.endTime(entityView.getEndTimeMs())
.allKeys(false)
.keyStates(keyStates).build();
}
private void registerSubscription(TbSubscription subscription) {
Map<Integer, TbSubscription> sessionSubscriptions = subscriptionsBySessionId.computeIfAbsent(subscription.getSessionId(), k -> new ConcurrentHashMap<>());
sessionSubscriptions.put(subscription.getSubscriptionId(), subscription);

69
application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetrySubscriptionService.java

@ -52,9 +52,12 @@ import javax.annotation.Nullable;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -122,33 +125,37 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
@Override
public void onSuccess(@Nullable List<EntityView> result) {
if (result != null) {
Map<String, List<TsKvEntry>> tsMap = new HashMap<>();
for (TsKvEntry entry : ts) {
tsMap.computeIfAbsent(entry.getKey(), s -> new ArrayList<>()).add(entry);
}
for (EntityView entityView : result) {
if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null
&& !entityView.getKeys().getTimeseries().isEmpty()) {
List<TsKvEntry> entityViewLatest = new ArrayList<>();
for (String key : entityView.getKeys().getTimeseries()) {
long startTime = entityView.getStartTimeMs();
long endTime = entityView.getEndTimeMs();
long startTs = startTime;
long endTs = endTime == 0 ? System.currentTimeMillis() : endTime;
Optional<TsKvEntry> tsKvEntry = ts.stream()
.filter(entry -> entry.getKey().equals(key) && entry.getTs() > startTs && entry.getTs() <= endTs)
List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
entityView.getKeys().getTimeseries() : new ArrayList<>(tsMap.keySet());
List<TsKvEntry> entityViewLatest = new ArrayList<>();
long startTs = entityView.getStartTimeMs();
long endTs = entityView.getEndTimeMs() == 0 ? Long.MAX_VALUE : entityView.getEndTimeMs();
for (String key : keys) {
List<TsKvEntry> entries = tsMap.get(key);
if (entries != null) {
Optional<TsKvEntry> tsKvEntry = entries.stream()
.filter(entry -> entry.getTs() > startTs && entry.getTs() <= endTs)
.max(Comparator.comparingLong(TsKvEntry::getTs));
if (tsKvEntry.isPresent()) {
entityViewLatest.add(tsKvEntry.get());
}
}
if (!entityViewLatest.isEmpty()) {
saveLatestAndNotify(tenantId, entityView.getId(), entityViewLatest, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {
}
}
if (!entityViewLatest.isEmpty()) {
saveLatestAndNotify(tenantId, entityView.getId(), entityViewLatest, new FutureCallback<Void>() {
@Override
public void onSuccess(@Nullable Void tmp) {
}
@Override
public void onFailure(Throwable t) {
}
});
}
@Override
public void onFailure(Throwable t) {
}
});
}
}
}
@ -189,6 +196,22 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
addMainCallback(deleteFuture, callback);
}
@Override
public void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback) {
ListenableFuture<Collection<String>> deleteFuture = tsService.removeAllLatest(tenantId, entityId);
Futures.addCallback(deleteFuture, new FutureCallback<Collection<String>>() {
@Override
public void onSuccess(@Nullable Collection<String> result) {
callback.onSuccess(result);
}
@Override
public void onFailure(Throwable t) {
callback.onFailure(t);
}
}, tsCallBackExecutor);
}
@Override
public void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback) {
saveAndNotify(tenantId, entityId, scope, Collections.singletonList(new BaseAttributeKvEntry(new LongDataEntry(key, value)
@ -255,10 +278,10 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
}
}
private void addMainCallback(ListenableFuture<List<Void>> saveFuture, final FutureCallback<Void> callback) {
Futures.addCallback(saveFuture, new FutureCallback<List<Void>>() {
private <S, R> void addMainCallback(ListenableFuture<S> saveFuture, final FutureCallback<R> callback) {
Futures.addCallback(saveFuture, new FutureCallback<S>() {
@Override
public void onSuccess(@Nullable List<Void> result) {
public void onSuccess(@Nullable S result) {
callback.onSuccess(null);
}

2
common/dao-api/src/main/java/org/thingsboard/server/dao/timeseries/TimeseriesService.java

@ -45,4 +45,6 @@ public interface TimeseriesService {
ListenableFuture<List<Void>> remove(TenantId tenantId, EntityId entityId, List<DeleteTsKvQuery> queries);
ListenableFuture<List<Void>> removeLatest(TenantId tenantId, EntityId entityId, Collection<String> keys);
ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId);
}

52
dao/src/main/java/org/thingsboard/server/dao/timeseries/BaseTimeseriesService.java

@ -18,6 +18,7 @@ package org.thingsboard.server.dao.timeseries;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
@ -39,6 +40,7 @@ import org.thingsboard.server.dao.service.Validator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@ -72,9 +74,11 @@ public class BaseTimeseriesService implements TimeseriesService {
queries.forEach(this::validate);
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId);
List<String> keys = entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null ?
entityView.getKeys().getTimeseries() : Collections.emptyList();
List<ReadTsKvQuery> filteredQueries =
queries.stream()
.filter(query -> entityView.getKeys().getTimeseries().isEmpty() || entityView.getKeys().getTimeseries().contains(query.getKey()))
.filter(query -> keys.isEmpty() || keys.contains(query.getKey()))
.collect(Collectors.toList());
return timeseriesDao.findAllAsync(tenantId, entityView.getEntityId(), updateQueriesForEntityView(entityView, filteredQueries));
}
@ -86,27 +90,6 @@ public class BaseTimeseriesService implements TimeseriesService {
validate(entityId);
List<ListenableFuture<TsKvEntry>> futures = Lists.newArrayListWithExpectedSize(keys.size());
keys.forEach(key -> Validator.validateString(key, "Incorrect key " + key));
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId);
List<String> filteredKeys = new ArrayList<>(keys);
if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null &&
!entityView.getKeys().getTimeseries().isEmpty()) {
filteredKeys.retainAll(entityView.getKeys().getTimeseries());
}
List<ReadTsKvQuery> queries =
filteredKeys.stream()
.map(key -> {
long endTs = entityView.getEndTimeMs() != 0 ? entityView.getEndTimeMs() : Long.MAX_VALUE;
return new BaseReadTsKvQuery(key, entityView.getStartTimeMs(), endTs, 1, "DESC");
})
.collect(Collectors.toList());
if (queries.size() > 0) {
return timeseriesDao.findAllAsync(tenantId, entityView.getEntityId(), queries);
} else {
return Futures.immediateFuture(new ArrayList<>());
}
}
keys.forEach(key -> futures.add(timeseriesLatestDao.findLatest(tenantId, entityId, key)));
return Futures.allAsList(futures);
}
@ -114,17 +97,7 @@ public class BaseTimeseriesService implements TimeseriesService {
@Override
public ListenableFuture<List<TsKvEntry>> findAllLatest(TenantId tenantId, EntityId entityId) {
validate(entityId);
if (entityId.getEntityType().equals(EntityType.ENTITY_VIEW)) {
EntityView entityView = entityViewService.findEntityViewById(tenantId, (EntityViewId) entityId);
if (entityView.getKeys() != null && entityView.getKeys().getTimeseries() != null &&
!entityView.getKeys().getTimeseries().isEmpty()) {
return findLatest(tenantId, entityId, entityView.getKeys().getTimeseries());
} else {
return Futures.immediateFuture(new ArrayList<>());
}
} else {
return timeseriesLatestDao.findAllLatest(tenantId, entityId);
}
return timeseriesLatestDao.findAllLatest(tenantId, entityId);
}
@Override
@ -212,6 +185,19 @@ public class BaseTimeseriesService implements TimeseriesService {
return Futures.allAsList(futures);
}
@Override
public ListenableFuture<Collection<String>> removeAllLatest(TenantId tenantId, EntityId entityId) {
validate(entityId);
return Futures.transformAsync(this.findAllLatest(tenantId, entityId), latest -> {
if (!latest.isEmpty()) {
Collection<String> keys = latest.stream().map(TsKvEntry::getKey).collect(Collectors.toList());
return Futures.transform(this.removeLatest(tenantId, entityId, keys), res -> keys, MoreExecutors.directExecutor());
} else {
return Futures.immediateFuture(Collections.emptyList());
}
}, MoreExecutors.directExecutor());
}
private void deleteAndRegisterFutures(TenantId tenantId, List<ListenableFuture<Void>> futures, EntityId entityId, DeleteTsKvQuery query) {
futures.add(timeseriesDao.remove(tenantId, entityId, query));
futures.add(timeseriesLatestDao.removeLatest(tenantId, entityId, query));

3
rule-engine/rule-engine-api/src/main/java/org/thingsboard/rule/engine/api/RuleEngineTelemetryService.java

@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.TenantId;
import org.thingsboard.server.common.data.kv.AttributeKvEntry;
import org.thingsboard.server.common.data.kv.TsKvEntry;
import java.util.Collection;
import java.util.List;
import java.util.Set;
@ -50,5 +51,7 @@ public interface RuleEngineTelemetryService {
void deleteLatest(TenantId tenantId, EntityId entityId, List<String> keys, FutureCallback<Void> callback);
void deleteAllLatest(TenantId tenantId, EntityId entityId, FutureCallback<Collection<String>> callback);
}

Loading…
Cancel
Save