Browse Source

fix: add subscriberCmdIds reverse index and defensive guard in processPendingUpdates

pull/15363/head
ababak 2 months ago
parent
commit
3bd5d424e7
  1. 21
      ui-ngx/src/app/core/ws/telemetry-websocket.service.ts
  2. 11
      ui-ngx/src/app/core/ws/websocket.service.ts

21
ui-ngx/src/app/core/ws/telemetry-websocket.service.ts

@ -81,6 +81,12 @@ export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscri
const cmdId = this.nextCmdId();
if (!(subscriptionCommand instanceof MarkAsReadCmd) && !(subscriptionCommand instanceof MarkAllAsReadCmd)) {
this.subscribersMap.set(cmdId, subscriber);
let cmdIds = this.subscriberCmdIds.get(subscriber);
if (!cmdIds) {
cmdIds = new Set<number>();
this.subscriberCmdIds.set(subscriber, cmdIds);
}
cmdIds.add(cmdId);
}
subscriptionCommand.cmdId = cmdId;
this.cmdWrapper.cmds.push(subscriptionCommand);
@ -141,6 +147,13 @@ export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscri
const cmdId = subscriptionCommand.cmdId;
if (cmdId) {
this.subscribersMap.delete(cmdId);
const cmdIds = this.subscriberCmdIds.get(subscriber);
if (cmdIds) {
cmdIds.delete(cmdId);
if (cmdIds.size === 0) {
this.subscriberCmdIds.delete(subscriber);
}
}
}
}
);
@ -183,11 +196,9 @@ export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscri
private syncCommandId(subscriptionCommand: WebsocketCmd, subscriber: TelemetrySubscriber) {
if (!this.subscribersMap.has(subscriptionCommand.cmdId)) {
for (const [cmdId, sub] of this.subscribersMap.entries()) {
if (sub === subscriber) {
subscriptionCommand.cmdId = cmdId;
break;
}
const cmdIds = this.subscriberCmdIds.get(subscriber);
if (cmdIds?.size) {
subscriptionCommand.cmdId = cmdIds.values().next().value;
}
}
}

11
ui-ngx/src/app/core/ws/websocket.service.ts

@ -50,6 +50,7 @@ export abstract class WebsocketService<T extends WsSubscriber> implements WsServ
lastCmdId = 0;
subscribersCount = 0;
subscribersMap = new Map<number, TelemetrySubscriber | NotificationSubscriber>();
subscriberCmdIds = new Map<WsSubscriber, Set<number>>();
reconnectSubscribers = new Set<WsSubscriber>();
pendingUpdates = new Set<T>();
@ -136,6 +137,7 @@ export abstract class WebsocketService<T extends WsSubscriber> implements WsServ
}
this.lastCmdId = 0;
this.subscribersMap.clear();
this.subscriberCmdIds.clear();
this.subscribersCount = 0;
this.cmdWrapper.clear();
if (close) {
@ -304,10 +306,13 @@ export abstract class WebsocketService<T extends WsSubscriber> implements WsServ
private processPendingUpdates() {
if (this.pendingUpdates.size > 0) {
this.pendingUpdates.forEach((subscriber) => {
this.update(subscriber);
});
const subscribers = Array.from(this.pendingUpdates);
this.pendingUpdates.clear();
for (const subscriber of subscribers) {
if (this.subscriberCmdIds.has(subscriber)) {
this.update(subscriber);
}
}
}
}
}

Loading…
Cancel
Save