committed by
GitHub
26 changed files with 1004 additions and 550 deletions
@ -1,42 +0,0 @@ |
|||
// Copyright (c) The Avalonia Project. All rights reserved.
|
|||
// Licensed under the MIT license. See licence.md file in the project root for full license information.
|
|||
|
|||
using System; |
|||
using System.Reactive; |
|||
using System.Reactive.Disposables; |
|||
|
|||
namespace Avalonia.Reactive |
|||
{ |
|||
/// <summary>
|
|||
/// An <see cref="IObservable{T}"/> with an additional description.
|
|||
/// </summary>
|
|||
/// <typeparam name="T">The type of the elements in the sequence.</typeparam>
|
|||
public class AvaloniaObservable<T> : ObservableBase<T>, IDescription |
|||
{ |
|||
private readonly Func<IObserver<T>, IDisposable> _subscribe; |
|||
|
|||
/// <summary>
|
|||
/// Initializes a new instance of the <see cref="AvaloniaObservable{T}"/> class.
|
|||
/// </summary>
|
|||
/// <param name="subscribe">The subscribe function.</param>
|
|||
/// <param name="description">The description of the observable.</param>
|
|||
public AvaloniaObservable(Func<IObserver<T>, IDisposable> subscribe, string description) |
|||
{ |
|||
Contract.Requires<ArgumentNullException>(subscribe != null); |
|||
|
|||
_subscribe = subscribe; |
|||
Description = description; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Gets the description of the observable.
|
|||
/// </summary>
|
|||
public string Description { get; } |
|||
|
|||
/// <inheritdoc/>
|
|||
protected override IDisposable SubscribeCore(IObserver<T> observer) |
|||
{ |
|||
return _subscribe(observer) ?? Disposable.Empty; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,46 @@ |
|||
using System; |
|||
|
|||
namespace Avalonia.Reactive |
|||
{ |
|||
internal class AvaloniaPropertyChangedObservable : |
|||
LightweightObservableBase<AvaloniaPropertyChangedEventArgs>, |
|||
IDescription |
|||
{ |
|||
private readonly WeakReference<IAvaloniaObject> _target; |
|||
private readonly AvaloniaProperty _property; |
|||
|
|||
public AvaloniaPropertyChangedObservable( |
|||
IAvaloniaObject target, |
|||
AvaloniaProperty property) |
|||
{ |
|||
_target = new WeakReference<IAvaloniaObject>(target); |
|||
_property = property; |
|||
} |
|||
|
|||
public string Description => $"{_target.GetType().Name}.{_property.Name}"; |
|||
|
|||
protected override void Initialize() |
|||
{ |
|||
if (_target.TryGetTarget(out var target)) |
|||
{ |
|||
target.PropertyChanged += PropertyChanged; |
|||
} |
|||
} |
|||
|
|||
protected override void Deinitialize() |
|||
{ |
|||
if (_target.TryGetTarget(out var target)) |
|||
{ |
|||
target.PropertyChanged -= PropertyChanged; |
|||
} |
|||
} |
|||
|
|||
private void PropertyChanged(object sender, AvaloniaPropertyChangedEventArgs e) |
|||
{ |
|||
if (e.Property == _property) |
|||
{ |
|||
PublishNext(e); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,52 @@ |
|||
using System; |
|||
|
|||
namespace Avalonia.Reactive |
|||
{ |
|||
internal class AvaloniaPropertyObservable<T> : LightweightObservableBase<T>, IDescription |
|||
{ |
|||
private readonly WeakReference<IAvaloniaObject> _target; |
|||
private readonly AvaloniaProperty _property; |
|||
private T _value; |
|||
|
|||
public AvaloniaPropertyObservable( |
|||
IAvaloniaObject target, |
|||
AvaloniaProperty property) |
|||
{ |
|||
_target = new WeakReference<IAvaloniaObject>(target); |
|||
_property = property; |
|||
} |
|||
|
|||
public string Description => $"{_target.GetType().Name}.{_property.Name}"; |
|||
|
|||
protected override void Initialize() |
|||
{ |
|||
if (_target.TryGetTarget(out var target)) |
|||
{ |
|||
_value = (T)target.GetValue(_property); |
|||
target.PropertyChanged += PropertyChanged; |
|||
} |
|||
} |
|||
|
|||
protected override void Deinitialize() |
|||
{ |
|||
if (_target.TryGetTarget(out var target)) |
|||
{ |
|||
target.PropertyChanged -= PropertyChanged; |
|||
} |
|||
} |
|||
|
|||
protected override void Subscribed(IObserver<T> observer, bool first) |
|||
{ |
|||
observer.OnNext(_value); |
|||
} |
|||
|
|||
private void PropertyChanged(object sender, AvaloniaPropertyChangedEventArgs e) |
|||
{ |
|||
if (e.Property == _property) |
|||
{ |
|||
_value = (T)e.NewValue; |
|||
PublishNext(_value); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,202 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Reactive; |
|||
using System.Reactive.Disposables; |
|||
using System.Threading; |
|||
using Avalonia.Threading; |
|||
|
|||
namespace Avalonia.Reactive |
|||
{ |
|||
/// <summary>
|
|||
/// Lightweight base class for observable implementations.
|
|||
/// </summary>
|
|||
/// <typeparam name="T">The observable type.</typeparam>
|
|||
/// <remarks>
|
|||
/// <see cref="ObservableBase{T}"/> is rather heavyweight in terms of allocations and memory
|
|||
/// usage. This class provides a more lightweight base for some internal observable types
|
|||
/// in the Avalonia framework.
|
|||
/// </remarks>
|
|||
public abstract class LightweightObservableBase<T> : IObservable<T> |
|||
{ |
|||
private Exception _error; |
|||
private List<IObserver<T>> _observers = new List<IObserver<T>>(); |
|||
|
|||
public IDisposable Subscribe(IObserver<T> observer) |
|||
{ |
|||
Contract.Requires<ArgumentNullException>(observer != null); |
|||
Dispatcher.UIThread.VerifyAccess(); |
|||
|
|||
var first = false; |
|||
|
|||
for (; ; ) |
|||
{ |
|||
if (Volatile.Read(ref _observers) == null) |
|||
{ |
|||
if (_error != null) |
|||
{ |
|||
observer.OnError(_error); |
|||
} |
|||
else |
|||
{ |
|||
observer.OnCompleted(); |
|||
} |
|||
|
|||
return Disposable.Empty; |
|||
} |
|||
|
|||
lock (this) |
|||
{ |
|||
if (_observers == null) |
|||
{ |
|||
continue; |
|||
} |
|||
|
|||
first = _observers.Count == 0; |
|||
_observers.Add(observer); |
|||
break; |
|||
} |
|||
} |
|||
|
|||
if (first) |
|||
{ |
|||
Initialize(); |
|||
} |
|||
|
|||
Subscribed(observer, first); |
|||
|
|||
return new RemoveObserver(this, observer); |
|||
} |
|||
|
|||
void Remove(IObserver<T> observer) |
|||
{ |
|||
if (Volatile.Read(ref _observers) != null) |
|||
{ |
|||
lock (this) |
|||
{ |
|||
var observers = _observers; |
|||
|
|||
if (observers != null) |
|||
{ |
|||
observers.Remove(observer); |
|||
|
|||
if (observers.Count == 0) |
|||
{ |
|||
observers.TrimExcess(); |
|||
} |
|||
else |
|||
{ |
|||
return; |
|||
} |
|||
} else |
|||
{ |
|||
return; |
|||
} |
|||
} |
|||
|
|||
Deinitialize(); |
|||
} |
|||
} |
|||
|
|||
sealed class RemoveObserver : IDisposable |
|||
{ |
|||
LightweightObservableBase<T> _parent; |
|||
|
|||
IObserver<T> _observer; |
|||
|
|||
public RemoveObserver(LightweightObservableBase<T> parent, IObserver<T> observer) |
|||
{ |
|||
_parent = parent; |
|||
Volatile.Write(ref _observer, observer); |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
var observer = _observer; |
|||
Interlocked.Exchange(ref _parent, null)?.Remove(observer); |
|||
_observer = null; |
|||
} |
|||
} |
|||
|
|||
protected abstract void Initialize(); |
|||
protected abstract void Deinitialize(); |
|||
|
|||
protected void PublishNext(T value) |
|||
{ |
|||
if (Volatile.Read(ref _observers) != null) |
|||
{ |
|||
IObserver<T>[] observers; |
|||
|
|||
lock (this) |
|||
{ |
|||
if (_observers == null) |
|||
{ |
|||
return; |
|||
} |
|||
observers = _observers.ToArray(); |
|||
} |
|||
|
|||
foreach (var observer in observers) |
|||
{ |
|||
observer.OnNext(value); |
|||
} |
|||
} |
|||
} |
|||
|
|||
protected void PublishCompleted() |
|||
{ |
|||
if (Volatile.Read(ref _observers) != null) |
|||
{ |
|||
IObserver<T>[] observers; |
|||
|
|||
lock (this) |
|||
{ |
|||
if (_observers == null) |
|||
{ |
|||
return; |
|||
} |
|||
observers = _observers.ToArray(); |
|||
Volatile.Write(ref _observers, null); |
|||
} |
|||
|
|||
foreach (var observer in observers) |
|||
{ |
|||
observer.OnCompleted(); |
|||
} |
|||
|
|||
Deinitialize(); |
|||
} |
|||
} |
|||
|
|||
protected void PublishError(Exception error) |
|||
{ |
|||
if (Volatile.Read(ref _observers) != null) |
|||
{ |
|||
|
|||
IObserver<T>[] observers; |
|||
|
|||
lock (this) |
|||
{ |
|||
if (_observers == null) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
_error = error; |
|||
observers = _observers.ToArray(); |
|||
Volatile.Write(ref _observers, null); |
|||
} |
|||
|
|||
foreach (var observer in observers) |
|||
{ |
|||
observer.OnError(error); |
|||
} |
|||
|
|||
Deinitialize(); |
|||
} |
|||
} |
|||
|
|||
protected virtual void Subscribed(IObserver<T> observer, bool first) |
|||
{ |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,76 @@ |
|||
using System; |
|||
using Avalonia.Threading; |
|||
|
|||
namespace Avalonia.Reactive |
|||
{ |
|||
public abstract class SingleSubscriberObservableBase<T> : IObservable<T>, IDisposable |
|||
{ |
|||
private Exception _error; |
|||
private IObserver<T> _observer; |
|||
private bool _completed; |
|||
|
|||
public IDisposable Subscribe(IObserver<T> observer) |
|||
{ |
|||
Contract.Requires<ArgumentNullException>(observer != null); |
|||
Dispatcher.UIThread.VerifyAccess(); |
|||
|
|||
if (_observer != null) |
|||
{ |
|||
throw new InvalidOperationException("The observable can only be subscribed once."); |
|||
} |
|||
|
|||
if (_error != null) |
|||
{ |
|||
observer.OnError(_error); |
|||
} |
|||
else if (_completed) |
|||
{ |
|||
observer.OnCompleted(); |
|||
} |
|||
else |
|||
{ |
|||
_observer = observer; |
|||
Subscribed(); |
|||
} |
|||
|
|||
return this; |
|||
} |
|||
|
|||
void IDisposable.Dispose() |
|||
{ |
|||
Unsubscribed(); |
|||
_observer = null; |
|||
} |
|||
|
|||
protected abstract void Unsubscribed(); |
|||
|
|||
protected void PublishNext(T value) |
|||
{ |
|||
_observer?.OnNext(value); |
|||
} |
|||
|
|||
protected void PublishCompleted() |
|||
{ |
|||
if (_observer != null) |
|||
{ |
|||
_observer.OnCompleted(); |
|||
_completed = true; |
|||
Unsubscribed(); |
|||
_observer = null; |
|||
} |
|||
} |
|||
|
|||
protected void PublishError(Exception error) |
|||
{ |
|||
if (_observer != null) |
|||
{ |
|||
_observer.OnError(error); |
|||
_error = error; |
|||
Unsubscribed(); |
|||
_observer = null; |
|||
} |
|||
} |
|||
|
|||
protected abstract void Subscribed(); |
|||
} |
|||
} |
|||
@ -1,85 +0,0 @@ |
|||
// Copyright (c) The Avalonia Project. All rights reserved.
|
|||
// Licensed under the MIT license. See licence.md file in the project root for full license information.
|
|||
|
|||
using System; |
|||
using System.Reactive; |
|||
using System.Reactive.Disposables; |
|||
using System.Reactive.Linq; |
|||
using System.Reactive.Subjects; |
|||
using Avalonia.Utilities; |
|||
|
|||
namespace Avalonia.Reactive |
|||
{ |
|||
internal class WeakPropertyChangedObservable : ObservableBase<object>, |
|||
IWeakSubscriber<AvaloniaPropertyChangedEventArgs>, IDescription |
|||
{ |
|||
private WeakReference<IAvaloniaObject> _sourceReference; |
|||
private readonly AvaloniaProperty _property; |
|||
private readonly Subject<object> _changed = new Subject<object>(); |
|||
|
|||
private int _count; |
|||
|
|||
public WeakPropertyChangedObservable( |
|||
WeakReference<IAvaloniaObject> source, |
|||
AvaloniaProperty property, |
|||
string description) |
|||
{ |
|||
_sourceReference = source; |
|||
_property = property; |
|||
Description = description; |
|||
} |
|||
|
|||
public string Description { get; } |
|||
|
|||
public void OnEvent(object sender, AvaloniaPropertyChangedEventArgs e) |
|||
{ |
|||
if (e.Property == _property) |
|||
{ |
|||
_changed.OnNext(e.NewValue); |
|||
} |
|||
} |
|||
|
|||
protected override IDisposable SubscribeCore(IObserver<object> observer) |
|||
{ |
|||
IAvaloniaObject instance; |
|||
|
|||
if (_sourceReference.TryGetTarget(out instance)) |
|||
{ |
|||
if (_count++ == 0) |
|||
{ |
|||
WeakSubscriptionManager.Subscribe( |
|||
instance, |
|||
nameof(instance.PropertyChanged), |
|||
this); |
|||
} |
|||
|
|||
observer.OnNext(instance.GetValue(_property)); |
|||
|
|||
return Observable.Using(() => Disposable.Create(DecrementCount), _ => _changed) |
|||
.Subscribe(observer); |
|||
} |
|||
else |
|||
{ |
|||
_changed.OnCompleted(); |
|||
observer.OnCompleted(); |
|||
return Disposable.Empty; |
|||
} |
|||
} |
|||
|
|||
private void DecrementCount() |
|||
{ |
|||
if (--_count == 0) |
|||
{ |
|||
IAvaloniaObject instance; |
|||
|
|||
if (_sourceReference.TryGetTarget(out instance)) |
|||
{ |
|||
WeakSubscriptionManager.Unsubscribe( |
|||
instance, |
|||
nameof(instance.PropertyChanged), |
|||
this); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue