/* * Squidex Headless CMS * * @license * Copyright (c) Squidex UG (haftungsbeschränkt). All rights reserved. */ import { EMPTY, Observable, ReplaySubject, throwError } from 'rxjs'; import { catchError, distinctUntilChanged, filter, map, onErrorResumeNext, share, switchMap } from 'rxjs/operators'; import { DialogService } from './../services/dialog.service'; import { Version, versioned, Versioned } from './version'; export function mapVersioned(project: (value: T, version: Version) => R) { return function mapOperation(source: Observable>) { return source.pipe(map, Versioned>(({ version, payload }) => { return versioned(version, project(payload, version)); })); }; } type Options = { silent?: boolean; throw?: boolean }; export function shareSubscribed(dialogs: DialogService, options?: Options) { return shareMapSubscribed(dialogs, x => x, options); } export function shareMapSubscribed(dialogs: DialogService, project: (value: T) => R, options?: Options) { return function mapOperation(source: Observable) { const shared = source.pipe(share({ connector: () => new ReplaySubject(1), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false, })); shared.pipe( catchError(error => { if (!options || !options.silent) { dialogs.notifyError(error); } if (options?.throw) { return throwError(() => error); } return EMPTY; })) .subscribe(); return shared.pipe(map(project)); }; } export function defined() { return function mapOperation(source: Observable): Observable { return source.pipe(filter(x => !!x), map(x => x!), distinctUntilChanged()); }; } export function switchSafe(project: (source: T) => Observable) { return function mapOperation(source: Observable) { return source.pipe(switchMap(project), onErrorResumeNext()); }; } export function ofForever(...values: ReadonlyArray) { return new Observable(s => { for (const value of values) { s.next(value); } }); }