diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 689f80f757..330b096b78 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -54,6 +54,7 @@ import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUpdate; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd; @@ -92,7 +93,7 @@ import java.util.stream.Collectors; public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubscriptionService { private static final int DEFAULT_LIMIT = 100; - private final Map> subscriptionsBySessionId = new ConcurrentHashMap<>(); + private final Map> subscriptionsBySessionId = new ConcurrentHashMap<>(); @Autowired private TelemetryWebSocketService wsService; @@ -202,7 +203,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc //TODO: validate number of dynamic page links against rate limits. Ignore dynamic flag if limit is reached. TbEntityDataSubCtx finalCtx = ctx; ScheduledFuture task = scheduler.scheduleWithFixedDelay( - () -> refreshDynamicQuery(tenantId, customerId, finalCtx), + () -> refreshDynamicQuery(finalCtx), dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); finalCtx.setRefreshTask(task); } @@ -235,6 +236,26 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc }, wsCallBackExecutor); } + @Override + public void handleCmd(TelemetryWebSocketSessionRef session, EntityCountCmd cmd) { + TbEntityCountSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId()); + if (ctx == null) { + ctx = createSubCtx(session, cmd); + long start = System.currentTimeMillis(); + ctx.fetchData(); + long end = System.currentTimeMillis(); + stats.getRegularQueryInvocationCnt().incrementAndGet(); + stats.getRegularQueryTimeSpent().addAndGet(end - start); + TbEntityCountSubCtx finalCtx = ctx; + ScheduledFuture task = scheduler.scheduleWithFixedDelay( + () -> refreshDynamicQuery(finalCtx), + dynamicPageLinkRefreshInterval, dynamicPageLinkRefreshInterval, TimeUnit.SECONDS); + finalCtx.setRefreshTask(task); + } else { + log.debug("[{}][{}] Received duplicate command: {}", session.getSessionId(), cmd.getCmdId(), cmd); + } + } + @Override public void handleCmd(TelemetryWebSocketSessionRef session, AlarmDataCmd cmd) { TbAlarmDataSubCtx ctx = getSubCtx(session.getSessionId(), cmd.getCmdId()); @@ -267,7 +288,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } } - private void refreshDynamicQuery(TenantId tenantId, CustomerId customerId, TbEntityDataSubCtx finalCtx) { + private void refreshDynamicQuery(TbAbstractSubCtx finalCtx) { try { long start = System.currentTimeMillis(); finalCtx.update(); @@ -299,7 +320,7 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } private TbEntityDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityDataCmd cmd) { - Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); + Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); TbEntityDataSubCtx ctx = new TbEntityDataSubCtx(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmd.getCmdId(), maxEntitiesPerDataSubscription); if (cmd.getQuery() != null) { @@ -309,8 +330,20 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc return ctx; } + private TbEntityCountSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, EntityCountCmd cmd) { + Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); + TbEntityCountSubCtx ctx = new TbEntityCountSubCtx(serviceId, wsService, entityService, localSubscriptionService, + attributesService, stats, sessionRef, cmd.getCmdId()); + if (cmd.getQuery() != null) { + ctx.setAndResolveQuery(cmd.getQuery()); + } + sessionSubs.put(cmd.getCmdId(), ctx); + return ctx; + } + + private TbAlarmDataSubCtx createSubCtx(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) { - Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); + Map sessionSubs = subscriptionsBySessionId.computeIfAbsent(sessionRef.getSessionId(), k -> new HashMap<>()); TbAlarmDataSubCtx ctx = new TbAlarmDataSubCtx(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, alarmService, sessionRef, cmd.getCmdId(), maxEntitiesPerAlarmSubscription); ctx.setAndResolveQuery(cmd.getQuery()); @@ -319,8 +352,8 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc } @SuppressWarnings("unchecked") - private T getSubCtx(String sessionId, int cmdId) { - Map sessionSubs = subscriptionsBySessionId.get(sessionId); + private T getSubCtx(String sessionId, int cmdId) { + Map sessionSubs = subscriptionsBySessionId.get(sessionId); if (sessionSubs != null) { return (T) sessionSubs.get(cmdId); } else { @@ -464,17 +497,16 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc cleanupAndCancel(getSubCtx(sessionId, cmd.getCmdId())); } - private void cleanupAndCancel(TbAbstractDataSubCtx ctx) { + private void cleanupAndCancel(TbAbstractSubCtx ctx) { if (ctx != null) { ctx.cancelTasks(); - ctx.clearEntitySubscriptions(); - ctx.clearDynamicValueSubscriptions(); + ctx.clearSubscriptions(); } } @Override public void cancelAllSessionSubscriptions(String sessionId) { - Map sessionSubs = subscriptionsBySessionId.remove(sessionId); + Map sessionSubs = subscriptionsBySessionId.remove(sessionId); if (sessionSubs != null) { sessionSubs.values().forEach(this::cleanupAndCancel); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java index a200f31a3f..08f7bfa1a6 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAbstractDataSubCtx.java @@ -15,32 +15,16 @@ */ package org.thingsboard.server.service.subscription; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; -import lombok.Data; import lombok.Getter; -import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.thingsboard.server.common.data.id.CustomerId; import org.thingsboard.server.common.data.id.EntityId; -import org.thingsboard.server.common.data.id.TenantId; -import org.thingsboard.server.common.data.id.UserId; -import org.thingsboard.server.common.data.kv.AttributeKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.AbstractDataQuery; -import org.thingsboard.server.common.data.query.ComplexFilterPredicate; -import org.thingsboard.server.common.data.query.DynamicValue; -import org.thingsboard.server.common.data.query.DynamicValueSourceType; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityDataPageLink; import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKeyType; -import org.thingsboard.server.common.data.query.FilterPredicateType; -import org.thingsboard.server.common.data.query.KeyFilter; -import org.thingsboard.server.common.data.query.KeyFilterPredicate; -import org.thingsboard.server.common.data.query.SimpleKeyFilterPredicate; import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.dao.attributes.AttributesService; import org.thingsboard.server.dao.entity.EntityService; @@ -52,140 +36,25 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ScheduledFuture; import java.util.function.Function; import java.util.stream.Collectors; @Slf4j -@Data -public abstract class TbAbstractDataSubCtx> { +public abstract class TbAbstractDataSubCtx> extends TbAbstractSubCtx { - protected final String serviceId; - protected final SubscriptionServiceStatistics stats; - protected final TelemetryWebSocketService wsService; - protected final EntityService entityService; - protected final TbLocalSubscriptionService localSubscriptionService; - protected final AttributesService attributesService; - protected final TelemetryWebSocketSessionRef sessionRef; - protected final int cmdId; protected final Map subToEntityIdMap; - protected final Set subToDynamicValueKeySet; - @Getter - protected final Map> dynamicValues; @Getter protected PageData data; - @Getter - @Setter - protected T query; - @Setter - protected volatile ScheduledFuture refreshTask; public TbAbstractDataSubCtx(String serviceId, TelemetryWebSocketService wsService, EntityService entityService, TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService, SubscriptionServiceStatistics stats, TelemetryWebSocketSessionRef sessionRef, int cmdId) { - this.serviceId = serviceId; - this.wsService = wsService; - this.entityService = entityService; - this.localSubscriptionService = localSubscriptionService; - this.attributesService = attributesService; - this.stats = stats; - this.sessionRef = sessionRef; - this.cmdId = cmdId; + super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId); this.subToEntityIdMap = new ConcurrentHashMap<>(); - this.subToDynamicValueKeySet = ConcurrentHashMap.newKeySet(); - this.dynamicValues = new ConcurrentHashMap<>(); - } - - public void setAndResolveQuery(T query) { - dynamicValues.clear(); - this.query = query; - if (query != null && query.getKeyFilters() != null) { - for (KeyFilter filter : query.getKeyFilters()) { - registerDynamicValues(filter.getPredicate()); - } - } - resolve(getTenantId(), getCustomerId(), getUserId()); - } - - public void resolve(TenantId tenantId, CustomerId customerId, UserId userId) { - List> futures = new ArrayList<>(); - for (DynamicValueKey key : dynamicValues.keySet()) { - switch (key.getSourceType()) { - case CURRENT_TENANT: - futures.add(resolveEntityValue(tenantId, tenantId, key)); - break; - case CURRENT_CUSTOMER: - if (customerId != null && !customerId.isNullUid()) { - futures.add(resolveEntityValue(tenantId, customerId, key)); - } - break; - case CURRENT_USER: - if (userId != null && !userId.isNullUid()) { - futures.add(resolveEntityValue(tenantId, userId, key)); - } - break; - } - } - try { - Map> tmpSubMap = new HashMap<>(); - for (DynamicValueKeySub sub : Futures.successfulAsList(futures).get()) { - tmpSubMap.computeIfAbsent(sub.getEntityId(), tmp -> new HashMap<>()).put(sub.getKey().getSourceAttribute(), sub); - } - for (EntityId entityId : tmpSubMap.keySet()) { - Map keyStates = new HashMap<>(); - Map dynamicValueKeySubMap = tmpSubMap.get(entityId); - dynamicValueKeySubMap.forEach((k, v) -> keyStates.put(k, v.getLastUpdateTs())); - int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); - TbAttributeSubscription sub = TbAttributeSubscription.builder() - .serviceId(serviceId) - .sessionId(sessionRef.getSessionId()) - .subscriptionId(subIdx) - .tenantId(sessionRef.getSecurityCtx().getTenantId()) - .entityId(entityId) - .updateConsumer((s, subscriptionUpdate) -> dynamicValueSubUpdate(s, subscriptionUpdate, dynamicValueKeySubMap)) - .allKeys(false) - .keyStates(keyStates) - .scope(TbAttributeSubscriptionScope.SERVER_SCOPE) - .build(); - subToDynamicValueKeySet.add(subIdx); - localSubscriptionService.addSubscription(sub); - } - } catch (InterruptedException | ExecutionException e) { - log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet()); - } - - } - - private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, - Map dynamicValueKeySubMap) { - Map latestUpdate = new HashMap<>(); - subscriptionUpdate.getData().forEach((k, v) -> { - Object[] data = (Object[]) v.get(0); - latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); - }); - - boolean invalidateFilter = false; - for (Map.Entry entry : latestUpdate.entrySet()) { - String k = entry.getKey(); - TsValue tsValue = entry.getValue(); - DynamicValueKeySub sub = dynamicValueKeySubMap.get(k); - if (sub.updateValue(tsValue)) { - invalidateFilter = true; - updateDynamicValuesByKey(sub, tsValue); - } - } - - if (invalidateFilter) { - update(); - } } public void fetchData() { @@ -231,104 +100,10 @@ public abstract class TbAbstractDataSubCtx lastUpdateTs && (lastUpdateValue == null || !lastUpdateValue.equals(value.getValue()))) { - this.lastUpdateTs = value.getTs(); - this.lastUpdateValue = value.getValue(); - return true; - } else { - return false; - } - } - } - - private ListenableFuture resolveEntityValue(TenantId tenantId, EntityId entityId, DynamicValueKey key) { - ListenableFuture> entry = attributesService.find(tenantId, entityId, - TbAttributeSubscriptionScope.SERVER_SCOPE.name(), key.getSourceAttribute()); - return Futures.transform(entry, attributeOpt -> { - DynamicValueKeySub sub = new DynamicValueKeySub(key, entityId); - if (attributeOpt.isPresent()) { - AttributeKvEntry attribute = attributeOpt.get(); - sub.setLastUpdateTs(attribute.getLastUpdateTs()); - sub.setLastUpdateValue(attribute.getValueAsString()); - updateDynamicValuesByKey(sub, new TsValue(attribute.getLastUpdateTs(), attribute.getValueAsString())); - } - return sub; - }, MoreExecutors.directExecutor()); - } - - @SuppressWarnings("unchecked") - private void updateDynamicValuesByKey(DynamicValueKeySub sub, TsValue tsValue) { - DynamicValueKey dvk = sub.getKey(); - switch (dvk.getPredicateType()) { - case STRING: - dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(tsValue.getValue())); - break; - case NUMERIC: - try { - Double dValue = Double.parseDouble(tsValue.getValue()); - dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(dValue)); - } catch (NumberFormatException e) { - dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(null)); - } - break; - case BOOLEAN: - Boolean bValue = Boolean.parseBoolean(tsValue.getValue()); - dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(bValue)); - break; - } - } - - @SuppressWarnings("unchecked") - private void registerDynamicValues(KeyFilterPredicate predicate) { - switch (predicate.getType()) { - case STRING: - case NUMERIC: - case BOOLEAN: - Optional value = getDynamicValueFromSimplePredicate((SimpleKeyFilterPredicate) predicate); - if (value.isPresent()) { - DynamicValue dynamicValue = value.get(); - DynamicValueKey key = new DynamicValueKey( - predicate.getType(), - dynamicValue.getSourceType(), - dynamicValue.getSourceAttribute()); - dynamicValues.computeIfAbsent(key, tmp -> new ArrayList<>()).add(dynamicValue); - } - break; - case COMPLEX: - ((ComplexFilterPredicate) predicate).getPredicates().forEach(this::registerDynamicValues); - } - } - - private Optional> getDynamicValueFromSimplePredicate(SimpleKeyFilterPredicate predicate) { - if (predicate.getValue().getUserValue() == null) { - return Optional.ofNullable(predicate.getValue().getDynamicValue()); - } else { - return Optional.empty(); - } - } - - public String getSessionId() { - return sessionRef.getSessionId(); - } - - public TenantId getTenantId() { - return sessionRef.getSecurityCtx().getTenantId(); - } - - public CustomerId getCustomerId() { - return sessionRef.getSecurityCtx().getCustomerId(); - } - - public UserId getUserId() { - return sessionRef.getSecurityCtx().getId(); + @Override + public void clearSubscriptions() { + clearEntitySubscriptions(); + super.clearSubscriptions(); } public void clearEntitySubscriptions() { @@ -340,26 +115,6 @@ public abstract class TbAbstractDataSubCtx task) { - this.refreshTask = task; - } - - public void cancelTasks() { - if (this.refreshTask != null) { - log.trace("[{}][{}] Canceling old refresh task", sessionRef.getSessionId(), cmdId); - this.refreshTask.cancel(true); - } - } - public void createSubscriptions(List keys, boolean resultToLatestValues) { Map> keysByType = getEntityKeyByTypeMap(keys); for (EntityData entityData : data.getData()) { @@ -459,14 +214,4 @@ public abstract class TbAbstractDataSubCtx { + + protected final String serviceId; + protected final SubscriptionServiceStatistics stats; + protected final TelemetryWebSocketService wsService; + protected final EntityService entityService; + protected final TbLocalSubscriptionService localSubscriptionService; + protected final AttributesService attributesService; + protected final TelemetryWebSocketSessionRef sessionRef; + protected final int cmdId; + protected final Set subToDynamicValueKeySet; + @Getter + protected final Map> dynamicValues; + @Getter + @Setter + protected T query; + @Setter + protected volatile ScheduledFuture refreshTask; + + public TbAbstractSubCtx(String serviceId, TelemetryWebSocketService wsService, + EntityService entityService, TbLocalSubscriptionService localSubscriptionService, + AttributesService attributesService, SubscriptionServiceStatistics stats, + TelemetryWebSocketSessionRef sessionRef, int cmdId) { + this.serviceId = serviceId; + this.wsService = wsService; + this.entityService = entityService; + this.localSubscriptionService = localSubscriptionService; + this.attributesService = attributesService; + this.stats = stats; + this.sessionRef = sessionRef; + this.cmdId = cmdId; + this.subToDynamicValueKeySet = ConcurrentHashMap.newKeySet(); + this.dynamicValues = new ConcurrentHashMap<>(); + } + + public void setAndResolveQuery(T query) { + dynamicValues.clear(); + this.query = query; + if (query != null && query.getKeyFilters() != null) { + for (KeyFilter filter : query.getKeyFilters()) { + registerDynamicValues(filter.getPredicate()); + } + } + resolve(getTenantId(), getCustomerId(), getUserId()); + } + + public void resolve(TenantId tenantId, CustomerId customerId, UserId userId) { + List> futures = new ArrayList<>(); + for (DynamicValueKey key : dynamicValues.keySet()) { + switch (key.getSourceType()) { + case CURRENT_TENANT: + futures.add(resolveEntityValue(tenantId, tenantId, key)); + break; + case CURRENT_CUSTOMER: + if (customerId != null && !customerId.isNullUid()) { + futures.add(resolveEntityValue(tenantId, customerId, key)); + } + break; + case CURRENT_USER: + if (userId != null && !userId.isNullUid()) { + futures.add(resolveEntityValue(tenantId, userId, key)); + } + break; + } + } + try { + Map> tmpSubMap = new HashMap<>(); + for (DynamicValueKeySub sub : Futures.successfulAsList(futures).get()) { + tmpSubMap.computeIfAbsent(sub.getEntityId(), tmp -> new HashMap<>()).put(sub.getKey().getSourceAttribute(), sub); + } + for (EntityId entityId : tmpSubMap.keySet()) { + Map keyStates = new HashMap<>(); + Map dynamicValueKeySubMap = tmpSubMap.get(entityId); + dynamicValueKeySubMap.forEach((k, v) -> keyStates.put(k, v.getLastUpdateTs())); + int subIdx = sessionRef.getSessionSubIdSeq().incrementAndGet(); + TbAttributeSubscription sub = TbAttributeSubscription.builder() + .serviceId(serviceId) + .sessionId(sessionRef.getSessionId()) + .subscriptionId(subIdx) + .tenantId(sessionRef.getSecurityCtx().getTenantId()) + .entityId(entityId) + .updateConsumer((s, subscriptionUpdate) -> dynamicValueSubUpdate(s, subscriptionUpdate, dynamicValueKeySubMap)) + .allKeys(false) + .keyStates(keyStates) + .scope(TbAttributeSubscriptionScope.SERVER_SCOPE) + .build(); + subToDynamicValueKeySet.add(subIdx); + localSubscriptionService.addSubscription(sub); + } + } catch (InterruptedException | ExecutionException e) { + log.info("[{}][{}][{}] Failed to resolve dynamic values: {}", tenantId, customerId, userId, dynamicValues.keySet()); + } + + } + + private void dynamicValueSubUpdate(String sessionId, TelemetrySubscriptionUpdate subscriptionUpdate, + Map dynamicValueKeySubMap) { + Map latestUpdate = new HashMap<>(); + subscriptionUpdate.getData().forEach((k, v) -> { + Object[] data = (Object[]) v.get(0); + latestUpdate.put(k, new TsValue((Long) data[0], (String) data[1])); + }); + + boolean invalidateFilter = false; + for (Map.Entry entry : latestUpdate.entrySet()) { + String k = entry.getKey(); + TsValue tsValue = entry.getValue(); + DynamicValueKeySub sub = dynamicValueKeySubMap.get(k); + if (sub.updateValue(tsValue)) { + invalidateFilter = true; + updateDynamicValuesByKey(sub, tsValue); + } + } + + if (invalidateFilter) { + update(); + } + } + + public abstract void fetchData(); + + protected abstract void update(); + + public void clearSubscriptions() { + clearDynamicValueSubscriptions(); + } + + @Data + private static class DynamicValueKeySub { + private final DynamicValueKey key; + private final EntityId entityId; + private long lastUpdateTs; + private String lastUpdateValue; + + boolean updateValue(TsValue value) { + if (value.getTs() > lastUpdateTs && (lastUpdateValue == null || !lastUpdateValue.equals(value.getValue()))) { + this.lastUpdateTs = value.getTs(); + this.lastUpdateValue = value.getValue(); + return true; + } else { + return false; + } + } + } + + private ListenableFuture resolveEntityValue(TenantId tenantId, EntityId entityId, DynamicValueKey key) { + ListenableFuture> entry = attributesService.find(tenantId, entityId, + TbAttributeSubscriptionScope.SERVER_SCOPE.name(), key.getSourceAttribute()); + return Futures.transform(entry, attributeOpt -> { + DynamicValueKeySub sub = new DynamicValueKeySub(key, entityId); + if (attributeOpt.isPresent()) { + AttributeKvEntry attribute = attributeOpt.get(); + sub.setLastUpdateTs(attribute.getLastUpdateTs()); + sub.setLastUpdateValue(attribute.getValueAsString()); + updateDynamicValuesByKey(sub, new TsValue(attribute.getLastUpdateTs(), attribute.getValueAsString())); + } + return sub; + }, MoreExecutors.directExecutor()); + } + + @SuppressWarnings("unchecked") + protected void updateDynamicValuesByKey(DynamicValueKeySub sub, TsValue tsValue) { + DynamicValueKey dvk = sub.getKey(); + switch (dvk.getPredicateType()) { + case STRING: + dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(tsValue.getValue())); + break; + case NUMERIC: + try { + Double dValue = Double.parseDouble(tsValue.getValue()); + dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(dValue)); + } catch (NumberFormatException e) { + dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(null)); + } + break; + case BOOLEAN: + Boolean bValue = Boolean.parseBoolean(tsValue.getValue()); + dynamicValues.get(dvk).forEach(dynamicValue -> dynamicValue.setResolvedValue(bValue)); + break; + } + } + + @SuppressWarnings("unchecked") + private void registerDynamicValues(KeyFilterPredicate predicate) { + switch (predicate.getType()) { + case STRING: + case NUMERIC: + case BOOLEAN: + Optional value = getDynamicValueFromSimplePredicate((SimpleKeyFilterPredicate) predicate); + if (value.isPresent()) { + DynamicValue dynamicValue = value.get(); + DynamicValueKey key = new DynamicValueKey( + predicate.getType(), + dynamicValue.getSourceType(), + dynamicValue.getSourceAttribute()); + dynamicValues.computeIfAbsent(key, tmp -> new ArrayList<>()).add(dynamicValue); + } + break; + case COMPLEX: + ((ComplexFilterPredicate) predicate).getPredicates().forEach(this::registerDynamicValues); + } + } + + private Optional> getDynamicValueFromSimplePredicate(SimpleKeyFilterPredicate predicate) { + if (predicate.getValue().getUserValue() == null) { + return Optional.ofNullable(predicate.getValue().getDynamicValue()); + } else { + return Optional.empty(); + } + } + + public String getSessionId() { + return sessionRef.getSessionId(); + } + + public TenantId getTenantId() { + return sessionRef.getSecurityCtx().getTenantId(); + } + + public CustomerId getCustomerId() { + return sessionRef.getSecurityCtx().getCustomerId(); + } + + public UserId getUserId() { + return sessionRef.getSecurityCtx().getId(); + } + + protected void clearDynamicValueSubscriptions() { + if (subToDynamicValueKeySet != null) { + for (Integer subId : subToDynamicValueKeySet) { + localSubscriptionService.cancelSubscription(sessionRef.getSessionId(), subId); + } + subToDynamicValueKeySet.clear(); + } + } + + public void setRefreshTask(ScheduledFuture task) { + this.refreshTask = task; + } + + public void cancelTasks() { + if (this.refreshTask != null) { + log.trace("[{}][{}] Canceling old refresh task", sessionRef.getSessionId(), cmdId); + this.refreshTask.cancel(true); + } + } + + @Data + public static class DynamicValueKey { + @Getter + private final FilterPredicateType predicateType; + @Getter + private final DynamicValueSourceType sourceType; + @Getter + private final String sourceAttribute; + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java index 95f15fcb3f..8af9ba2330 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbAlarmDataSubCtx.java @@ -90,8 +90,7 @@ public class TbAlarmDataSubCtx extends TbAbstractDataSubCtx { AlarmDataUpdate update; if (!entitiesMap.isEmpty()) { long start = System.currentTimeMillis(); - PageData alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(), - query, getOrderedEntityIds()); + PageData alarms = alarmService.findAlarmDataByQueryForEntities(getTenantId(), getCustomerId(), query, getOrderedEntityIds()); long end = System.currentTimeMillis(); stats.getAlarmQueryInvocationCnt().incrementAndGet(); stats.getAlarmQueryTimeSpent().addAndGet(end - start); diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java new file mode 100644 index 0000000000..e97892a37d --- /dev/null +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityCountSubCtx.java @@ -0,0 +1,55 @@ +/** + * Copyright © 2016-2021 The Thingsboard Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.thingsboard.server.service.subscription; + +import lombok.extern.slf4j.Slf4j; +import org.thingsboard.server.common.data.query.EntityCountQuery; +import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.dao.attributes.AttributesService; +import org.thingsboard.server.dao.entity.EntityService; +import org.thingsboard.server.service.telemetry.TelemetryWebSocketService; +import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; +import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; + +@Slf4j +public class TbEntityCountSubCtx extends TbAbstractSubCtx { + + private volatile int result; + + public TbEntityCountSubCtx(String serviceId, TelemetryWebSocketService wsService, EntityService entityService, + TbLocalSubscriptionService localSubscriptionService, AttributesService attributesService, + SubscriptionServiceStatistics stats, TelemetryWebSocketSessionRef sessionRef, int cmdId) { + super(serviceId, wsService, entityService, localSubscriptionService, attributesService, stats, sessionRef, cmdId); + } + + @Override + public void fetchData() { + result = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query); + wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result)); + } + + @Override + protected void update() { + int newCount = (int) entityService.countEntitiesByQuery(getTenantId(), getCustomerId(), query); + if (newCount != result) { + result = newCount; + wsService.sendWsMsg(sessionRef.getSessionId(), new EntityCountUpdate(cmdId, result)); + } + } + +} diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubscriptionService.java index 60abdeaa9c..8760997bfd 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubscriptionService.java @@ -17,6 +17,7 @@ package org.thingsboard.server.service.subscription; import org.thingsboard.server.service.telemetry.TelemetryWebSocketSessionRef; import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd; import org.thingsboard.server.service.telemetry.cmd.v2.UnsubscribeCmd; @@ -25,6 +26,8 @@ public interface TbEntityDataSubscriptionService { void handleCmd(TelemetryWebSocketSessionRef sessionId, EntityDataCmd cmd); + void handleCmd(TelemetryWebSocketSessionRef sessionId, EntityCountCmd cmd); + void handleCmd(TelemetryWebSocketSessionRef sessionId, AlarmDataCmd cmd); void cancelSubscription(String sessionId, UnsubscribeCmd subscriptionId); diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java index 2efbc2d0a8..eec45a9c8d 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/DefaultTelemetryWebSocketService.java @@ -51,22 +51,22 @@ import org.thingsboard.server.service.security.ValidationResult; import org.thingsboard.server.service.security.ValidationResultCode; import org.thingsboard.server.service.security.model.UserPrincipal; import org.thingsboard.server.service.security.permission.Operation; +import org.thingsboard.server.service.subscription.TbAttributeSubscription; +import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope; import org.thingsboard.server.service.subscription.TbEntityDataSubscriptionService; import org.thingsboard.server.service.subscription.TbLocalSubscriptionService; -import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope; -import org.thingsboard.server.service.subscription.TbAttributeSubscription; import org.thingsboard.server.service.subscription.TbTimeseriesSubscription; +import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper; import org.thingsboard.server.service.telemetry.cmd.v1.AttributesSubscriptionCmd; import org.thingsboard.server.service.telemetry.cmd.v1.GetHistoryCmd; import org.thingsboard.server.service.telemetry.cmd.v1.SubscriptionCmd; import org.thingsboard.server.service.telemetry.cmd.v1.TelemetryPluginCmd; -import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper; import org.thingsboard.server.service.telemetry.cmd.v1.TimeseriesSubscriptionCmd; import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataCmd; -import org.thingsboard.server.service.telemetry.cmd.v2.AlarmDataUnsubscribeCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.CmdUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.DataUpdate; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; -import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUnsubscribeCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.UnsubscribeCmd; import org.thingsboard.server.service.telemetry.exception.UnauthorizedException; @@ -216,12 +216,18 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi if (cmdsWrapper.getAlarmDataCmds() != null) { cmdsWrapper.getAlarmDataCmds().forEach(cmd -> handleWsAlarmDataCmd(sessionRef, cmd)); } + if (cmdsWrapper.getEntityCountCmds() != null) { + cmdsWrapper.getEntityCountCmds().forEach(cmd -> handleWsEntityCountCmd(sessionRef, cmd)); + } if (cmdsWrapper.getEntityDataUnsubscribeCmds() != null) { cmdsWrapper.getEntityDataUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd)); } if (cmdsWrapper.getAlarmDataUnsubscribeCmds() != null) { cmdsWrapper.getAlarmDataUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd)); } + if (cmdsWrapper.getEntityCountUnsubscribeCmds() != null) { + cmdsWrapper.getEntityCountUnsubscribeCmds().forEach(cmd -> handleWsDataUnsubscribeCmd(sessionRef, cmd)); + } } } catch (IOException e) { log.warn("Failed to decode subscription cmd: {}", e.getMessage(), e); @@ -239,6 +245,16 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } } + private void handleWsEntityCountCmd(TelemetryWebSocketSessionRef sessionRef, EntityCountCmd cmd) { + String sessionId = sessionRef.getSessionId(); + log.debug("[{}] Processing: {}", sessionId, cmd); + + if (validateSessionMetadata(sessionRef, cmd.getCmdId(), sessionId) + && validateSubscriptionCmd(sessionRef, cmd)) { + entityDataSubService.handleCmd(sessionRef, cmd); + } + } + private void handleWsAlarmDataCmd(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) { String sessionId = sessionRef.getSessionId(); log.debug("[{}] Processing: {}", sessionId, cmd); @@ -264,7 +280,7 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi } @Override - public void sendWsMsg(String sessionId, DataUpdate update) { + public void sendWsMsg(String sessionId, CmdUpdate update) { sendWsMsg(sessionId, update.getCmdId(), update); } @@ -679,6 +695,20 @@ public class DefaultTelemetryWebSocketService implements TelemetryWebSocketServi return true; } + private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, EntityCountCmd cmd) { + if (cmd.getCmdId() < 0) { + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, + "Cmd id is negative value!"); + sendWsMsg(sessionRef, update); + return false; + } else if (cmd.getQuery() == null) { + TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, "Query is empty!"); + sendWsMsg(sessionRef, update); + return false; + } + return true; + } + private boolean validateSubscriptionCmd(TelemetryWebSocketSessionRef sessionRef, AlarmDataCmd cmd) { if (cmd.getCmdId() < 0) { TelemetrySubscriptionUpdate update = new TelemetrySubscriptionUpdate(cmd.getCmdId(), SubscriptionErrorCode.BAD_REQUEST, diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java index 6834d26b4d..c8cdbd06af 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/TelemetryWebSocketService.java @@ -15,6 +15,7 @@ */ package org.thingsboard.server.service.telemetry; +import org.thingsboard.server.service.telemetry.cmd.v2.CmdUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.DataUpdate; import org.thingsboard.server.service.telemetry.sub.TelemetrySubscriptionUpdate; @@ -29,6 +30,6 @@ public interface TelemetryWebSocketService { void sendWsMsg(String sessionId, TelemetrySubscriptionUpdate update); - void sendWsMsg(String sessionId, DataUpdate update); + void sendWsMsg(String sessionId, CmdUpdate update); } diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/CmdUpdate.java b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/CmdUpdate.java index 1a78a4d29a..351dfec15c 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/CmdUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/CmdUpdate.java @@ -15,11 +15,13 @@ */ package org.thingsboard.server.service.telemetry.cmd.v2; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.AllArgsConstructor; import lombok.Data; @Data @AllArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) public abstract class CmdUpdate { private final int cmdId; diff --git a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/DataUpdate.java b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/DataUpdate.java index 5da682502c..d0658044d0 100644 --- a/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/DataUpdate.java +++ b/application/src/main/java/org/thingsboard/server/service/telemetry/cmd/v2/DataUpdate.java @@ -15,18 +15,12 @@ */ package org.thingsboard.server.service.telemetry.cmd.v2; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.EqualsAndHashCode; import lombok.Getter; -import lombok.ToString; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.service.telemetry.sub.SubscriptionErrorCode; import java.util.List; -@JsonIgnoreProperties(ignoreUnknown = true) public abstract class DataUpdate extends CmdUpdate { @Getter diff --git a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java index 299413d9c9..2249940c48 100644 --- a/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java +++ b/application/src/test/java/org/thingsboard/server/controller/BaseWebsocketApiTest.java @@ -35,16 +35,23 @@ import org.thingsboard.server.common.data.kv.LongDataEntry; import org.thingsboard.server.common.data.kv.TsKvEntry; import org.thingsboard.server.common.data.page.PageData; import org.thingsboard.server.common.data.query.DeviceTypeFilter; +import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityData; import org.thingsboard.server.common.data.query.EntityDataPageLink; import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityKey; import org.thingsboard.server.common.data.query.EntityKeyType; +import org.thingsboard.server.common.data.query.EntityKeyValueType; +import org.thingsboard.server.common.data.query.FilterPredicateValue; +import org.thingsboard.server.common.data.query.KeyFilter; +import org.thingsboard.server.common.data.query.NumericFilterPredicate; import org.thingsboard.server.common.data.query.TsValue; import org.thingsboard.server.common.data.security.Authority; import org.thingsboard.server.service.subscription.TbAttributeSubscriptionScope; import org.thingsboard.server.service.telemetry.TelemetrySubscriptionService; import org.thingsboard.server.service.telemetry.cmd.TelemetryPluginCmdsWrapper; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountCmd; +import org.thingsboard.server.service.telemetry.cmd.v2.EntityCountUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataCmd; import org.thingsboard.server.service.telemetry.cmd.v2.EntityDataUpdate; import org.thingsboard.server.service.telemetry.cmd.v2.EntityHistoryCmd; @@ -243,6 +250,98 @@ public class BaseWebsocketApiTest extends AbstractWebsocketTest { Assert.assertEquals(new TsValue(dataPoint4.getTs(), dataPoint4.getValueAsString()), tsValues[0]); } + @Test + public void testEntityCountWsCmd() throws Exception { + Device device = new Device(); + device.setName("Device"); + device.setType("default"); + device.setLabel("testLabel" + (int) (Math.random() * 1000)); + device = doPost("/api/device", device, Device.class); + + AttributeKvEntry dataPoint1 = new BaseAttributeKvEntry(System.currentTimeMillis(), new LongDataEntry("temperature", 42L)); + sendAttributes(device, TbAttributeSubscriptionScope.SERVER_SCOPE, Collections.singletonList(dataPoint1)); + + DeviceTypeFilter dtf1 = new DeviceTypeFilter(); + dtf1.setDeviceNameFilter("D"); + dtf1.setDeviceType("default"); + EntityCountQuery edq1 = new EntityCountQuery(dtf1, Collections.emptyList()); + + EntityCountCmd cmd1 = new EntityCountCmd(1, edq1); + + TelemetryPluginCmdsWrapper wrapper1 = new TelemetryPluginCmdsWrapper(); + wrapper1.setEntityCountCmds(Collections.singletonList(cmd1)); + + wsClient.send(mapper.writeValueAsString(wrapper1)); + String msg1 = wsClient.waitForReply(); + EntityCountUpdate update1 = mapper.readValue(msg1, EntityCountUpdate.class); + Assert.assertEquals(1, update1.getCmdId()); + Assert.assertEquals(1, update1.getCount()); + + DeviceTypeFilter dtf2 = new DeviceTypeFilter(); + dtf2.setDeviceNameFilter("D"); + dtf2.setDeviceType("non-existing-device-type"); + EntityCountQuery edq2 = new EntityCountQuery(dtf2, Collections.emptyList()); + + EntityCountCmd cmd2 = new EntityCountCmd(2, edq2); + + TelemetryPluginCmdsWrapper wrapper2 = new TelemetryPluginCmdsWrapper(); + wrapper2.setEntityCountCmds(Collections.singletonList(cmd2)); + wsClient.send(mapper.writeValueAsString(wrapper2)); + + String msg2 = wsClient.waitForReply(); + EntityCountUpdate update2 = mapper.readValue(msg2, EntityCountUpdate.class); + Assert.assertEquals(2, update2.getCmdId()); + Assert.assertEquals(0, update2.getCount()); + + KeyFilter highTemperatureFilter = new KeyFilter(); + highTemperatureFilter.setKey(new EntityKey(EntityKeyType.ATTRIBUTE, "temperature")); + NumericFilterPredicate predicate = new NumericFilterPredicate(); + predicate.setValue(FilterPredicateValue.fromDouble(40)); + predicate.setOperation(NumericFilterPredicate.NumericOperation.GREATER); + highTemperatureFilter.setPredicate(predicate); + highTemperatureFilter.setValueType(EntityKeyValueType.NUMERIC); + + DeviceTypeFilter dtf3 = new DeviceTypeFilter(); + dtf3.setDeviceNameFilter("D"); + dtf3.setDeviceType("default"); + EntityCountQuery edq3 = new EntityCountQuery(dtf3, Collections.singletonList(highTemperatureFilter)); + + EntityCountCmd cmd3 = new EntityCountCmd(3, edq3); + + TelemetryPluginCmdsWrapper wrapper3 = new TelemetryPluginCmdsWrapper(); + wrapper3.setEntityCountCmds(Collections.singletonList(cmd3)); + wsClient.send(mapper.writeValueAsString(wrapper3)); + + String msg3 = wsClient.waitForReply(); + EntityCountUpdate update3 = mapper.readValue(msg3, EntityCountUpdate.class); + Assert.assertEquals(3, update3.getCmdId()); + Assert.assertEquals(1, update3.getCount()); + + KeyFilter highTemperatureFilter2 = new KeyFilter(); + highTemperatureFilter2.setKey(new EntityKey(EntityKeyType.ATTRIBUTE, "temperature")); + NumericFilterPredicate predicate2 = new NumericFilterPredicate(); + predicate2.setValue(FilterPredicateValue.fromDouble(50)); + predicate2.setOperation(NumericFilterPredicate.NumericOperation.GREATER); + highTemperatureFilter2.setPredicate(predicate2); + highTemperatureFilter2.setValueType(EntityKeyValueType.NUMERIC); + + DeviceTypeFilter dtf4 = new DeviceTypeFilter(); + dtf4.setDeviceNameFilter("D"); + dtf4.setDeviceType("default"); + EntityCountQuery edq4 = new EntityCountQuery(dtf4, Collections.singletonList(highTemperatureFilter2)); + + EntityCountCmd cmd4 = new EntityCountCmd(4, edq4); + + TelemetryPluginCmdsWrapper wrapper4 = new TelemetryPluginCmdsWrapper(); + wrapper4.setEntityCountCmds(Collections.singletonList(cmd4)); + wsClient.send(mapper.writeValueAsString(wrapper4)); + + String msg4 = wsClient.waitForReply(); + EntityCountUpdate update4 = mapper.readValue(msg4, EntityCountUpdate.class); + Assert.assertEquals(4, update4.getCmdId()); + Assert.assertEquals(0, update4.getCount()); + } + @Test public void testEntityDataLatestWidgetFlow() throws Exception { Device device = new Device(); diff --git a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java index 3186765130..a386f18c37 100644 --- a/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java +++ b/application/src/test/java/org/thingsboard/server/controller/ControllerSqlTestSuite.java @@ -26,9 +26,9 @@ import java.util.Arrays; @RunWith(ClasspathSuite.class) @ClasspathSuite.ClassnameFilters({ -// "org.thingsboard.server.controller.sql.WebsocketApiSqlTest", + "org.thingsboard.server.controller.sql.WebsocketApiSqlTest", // "org.thingsboard.server.controller.sql.TenantProfileControllerSqlTest", - "org.thingsboard.server.controller.sql.*Test", +// "org.thingsboard.server.controller.sql.*Test", }) public class ControllerSqlTestSuite { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index 8e8bd6638e..170a2edfa2 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -249,18 +249,70 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { public long countEntitiesByQuery(TenantId tenantId, CustomerId customerId, EntityCountQuery query) { EntityType entityType = resolveEntityType(query.getEntityFilter()); QueryContext ctx = new QueryContext(new QuerySecurityContext(tenantId, customerId, entityType)); - ctx.append("select count(e.id) from "); - ctx.append(addEntityTableQuery(ctx, query.getEntityFilter())); - ctx.append(" e where "); - ctx.append(buildEntityWhere(ctx, query.getEntityFilter(), Collections.emptyList())); - return transactionTemplate.execute(status -> { - long startTs = System.currentTimeMillis(); - try { - return jdbcTemplate.queryForObject(ctx.getQuery(), ctx, Long.class); - } finally { - queryLog.logQuery(ctx, ctx.getQuery(), System.currentTimeMillis() - startTs); + if (query.getKeyFilters() == null || query.getKeyFilters().isEmpty()) { + ctx.append("select count(e.id) from "); + ctx.append(addEntityTableQuery(ctx, query.getEntityFilter())); + ctx.append(" e where "); + ctx.append(buildEntityWhere(ctx, query.getEntityFilter(), Collections.emptyList())); + return transactionTemplate.execute(status -> { + long startTs = System.currentTimeMillis(); + try { + return jdbcTemplate.queryForObject(ctx.getQuery(), ctx, Long.class); + } finally { + queryLog.logQuery(ctx, ctx.getQuery(), System.currentTimeMillis() - startTs); + } + }); + } else { + List mappings = EntityKeyMapping.prepareEntityCountKeyMapping(query); + + List selectionMapping = mappings.stream().filter(EntityKeyMapping::isSelection) + .collect(Collectors.toList()); + List entityFieldsSelectionMapping = selectionMapping.stream().filter(mapping -> !mapping.isLatest()) + .collect(Collectors.toList()); + + List filterMapping = mappings.stream().filter(EntityKeyMapping::hasFilter) + .collect(Collectors.toList()); + List entityFieldsFiltersMapping = filterMapping.stream().filter(mapping -> !mapping.isLatest()) + .collect(Collectors.toList()); + + List allLatestMappings = mappings.stream().filter(EntityKeyMapping::isLatest) + .collect(Collectors.toList()); + + + String entityWhereClause = DefaultEntityQueryRepository.this.buildEntityWhere(ctx, query.getEntityFilter(), entityFieldsFiltersMapping); + String latestJoinsCnt = EntityKeyMapping.buildLatestJoins(ctx, query.getEntityFilter(), entityType, allLatestMappings, true); + String entityFieldsSelection = EntityKeyMapping.buildSelections(entityFieldsSelectionMapping, query.getEntityFilter().getType(), entityType); + String entityTypeStr; + if (query.getEntityFilter().getType().equals(EntityFilterType.RELATIONS_QUERY)) { + entityTypeStr = "e.entity_type"; + } else { + entityTypeStr = "'" + entityType.name() + "'"; } - }); + if (!StringUtils.isEmpty(entityFieldsSelection)) { + entityFieldsSelection = String.format("e.id id, %s entity_type, %s", entityTypeStr, entityFieldsSelection); + } else { + entityFieldsSelection = String.format("e.id id, %s entity_type", entityTypeStr); + } + + String fromClauseCount = String.format("from (select %s from (select %s from %s e where %s) entities %s ) result %s", + "entities.*", + entityFieldsSelection, + addEntityTableQuery(ctx, query.getEntityFilter()), + entityWhereClause, + latestJoinsCnt, + ""); + + String countQuery = String.format("select count(id) %s", fromClauseCount); + + return transactionTemplate.execute(status -> { + long startTs = System.currentTimeMillis(); + try { + return jdbcTemplate.queryForObject(countQuery, ctx, Long.class); + } finally { + queryLog.logQuery(ctx, ctx.getQuery(), System.currentTimeMillis() - startTs); + } + }); + } } @Override diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityKeyMapping.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityKeyMapping.java index 0bacdf6349..313a44f4a3 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityKeyMapping.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/EntityKeyMapping.java @@ -21,6 +21,7 @@ import org.thingsboard.server.common.data.DataConstants; import org.thingsboard.server.common.data.EntityType; import org.thingsboard.server.common.data.query.BooleanFilterPredicate; import org.thingsboard.server.common.data.query.ComplexFilterPredicate; +import org.thingsboard.server.common.data.query.EntityCountQuery; import org.thingsboard.server.common.data.query.EntityDataQuery; import org.thingsboard.server.common.data.query.EntityDataSortOrder; import org.thingsboard.server.common.data.query.EntityFilter; @@ -380,6 +381,30 @@ public class EntityKeyMapping { return mappings; } + public static List prepareEntityCountKeyMapping(EntityCountQuery query) { + Map> filters = + query.getKeyFilters() != null ? + query.getKeyFilters().stream().collect(Collectors.groupingBy(KeyFilter::getKey)) : Collections.emptyMap(); + int index = 2; + List mappings = new ArrayList<>(); + if (!filters.isEmpty()) { + for (EntityKey filterField : filters.keySet()) { + EntityKeyMapping mapping = new EntityKeyMapping(); + mapping.setIndex(index); + mapping.setAlias(String.format("alias%s", index)); + mapping.setKeyFilters(filters.get(filterField)); + mapping.setLatest(!filterField.getType().equals(EntityKeyType.ENTITY_FIELD)); + mapping.setSelection(false); + mapping.setEntityKey(filterField); + mappings.add(mapping); + index += 1; + } + } + + return mappings; + } + + private String buildAttributeSelection() { return buildTimeSeriesOrAttrSelection(true); }