diff --git a/src/Avalonia.Styling/Styling/ActivatedObservable.cs b/src/Avalonia.Styling/Styling/ActivatedObservable.cs index 2cd324fff4..1d9af6ed41 100644 --- a/src/Avalonia.Styling/Styling/ActivatedObservable.cs +++ b/src/Avalonia.Styling/Styling/ActivatedObservable.cs @@ -2,8 +2,8 @@ // Licensed under the MIT license. See licence.md file in the project root for full license information. using System; -using System.Reactive; -using System.Reactive.Linq; +using System.Collections.Generic; +using System.Reactive.Disposables; namespace Avalonia.Styling { @@ -17,8 +17,17 @@ namespace Avalonia.Styling /// value. When the activator produces false it will produce /// . /// - internal class ActivatedObservable : ObservableBase, IDescription + internal class ActivatedObservable : IObservable, IDescription { + private static readonly object NotSent = new object(); + private readonly Listener _listener; + private List> _observers; + private IDisposable _activatorSubscription; + private IDisposable _sourceSubscription; + private bool? _active; + private object _value; + private object _last = NotSent; + /// /// Initializes a new instance of the class. /// @@ -36,6 +45,7 @@ namespace Avalonia.Styling Activator = activator; Description = description; Source = source; + _listener = new Listener(this); } /// @@ -53,25 +63,96 @@ namespace Avalonia.Styling /// public IObservable Source { get; } - /// - /// Notifies the provider that an observer is to receive notifications. - /// - /// The observer. - /// IDisposable object used to unsubscribe from the observable sequence. - protected override IDisposable SubscribeCore(IObserver observer) + public IDisposable Subscribe(IObserver observer) + { + var subscribe = _observers == null; + + _observers = _observers ?? new List>(); + _observers.Add(observer); + + if (subscribe) + { + _sourceSubscription = Source.Subscribe(_listener); + _activatorSubscription = Activator.Subscribe(_listener); + } + + return Disposable.Create(() => + { + _observers.Remove(observer); + + if (_observers.Count == 0) + { + _activatorSubscription.Dispose(); + _sourceSubscription.Dispose(); + _activatorSubscription = null; + _sourceSubscription = null; + } + }); + } + + private void NotifyCompleted() + { + foreach (var observer in _observers) + { + observer.OnCompleted(); + } + + _observers = null; + } + + private void NotifyError(Exception error) { - Contract.Requires(observer != null); - - var sourceCompleted = Source.LastOrDefaultAsync().Select(_ => Unit.Default); - var activatorCompleted = Activator.LastOrDefaultAsync().Select(_ => Unit.Default); - var completed = sourceCompleted.Merge(activatorCompleted); - - return Activator - .CombineLatest(Source, (x, y) => new { Active = x, Value = y }) - .Select(x => x.Active ? x.Value : AvaloniaProperty.UnsetValue) - .DistinctUntilChanged() - .TakeUntil(completed) - .Subscribe(observer); + foreach (var observer in _observers) + { + observer.OnError(error); + } + + _observers = null; + } + + private void Update() + { + if (_active.HasValue) + { + var v = _active.Value ? _value : AvaloniaProperty.UnsetValue; + + if (!Equals(v, _last)) + { + foreach (var observer in _observers) + { + observer.OnNext(v); + } + + _last = v; + } + } + } + + private class Listener : IObserver, IObserver + { + private readonly ActivatedObservable _parent; + + public Listener(ActivatedObservable parent) + { + _parent = parent; + } + + void IObserver.OnCompleted() => _parent.NotifyCompleted(); + void IObserver.OnCompleted() => _parent.NotifyCompleted(); + void IObserver.OnError(Exception error) => _parent.NotifyError(error); + void IObserver.OnError(Exception error) => _parent.NotifyError(error); + + void IObserver.OnNext(bool value) + { + _parent._active = value; + _parent.Update(); + } + + void IObserver.OnNext(object value) + { + _parent._value = value; + _parent.Update(); + } } } }