Browse Source

Entity data query fixes

pull/3053/head
Igor Kulikov 6 years ago
parent
commit
e6e9be18ea
  1. 9
      application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java
  2. 26
      application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java
  3. 26
      dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java
  4. 41
      ui-ngx/src/app/core/api/entity-data-subscription.ts
  5. 4
      ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts

9
application/src/main/java/org/thingsboard/server/service/subscription/DefaultTbEntityDataSubscriptionService.java

@ -401,9 +401,14 @@ public class DefaultTbEntityDataSubscriptionService implements TbEntityDataSubsc
new EntityDataUpdate(ctx.getCmdId(), SubscriptionErrorCode.INTERNAL_ERROR.getCode(), "Failed to fetch historical data!"));
}
});
EntityDataUpdate update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
EntityDataUpdate update;
if (!ctx.isInitialDataSent()) {
update = new EntityDataUpdate(ctx.getCmdId(), ctx.getData(), null);
ctx.setInitialDataSent(true);
} else {
update = new EntityDataUpdate(ctx.getCmdId(), null, ctx.getData().getData());
}
wsService.sendWsMsg(ctx.getSessionId(), update);
ctx.setInitialDataSent(true);
return ctx;
}, wsCallBackExecutor);
}

26
application/src/main/java/org/thingsboard/server/service/subscription/TbEntityDataSubCtx.java

