Browse Source

Make ActivatedObservable a custom observable.

pull/1690/head
Steven Kirk 9 years ago
committed by Steven Kirk
parent
commit
6d3381dd6e
  1. 123
      src/Avalonia.Styling/Styling/ActivatedObservable.cs

123
src/Avalonia.Styling/Styling/ActivatedObservable.cs

@ -2,8 +2,8 @@
// Licensed under the MIT license. See licence.md file in the project root for full license information.
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Collections.Generic;
using System.Reactive.Disposables;
namespace Avalonia.Styling
{
@ -17,8 +17,17 @@ namespace Avalonia.Styling
/// value. When the activator produces false it will produce
/// <see cref="AvaloniaProperty.UnsetValue"/>.
/// </remarks>
internal class ActivatedObservable : ObservableBase<object>, IDescription
internal class ActivatedObservable : IObservable<object>, IDescription
{
private static readonly object NotSent = new object();
private readonly Listener _listener;
private List<IObserver<object>> _observers;
private IDisposable _activatorSubscription;
private IDisposable _sourceSubscription;
private bool? _active;
private object _value;
private object _last = NotSent;
/// <summary>
/// Initializes a new instance of the <see cref="ActivatedObservable"/> class.
/// </summary>
@ -36,6 +45,7 @@ namespace Avalonia.Styling
Activator = activator;
Description = description;
Source = source;
_listener = new Listener(this);
}
/// <summary>
@ -53,25 +63,96 @@ namespace Avalonia.Styling
/// </summary>
public IObservable<object> Source { get; }
/// <summary>
/// Notifies the provider that an observer is to receive notifications.
/// </summary>
/// <param name="observer">The observer.</param>
/// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
protected override IDisposable SubscribeCore(IObserver<object> observer)
public IDisposable Subscribe(IObserver<object> observer)
{
var subscribe = _observers == null;
_observers = _observers ?? new List<IObserver<object>>();
_observers.Add(observer);
if (subscribe)
{
_sourceSubscription = Source.Subscribe(_listener);
_activatorSubscription = Activator.Subscribe(_listener);
}
return Disposable.Create(() =>
{
_observers.Remove(observer);
if (_observers.Count == 0)
{
_activatorSubscription.Dispose();
_sourceSubscription.Dispose();
_activatorSubscription = null;
_sourceSubscription = null;
}
});
}
private void NotifyCompleted()
{
foreach (var observer in _observers)
{
observer.OnCompleted();
}
_observers = null;
}
private void NotifyError(Exception error)
{
Contract.Requires<ArgumentNullException>(observer != null);
var sourceCompleted = Source.LastOrDefaultAsync().Select(_ => Unit.Default);
var activatorCompleted = Activator.LastOrDefaultAsync().Select(_ => Unit.Default);
var completed = sourceCompleted.Merge(activatorCompleted);
return Activator
.CombineLatest(Source, (x, y) => new { Active = x, Value = y })
.Select(x => x.Active ? x.Value : AvaloniaProperty.UnsetValue)
.DistinctUntilChanged()
.TakeUntil(completed)
.Subscribe(observer);
foreach (var observer in _observers)
{
observer.OnError(error);
}
_observers = null;
}
private void Update()
{
if (_active.HasValue)
{
var v = _active.Value ? _value : AvaloniaProperty.UnsetValue;
if (!Equals(v, _last))
{
foreach (var observer in _observers)
{
observer.OnNext(v);
}
_last = v;
}
}
}
private class Listener : IObserver<bool>, IObserver<object>
{
private readonly ActivatedObservable _parent;
public Listener(ActivatedObservable parent)
{
_parent = parent;
}
void IObserver<bool>.OnCompleted() => _parent.NotifyCompleted();
void IObserver<object>.OnCompleted() => _parent.NotifyCompleted();
void IObserver<bool>.OnError(Exception error) => _parent.NotifyError(error);
void IObserver<object>.OnError(Exception error) => _parent.NotifyError(error);
void IObserver<bool>.OnNext(bool value)
{
_parent._active = value;
_parent.Update();
}
void IObserver<object>.OnNext(object value)
{
_parent._value = value;
_parent.Update();
}
}
}
}

Loading…
Cancel
Save