Browse Source

Merge da340fa554 into 767fdbef6d

pull/15363/merge
Artem Babak 5 days ago
committed by GitHub
parent
commit
25531586ad
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 26
      ui-ngx/src/app/core/ws/telemetry-websocket.service.ts
  2. 17
      ui-ngx/src/app/core/ws/websocket.service.ts

26
ui-ngx/src/app/core/ws/telemetry-websocket.service.ts

@ -50,6 +50,7 @@ import {
UnreadCountSubCmd,
UnreadSubCmd,
UnsubscribeCmd,
WebsocketCmd,
WebsocketDataMsg
} from '@app/shared/models/telemetry/telemetry.models';
import { Store } from '@ngrx/store';
@ -80,6 +81,12 @@ export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscri
const cmdId = this.nextCmdId();
if (!(subscriptionCommand instanceof MarkAsReadCmd) && !(subscriptionCommand instanceof MarkAllAsReadCmd)) {
this.subscribersMap.set(cmdId, subscriber);
let cmdIds = this.subscriberCmdIds.get(subscriber);
if (!cmdIds) {
cmdIds = new Set<number>();
this.subscriberCmdIds.set(subscriber, cmdIds);
}
cmdIds.add(cmdId);
}
subscriptionCommand.cmdId = cmdId;
this.cmdWrapper.cmds.push(subscriptionCommand);
@ -94,11 +101,14 @@ export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscri
subscriber.subscriptionCommands.forEach(
(subscriptionCommand) => {
if (subscriptionCommand.cmdId && (subscriptionCommand instanceof EntityDataCmd || subscriptionCommand instanceof UnreadSubCmd)) {
this.syncCommandId(subscriptionCommand, subscriber);
this.cmdWrapper.cmds.push(subscriptionCommand);
}
}
);
this.publishCommands();
} else {
this.pendingUpdates.add(subscriber);
}
}
@ -137,6 +147,13 @@ export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscri
const cmdId = subscriptionCommand.cmdId;
if (cmdId) {
this.subscribersMap.delete(cmdId);
const cmdIds = this.subscriberCmdIds.get(subscriber);
if (cmdIds) {
cmdIds.delete(cmdId);
if (cmdIds.size === 0) {
this.subscriberCmdIds.delete(subscriber);
}
}
}
}
);
@ -177,4 +194,13 @@ export class TelemetryWebsocketService extends WebsocketService<TelemetrySubscri
}
}
private syncCommandId(subscriptionCommand: WebsocketCmd, subscriber: TelemetrySubscriber) {
if (this.subscribersMap.get(subscriptionCommand.cmdId) !== subscriber) {
const cmdIds = this.subscriberCmdIds.get(subscriber);
if (cmdIds?.size) {
subscriptionCommand.cmdId = cmdIds.values().next().value;
}
}
}
}

17
ui-ngx/src/app/core/ws/websocket.service.ts

@ -50,8 +50,10 @@ export abstract class WebsocketService<T extends WsSubscriber> implements WsServ
lastCmdId = 0;
subscribersCount = 0;
subscribersMap = new Map<number, TelemetrySubscriber | NotificationSubscriber>();
subscriberCmdIds = new Map<WsSubscriber, Set<number>>();
reconnectSubscribers = new Set<WsSubscriber>();
pendingUpdates = new Set<T>();
wsUri: string;
@ -135,11 +137,13 @@ export abstract class WebsocketService<T extends WsSubscriber> implements WsServ
}
this.lastCmdId = 0;
this.subscribersMap.clear();
this.subscriberCmdIds.clear();
this.subscribersCount = 0;
this.cmdWrapper.clear();
if (close) {
this.reconnectAttempts = 0;
this.lastShownCloseCode = null;
this.pendingUpdates.clear();
this.closeSocket();
}
}
@ -223,6 +227,7 @@ export abstract class WebsocketService<T extends WsSubscriber> implements WsServ
}
);
this.reconnectSubscribers.clear();
this.processPendingUpdates();
} else {
this.publishCommands();
}
@ -298,4 +303,16 @@ export abstract class WebsocketService<T extends WsSubscriber> implements WsServ
message, type: notificationType
}));
}
private processPendingUpdates() {
if (this.pendingUpdates.size > 0) {
const subscribers = Array.from(this.pendingUpdates);
this.pendingUpdates.clear();
for (const subscriber of subscribers) {
if (this.subscriberCmdIds.has(subscriber)) {
this.update(subscriber);
}
}
}
}
}

Loading…
Cancel
Save