@ -191,12 +191,14 @@ public class TbEntityDataSubCtx {
if (latestCtxValues != null) {
latestCtxValues.forEach((k, v) -> {
TsValue update = latestUpdate.get(k);
if (update.getTs() < v.getTs()) {
log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
latestUpdate.remove(k);
} else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
latestUpdate.remove(k);
if (update != null) {
if (update.getTs() < v.getTs()) {
log.trace("[{}][{}][{}] Removed stale update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
latestUpdate.remove(k);
} else if ((update.getTs() == v.getTs() && update.getValue().equals(v.getValue()))) {
log.trace("[{}][{}][{}] Removed duplicate update for key: {} and ts: {}", sessionId, cmdId, subscriptionUpdate.getSubscriptionId(), k, update.getTs());
latestUpdate.remove(k);
}
}
});
//Setting new values
@ -204,8 +206,16 @@ public class TbEntityDataSubCtx {
}
}
if (!latestUpdate.isEmpty()) {
Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);
entityData = new EntityData(entityId, latestMap, null);
if (resultToLatestValues) {
Map<EntityKeyType, Map<String, TsValue>> latestMap = Collections.singletonMap(keyType, latestUpdate);
entityData = new EntityData(entityId, latestMap, null);
} else {
Map<String, TsValue[]> tsMap = new HashMap<>();
latestUpdate.forEach((key, tsValue) -> {
tsMap.put(key, new TsValue[]{tsValue});
});
entityData = new EntityData(entityId, null, tsMap);
}
wsService.sendWsMsg(sessionId, new EntityDataUpdate(cmdId, null, Collections.singletonList(entityData)));
}
} else {

26
dao/src/main/java/org/thingsboard/server/dao/sql/query/DefaultEntityQueryRepository.java

@ -206,25 +206,31 @@ public class DefaultEntityQueryRepository implements EntityQueryRepository {
case RELATIONS_QUERY:
case DEVICE_SEARCH_QUERY:
case ASSET_SEARCH_QUERY:
ctx.addUuidParameter("permissions_tenant_id", tenantId.getId());
ctx.addUuidParameter("permissions_customer_id", customerId.getId());
return "e.tenant_id=:permissions_tenant_id and e.customer_id=:permissions_customer_id";
return this.defaultPermissionQuery(ctx, tenantId, customerId, entityType);
default:
if (entityType == EntityType.TENANT) {
ctx.addUuidParameter("permissions_tenant_id", tenantId.getId());
return "e.id=:permissions_tenant_id";
} else if (entityType == EntityType.CUSTOMER) {
ctx.addUuidParameter("permissions_tenant_id", tenantId.getId());
ctx.addUuidParameter("permissions_customer_id", customerId.getId());
return "e.tenant_id=:permissions_tenant_id and e.id=:permissions_customer_id";
} else {
ctx.addUuidParameter("permissions_tenant_id", tenantId.getId());
ctx.addUuidParameter("permissions_customer_id", customerId.getId());
return "e.tenant_id=:permissions_tenant_id and e.customer_id=:permissions_customer_id";
return this.defaultPermissionQuery(ctx, tenantId, customerId, entityType);
}
}
}
private String defaultPermissionQuery(EntityQueryContext ctx, TenantId tenantId, CustomerId customerId, EntityType entityType) {
ctx.addUuidParameter("permissions_tenant_id", tenantId.getId());
if (customerId != null && !customerId.isNullUid()) {
ctx.addUuidParameter("permissions_customer_id", customerId.getId());
if (entityType == EntityType.CUSTOMER) {
return "e.tenant_id=:permissions_tenant_id and e.id=:permissions_customer_id";
} else {
return "e.tenant_id=:permissions_tenant_id and e.customer_id=:permissions_customer_id";
}
} else {
return "e.tenant_id=:permissions_tenant_id";
}
}
private String buildEntityFilterQuery(EntityQueryContext ctx, EntityFilter entityFilter) {
switch (entityFilter.getType()) {
case SINGLE_ENTITY:

41
ui-ngx/src/app/core/api/entity-data-subscription.ts

@ -222,21 +222,26 @@ export class EntityDataSubscription {
);
this.subscriber.reconnect$.subscribe(() => {
const newSubsTw: SubscriptionTimewindow = this.listener.updateRealtimeSubscription();
this.listener.setRealtimeSubscription(newSubsTw);
this.subsTw = newSubsTw;
if (this.started && !this.entityDataSubscriptionOptions.isLatestDataSubscription) {
this.subsCommand.tsCmd.startTs = this.subsTw.startTs;
this.subsCommand.tsCmd.timeWindow = this.subsTw.aggregation.timeWindow;
this.subsCommand.tsCmd.interval = this.subsTw.aggregation.interval;
this.subsCommand.tsCmd.limit = this.subsTw.aggregation.limit;
this.subsCommand.tsCmd.agg = this.subsTw.aggregation.type;
if (this.subsTw.aggregation.stateData) {
this.subsCommand.historyCmd.startTs = this.subsTw.startTs - YEAR;
this.subsCommand.historyCmd.endTs = this.subsTw.startTs;
this.subsCommand.historyCmd.interval = this.subsTw.aggregation.interval;
this.subsCommand.historyCmd.limit = this.subsTw.aggregation.limit;
this.subsCommand.historyCmd.agg = this.subsTw.aggregation.type;
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries &&
!this.history && this.tsFields.length) {
const newSubsTw: SubscriptionTimewindow = this.listener.updateRealtimeSubscription();
this.subsTw = newSubsTw;
this.subsCommand.tsCmd.startTs = this.subsTw.startTs;
this.subsCommand.tsCmd.timeWindow = this.subsTw.aggregation.timeWindow;
this.subsCommand.tsCmd.interval = this.subsTw.aggregation.interval;
this.subsCommand.tsCmd.limit = this.subsTw.aggregation.limit;
this.subsCommand.tsCmd.agg = this.subsTw.aggregation.type;
if (this.subsTw.aggregation.stateData) {
this.subsCommand.historyCmd.startTs = this.subsTw.startTs - YEAR;
this.subsCommand.historyCmd.endTs = this.subsTw.startTs;
this.subsCommand.historyCmd.interval = this.subsTw.aggregation.interval;
this.subsCommand.historyCmd.limit = this.subsTw.aggregation.limit;
this.subsCommand.historyCmd.agg = this.subsTw.aggregation.type;
}
this.dataAggregators.forEach((dataAggregator) => {
dataAggregator.reset(newSubsTw.startTs, newSubsTw.aggregation.timeWindow, newSubsTw.aggregation.interval);
});
}
this.subsCommand.query = this.dataCommand.query;
this.subscriber.subscriptionCommands = [this.subsCommand];
@ -370,8 +375,10 @@ export class EntityDataSubscription {
};
}
}
this.subscriber.subscriptionCommands = [this.subsCommand];
this.subscriber.update();
if (!this.subsCommand.isEmpty()) {
this.subscriber.subscriptionCommands = [this.subsCommand];
this.subscriber.update();
}
} else if (this.datasourceType === DatasourceType.function) {
this.frequency = 1000;
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries) {
@ -480,7 +487,7 @@ export class EntityDataSubscription {
}
if (this.entityDataSubscriptionOptions.type === widgetType.timeseries && entityData.timeseries) {
const subscriptionData = this.toSubscriptionData(entityData.timeseries, true);
if (aggregate) {
if (!this.history && aggregate) {
this.dataAggregators[dataIndex].onData({data: subscriptionData}, false, false, true);
} else {
this.onData(subscriptionData, DataKeyType.timeseries, dataIndex, true, dataUpdatedCb);

4
ui-ngx/src/app/shared/models/telemetry/telemetry.models.ts

@ -157,6 +157,10 @@ export class EntityDataCmd implements WebsocketCmd {
historyCmd?: EntityHistoryCmd;
latestCmd?: LatestValueCmd;
tsCmd?: TimeSeriesCmd;
public isEmpty(): boolean {
return !this.query && !this.historyCmd && !this.latestCmd && !this.tsCmd;
}
}
export class EntityDataUnsubscribeCmd implements WebsocketCmd {

Loading…
Cancel
Save