diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java index 80c9b80b55..a9277a50e8 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java @@ -401,9 +401,14 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!")); } }); - EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null); + EntityDataUpdate update; + if (!ctx.isInitialDataSent()) { + update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null); + ctx.setInitialDataSent(true); + } else { + update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData()); + } wsService.sendWsMsg(ctx.getSessionId(), update); - ctx.setInitialDataSent(true); return ctx; }, wsCallBackExecutor); } diff --git a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java index de3811d16b..47f0885ccf 100644 --- a/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java +++ b/application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java @@ -191,12 +191,14 @@ public class TbEntityDataSubCtx { if (latestCtxValues != null) { latestCtxValues.forEach((k, v) -> { TsValue update = latestUpdate.get(k); - if (update.getTs() < v.getTs()) { - log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); - latestUpdate.remove(k); - } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { - log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); - latestUpdate.remove(k); + if (update != null) { + if (update.getTs() < v.getTs()) { + log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); + } else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) { + log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs()); + latestUpdate.remove(k); + } } }); //Setting new values @@ -204,8 +206,16 @@ public class TbEntityDataSubCtx { } } if (!latestUpdate.isEmpty()) { - Map> latestMap = Collections.singletonMap(keyType, latestUpdate); - entityData = new EntityData(entityId, latestMap, null); + if (resultToLatestValues) { + Map> latestMap = Collections.singletonMap(keyType, latestUpdate); + entityData = new EntityData(entityId, latestMap, null); + } else { + Map tsMap = new HashMap<>(); + latestUpdate.forEach((key, tsValue) -> { + tsMap.put(key, new TsValue[]{tsValue}); + }); + entityData = new EntityData(entityId, null, tsMap); + } wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData))); } } else { diff --git a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java index 8342b83c0f..86f5e183e8 100644 --- a/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java +++ b/dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java @@ -206,25 +206,31 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository { case RELATIONS_QUERY: case DEVICE_SEARCH_QUERY: case ASSET_SEARCH_QUERY: - ctx.addUuidParameter("permissions_tenant_id", tenantId.getId()); - ctx.addUuidParameter("permissions_customer_id", customerId.getId()); - return "e.tenant_id=:permissions_tenant_id and e.customer_id=:permissions_customer_id"; + return this.defaultPermissionQuery(ctx, tenantId, customerId, entityType); default: if (entityType == EntityType.TENANT) { ctx.addUuidParameter("permissions_tenant_id", tenantId.getId()); return "e.id=:permissions_tenant_id"; - } else if (entityType == EntityType.CUSTOMER) { - ctx.addUuidParameter("permissions_tenant_id", tenantId.getId()); - ctx.addUuidParameter("permissions_customer_id", customerId.getId()); - return "e.tenant_id=:permissions_tenant_id and e.id=:permissions_customer_id"; } else { - ctx.addUuidParameter("permissions_tenant_id", tenantId.getId()); - ctx.addUuidParameter("permissions_customer_id", customerId.getId()); - return "e.tenant_id=:permissions_tenant_id and e.customer_id=:permissions_customer_id"; + return this.defaultPermissionQuery(ctx, tenantId, customerId, entityType); } } } + private String defaultPermissionQuery(EntityQueryContext ctx, TenantId tenantId, CustomerId customerId, EntityType entityType) { + ctx.addUuidParameter("permissions_tenant_id", tenantId.getId()); + if (customerId != null && !customerId.isNullUid()) { + ctx.addUuidParameter("permissions_customer_id", customerId.getId()); + if (entityType == EntityType.CUSTOMER) { + return "e.tenant_id=:permissions_tenant_id and e.id=:permissions_customer_id"; + } else { + return "e.tenant_id=:permissions_tenant_id and e.customer_id=:permissions_customer_id"; + } + } else { + return "e.tenant_id=:permissions_tenant_id"; + } + } + private String buildEntityFilterQuery(EntityQueryContext ctx, EntityFilter entityFilter) { switch (entityFilter.getType()) { case SINGLE_ENTITY: diff --git a/ui-ngx/src/app/core/api/entity-data-subscription.ts b/ui-ngx/src/app/core/api/entity-data-subscription.ts index 6a3f9a7f48..5db1827236 100644 --- a/ui-ngx/src/app/core/api/entity-data-subscription.ts +++ b/ui-ngx/src/app/core/api/entity-data-subscription.ts @@ -222,21 +222,26 @@ export class EntityDataSubscription { ); this.subscriber.reconnect$.subscribe(() => { - const newSubsTw: SubscriptionTimewindow = this.listener.updateRealtimeSubscription(); - this.listener.setRealtimeSubscription(newSubsTw); - this.subsTw = newSubsTw; if (this.started && !this.entityDataSubscriptionOptions.isLatestDataSubscription) { - this.subsCommand.tsCmd.startTs = this.subsTw.startTs; - this.subsCommand.tsCmd.timeWindow = this.subsTw.aggregation.timeWindow; - this.subsCommand.tsCmd.interval = this.subsTw.aggregation.interval; - this.subsCommand.tsCmd.limit = this.subsTw.aggregation.limit; - this.subsCommand.tsCmd.agg = this.subsTw.aggregation.type; - if (this.subsTw.aggregation.stateData) { - this.subsCommand.historyCmd.startTs = this.subsTw.startTs - YEAR; - this.subsCommand.historyCmd.endTs = this.subsTw.startTs; - this.subsCommand.historyCmd.interval = this.subsTw.aggregation.interval; - this.subsCommand.historyCmd.limit = this.subsTw.aggregation.limit; - this.subsCommand.historyCmd.agg = this.subsTw.aggregation.type; + if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && + !this.history && this.tsFields.length) { + const newSubsTw: SubscriptionTimewindow = this.listener.updateRealtimeSubscription(); + this.subsTw = newSubsTw; + this.subsCommand.tsCmd.startTs = this.subsTw.startTs; + this.subsCommand.tsCmd.timeWindow = this.subsTw.aggregation.timeWindow; + this.subsCommand.tsCmd.interval = this.subsTw.aggregation.interval; + this.subsCommand.tsCmd.limit = this.subsTw.aggregation.limit; + this.subsCommand.tsCmd.agg = this.subsTw.aggregation.type; + if (this.subsTw.aggregation.stateData) { + this.subsCommand.historyCmd.startTs = this.subsTw.startTs - YEAR; + this.subsCommand.historyCmd.endTs = this.subsTw.startTs; + this.subsCommand.historyCmd.interval = this.subsTw.aggregation.interval; + this.subsCommand.historyCmd.limit = this.subsTw.aggregation.limit; + this.subsCommand.historyCmd.agg = this.subsTw.aggregation.type; + } + this.dataAggregators.forEach((dataAggregator) => { + dataAggregator.reset(newSubsTw.startTs, newSubsTw.aggregation.timeWindow, newSubsTw.aggregation.interval); + }); } this.subsCommand.query = this.dataCommand.query; this.subscriber.subscriptionCommands = [this.subsCommand]; @@ -370,8 +375,10 @@ export class EntityDataSubscription { }; } } - this.subscriber.subscriptionCommands = [this.subsCommand]; - this.subscriber.update(); + if (!this.subsCommand.isEmpty()) { + this.subscriber.subscriptionCommands = [this.subsCommand]; + this.subscriber.update(); + } } else if (this.datasourceType === DatasourceType.function) { this.frequency = 1000; if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) { @@ -480,7 +487,7 @@ export class EntityDataSubscription { } if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && entityData.timeseries) { const subscriptionData = this.toSubscriptionData(entityData.timeseries, true); - if (aggregate) { + if (!this.history && aggregate) { this.dataAggregators[dataIndex].onData({data: subscriptionData}, false, false, true); } else { this.onData(subscriptionData, DataKeyType.timeseries, dataIndex, true, dataUpdatedCb); diff --git a/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts b/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts index c0ad2c80a0..c35946dcba 100644 --- a/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts +++ b/ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts @@ -157,6 +157,10 @@ export class EntityDataCmd implements WebsocketCmd { historyCmd?: EntityHistoryCmd; latestCmd?: LatestValueCmd; tsCmd?: TimeSeriesCmd; + + public isEmpty(): boolean { + return !this.query && !this.historyCmd && !this.latestCmd && !this.tsCmd; + } } export class EntityDataUnsubscribeCmd implements WebsocketCmd {