195 changed files with 2144 additions and 735 deletions
@ -1,6 +1,7 @@ |
|||
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> |
|||
<ItemGroup> |
|||
<PackageReference Include="System.ValueTuple" Version="4.5.0" /> |
|||
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" /> |
|||
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="4.6.0" /> |
|||
</ItemGroup> |
|||
</Project> |
|||
|
|||
@ -0,0 +1,62 @@ |
|||
using System; |
|||
using System.Threading.Tasks; |
|||
|
|||
namespace Avalonia.Reactive; |
|||
|
|||
internal class AnonymousObserver<T> : IObserver<T> |
|||
{ |
|||
private static readonly Action<Exception> ThrowsOnError = ex => throw ex; |
|||
private static readonly Action NoOpCompleted = () => { }; |
|||
private readonly Action<T> _onNext; |
|||
private readonly Action<Exception> _onError; |
|||
private readonly Action _onCompleted; |
|||
|
|||
public AnonymousObserver(TaskCompletionSource<T> tcs) |
|||
{ |
|||
if (tcs is null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(tcs)); |
|||
} |
|||
|
|||
_onNext = tcs.SetResult; |
|||
_onError = tcs.SetException; |
|||
_onCompleted = NoOpCompleted; |
|||
} |
|||
|
|||
public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted) |
|||
{ |
|||
_onNext = onNext ?? throw new ArgumentNullException(nameof(onNext)); |
|||
_onError = onError ?? throw new ArgumentNullException(nameof(onError)); |
|||
_onCompleted = onCompleted ?? throw new ArgumentNullException(nameof(onCompleted)); |
|||
} |
|||
|
|||
public AnonymousObserver(Action<T> onNext) |
|||
: this(onNext, ThrowsOnError, NoOpCompleted) |
|||
{ |
|||
} |
|||
|
|||
public AnonymousObserver(Action<T> onNext, Action<Exception> onError) |
|||
: this(onNext, onError, NoOpCompleted) |
|||
{ |
|||
} |
|||
|
|||
public AnonymousObserver(Action<T> onNext, Action onCompleted) |
|||
: this(onNext, ThrowsOnError, onCompleted) |
|||
{ |
|||
} |
|||
|
|||
public void OnCompleted() |
|||
{ |
|||
_onCompleted.Invoke(); |
|||
} |
|||
|
|||
public void OnError(Exception error) |
|||
{ |
|||
_onError.Invoke(error); |
|||
} |
|||
|
|||
public void OnNext(T value) |
|||
{ |
|||
_onNext.Invoke(value); |
|||
} |
|||
} |
|||
@ -0,0 +1,23 @@ |
|||
using System; |
|||
|
|||
namespace Avalonia.Reactive; |
|||
|
|||
internal class CombinedSubject<T> : IAvaloniaSubject<T> |
|||
{ |
|||
private readonly IObserver<T> _observer; |
|||
private readonly IObservable<T> _observable; |
|||
|
|||
public CombinedSubject(IObserver<T> observer, IObservable<T> observable) |
|||
{ |
|||
_observer = observer; |
|||
_observable = observable; |
|||
} |
|||
|
|||
public void OnCompleted() => _observer.OnCompleted(); |
|||
|
|||
public void OnError(Exception error) => _observer.OnError(error); |
|||
|
|||
public void OnNext(T value) => _observer.OnNext(value); |
|||
|
|||
public IDisposable Subscribe(IObserver<T> observer) => _observable.Subscribe(observer); |
|||
} |
|||
@ -0,0 +1,427 @@ |
|||
using System; |
|||
using System.Collections; |
|||
using System.Collections.Generic; |
|||
using System.Threading; |
|||
|
|||
namespace Avalonia.Reactive; |
|||
|
|||
internal sealed class CompositeDisposable : ICollection<IDisposable>, IDisposable |
|||
{ |
|||
private readonly object _gate = new object(); |
|||
private bool _disposed; |
|||
private List<IDisposable?> _disposables; |
|||
private int _count; |
|||
private const int ShrinkThreshold = 64; |
|||
|
|||
/// <summary>
|
|||
/// Initializes a new instance of the <see cref="CompositeDisposable"/> class with the specified number of disposables.
|
|||
/// </summary>
|
|||
/// <param name="capacity">The number of disposables that the new CompositeDisposable can initially store.</param>
|
|||
/// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than zero.</exception>
|
|||
public CompositeDisposable(int capacity) |
|||
{ |
|||
if (capacity < 0) |
|||
{ |
|||
throw new ArgumentOutOfRangeException(nameof(capacity)); |
|||
} |
|||
|
|||
_disposables = new List<IDisposable?>(capacity); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Initializes a new instance of the <see cref="CompositeDisposable"/> class from a group of disposables.
|
|||
/// </summary>
|
|||
/// <param name="disposables">Disposables that will be disposed together.</param>
|
|||
/// <exception cref="ArgumentNullException"><paramref name="disposables"/> is <c>null</c>.</exception>
|
|||
/// <exception cref="ArgumentException">Any of the disposables in the <paramref name="disposables"/> collection is <c>null</c>.</exception>
|
|||
public CompositeDisposable(params IDisposable[] disposables) |
|||
{ |
|||
if (disposables == null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(disposables)); |
|||
} |
|||
|
|||
_disposables = ToList(disposables); |
|||
|
|||
// _count can be read by other threads and thus should be properly visible
|
|||
// also releases the _disposables contents so it becomes thread-safe
|
|||
Volatile.Write(ref _count, _disposables.Count); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Initializes a new instance of the <see cref="CompositeDisposable"/> class from a group of disposables.
|
|||
/// </summary>
|
|||
/// <param name="disposables">Disposables that will be disposed together.</param>
|
|||
/// <exception cref="ArgumentNullException"><paramref name="disposables"/> is <c>null</c>.</exception>
|
|||
/// <exception cref="ArgumentException">Any of the disposables in the <paramref name="disposables"/> collection is <c>null</c>.</exception>
|
|||
public CompositeDisposable(IList<IDisposable> disposables) |
|||
{ |
|||
if (disposables == null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(disposables)); |
|||
} |
|||
|
|||
_disposables = ToList(disposables); |
|||
|
|||
// _count can be read by other threads and thus should be properly visible
|
|||
// also releases the _disposables contents so it becomes thread-safe
|
|||
Volatile.Write(ref _count, _disposables.Count); |
|||
} |
|||
|
|||
private static List<IDisposable?> ToList(IEnumerable<IDisposable> disposables) |
|||
{ |
|||
var capacity = disposables switch |
|||
{ |
|||
IDisposable[] a => a.Length, |
|||
ICollection<IDisposable> c => c.Count, |
|||
_ => 12 |
|||
}; |
|||
|
|||
var list = new List<IDisposable?>(capacity); |
|||
|
|||
// do the copy and null-check in one step to avoid a
|
|||
// second loop for just checking for null items
|
|||
foreach (var d in disposables) |
|||
{ |
|||
if (d == null) |
|||
{ |
|||
throw new ArgumentException("Disposables can't contain null", nameof(disposables)); |
|||
} |
|||
|
|||
list.Add(d); |
|||
} |
|||
|
|||
return list; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Gets the number of disposables contained in the <see cref="CompositeDisposable"/>.
|
|||
/// </summary>
|
|||
public int Count => Volatile.Read(ref _count); |
|||
|
|||
/// <summary>
|
|||
/// Adds a disposable to the <see cref="CompositeDisposable"/> or disposes the disposable if the <see cref="CompositeDisposable"/> is disposed.
|
|||
/// </summary>
|
|||
/// <param name="item">Disposable to add.</param>
|
|||
/// <exception cref="ArgumentNullException"><paramref name="item"/> is <c>null</c>.</exception>
|
|||
public void Add(IDisposable item) |
|||
{ |
|||
if (item == null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(item)); |
|||
} |
|||
|
|||
lock (_gate) |
|||
{ |
|||
if (!_disposed) |
|||
{ |
|||
_disposables.Add(item); |
|||
|
|||
// If read atomically outside the lock, it should be written atomically inside
|
|||
// the plain read on _count is fine here because manipulation always happens
|
|||
// from inside a lock.
|
|||
Volatile.Write(ref _count, _count + 1); |
|||
return; |
|||
} |
|||
} |
|||
|
|||
item.Dispose(); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Removes and disposes the first occurrence of a disposable from the <see cref="CompositeDisposable"/>.
|
|||
/// </summary>
|
|||
/// <param name="item">Disposable to remove.</param>
|
|||
/// <returns>true if found; false otherwise.</returns>
|
|||
/// <exception cref="ArgumentNullException"><paramref name="item"/> is <c>null</c>.</exception>
|
|||
public bool Remove(IDisposable item) |
|||
{ |
|||
if (item == null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(item)); |
|||
} |
|||
|
|||
lock (_gate) |
|||
{ |
|||
// this composite was already disposed and if the item was in there
|
|||
// it has been already removed/disposed
|
|||
if (_disposed) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
//
|
|||
// List<T> doesn't shrink the size of the underlying array but does collapse the array
|
|||
// by copying the tail one position to the left of the removal index. We don't need
|
|||
// index-based lookup but only ordering for sequential disposal. So, instead of spending
|
|||
// cycles on the Array.Copy imposed by Remove, we use a null sentinel value. We also
|
|||
// do manual Swiss cheese detection to shrink the list if there's a lot of holes in it.
|
|||
//
|
|||
|
|||
// read fields as infrequently as possible
|
|||
var current = _disposables; |
|||
|
|||
var i = current.IndexOf(item); |
|||
if (i < 0) |
|||
{ |
|||
// not found, just return
|
|||
return false; |
|||
} |
|||
|
|||
current[i] = null; |
|||
|
|||
if (current.Capacity > ShrinkThreshold && _count < current.Capacity / 2) |
|||
{ |
|||
var fresh = new List<IDisposable?>(current.Capacity / 2); |
|||
|
|||
foreach (var d in current) |
|||
{ |
|||
if (d != null) |
|||
{ |
|||
fresh.Add(d); |
|||
} |
|||
} |
|||
|
|||
_disposables = fresh; |
|||
} |
|||
|
|||
// make sure the Count property sees an atomic update
|
|||
Volatile.Write(ref _count, _count - 1); |
|||
} |
|||
|
|||
// if we get here, the item was found and removed from the list
|
|||
// just dispose it and report success
|
|||
|
|||
item.Dispose(); |
|||
|
|||
return true; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Disposes all disposables in the group and removes them from the group.
|
|||
/// </summary>
|
|||
public void Dispose() |
|||
{ |
|||
List<IDisposable?>? currentDisposables = null; |
|||
|
|||
lock (_gate) |
|||
{ |
|||
if (!_disposed) |
|||
{ |
|||
currentDisposables = _disposables; |
|||
|
|||
// nulling out the reference is faster no risk to
|
|||
// future Add/Remove because _disposed will be true
|
|||
// and thus _disposables won't be touched again.
|
|||
_disposables = null!; // NB: All accesses are guarded by _disposed checks.
|
|||
|
|||
Volatile.Write(ref _count, 0); |
|||
Volatile.Write(ref _disposed, true); |
|||
} |
|||
} |
|||
|
|||
if (currentDisposables != null) |
|||
{ |
|||
foreach (var d in currentDisposables) |
|||
{ |
|||
d?.Dispose(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Removes and disposes all disposables from the <see cref="CompositeDisposable"/>, but does not dispose the <see cref="CompositeDisposable"/>.
|
|||
/// </summary>
|
|||
public void Clear() |
|||
{ |
|||
IDisposable?[] previousDisposables; |
|||
|
|||
lock (_gate) |
|||
{ |
|||
// disposed composites are always clear
|
|||
if (_disposed) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
var current = _disposables; |
|||
|
|||
previousDisposables = current.ToArray(); |
|||
current.Clear(); |
|||
|
|||
Volatile.Write(ref _count, 0); |
|||
} |
|||
|
|||
foreach (var d in previousDisposables) |
|||
{ |
|||
d?.Dispose(); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Determines whether the <see cref="CompositeDisposable"/> contains a specific disposable.
|
|||
/// </summary>
|
|||
/// <param name="item">Disposable to search for.</param>
|
|||
/// <returns>true if the disposable was found; otherwise, false.</returns>
|
|||
/// <exception cref="ArgumentNullException"><paramref name="item"/> is <c>null</c>.</exception>
|
|||
public bool Contains(IDisposable item) |
|||
{ |
|||
if (item == null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(item)); |
|||
} |
|||
|
|||
lock (_gate) |
|||
{ |
|||
if (_disposed) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
return _disposables.Contains(item); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Copies the disposables contained in the <see cref="CompositeDisposable"/> to an array, starting at a particular array index.
|
|||
/// </summary>
|
|||
/// <param name="array">Array to copy the contained disposables to.</param>
|
|||
/// <param name="arrayIndex">Target index at which to copy the first disposable of the group.</param>
|
|||
/// <exception cref="ArgumentNullException"><paramref name="array"/> is <c>null</c>.</exception>
|
|||
/// <exception cref="ArgumentOutOfRangeException"><paramref name="arrayIndex"/> is less than zero. -or - <paramref name="arrayIndex"/> is larger than or equal to the array length.</exception>
|
|||
public void CopyTo(IDisposable[] array, int arrayIndex) |
|||
{ |
|||
if (array == null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(array)); |
|||
} |
|||
|
|||
if (arrayIndex < 0 || arrayIndex >= array.Length) |
|||
{ |
|||
throw new ArgumentOutOfRangeException(nameof(arrayIndex)); |
|||
} |
|||
|
|||
lock (_gate) |
|||
{ |
|||
// disposed composites are always empty
|
|||
if (_disposed) |
|||
{ |
|||
return; |
|||
} |
|||
|
|||
if (arrayIndex + _count > array.Length) |
|||
{ |
|||
// there is not enough space beyond arrayIndex
|
|||
// to accommodate all _count disposables in this composite
|
|||
throw new ArgumentOutOfRangeException(nameof(arrayIndex)); |
|||
} |
|||
|
|||
var i = arrayIndex; |
|||
|
|||
foreach (var d in _disposables) |
|||
{ |
|||
if (d != null) |
|||
{ |
|||
array[i++] = d; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Always returns false.
|
|||
/// </summary>
|
|||
public bool IsReadOnly => false; |
|||
|
|||
/// <summary>
|
|||
/// Returns an enumerator that iterates through the <see cref="CompositeDisposable"/>.
|
|||
/// </summary>
|
|||
/// <returns>An enumerator to iterate over the disposables.</returns>
|
|||
public IEnumerator<IDisposable> GetEnumerator() |
|||
{ |
|||
lock (_gate) |
|||
{ |
|||
if (_disposed || _count == 0) |
|||
{ |
|||
return EmptyEnumerator; |
|||
} |
|||
|
|||
// the copy is unavoidable but the creation
|
|||
// of an outer IEnumerable is avoidable
|
|||
return new CompositeEnumerator(_disposables.ToArray()); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Returns an enumerator that iterates through the <see cref="CompositeDisposable"/>.
|
|||
/// </summary>
|
|||
/// <returns>An enumerator to iterate over the disposables.</returns>
|
|||
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); |
|||
|
|||
/// <summary>
|
|||
/// Gets a value that indicates whether the object is disposed.
|
|||
/// </summary>
|
|||
public bool IsDisposed => Volatile.Read(ref _disposed); |
|||
|
|||
/// <summary>
|
|||
/// An empty enumerator for the <see cref="GetEnumerator"/>
|
|||
/// method to avoid allocation on disposed or empty composites.
|
|||
/// </summary>
|
|||
private static readonly CompositeEnumerator EmptyEnumerator = |
|||
new CompositeEnumerator(Array.Empty<IDisposable?>()); |
|||
|
|||
/// <summary>
|
|||
/// An enumerator for an array of disposables.
|
|||
/// </summary>
|
|||
private sealed class CompositeEnumerator : IEnumerator<IDisposable> |
|||
{ |
|||
private readonly IDisposable?[] _disposables; |
|||
private int _index; |
|||
|
|||
public CompositeEnumerator(IDisposable?[] disposables) |
|||
{ |
|||
_disposables = disposables; |
|||
_index = -1; |
|||
} |
|||
|
|||
public IDisposable Current => _disposables[_index]!; // NB: _index is only advanced to non-null positions.
|
|||
|
|||
object IEnumerator.Current => _disposables[_index]!; |
|||
|
|||
public void Dispose() |
|||
{ |
|||
// Avoid retention of the referenced disposables
|
|||
// beyond the lifecycle of the enumerator.
|
|||
// Not sure if this happens by default to
|
|||
// generic array enumerators though.
|
|||
var disposables = _disposables; |
|||
Array.Clear(disposables, 0, disposables.Length); |
|||
} |
|||
|
|||
public bool MoveNext() |
|||
{ |
|||
var disposables = _disposables; |
|||
|
|||
for (;;) |
|||
{ |
|||
var idx = ++_index; |
|||
|
|||
if (idx >= disposables.Length) |
|||
{ |
|||
return false; |
|||
} |
|||
|
|||
// inlined that filter for null elements
|
|||
if (disposables[idx] != null) |
|||
{ |
|||
return true; |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void Reset() |
|||
{ |
|||
_index = -1; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,98 @@ |
|||
using System; |
|||
using System.Threading; |
|||
|
|||
namespace Avalonia.Reactive; |
|||
|
|||
/// <summary>
|
|||
/// Provides a set of static methods for creating <see cref="IDisposable"/> objects.
|
|||
/// </summary>
|
|||
internal static class Disposable |
|||
{ |
|||
/// <summary>
|
|||
/// Represents a disposable that does nothing on disposal.
|
|||
/// </summary>
|
|||
private sealed class EmptyDisposable : IDisposable |
|||
{ |
|||
public static readonly EmptyDisposable Instance = new(); |
|||
|
|||
private EmptyDisposable() |
|||
{ |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
// no op
|
|||
} |
|||
} |
|||
|
|||
internal sealed class AnonymousDisposable : IDisposable |
|||
{ |
|||
private volatile Action? _dispose; |
|||
public AnonymousDisposable(Action dispose) |
|||
{ |
|||
_dispose = dispose; |
|||
} |
|||
public bool IsDisposed => _dispose == null; |
|||
public void Dispose() |
|||
{ |
|||
Interlocked.Exchange(ref _dispose, null)?.Invoke(); |
|||
} |
|||
} |
|||
|
|||
internal sealed class AnonymousDisposable<TState> : IDisposable |
|||
{ |
|||
private TState _state; |
|||
private volatile Action<TState>? _dispose; |
|||
|
|||
public AnonymousDisposable(TState state, Action<TState> dispose) |
|||
{ |
|||
_state = state; |
|||
_dispose = dispose; |
|||
} |
|||
|
|||
public bool IsDisposed => _dispose == null; |
|||
public void Dispose() |
|||
{ |
|||
Interlocked.Exchange(ref _dispose, null)?.Invoke(_state); |
|||
_state = default!; |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Gets the disposable that does nothing when disposed.
|
|||
/// </summary>
|
|||
public static IDisposable Empty => EmptyDisposable.Instance; |
|||
|
|||
/// <summary>
|
|||
/// Creates a disposable object that invokes the specified action when disposed.
|
|||
/// </summary>
|
|||
/// <param name="dispose">Action to run during the first call to <see cref="IDisposable.Dispose"/>. The action is guaranteed to be run at most once.</param>
|
|||
/// <returns>The disposable object that runs the given action upon disposal.</returns>
|
|||
/// <exception cref="ArgumentNullException"><paramref name="dispose"/> is <c>null</c>.</exception>
|
|||
public static IDisposable Create(Action dispose) |
|||
{ |
|||
if (dispose == null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(dispose)); |
|||
} |
|||
|
|||
return new AnonymousDisposable(dispose); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Creates a disposable object that invokes the specified action when disposed.
|
|||
/// </summary>
|
|||
/// <param name="state">The state to be passed to the action.</param>
|
|||
/// <param name="dispose">Action to run during the first call to <see cref="IDisposable.Dispose"/>. The action is guaranteed to be run at most once.</param>
|
|||
/// <returns>The disposable object that runs the given action upon disposal.</returns>
|
|||
/// <exception cref="ArgumentNullException"><paramref name="dispose"/> is <c>null</c>.</exception>
|
|||
public static IDisposable Create<TState>(TState state, Action<TState> dispose) |
|||
{ |
|||
if (dispose == null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(dispose)); |
|||
} |
|||
|
|||
return new AnonymousDisposable<TState>(state, dispose); |
|||
} |
|||
} |
|||
@ -0,0 +1,37 @@ |
|||
using System; |
|||
using Avalonia.Reactive; |
|||
|
|||
namespace Avalonia.Reactive; |
|||
|
|||
/// <summary>
|
|||
/// Extension methods associated with the IDisposable interface.
|
|||
/// </summary>
|
|||
internal static class DisposableMixin |
|||
{ |
|||
/// <summary>
|
|||
/// Ensures the provided disposable is disposed with the specified <see cref="CompositeDisposable"/>.
|
|||
/// </summary>
|
|||
/// <typeparam name="T">
|
|||
/// The type of the disposable.
|
|||
/// </typeparam>
|
|||
/// <param name="item">
|
|||
/// The disposable we are going to want to be disposed by the CompositeDisposable.
|
|||
/// </param>
|
|||
/// <param name="compositeDisposable">
|
|||
/// The <see cref="CompositeDisposable"/> to which <paramref name="item"/> will be added.
|
|||
/// </param>
|
|||
/// <returns>
|
|||
/// The disposable.
|
|||
/// </returns>
|
|||
public static T DisposeWith<T>(this T item, CompositeDisposable compositeDisposable) |
|||
where T : IDisposable |
|||
{ |
|||
if (compositeDisposable is null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(compositeDisposable)); |
|||
} |
|||
|
|||
compositeDisposable.Add(item); |
|||
return item; |
|||
} |
|||
} |
|||
@ -0,0 +1,8 @@ |
|||
using System; |
|||
|
|||
namespace Avalonia.Reactive; |
|||
|
|||
internal interface IAvaloniaSubject<T> : IObserver<T>, IObservable<T> /*, ISubject<T> */ |
|||
{ |
|||
|
|||
} |
|||
@ -0,0 +1,30 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Diagnostics; |
|||
using System.Runtime.InteropServices; |
|||
using System.Threading; |
|||
using Avalonia.Threading; |
|||
|
|||
namespace Avalonia.Reactive; |
|||
|
|||
internal class LightweightSubject<T> : LightweightObservableBase<T>, IAvaloniaSubject<T> |
|||
{ |
|||
public void OnCompleted() |
|||
{ |
|||
PublishCompleted(); |
|||
} |
|||
|
|||
public void OnError(Exception error) |
|||
{ |
|||
PublishError(error); |
|||
} |
|||
|
|||
public void OnNext(T value) |
|||
{ |
|||
PublishNext(value); |
|||
} |
|||
|
|||
protected override void Initialize() { } |
|||
|
|||
protected override void Deinitialize() { } |
|||
} |
|||
@ -0,0 +1,238 @@ |
|||
using System; |
|||
using System.Collections; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using Avalonia.Reactive.Operators; |
|||
using Avalonia.Threading; |
|||
|
|||
namespace Avalonia.Reactive; |
|||
|
|||
/// <summary>
|
|||
/// Provides common observable methods as a replacement for the Rx framework.
|
|||
/// </summary>
|
|||
internal static class Observable |
|||
{ |
|||
public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) |
|||
{ |
|||
return new CreateWithDisposableObservable<TSource>(subscribe); |
|||
} |
|||
|
|||
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> action) |
|||
{ |
|||
return source.Subscribe(new AnonymousObserver<T>(action)); |
|||
} |
|||
|
|||
public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector) |
|||
{ |
|||
return Create<TResult>(obs => |
|||
{ |
|||
return source.Subscribe(new AnonymousObserver<TSource>( |
|||
input => |
|||
{ |
|||
TResult value; |
|||
try |
|||
{ |
|||
value = selector(input); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
obs.OnError(ex); |
|||
return; |
|||
} |
|||
|
|||
obs.OnNext(value); |
|||
}, obs.OnError, obs.OnCompleted)); |
|||
}); |
|||
} |
|||
|
|||
public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate) |
|||
{ |
|||
return Create<TSource>(obs => |
|||
{ |
|||
return source.Subscribe(new AnonymousObserver<TSource>( |
|||
input => |
|||
{ |
|||
bool shouldRun; |
|||
try |
|||
{ |
|||
shouldRun = predicate(input); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
obs.OnError(ex); |
|||
return; |
|||
} |
|||
if (shouldRun) |
|||
{ |
|||
obs.OnNext(input); |
|||
} |
|||
}, obs.OnError, obs.OnCompleted)); |
|||
}); |
|||
} |
|||
|
|||
public static IObservable<TSource> Switch<TSource>( |
|||
this IObservable<IObservable<TSource>> sources) |
|||
{ |
|||
return new Switch<TSource>(sources); |
|||
} |
|||
|
|||
public static IObservable<TResult> CombineLatest<TFirst, TSecond, TResult>( |
|||
this IObservable<TFirst> first, IObservable<TSecond> second, |
|||
Func<TFirst, TSecond, TResult> resultSelector) |
|||
{ |
|||
return new CombineLatest<TFirst, TSecond, TResult>(first, second, resultSelector); |
|||
} |
|||
|
|||
public static IObservable<TInput[]> CombineLatest<TInput>( |
|||
this IEnumerable<IObservable<TInput>> inputs) |
|||
{ |
|||
return new CombineLatest<TInput, TInput[]>(inputs, items => items); |
|||
} |
|||
|
|||
public static IObservable<T> Skip<T>(this IObservable<T> source, int skipCount) |
|||
{ |
|||
if (skipCount <= 0) |
|||
{ |
|||
throw new ArgumentException("Skip count must be bigger than zero", nameof(skipCount)); |
|||
} |
|||
|
|||
return Create<T>(obs => |
|||
{ |
|||
var remaining = skipCount; |
|||
return source.Subscribe(new AnonymousObserver<T>( |
|||
input => |
|||
{ |
|||
if (remaining <= 0) |
|||
{ |
|||
obs.OnNext(input); |
|||
} |
|||
else |
|||
{ |
|||
remaining--; |
|||
} |
|||
}, obs.OnError, obs.OnCompleted)); |
|||
}); |
|||
} |
|||
|
|||
public static IObservable<T> Take<T>(this IObservable<T> source, int takeCount) |
|||
{ |
|||
if (takeCount <= 0) |
|||
{ |
|||
return Empty<T>(); |
|||
} |
|||
|
|||
return Create<T>(obs => |
|||
{ |
|||
var remaining = takeCount; |
|||
IDisposable? sub = null; |
|||
sub = source.Subscribe(new AnonymousObserver<T>( |
|||
input => |
|||
{ |
|||
if (remaining > 0) |
|||
{ |
|||
--remaining; |
|||
obs.OnNext(input); |
|||
|
|||
if (remaining == 0) |
|||
{ |
|||
sub?.Dispose(); |
|||
obs.OnCompleted(); |
|||
} |
|||
} |
|||
}, obs.OnError, obs.OnCompleted)); |
|||
return sub; |
|||
}); |
|||
} |
|||
|
|||
public static IObservable<EventArgs> FromEventPattern(Action<EventHandler> addHandler, Action<EventHandler> removeHandler) |
|||
{ |
|||
return Create<EventArgs>(observer => |
|||
{ |
|||
var handler = new Action<EventArgs>(observer.OnNext); |
|||
var converted = new EventHandler((_, args) => handler(args)); |
|||
addHandler(converted); |
|||
|
|||
return Disposable.Create(() => removeHandler(converted)); |
|||
}); |
|||
} |
|||
|
|||
public static IObservable<T> Return<T>(T value) |
|||
{ |
|||
return new ReturnImpl<T>(value); |
|||
} |
|||
|
|||
public static IObservable<T> Empty<T>() |
|||
{ |
|||
return EmptyImpl<T>.Instance; |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Returns an observable that fires once with the specified value and never completes.
|
|||
/// </summary>
|
|||
/// <typeparam name="T">The type of the value.</typeparam>
|
|||
/// <param name="value">The value.</param>
|
|||
/// <returns>The observable.</returns>
|
|||
public static IObservable<T> SingleValue<T>(T value) |
|||
{ |
|||
return new SingleValueImpl<T>(value); |
|||
} |
|||
|
|||
private sealed class SingleValueImpl<T> : IObservable<T> |
|||
{ |
|||
private readonly T _value; |
|||
|
|||
public SingleValueImpl(T value) |
|||
{ |
|||
_value = value; |
|||
} |
|||
public IDisposable Subscribe(IObserver<T> observer) |
|||
{ |
|||
observer.OnNext(_value); |
|||
return Disposable.Empty; |
|||
} |
|||
} |
|||
|
|||
private sealed class ReturnImpl<T> : IObservable<T> |
|||
{ |
|||
private readonly T _value; |
|||
|
|||
public ReturnImpl(T value) |
|||
{ |
|||
_value = value; |
|||
} |
|||
public IDisposable Subscribe(IObserver<T> observer) |
|||
{ |
|||
observer.OnNext(_value); |
|||
observer.OnCompleted(); |
|||
return Disposable.Empty; |
|||
} |
|||
} |
|||
|
|||
internal sealed class EmptyImpl<TResult> : IObservable<TResult> |
|||
{ |
|||
internal static readonly IObservable<TResult> Instance = new EmptyImpl<TResult>(); |
|||
|
|||
private EmptyImpl() { } |
|||
|
|||
public IDisposable Subscribe(IObserver<TResult> observer) |
|||
{ |
|||
observer.OnCompleted(); |
|||
return Disposable.Empty; |
|||
} |
|||
} |
|||
|
|||
private sealed class CreateWithDisposableObservable<TSource> : IObservable<TSource> |
|||
{ |
|||
private readonly Func<IObserver<TSource>, IDisposable> _subscribe; |
|||
|
|||
public CreateWithDisposableObservable(Func<IObserver<TSource>, IDisposable> subscribe) |
|||
{ |
|||
_subscribe = subscribe; |
|||
} |
|||
|
|||
public IDisposable Subscribe(IObserver<TSource> observer) |
|||
{ |
|||
return _subscribe(observer); |
|||
} |
|||
} |
|||
} |
|||
@ -1,37 +0,0 @@ |
|||
using System; |
|||
using System.Reactive.Disposables; |
|||
|
|||
namespace Avalonia.Reactive |
|||
{ |
|||
/// <summary>
|
|||
/// Provides common observable methods not found in standard Rx framework.
|
|||
/// </summary>
|
|||
public static class ObservableEx |
|||
{ |
|||
/// <summary>
|
|||
/// Returns an observable that fires once with the specified value and never completes.
|
|||
/// </summary>
|
|||
/// <typeparam name="T">The type of the value.</typeparam>
|
|||
/// <param name="value">The value.</param>
|
|||
/// <returns>The observable.</returns>
|
|||
public static IObservable<T> SingleValue<T>(T value) |
|||
{ |
|||
return new SingleValueImpl<T>(value); |
|||
} |
|||
|
|||
private class SingleValueImpl<T> : IObservable<T> |
|||
{ |
|||
private T _value; |
|||
|
|||
public SingleValueImpl(T value) |
|||
{ |
|||
_value = value; |
|||
} |
|||
public IDisposable Subscribe(IObserver<T> observer) |
|||
{ |
|||
observer.OnNext(_value); |
|||
return Disposable.Empty; |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,374 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Collections.ObjectModel; |
|||
using System.Linq; |
|||
|
|||
namespace Avalonia.Reactive.Operators; |
|||
|
|||
// Code based on https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs
|
|||
|
|||
internal sealed class CombineLatest<TFirst, TSecond, TResult> : IObservable<TResult> |
|||
{ |
|||
private readonly IObservable<TFirst> _first; |
|||
private readonly IObservable<TSecond> _second; |
|||
private readonly Func<TFirst, TSecond, TResult> _resultSelector; |
|||
|
|||
public CombineLatest(IObservable<TFirst> first, IObservable<TSecond> second, |
|||
Func<TFirst, TSecond, TResult> resultSelector) |
|||
{ |
|||
_first = first; |
|||
_second = second; |
|||
_resultSelector = resultSelector; |
|||
} |
|||
|
|||
public IDisposable Subscribe(IObserver<TResult> observer) |
|||
{ |
|||
var sink = new _(_resultSelector, observer); |
|||
sink.Run(_first, _second); |
|||
return sink; |
|||
} |
|||
|
|||
internal sealed class _ : IdentitySink<TResult> |
|||
{ |
|||
private readonly Func<TFirst, TSecond, TResult> _resultSelector; |
|||
private readonly object _gate = new object(); |
|||
|
|||
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer) |
|||
: base(observer) |
|||
{ |
|||
_resultSelector = resultSelector; |
|||
_firstDisposable = null!; |
|||
_secondDisposable = null!; |
|||
} |
|||
|
|||
private IDisposable _firstDisposable; |
|||
private IDisposable _secondDisposable; |
|||
|
|||
public void Run(IObservable<TFirst> first, IObservable<TSecond> second) |
|||
{ |
|||
var fstO = new FirstObserver(this); |
|||
var sndO = new SecondObserver(this); |
|||
|
|||
fstO.SetOther(sndO); |
|||
sndO.SetOther(fstO); |
|||
|
|||
_firstDisposable = first.Subscribe(fstO); |
|||
_secondDisposable = second.Subscribe(sndO); |
|||
} |
|||
|
|||
protected override void Dispose(bool disposing) |
|||
{ |
|||
if (disposing) |
|||
{ |
|||
_firstDisposable.Dispose(); |
|||
_secondDisposable.Dispose(); |
|||
} |
|||
|
|||
base.Dispose(disposing); |
|||
} |
|||
|
|||
private sealed class FirstObserver : IObserver<TFirst> |
|||
{ |
|||
private readonly _ _parent; |
|||
private SecondObserver _other; |
|||
|
|||
public FirstObserver(_ parent) |
|||
{ |
|||
_parent = parent; |
|||
_other = default!; // NB: Will be set by SetOther.
|
|||
} |
|||
|
|||
public void SetOther(SecondObserver other) { _other = other; } |
|||
|
|||
public bool HasValue { get; private set; } |
|||
public TFirst? Value { get; private set; } |
|||
public bool Done { get; private set; } |
|||
|
|||
public void OnNext(TFirst value) |
|||
{ |
|||
lock (_parent._gate) |
|||
{ |
|||
HasValue = true; |
|||
Value = value; |
|||
|
|||
if (_other.HasValue) |
|||
{ |
|||
TResult res; |
|||
try |
|||
{ |
|||
res = _parent._resultSelector(value, _other.Value!); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_parent.ForwardOnError(ex); |
|||
return; |
|||
} |
|||
|
|||
_parent.ForwardOnNext(res); |
|||
} |
|||
else if (_other.Done) |
|||
{ |
|||
_parent.ForwardOnCompleted(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void OnError(Exception error) |
|||
{ |
|||
lock (_parent._gate) |
|||
{ |
|||
_parent.ForwardOnError(error); |
|||
} |
|||
} |
|||
|
|||
public void OnCompleted() |
|||
{ |
|||
lock (_parent._gate) |
|||
{ |
|||
Done = true; |
|||
|
|||
if (_other.Done) |
|||
{ |
|||
_parent.ForwardOnCompleted(); |
|||
} |
|||
else |
|||
{ |
|||
_parent._firstDisposable.Dispose(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
private sealed class SecondObserver : IObserver<TSecond> |
|||
{ |
|||
private readonly _ _parent; |
|||
private FirstObserver _other; |
|||
|
|||
public SecondObserver(_ parent) |
|||
{ |
|||
_parent = parent; |
|||
_other = default!; // NB: Will be set by SetOther.
|
|||
} |
|||
|
|||
public void SetOther(FirstObserver other) { _other = other; } |
|||
|
|||
public bool HasValue { get; private set; } |
|||
public TSecond? Value { get; private set; } |
|||
public bool Done { get; private set; } |
|||
|
|||
public void OnNext(TSecond value) |
|||
{ |
|||
lock (_parent._gate) |
|||
{ |
|||
HasValue = true; |
|||
Value = value; |
|||
|
|||
if (_other.HasValue) |
|||
{ |
|||
TResult res; |
|||
try |
|||
{ |
|||
res = _parent._resultSelector(_other.Value!, value); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
_parent.ForwardOnError(ex); |
|||
return; |
|||
} |
|||
|
|||
_parent.ForwardOnNext(res); |
|||
} |
|||
else if (_other.Done) |
|||
{ |
|||
_parent.ForwardOnCompleted(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void OnError(Exception error) |
|||
{ |
|||
lock (_parent._gate) |
|||
{ |
|||
_parent.ForwardOnError(error); |
|||
} |
|||
} |
|||
|
|||
public void OnCompleted() |
|||
{ |
|||
lock (_parent._gate) |
|||
{ |
|||
Done = true; |
|||
|
|||
if (_other.Done) |
|||
{ |
|||
_parent.ForwardOnCompleted(); |
|||
} |
|||
else |
|||
{ |
|||
_parent._secondDisposable.Dispose(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
internal sealed class CombineLatest<TSource, TResult> : IObservable<TResult> |
|||
{ |
|||
private readonly IEnumerable<IObservable<TSource>> _sources; |
|||
private readonly Func<TSource[], TResult> _resultSelector; |
|||
|
|||
public CombineLatest(IEnumerable<IObservable<TSource>> sources, Func<TSource[], TResult> resultSelector) |
|||
{ |
|||
_sources = sources; |
|||
_resultSelector = resultSelector; |
|||
} |
|||
|
|||
public IDisposable Subscribe(IObserver<TResult> observer) |
|||
{ |
|||
var sink = new _(_resultSelector, observer); |
|||
sink.Run(_sources); |
|||
return sink; |
|||
} |
|||
|
|||
internal sealed class _ : IdentitySink<TResult> |
|||
{ |
|||
private readonly object _gate = new object(); |
|||
private readonly Func<TSource[], TResult> _resultSelector; |
|||
|
|||
public _(Func<TSource[], TResult> resultSelector, IObserver<TResult> observer) |
|||
: base(observer) |
|||
{ |
|||
_resultSelector = resultSelector; |
|||
|
|||
// NB: These will be set in Run before getting used.
|
|||
_hasValue = null!; |
|||
_values = null!; |
|||
_isDone = null!; |
|||
_subscriptions = null!; |
|||
} |
|||
|
|||
private bool[] _hasValue; |
|||
private bool _hasValueAll; |
|||
private TSource[] _values; |
|||
private bool[] _isDone; |
|||
private IDisposable[] _subscriptions; |
|||
|
|||
public void Run(IEnumerable<IObservable<TSource>> sources) |
|||
{ |
|||
var srcs = sources.ToArray(); |
|||
|
|||
var N = srcs.Length; |
|||
|
|||
_hasValue = new bool[N]; |
|||
_hasValueAll = false; |
|||
|
|||
_values = new TSource[N]; |
|||
|
|||
_isDone = new bool[N]; |
|||
|
|||
_subscriptions = new IDisposable[N]; |
|||
|
|||
for (var i = 0; i < N; i++) |
|||
{ |
|||
var j = i; |
|||
|
|||
var o = new SourceObserver(this, j); |
|||
_subscriptions[j] = o; |
|||
|
|||
o.Disposable = srcs[j].Subscribe(o); |
|||
} |
|||
|
|||
SetUpstream(new CompositeDisposable(_subscriptions)); |
|||
} |
|||
|
|||
private void OnNext(int index, TSource value) |
|||
{ |
|||
lock (_gate) |
|||
{ |
|||
_values[index] = value; |
|||
|
|||
_hasValue[index] = true; |
|||
|
|||
if (_hasValueAll || (_hasValueAll = _hasValue.All(v => v))) |
|||
{ |
|||
TResult res; |
|||
try |
|||
{ |
|||
res = _resultSelector(_values); |
|||
} |
|||
catch (Exception ex) |
|||
{ |
|||
ForwardOnError(ex); |
|||
return; |
|||
} |
|||
|
|||
ForwardOnNext(res); |
|||
} |
|||
else if (_isDone.Where((_, i) => i != index).All(d => d)) |
|||
{ |
|||
ForwardOnCompleted(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private new void OnError(Exception error) |
|||
{ |
|||
lock (_gate) |
|||
{ |
|||
ForwardOnError(error); |
|||
} |
|||
} |
|||
|
|||
private void OnCompleted(int index) |
|||
{ |
|||
lock (_gate) |
|||
{ |
|||
_isDone[index] = true; |
|||
|
|||
if (_isDone.All(d => d)) |
|||
{ |
|||
ForwardOnCompleted(); |
|||
} |
|||
else |
|||
{ |
|||
_subscriptions[index].Dispose(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private sealed class SourceObserver : IObserver<TSource>, IDisposable |
|||
{ |
|||
private readonly _ _parent; |
|||
private readonly int _index; |
|||
|
|||
public SourceObserver(_ parent, int index) |
|||
{ |
|||
_parent = parent; |
|||
_index = index; |
|||
} |
|||
|
|||
public IDisposable? Disposable { get; set; } |
|||
|
|||
public void OnNext(TSource value) |
|||
{ |
|||
_parent.OnNext(_index, value); |
|||
} |
|||
|
|||
public void OnError(Exception error) |
|||
{ |
|||
_parent.OnError(error); |
|||
} |
|||
|
|||
public void OnCompleted() |
|||
{ |
|||
_parent.OnCompleted(_index); |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
Disposable?.Dispose(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,109 @@ |
|||
using System; |
|||
using System.Threading; |
|||
|
|||
namespace Avalonia.Reactive.Operators; |
|||
|
|||
internal abstract class Sink<TTarget> : IDisposable |
|||
{ |
|||
private IDisposable? _upstream; |
|||
private volatile IObserver<TTarget> _observer; |
|||
|
|||
protected Sink(IObserver<TTarget> observer) |
|||
{ |
|||
_observer = observer; |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
Dispose(true); |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Override this method to dispose additional resources.
|
|||
/// The method is guaranteed to be called at most once.
|
|||
/// </summary>
|
|||
/// <param name="disposing">If true, the method was called from <see cref="Dispose()"/>.</param>
|
|||
protected virtual void Dispose(bool disposing) |
|||
{ |
|||
//Calling base.Dispose(true) is not a proper disposal, so we can omit the assignment here.
|
|||
//Sink is internal so this can pretty much be enforced.
|
|||
//_observer = NopObserver<TTarget>.Instance;
|
|||
|
|||
_upstream?.Dispose(); |
|||
} |
|||
|
|||
public void ForwardOnNext(TTarget value) |
|||
{ |
|||
_observer.OnNext(value); |
|||
} |
|||
|
|||
public void ForwardOnCompleted() |
|||
{ |
|||
_observer.OnCompleted(); |
|||
Dispose(); |
|||
} |
|||
|
|||
public void ForwardOnError(Exception error) |
|||
{ |
|||
_observer.OnError(error); |
|||
Dispose(); |
|||
} |
|||
|
|||
protected void SetUpstream(IDisposable upstream) |
|||
{ |
|||
_upstream = upstream; |
|||
} |
|||
|
|||
protected void DisposeUpstream() |
|||
{ |
|||
_upstream?.Dispose(); |
|||
} |
|||
} |
|||
|
|||
internal abstract class Sink<TSource, TTarget> : Sink<TTarget>, IObserver<TSource> |
|||
{ |
|||
protected Sink(IObserver<TTarget> observer) : base(observer) |
|||
{ |
|||
} |
|||
|
|||
public virtual void Run(IObservable<TSource> source) |
|||
{ |
|||
SetUpstream(source.Subscribe(this)); |
|||
} |
|||
|
|||
public abstract void OnNext(TSource value); |
|||
|
|||
public virtual void OnError(Exception error) => ForwardOnError(error); |
|||
|
|||
public virtual void OnCompleted() => ForwardOnCompleted(); |
|||
|
|||
public IObserver<TTarget> GetForwarder() => new _(this); |
|||
|
|||
private sealed class _ : IObserver<TTarget> |
|||
{ |
|||
private readonly Sink<TSource, TTarget> _forward; |
|||
|
|||
public _(Sink<TSource, TTarget> forward) |
|||
{ |
|||
_forward = forward; |
|||
} |
|||
|
|||
public void OnNext(TTarget value) => _forward.ForwardOnNext(value); |
|||
|
|||
public void OnError(Exception error) => _forward.ForwardOnError(error); |
|||
|
|||
public void OnCompleted() => _forward.ForwardOnCompleted(); |
|||
} |
|||
} |
|||
|
|||
internal abstract class IdentitySink<T> : Sink<T, T> |
|||
{ |
|||
protected IdentitySink(IObserver<T> observer) : base(observer) |
|||
{ |
|||
} |
|||
|
|||
public override void OnNext(T value) |
|||
{ |
|||
ForwardOnNext(value); |
|||
} |
|||
} |
|||
@ -0,0 +1,144 @@ |
|||
using System; |
|||
|
|||
namespace Avalonia.Reactive.Operators; |
|||
|
|||
// Code based on https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Linq/Observable/Switch.cs
|
|||
|
|||
internal sealed class Switch<TSource> : IObservable<TSource> |
|||
{ |
|||
private readonly IObservable<IObservable<TSource>> _sources; |
|||
|
|||
public Switch(IObservable<IObservable<TSource>> sources) |
|||
{ |
|||
_sources = sources; |
|||
} |
|||
|
|||
public IDisposable Subscribe(IObserver<TSource> observer) |
|||
{ |
|||
return _sources.Subscribe(new _(observer)); |
|||
} |
|||
|
|||
internal sealed class _ : Sink<IObservable<TSource>, TSource> |
|||
{ |
|||
private readonly object _gate = new object(); |
|||
|
|||
public _(IObserver<TSource> observer) |
|||
: base(observer) |
|||
{ |
|||
} |
|||
|
|||
private IDisposable? _innerSerialDisposable; |
|||
private bool _isStopped; |
|||
private ulong _latest; |
|||
private bool _hasLatest; |
|||
|
|||
protected override void Dispose(bool disposing) |
|||
{ |
|||
if (disposing) |
|||
{ |
|||
_innerSerialDisposable?.Dispose(); |
|||
} |
|||
|
|||
base.Dispose(disposing); |
|||
} |
|||
|
|||
public override void OnNext(IObservable<TSource> value) |
|||
{ |
|||
ulong id; |
|||
|
|||
lock (_gate) |
|||
{ |
|||
id = unchecked(++_latest); |
|||
_hasLatest = true; |
|||
} |
|||
|
|||
var innerObserver = new InnerObserver(this, id); |
|||
|
|||
_innerSerialDisposable = innerObserver; |
|||
innerObserver.Disposable = value.Subscribe(innerObserver); |
|||
} |
|||
|
|||
public override void OnError(Exception error) |
|||
{ |
|||
lock (_gate) |
|||
{ |
|||
ForwardOnError(error); |
|||
} |
|||
} |
|||
|
|||
public override void OnCompleted() |
|||
{ |
|||
lock (_gate) |
|||
{ |
|||
DisposeUpstream(); |
|||
|
|||
_isStopped = true; |
|||
if (!_hasLatest) |
|||
{ |
|||
ForwardOnCompleted(); |
|||
} |
|||
} |
|||
} |
|||
|
|||
private sealed class InnerObserver : IObserver<TSource>, IDisposable |
|||
{ |
|||
private readonly _ _parent; |
|||
private readonly ulong _id; |
|||
|
|||
public InnerObserver(_ parent, ulong id) |
|||
{ |
|||
_parent = parent; |
|||
_id = id; |
|||
} |
|||
|
|||
public IDisposable? Disposable { get; set; } |
|||
|
|||
public void OnNext(TSource value) |
|||
{ |
|||
lock (_parent._gate) |
|||
{ |
|||
if (_parent._latest == _id) |
|||
{ |
|||
_parent.ForwardOnNext(value); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void OnError(Exception error) |
|||
{ |
|||
lock (_parent._gate) |
|||
{ |
|||
Dispose(); |
|||
|
|||
if (_parent._latest == _id) |
|||
{ |
|||
_parent.ForwardOnError(error); |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void OnCompleted() |
|||
{ |
|||
lock (_parent._gate) |
|||
{ |
|||
Dispose(); |
|||
|
|||
if (_parent._latest == _id) |
|||
{ |
|||
_parent._hasLatest = false; |
|||
|
|||
if (_parent._isStopped) |
|||
{ |
|||
_parent.ForwardOnCompleted(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
public void Dispose() |
|||
{ |
|||
Disposable?.Dispose(); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
@ -1,18 +0,0 @@ |
|||
using System; |
|||
|
|||
namespace Avalonia.Utilities |
|||
{ |
|||
/// <summary>
|
|||
/// Defines a listener to a event subscribed vis the <see cref="WeakObservable"/>.
|
|||
/// </summary>
|
|||
/// <typeparam name="T">The type of the event arguments.</typeparam>
|
|||
public interface IWeakSubscriber<T> where T : EventArgs |
|||
{ |
|||
/// <summary>
|
|||
/// Invoked when the subscribed event is raised.
|
|||
/// </summary>
|
|||
/// <param name="sender">The event sender.</param>
|
|||
/// <param name="e">The event arguments.</param>
|
|||
void OnEvent(object? sender, T e); |
|||
} |
|||
} |
|||
@ -1,60 +0,0 @@ |
|||
using System; |
|||
using System.Reactive; |
|||
using System.Reactive.Linq; |
|||
|
|||
namespace Avalonia.Utilities |
|||
{ |
|||
/// <summary>
|
|||
/// Provides extension methods for working with weak event handlers.
|
|||
/// </summary>
|
|||
public static class WeakObservable |
|||
{ |
|||
|
|||
private class Handler<TEventArgs> |
|||
: IWeakSubscriber<TEventArgs>, |
|||
IWeakEventSubscriber<TEventArgs> where TEventArgs : EventArgs |
|||
{ |
|||
private IObserver<EventPattern<object, TEventArgs>> _observer; |
|||
|
|||
public Handler(IObserver<EventPattern<object, TEventArgs>> observer) |
|||
{ |
|||
_observer = observer; |
|||
} |
|||
|
|||
public void OnEvent(object? sender, TEventArgs e) |
|||
{ |
|||
_observer.OnNext(new EventPattern<object, TEventArgs>(sender, e)); |
|||
} |
|||
|
|||
public void OnEvent(object? sender, WeakEvent ev, TEventArgs e) |
|||
{ |
|||
_observer.OnNext(new EventPattern<object, TEventArgs>(sender, e)); |
|||
} |
|||
} |
|||
|
|||
/// <summary>
|
|||
/// Converts a WeakEvent conforming to the standard .NET event pattern into an observable
|
|||
/// sequence, subscribing weakly.
|
|||
/// </summary>
|
|||
/// <typeparam name="TTarget">The type of target.</typeparam>
|
|||
/// <typeparam name="TEventArgs">The type of the event args.</typeparam>
|
|||
/// <param name="target">Object instance that exposes the event to convert.</param>
|
|||
/// <param name="ev">The weak event to convert.</param>
|
|||
/// <returns></returns>
|
|||
public static IObservable<EventPattern<object, TEventArgs>> FromEventPattern<TTarget, TEventArgs>( |
|||
TTarget target, WeakEvent<TTarget, TEventArgs> ev) |
|||
where TEventArgs : EventArgs where TTarget : class |
|||
{ |
|||
_ = target ?? throw new ArgumentNullException(nameof(target)); |
|||
_ = ev ?? throw new ArgumentNullException(nameof(ev)); |
|||
|
|||
return Observable.Create<EventPattern<object, TEventArgs>>(observer => |
|||
{ |
|||
var handler = new Handler<TEventArgs>(observer); |
|||
ev.Subscribe(target, handler); |
|||
return () => ev.Unsubscribe(target, handler); |
|||
}).Publish().RefCount(); |
|||
} |
|||
|
|||
} |
|||
} |
|||
@ -1,38 +0,0 @@ |
|||
using System; |
|||
using System.Reactive.Disposables; |
|||
|
|||
namespace Avalonia.Controls.Mixins |
|||
{ |
|||
/// <summary>
|
|||
/// Extension methods associated with the IDisposable interface.
|
|||
/// </summary>
|
|||
public static class DisposableMixin |
|||
{ |
|||
/// <summary>
|
|||
/// Ensures the provided disposable is disposed with the specified <see cref="CompositeDisposable"/>.
|
|||
/// </summary>
|
|||
/// <typeparam name="T">
|
|||
/// The type of the disposable.
|
|||
/// </typeparam>
|
|||
/// <param name="item">
|
|||
/// The disposable we are going to want to be disposed by the CompositeDisposable.
|
|||
/// </param>
|
|||
/// <param name="compositeDisposable">
|
|||
/// The <see cref="CompositeDisposable"/> to which <paramref name="item"/> will be added.
|
|||
/// </param>
|
|||
/// <returns>
|
|||
/// The disposable.
|
|||
/// </returns>
|
|||
public static T DisposeWith<T>(this T item, CompositeDisposable compositeDisposable) |
|||
where T : IDisposable |
|||
{ |
|||
if (compositeDisposable is null) |
|||
{ |
|||
throw new ArgumentNullException(nameof(compositeDisposable)); |
|||
} |
|||
|
|||
compositeDisposable.Add(item); |
|||
return item; |
|||
} |
|||
} |
|||
} |
|||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue