/* * Squidex Headless CMS * * @license * Copyright (c) Squidex UG (haftungsbeschränkt). All rights reserved. */ // tslint:disable: only-arrow-functions import { empty, Observable } from 'rxjs'; import { catchError, distinctUntilChanged, filter, map, onErrorResumeNext, publishReplay, refCount, switchMap } from 'rxjs/operators'; import { DialogService } from './../services/dialog.service'; import { Version, versioned, Versioned } from './version'; export function mapVersioned(project: (value: T, version: Version) => R) { return function mapOperation(source: Observable>) { return source.pipe(map, Versioned>(({ version, payload }) => { return versioned(version, project(payload, version)); })); }; } type Options = { silent?: boolean }; export function shareSubscribed(dialogs: DialogService, options?: Options) { return shareMapSubscribed(dialogs, x => x, options); } export function shareMapSubscribed(dialogs: DialogService, project: (value: T) => R, options?: Options) { return function mapOperation(source: Observable) { const shared = source.pipe(publishReplay(), refCount()); shared.pipe( catchError(error => { if (!options || !options.silent) { dialogs.notifyError(error); } return empty(); })) .subscribe(); return shared.pipe(map(x => project(x))); }; } export function defined() { return function mapOperation(source: Observable): Observable { return source.pipe(filter(x => !!x), map(x => x!), distinctUntilChanged()); }; } export function switchSafe(project: (source: T) => Observable) { return function mapOperation(source: Observable) { return source.pipe(switchMap(project), onErrorResumeNext()); }; } export function ofForever(...values: ReadonlyArray) { return new Observable(s => { for (const value of values) { s.next(value); } }); }