committed by
GitHub
224 changed files with 2384 additions and 874 deletions
@ -1,6 +1,7 @@ |
|||||
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> |
<Project DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003"> |
||||
<ItemGroup Condition="'$(TargetFramework)' != 'net6'"> |
<ItemGroup Condition="'$(TargetFramework)' != 'net6'"> |
||||
<PackageReference Include="System.ValueTuple" Version="4.5.0" /> |
<PackageReference Include="System.ValueTuple" Version="4.5.0" /> |
||||
|
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" /> |
||||
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="4.6.0" /> |
<PackageReference Include="System.Runtime.CompilerServices.Unsafe" Version="4.6.0" /> |
||||
</ItemGroup> |
</ItemGroup> |
||||
</Project> |
</Project> |
||||
|
|||||
@ -0,0 +1,62 @@ |
|||||
|
using System; |
||||
|
using System.Threading.Tasks; |
||||
|
|
||||
|
namespace Avalonia.Reactive; |
||||
|
|
||||
|
internal class AnonymousObserver<T> : IObserver<T> |
||||
|
{ |
||||
|
private static readonly Action<Exception> ThrowsOnError = ex => throw ex; |
||||
|
private static readonly Action NoOpCompleted = () => { }; |
||||
|
private readonly Action<T> _onNext; |
||||
|
private readonly Action<Exception> _onError; |
||||
|
private readonly Action _onCompleted; |
||||
|
|
||||
|
public AnonymousObserver(TaskCompletionSource<T> tcs) |
||||
|
{ |
||||
|
if (tcs is null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(tcs)); |
||||
|
} |
||||
|
|
||||
|
_onNext = tcs.SetResult; |
||||
|
_onError = tcs.SetException; |
||||
|
_onCompleted = NoOpCompleted; |
||||
|
} |
||||
|
|
||||
|
public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted) |
||||
|
{ |
||||
|
_onNext = onNext ?? throw new ArgumentNullException(nameof(onNext)); |
||||
|
_onError = onError ?? throw new ArgumentNullException(nameof(onError)); |
||||
|
_onCompleted = onCompleted ?? throw new ArgumentNullException(nameof(onCompleted)); |
||||
|
} |
||||
|
|
||||
|
public AnonymousObserver(Action<T> onNext) |
||||
|
: this(onNext, ThrowsOnError, NoOpCompleted) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
public AnonymousObserver(Action<T> onNext, Action<Exception> onError) |
||||
|
: this(onNext, onError, NoOpCompleted) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
public AnonymousObserver(Action<T> onNext, Action onCompleted) |
||||
|
: this(onNext, ThrowsOnError, onCompleted) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
public void OnCompleted() |
||||
|
{ |
||||
|
_onCompleted.Invoke(); |
||||
|
} |
||||
|
|
||||
|
public void OnError(Exception error) |
||||
|
{ |
||||
|
_onError.Invoke(error); |
||||
|
} |
||||
|
|
||||
|
public void OnNext(T value) |
||||
|
{ |
||||
|
_onNext.Invoke(value); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,23 @@ |
|||||
|
using System; |
||||
|
|
||||
|
namespace Avalonia.Reactive; |
||||
|
|
||||
|
internal class CombinedSubject<T> : IAvaloniaSubject<T> |
||||
|
{ |
||||
|
private readonly IObserver<T> _observer; |
||||
|
private readonly IObservable<T> _observable; |
||||
|
|
||||
|
public CombinedSubject(IObserver<T> observer, IObservable<T> observable) |
||||
|
{ |
||||
|
_observer = observer; |
||||
|
_observable = observable; |
||||
|
} |
||||
|
|
||||
|
public void OnCompleted() => _observer.OnCompleted(); |
||||
|
|
||||
|
public void OnError(Exception error) => _observer.OnError(error); |
||||
|
|
||||
|
public void OnNext(T value) => _observer.OnNext(value); |
||||
|
|
||||
|
public IDisposable Subscribe(IObserver<T> observer) => _observable.Subscribe(observer); |
||||
|
} |
||||
@ -0,0 +1,427 @@ |
|||||
|
using System; |
||||
|
using System.Collections; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Threading; |
||||
|
|
||||
|
namespace Avalonia.Reactive; |
||||
|
|
||||
|
internal sealed class CompositeDisposable : ICollection<IDisposable>, IDisposable |
||||
|
{ |
||||
|
private readonly object _gate = new object(); |
||||
|
private bool _disposed; |
||||
|
private List<IDisposable?> _disposables; |
||||
|
private int _count; |
||||
|
private const int ShrinkThreshold = 64; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Initializes a new instance of the <see cref="CompositeDisposable"/> class with the specified number of disposables.
|
||||
|
/// </summary>
|
||||
|
/// <param name="capacity">The number of disposables that the new CompositeDisposable can initially store.</param>
|
||||
|
/// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than zero.</exception>
|
||||
|
public CompositeDisposable(int capacity) |
||||
|
{ |
||||
|
if (capacity < 0) |
||||
|
{ |
||||
|
throw new ArgumentOutOfRangeException(nameof(capacity)); |
||||
|
} |
||||
|
|
||||
|
_disposables = new List<IDisposable?>(capacity); |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Initializes a new instance of the <see cref="CompositeDisposable"/> class from a group of disposables.
|
||||
|
/// </summary>
|
||||
|
/// <param name="disposables">Disposables that will be disposed together.</param>
|
||||
|
/// <exception cref="ArgumentNullException"><paramref name="disposables"/> is <c>null</c>.</exception>
|
||||
|
/// <exception cref="ArgumentException">Any of the disposables in the <paramref name="disposables"/> collection is <c>null</c>.</exception>
|
||||
|
public CompositeDisposable(params IDisposable[] disposables) |
||||
|
{ |
||||
|
if (disposables == null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(disposables)); |
||||
|
} |
||||
|
|
||||
|
_disposables = ToList(disposables); |
||||
|
|
||||
|
// _count can be read by other threads and thus should be properly visible
|
||||
|
// also releases the _disposables contents so it becomes thread-safe
|
||||
|
Volatile.Write(ref _count, _disposables.Count); |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Initializes a new instance of the <see cref="CompositeDisposable"/> class from a group of disposables.
|
||||
|
/// </summary>
|
||||
|
/// <param name="disposables">Disposables that will be disposed together.</param>
|
||||
|
/// <exception cref="ArgumentNullException"><paramref name="disposables"/> is <c>null</c>.</exception>
|
||||
|
/// <exception cref="ArgumentException">Any of the disposables in the <paramref name="disposables"/> collection is <c>null</c>.</exception>
|
||||
|
public CompositeDisposable(IList<IDisposable> disposables) |
||||
|
{ |
||||
|
if (disposables == null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(disposables)); |
||||
|
} |
||||
|
|
||||
|
_disposables = ToList(disposables); |
||||
|
|
||||
|
// _count can be read by other threads and thus should be properly visible
|
||||
|
// also releases the _disposables contents so it becomes thread-safe
|
||||
|
Volatile.Write(ref _count, _disposables.Count); |
||||
|
} |
||||
|
|
||||
|
private static List<IDisposable?> ToList(IEnumerable<IDisposable> disposables) |
||||
|
{ |
||||
|
var capacity = disposables switch |
||||
|
{ |
||||
|
IDisposable[] a => a.Length, |
||||
|
ICollection<IDisposable> c => c.Count, |
||||
|
_ => 12 |
||||
|
}; |
||||
|
|
||||
|
var list = new List<IDisposable?>(capacity); |
||||
|
|
||||
|
// do the copy and null-check in one step to avoid a
|
||||
|
// second loop for just checking for null items
|
||||
|
foreach (var d in disposables) |
||||
|
{ |
||||
|
if (d == null) |
||||
|
{ |
||||
|
throw new ArgumentException("Disposables can't contain null", nameof(disposables)); |
||||
|
} |
||||
|
|
||||
|
list.Add(d); |
||||
|
} |
||||
|
|
||||
|
return list; |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Gets the number of disposables contained in the <see cref="CompositeDisposable"/>.
|
||||
|
/// </summary>
|
||||
|
public int Count => Volatile.Read(ref _count); |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Adds a disposable to the <see cref="CompositeDisposable"/> or disposes the disposable if the <see cref="CompositeDisposable"/> is disposed.
|
||||
|
/// </summary>
|
||||
|
/// <param name="item">Disposable to add.</param>
|
||||
|
/// <exception cref="ArgumentNullException"><paramref name="item"/> is <c>null</c>.</exception>
|
||||
|
public void Add(IDisposable item) |
||||
|
{ |
||||
|
if (item == null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(item)); |
||||
|
} |
||||
|
|
||||
|
lock (_gate) |
||||
|
{ |
||||
|
if (!_disposed) |
||||
|
{ |
||||
|
_disposables.Add(item); |
||||
|
|
||||
|
// If read atomically outside the lock, it should be written atomically inside
|
||||
|
// the plain read on _count is fine here because manipulation always happens
|
||||
|
// from inside a lock.
|
||||
|
Volatile.Write(ref _count, _count + 1); |
||||
|
return; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
item.Dispose(); |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Removes and disposes the first occurrence of a disposable from the <see cref="CompositeDisposable"/>.
|
||||
|
/// </summary>
|
||||
|
/// <param name="item">Disposable to remove.</param>
|
||||
|
/// <returns>true if found; false otherwise.</returns>
|
||||
|
/// <exception cref="ArgumentNullException"><paramref name="item"/> is <c>null</c>.</exception>
|
||||
|
public bool Remove(IDisposable item) |
||||
|
{ |
||||
|
if (item == null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(item)); |
||||
|
} |
||||
|
|
||||
|
lock (_gate) |
||||
|
{ |
||||
|
// this composite was already disposed and if the item was in there
|
||||
|
// it has been already removed/disposed
|
||||
|
if (_disposed) |
||||
|
{ |
||||
|
return false; |
||||
|
} |
||||
|
|
||||
|
//
|
||||
|
// List<T> doesn't shrink the size of the underlying array but does collapse the array
|
||||
|
// by copying the tail one position to the left of the removal index. We don't need
|
||||
|
// index-based lookup but only ordering for sequential disposal. So, instead of spending
|
||||
|
// cycles on the Array.Copy imposed by Remove, we use a null sentinel value. We also
|
||||
|
// do manual Swiss cheese detection to shrink the list if there's a lot of holes in it.
|
||||
|
//
|
||||
|
|
||||
|
// read fields as infrequently as possible
|
||||
|
var current = _disposables; |
||||
|
|
||||
|
var i = current.IndexOf(item); |
||||
|
if (i < 0) |
||||
|
{ |
||||
|
// not found, just return
|
||||
|
return false; |
||||
|
} |
||||
|
|
||||
|
current[i] = null; |
||||
|
|
||||
|
if (current.Capacity > ShrinkThreshold && _count < current.Capacity / 2) |
||||
|
{ |
||||
|
var fresh = new List<IDisposable?>(current.Capacity / 2); |
||||
|
|
||||
|
foreach (var d in current) |
||||
|
{ |
||||
|
if (d != null) |
||||
|
{ |
||||
|
fresh.Add(d); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
_disposables = fresh; |
||||
|
} |
||||
|
|
||||
|
// make sure the Count property sees an atomic update
|
||||
|
Volatile.Write(ref _count, _count - 1); |
||||
|
} |
||||
|
|
||||
|
// if we get here, the item was found and removed from the list
|
||||
|
// just dispose it and report success
|
||||
|
|
||||
|
item.Dispose(); |
||||
|
|
||||
|
return true; |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Disposes all disposables in the group and removes them from the group.
|
||||
|
/// </summary>
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
List<IDisposable?>? currentDisposables = null; |
||||
|
|
||||
|
lock (_gate) |
||||
|
{ |
||||
|
if (!_disposed) |
||||
|
{ |
||||
|
currentDisposables = _disposables; |
||||
|
|
||||
|
// nulling out the reference is faster no risk to
|
||||
|
// future Add/Remove because _disposed will be true
|
||||
|
// and thus _disposables won't be touched again.
|
||||
|
_disposables = null!; // NB: All accesses are guarded by _disposed checks.
|
||||
|
|
||||
|
Volatile.Write(ref _count, 0); |
||||
|
Volatile.Write(ref _disposed, true); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
if (currentDisposables != null) |
||||
|
{ |
||||
|
foreach (var d in currentDisposables) |
||||
|
{ |
||||
|
d?.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Removes and disposes all disposables from the <see cref="CompositeDisposable"/>, but does not dispose the <see cref="CompositeDisposable"/>.
|
||||
|
/// </summary>
|
||||
|
public void Clear() |
||||
|
{ |
||||
|
IDisposable?[] previousDisposables; |
||||
|
|
||||
|
lock (_gate) |
||||
|
{ |
||||
|
// disposed composites are always clear
|
||||
|
if (_disposed) |
||||
|
{ |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
var current = _disposables; |
||||
|
|
||||
|
previousDisposables = current.ToArray(); |
||||
|
current.Clear(); |
||||
|
|
||||
|
Volatile.Write(ref _count, 0); |
||||
|
} |
||||
|
|
||||
|
foreach (var d in previousDisposables) |
||||
|
{ |
||||
|
d?.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Determines whether the <see cref="CompositeDisposable"/> contains a specific disposable.
|
||||
|
/// </summary>
|
||||
|
/// <param name="item">Disposable to search for.</param>
|
||||
|
/// <returns>true if the disposable was found; otherwise, false.</returns>
|
||||
|
/// <exception cref="ArgumentNullException"><paramref name="item"/> is <c>null</c>.</exception>
|
||||
|
public bool Contains(IDisposable item) |
||||
|
{ |
||||
|
if (item == null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(item)); |
||||
|
} |
||||
|
|
||||
|
lock (_gate) |
||||
|
{ |
||||
|
if (_disposed) |
||||
|
{ |
||||
|
return false; |
||||
|
} |
||||
|
|
||||
|
return _disposables.Contains(item); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Copies the disposables contained in the <see cref="CompositeDisposable"/> to an array, starting at a particular array index.
|
||||
|
/// </summary>
|
||||
|
/// <param name="array">Array to copy the contained disposables to.</param>
|
||||
|
/// <param name="arrayIndex">Target index at which to copy the first disposable of the group.</param>
|
||||
|
/// <exception cref="ArgumentNullException"><paramref name="array"/> is <c>null</c>.</exception>
|
||||
|
/// <exception cref="ArgumentOutOfRangeException"><paramref name="arrayIndex"/> is less than zero. -or - <paramref name="arrayIndex"/> is larger than or equal to the array length.</exception>
|
||||
|
public void CopyTo(IDisposable[] array, int arrayIndex) |
||||
|
{ |
||||
|
if (array == null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(array)); |
||||
|
} |
||||
|
|
||||
|
if (arrayIndex < 0 || arrayIndex >= array.Length) |
||||
|
{ |
||||
|
throw new ArgumentOutOfRangeException(nameof(arrayIndex)); |
||||
|
} |
||||
|
|
||||
|
lock (_gate) |
||||
|
{ |
||||
|
// disposed composites are always empty
|
||||
|
if (_disposed) |
||||
|
{ |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
if (arrayIndex + _count > array.Length) |
||||
|
{ |
||||
|
// there is not enough space beyond arrayIndex
|
||||
|
// to accommodate all _count disposables in this composite
|
||||
|
throw new ArgumentOutOfRangeException(nameof(arrayIndex)); |
||||
|
} |
||||
|
|
||||
|
var i = arrayIndex; |
||||
|
|
||||
|
foreach (var d in _disposables) |
||||
|
{ |
||||
|
if (d != null) |
||||
|
{ |
||||
|
array[i++] = d; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Always returns false.
|
||||
|
/// </summary>
|
||||
|
public bool IsReadOnly => false; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Returns an enumerator that iterates through the <see cref="CompositeDisposable"/>.
|
||||
|
/// </summary>
|
||||
|
/// <returns>An enumerator to iterate over the disposables.</returns>
|
||||
|
public IEnumerator<IDisposable> GetEnumerator() |
||||
|
{ |
||||
|
lock (_gate) |
||||
|
{ |
||||
|
if (_disposed || _count == 0) |
||||
|
{ |
||||
|
return EmptyEnumerator; |
||||
|
} |
||||
|
|
||||
|
// the copy is unavoidable but the creation
|
||||
|
// of an outer IEnumerable is avoidable
|
||||
|
return new CompositeEnumerator(_disposables.ToArray()); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Returns an enumerator that iterates through the <see cref="CompositeDisposable"/>.
|
||||
|
/// </summary>
|
||||
|
/// <returns>An enumerator to iterate over the disposables.</returns>
|
||||
|
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Gets a value that indicates whether the object is disposed.
|
||||
|
/// </summary>
|
||||
|
public bool IsDisposed => Volatile.Read(ref _disposed); |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// An empty enumerator for the <see cref="GetEnumerator"/>
|
||||
|
/// method to avoid allocation on disposed or empty composites.
|
||||
|
/// </summary>
|
||||
|
private static readonly CompositeEnumerator EmptyEnumerator = |
||||
|
new CompositeEnumerator(Array.Empty<IDisposable?>()); |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// An enumerator for an array of disposables.
|
||||
|
/// </summary>
|
||||
|
private sealed class CompositeEnumerator : IEnumerator<IDisposable> |
||||
|
{ |
||||
|
private readonly IDisposable?[] _disposables; |
||||
|
private int _index; |
||||
|
|
||||
|
public CompositeEnumerator(IDisposable?[] disposables) |
||||
|
{ |
||||
|
_disposables = disposables; |
||||
|
_index = -1; |
||||
|
} |
||||
|
|
||||
|
public IDisposable Current => _disposables[_index]!; // NB: _index is only advanced to non-null positions.
|
||||
|
|
||||
|
object IEnumerator.Current => _disposables[_index]!; |
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
// Avoid retention of the referenced disposables
|
||||
|
// beyond the lifecycle of the enumerator.
|
||||
|
// Not sure if this happens by default to
|
||||
|
// generic array enumerators though.
|
||||
|
var disposables = _disposables; |
||||
|
Array.Clear(disposables, 0, disposables.Length); |
||||
|
} |
||||
|
|
||||
|
public bool MoveNext() |
||||
|
{ |
||||
|
var disposables = _disposables; |
||||
|
|
||||
|
for (;;) |
||||
|
{ |
||||
|
var idx = ++_index; |
||||
|
|
||||
|
if (idx >= disposables.Length) |
||||
|
{ |
||||
|
return false; |
||||
|
} |
||||
|
|
||||
|
// inlined that filter for null elements
|
||||
|
if (disposables[idx] != null) |
||||
|
{ |
||||
|
return true; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void Reset() |
||||
|
{ |
||||
|
_index = -1; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,98 @@ |
|||||
|
using System; |
||||
|
using System.Threading; |
||||
|
|
||||
|
namespace Avalonia.Reactive; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Provides a set of static methods for creating <see cref="IDisposable"/> objects.
|
||||
|
/// </summary>
|
||||
|
internal static class Disposable |
||||
|
{ |
||||
|
/// <summary>
|
||||
|
/// Represents a disposable that does nothing on disposal.
|
||||
|
/// </summary>
|
||||
|
private sealed class EmptyDisposable : IDisposable |
||||
|
{ |
||||
|
public static readonly EmptyDisposable Instance = new(); |
||||
|
|
||||
|
private EmptyDisposable() |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
// no op
|
||||
|
} |
||||
|
} |
||||
|
|
||||
|
internal sealed class AnonymousDisposable : IDisposable |
||||
|
{ |
||||
|
private volatile Action? _dispose; |
||||
|
public AnonymousDisposable(Action dispose) |
||||
|
{ |
||||
|
_dispose = dispose; |
||||
|
} |
||||
|
public bool IsDisposed => _dispose == null; |
||||
|
public void Dispose() |
||||
|
{ |
||||
|
Interlocked.Exchange(ref _dispose, null)?.Invoke(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
internal sealed class AnonymousDisposable<TState> : IDisposable |
||||
|
{ |
||||
|
private TState _state; |
||||
|
private volatile Action<TState>? _dispose; |
||||
|
|
||||
|
public AnonymousDisposable(TState state, Action<TState> dispose) |
||||
|
{ |
||||
|
_state = state; |
||||
|
_dispose = dispose; |
||||
|
} |
||||
|
|
||||
|
public bool IsDisposed => _dispose == null; |
||||
|
public void Dispose() |
||||
|
{ |
||||
|
Interlocked.Exchange(ref _dispose, null)?.Invoke(_state); |
||||
|
_state = default!; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Gets the disposable that does nothing when disposed.
|
||||
|
/// </summary>
|
||||
|
public static IDisposable Empty => EmptyDisposable.Instance; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Creates a disposable object that invokes the specified action when disposed.
|
||||
|
/// </summary>
|
||||
|
/// <param name="dispose">Action to run during the first call to <see cref="IDisposable.Dispose"/>. The action is guaranteed to be run at most once.</param>
|
||||
|
/// <returns>The disposable object that runs the given action upon disposal.</returns>
|
||||
|
/// <exception cref="ArgumentNullException"><paramref name="dispose"/> is <c>null</c>.</exception>
|
||||
|
public static IDisposable Create(Action dispose) |
||||
|
{ |
||||
|
if (dispose == null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(dispose)); |
||||
|
} |
||||
|
|
||||
|
return new AnonymousDisposable(dispose); |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Creates a disposable object that invokes the specified action when disposed.
|
||||
|
/// </summary>
|
||||
|
/// <param name="state">The state to be passed to the action.</param>
|
||||
|
/// <param name="dispose">Action to run during the first call to <see cref="IDisposable.Dispose"/>. The action is guaranteed to be run at most once.</param>
|
||||
|
/// <returns>The disposable object that runs the given action upon disposal.</returns>
|
||||
|
/// <exception cref="ArgumentNullException"><paramref name="dispose"/> is <c>null</c>.</exception>
|
||||
|
public static IDisposable Create<TState>(TState state, Action<TState> dispose) |
||||
|
{ |
||||
|
if (dispose == null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(dispose)); |
||||
|
} |
||||
|
|
||||
|
return new AnonymousDisposable<TState>(state, dispose); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,37 @@ |
|||||
|
using System; |
||||
|
using Avalonia.Reactive; |
||||
|
|
||||
|
namespace Avalonia.Reactive; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Extension methods associated with the IDisposable interface.
|
||||
|
/// </summary>
|
||||
|
internal static class DisposableMixin |
||||
|
{ |
||||
|
/// <summary>
|
||||
|
/// Ensures the provided disposable is disposed with the specified <see cref="CompositeDisposable"/>.
|
||||
|
/// </summary>
|
||||
|
/// <typeparam name="T">
|
||||
|
/// The type of the disposable.
|
||||
|
/// </typeparam>
|
||||
|
/// <param name="item">
|
||||
|
/// The disposable we are going to want to be disposed by the CompositeDisposable.
|
||||
|
/// </param>
|
||||
|
/// <param name="compositeDisposable">
|
||||
|
/// The <see cref="CompositeDisposable"/> to which <paramref name="item"/> will be added.
|
||||
|
/// </param>
|
||||
|
/// <returns>
|
||||
|
/// The disposable.
|
||||
|
/// </returns>
|
||||
|
public static T DisposeWith<T>(this T item, CompositeDisposable compositeDisposable) |
||||
|
where T : IDisposable |
||||
|
{ |
||||
|
if (compositeDisposable is null) |
||||
|
{ |
||||
|
throw new ArgumentNullException(nameof(compositeDisposable)); |
||||
|
} |
||||
|
|
||||
|
compositeDisposable.Add(item); |
||||
|
return item; |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,8 @@ |
|||||
|
using System; |
||||
|
|
||||
|
namespace Avalonia.Reactive; |
||||
|
|
||||
|
internal interface IAvaloniaSubject<T> : IObserver<T>, IObservable<T> /*, ISubject<T> */ |
||||
|
{ |
||||
|
|
||||
|
} |
||||
@ -0,0 +1,30 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Diagnostics; |
||||
|
using System.Runtime.InteropServices; |
||||
|
using System.Threading; |
||||
|
using Avalonia.Threading; |
||||
|
|
||||
|
namespace Avalonia.Reactive; |
||||
|
|
||||
|
internal class LightweightSubject<T> : LightweightObservableBase<T>, IAvaloniaSubject<T> |
||||
|
{ |
||||
|
public void OnCompleted() |
||||
|
{ |
||||
|
PublishCompleted(); |
||||
|
} |
||||
|
|
||||
|
public void OnError(Exception error) |
||||
|
{ |
||||
|
PublishError(error); |
||||
|
} |
||||
|
|
||||
|
public void OnNext(T value) |
||||
|
{ |
||||
|
PublishNext(value); |
||||
|
} |
||||
|
|
||||
|
protected override void Initialize() { } |
||||
|
|
||||
|
protected override void Deinitialize() { } |
||||
|
} |
||||
@ -0,0 +1,247 @@ |
|||||
|
using System; |
||||
|
using System.Collections; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Linq; |
||||
|
using Avalonia.Reactive.Operators; |
||||
|
using Avalonia.Threading; |
||||
|
|
||||
|
namespace Avalonia.Reactive; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Provides common observable methods as a replacement for the Rx framework.
|
||||
|
/// </summary>
|
||||
|
internal static class Observable |
||||
|
{ |
||||
|
public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe) |
||||
|
{ |
||||
|
return new CreateWithDisposableObservable<TSource>(subscribe); |
||||
|
} |
||||
|
|
||||
|
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> action) |
||||
|
{ |
||||
|
return source.Subscribe(new AnonymousObserver<T>(action)); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<TResult> Select<TSource, TResult>(this IObservable<TSource> source, Func<TSource, TResult> selector) |
||||
|
{ |
||||
|
return Create<TResult>(obs => |
||||
|
{ |
||||
|
return source.Subscribe(new AnonymousObserver<TSource>( |
||||
|
input => |
||||
|
{ |
||||
|
TResult value; |
||||
|
try |
||||
|
{ |
||||
|
value = selector(input); |
||||
|
} |
||||
|
catch (Exception ex) |
||||
|
{ |
||||
|
obs.OnError(ex); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
obs.OnNext(value); |
||||
|
}, obs.OnError, obs.OnCompleted)); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<TSource> StartWith<TSource>(this IObservable<TSource> source, TSource value) |
||||
|
{ |
||||
|
return Create<TSource>(obs => |
||||
|
{ |
||||
|
obs.OnNext(value); |
||||
|
return source.Subscribe(obs); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<TSource> Where<TSource>(this IObservable<TSource> source, Func<TSource, bool> predicate) |
||||
|
{ |
||||
|
return Create<TSource>(obs => |
||||
|
{ |
||||
|
return source.Subscribe(new AnonymousObserver<TSource>( |
||||
|
input => |
||||
|
{ |
||||
|
bool shouldRun; |
||||
|
try |
||||
|
{ |
||||
|
shouldRun = predicate(input); |
||||
|
} |
||||
|
catch (Exception ex) |
||||
|
{ |
||||
|
obs.OnError(ex); |
||||
|
return; |
||||
|
} |
||||
|
if (shouldRun) |
||||
|
{ |
||||
|
obs.OnNext(input); |
||||
|
} |
||||
|
}, obs.OnError, obs.OnCompleted)); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<TSource> Switch<TSource>( |
||||
|
this IObservable<IObservable<TSource>> sources) |
||||
|
{ |
||||
|
return new Switch<TSource>(sources); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<TResult> CombineLatest<TFirst, TSecond, TResult>( |
||||
|
this IObservable<TFirst> first, IObservable<TSecond> second, |
||||
|
Func<TFirst, TSecond, TResult> resultSelector) |
||||
|
{ |
||||
|
return new CombineLatest<TFirst, TSecond, TResult>(first, second, resultSelector); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<TInput[]> CombineLatest<TInput>( |
||||
|
this IEnumerable<IObservable<TInput>> inputs) |
||||
|
{ |
||||
|
return new CombineLatest<TInput, TInput[]>(inputs, items => items); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<T> Skip<T>(this IObservable<T> source, int skipCount) |
||||
|
{ |
||||
|
if (skipCount <= 0) |
||||
|
{ |
||||
|
throw new ArgumentException("Skip count must be bigger than zero", nameof(skipCount)); |
||||
|
} |
||||
|
|
||||
|
return Create<T>(obs => |
||||
|
{ |
||||
|
var remaining = skipCount; |
||||
|
return source.Subscribe(new AnonymousObserver<T>( |
||||
|
input => |
||||
|
{ |
||||
|
if (remaining <= 0) |
||||
|
{ |
||||
|
obs.OnNext(input); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
remaining--; |
||||
|
} |
||||
|
}, obs.OnError, obs.OnCompleted)); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<T> Take<T>(this IObservable<T> source, int takeCount) |
||||
|
{ |
||||
|
if (takeCount <= 0) |
||||
|
{ |
||||
|
return Empty<T>(); |
||||
|
} |
||||
|
|
||||
|
return Create<T>(obs => |
||||
|
{ |
||||
|
var remaining = takeCount; |
||||
|
IDisposable? sub = null; |
||||
|
sub = source.Subscribe(new AnonymousObserver<T>( |
||||
|
input => |
||||
|
{ |
||||
|
if (remaining > 0) |
||||
|
{ |
||||
|
--remaining; |
||||
|
obs.OnNext(input); |
||||
|
|
||||
|
if (remaining == 0) |
||||
|
{ |
||||
|
sub?.Dispose(); |
||||
|
obs.OnCompleted(); |
||||
|
} |
||||
|
} |
||||
|
}, obs.OnError, obs.OnCompleted)); |
||||
|
return sub; |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<EventArgs> FromEventPattern(Action<EventHandler> addHandler, Action<EventHandler> removeHandler) |
||||
|
{ |
||||
|
return Create<EventArgs>(observer => |
||||
|
{ |
||||
|
var handler = new Action<EventArgs>(observer.OnNext); |
||||
|
var converted = new EventHandler((_, args) => handler(args)); |
||||
|
addHandler(converted); |
||||
|
|
||||
|
return Disposable.Create(() => removeHandler(converted)); |
||||
|
}); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<T> Return<T>(T value) |
||||
|
{ |
||||
|
return new ReturnImpl<T>(value); |
||||
|
} |
||||
|
|
||||
|
public static IObservable<T> Empty<T>() |
||||
|
{ |
||||
|
return EmptyImpl<T>.Instance; |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Returns an observable that fires once with the specified value and never completes.
|
||||
|
/// </summary>
|
||||
|
/// <typeparam name="T">The type of the value.</typeparam>
|
||||
|
/// <param name="value">The value.</param>
|
||||
|
/// <returns>The observable.</returns>
|
||||
|
public static IObservable<T> SingleValue<T>(T value) |
||||
|
{ |
||||
|
return new SingleValueImpl<T>(value); |
||||
|
} |
||||
|
|
||||
|
private sealed class SingleValueImpl<T> : IObservable<T> |
||||
|
{ |
||||
|
private readonly T _value; |
||||
|
|
||||
|
public SingleValueImpl(T value) |
||||
|
{ |
||||
|
_value = value; |
||||
|
} |
||||
|
public IDisposable Subscribe(IObserver<T> observer) |
||||
|
{ |
||||
|
observer.OnNext(_value); |
||||
|
return Disposable.Empty; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private sealed class ReturnImpl<T> : IObservable<T> |
||||
|
{ |
||||
|
private readonly T _value; |
||||
|
|
||||
|
public ReturnImpl(T value) |
||||
|
{ |
||||
|
_value = value; |
||||
|
} |
||||
|
public IDisposable Subscribe(IObserver<T> observer) |
||||
|
{ |
||||
|
observer.OnNext(_value); |
||||
|
observer.OnCompleted(); |
||||
|
return Disposable.Empty; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
internal sealed class EmptyImpl<TResult> : IObservable<TResult> |
||||
|
{ |
||||
|
internal static readonly IObservable<TResult> Instance = new EmptyImpl<TResult>(); |
||||
|
|
||||
|
private EmptyImpl() { } |
||||
|
|
||||
|
public IDisposable Subscribe(IObserver<TResult> observer) |
||||
|
{ |
||||
|
observer.OnCompleted(); |
||||
|
return Disposable.Empty; |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private sealed class CreateWithDisposableObservable<TSource> : IObservable<TSource> |
||||
|
{ |
||||
|
private readonly Func<IObserver<TSource>, IDisposable> _subscribe; |
||||
|
|
||||
|
public CreateWithDisposableObservable(Func<IObserver<TSource>, IDisposable> subscribe) |
||||
|
{ |
||||
|
_subscribe = subscribe; |
||||
|
} |
||||
|
|
||||
|
public IDisposable Subscribe(IObserver<TSource> observer) |
||||
|
{ |
||||
|
return _subscribe(observer); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -1,37 +0,0 @@ |
|||||
using System; |
|
||||
using System.Reactive.Disposables; |
|
||||
|
|
||||
namespace Avalonia.Reactive |
|
||||
{ |
|
||||
/// <summary>
|
|
||||
/// Provides common observable methods not found in standard Rx framework.
|
|
||||
/// </summary>
|
|
||||
public static class ObservableEx |
|
||||
{ |
|
||||
/// <summary>
|
|
||||
/// Returns an observable that fires once with the specified value and never completes.
|
|
||||
/// </summary>
|
|
||||
/// <typeparam name="T">The type of the value.</typeparam>
|
|
||||
/// <param name="value">The value.</param>
|
|
||||
/// <returns>The observable.</returns>
|
|
||||
public static IObservable<T> SingleValue<T>(T value) |
|
||||
{ |
|
||||
return new SingleValueImpl<T>(value); |
|
||||
} |
|
||||
|
|
||||
private class SingleValueImpl<T> : IObservable<T> |
|
||||
{ |
|
||||
private T _value; |
|
||||
|
|
||||
public SingleValueImpl(T value) |
|
||||
{ |
|
||||
_value = value; |
|
||||
} |
|
||||
public IDisposable Subscribe(IObserver<T> observer) |
|
||||
{ |
|
||||
observer.OnNext(_value); |
|
||||
return Disposable.Empty; |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
} |
|
||||
@ -0,0 +1,374 @@ |
|||||
|
using System; |
||||
|
using System.Collections.Generic; |
||||
|
using System.Collections.ObjectModel; |
||||
|
using System.Linq; |
||||
|
|
||||
|
namespace Avalonia.Reactive.Operators; |
||||
|
|
||||
|
// Code based on https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs
|
||||
|
|
||||
|
internal sealed class CombineLatest<TFirst, TSecond, TResult> : IObservable<TResult> |
||||
|
{ |
||||
|
private readonly IObservable<TFirst> _first; |
||||
|
private readonly IObservable<TSecond> _second; |
||||
|
private readonly Func<TFirst, TSecond, TResult> _resultSelector; |
||||
|
|
||||
|
public CombineLatest(IObservable<TFirst> first, IObservable<TSecond> second, |
||||
|
Func<TFirst, TSecond, TResult> resultSelector) |
||||
|
{ |
||||
|
_first = first; |
||||
|
_second = second; |
||||
|
_resultSelector = resultSelector; |
||||
|
} |
||||
|
|
||||
|
public IDisposable Subscribe(IObserver<TResult> observer) |
||||
|
{ |
||||
|
var sink = new _(_resultSelector, observer); |
||||
|
sink.Run(_first, _second); |
||||
|
return sink; |
||||
|
} |
||||
|
|
||||
|
internal sealed class _ : IdentitySink<TResult> |
||||
|
{ |
||||
|
private readonly Func<TFirst, TSecond, TResult> _resultSelector; |
||||
|
private readonly object _gate = new object(); |
||||
|
|
||||
|
public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer) |
||||
|
: base(observer) |
||||
|
{ |
||||
|
_resultSelector = resultSelector; |
||||
|
_firstDisposable = null!; |
||||
|
_secondDisposable = null!; |
||||
|
} |
||||
|
|
||||
|
private IDisposable _firstDisposable; |
||||
|
private IDisposable _secondDisposable; |
||||
|
|
||||
|
public void Run(IObservable<TFirst> first, IObservable<TSecond> second) |
||||
|
{ |
||||
|
var fstO = new FirstObserver(this); |
||||
|
var sndO = new SecondObserver(this); |
||||
|
|
||||
|
fstO.SetOther(sndO); |
||||
|
sndO.SetOther(fstO); |
||||
|
|
||||
|
_firstDisposable = first.Subscribe(fstO); |
||||
|
_secondDisposable = second.Subscribe(sndO); |
||||
|
} |
||||
|
|
||||
|
protected override void Dispose(bool disposing) |
||||
|
{ |
||||
|
if (disposing) |
||||
|
{ |
||||
|
_firstDisposable.Dispose(); |
||||
|
_secondDisposable.Dispose(); |
||||
|
} |
||||
|
|
||||
|
base.Dispose(disposing); |
||||
|
} |
||||
|
|
||||
|
private sealed class FirstObserver : IObserver<TFirst> |
||||
|
{ |
||||
|
private readonly _ _parent; |
||||
|
private SecondObserver _other; |
||||
|
|
||||
|
public FirstObserver(_ parent) |
||||
|
{ |
||||
|
_parent = parent; |
||||
|
_other = default!; // NB: Will be set by SetOther.
|
||||
|
} |
||||
|
|
||||
|
public void SetOther(SecondObserver other) { _other = other; } |
||||
|
|
||||
|
public bool HasValue { get; private set; } |
||||
|
public TFirst? Value { get; private set; } |
||||
|
public bool Done { get; private set; } |
||||
|
|
||||
|
public void OnNext(TFirst value) |
||||
|
{ |
||||
|
lock (_parent._gate) |
||||
|
{ |
||||
|
HasValue = true; |
||||
|
Value = value; |
||||
|
|
||||
|
if (_other.HasValue) |
||||
|
{ |
||||
|
TResult res; |
||||
|
try |
||||
|
{ |
||||
|
res = _parent._resultSelector(value, _other.Value!); |
||||
|
} |
||||
|
catch (Exception ex) |
||||
|
{ |
||||
|
_parent.ForwardOnError(ex); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
_parent.ForwardOnNext(res); |
||||
|
} |
||||
|
else if (_other.Done) |
||||
|
{ |
||||
|
_parent.ForwardOnCompleted(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void OnError(Exception error) |
||||
|
{ |
||||
|
lock (_parent._gate) |
||||
|
{ |
||||
|
_parent.ForwardOnError(error); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void OnCompleted() |
||||
|
{ |
||||
|
lock (_parent._gate) |
||||
|
{ |
||||
|
Done = true; |
||||
|
|
||||
|
if (_other.Done) |
||||
|
{ |
||||
|
_parent.ForwardOnCompleted(); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
_parent._firstDisposable.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private sealed class SecondObserver : IObserver<TSecond> |
||||
|
{ |
||||
|
private readonly _ _parent; |
||||
|
private FirstObserver _other; |
||||
|
|
||||
|
public SecondObserver(_ parent) |
||||
|
{ |
||||
|
_parent = parent; |
||||
|
_other = default!; // NB: Will be set by SetOther.
|
||||
|
} |
||||
|
|
||||
|
public void SetOther(FirstObserver other) { _other = other; } |
||||
|
|
||||
|
public bool HasValue { get; private set; } |
||||
|
public TSecond? Value { get; private set; } |
||||
|
public bool Done { get; private set; } |
||||
|
|
||||
|
public void OnNext(TSecond value) |
||||
|
{ |
||||
|
lock (_parent._gate) |
||||
|
{ |
||||
|
HasValue = true; |
||||
|
Value = value; |
||||
|
|
||||
|
if (_other.HasValue) |
||||
|
{ |
||||
|
TResult res; |
||||
|
try |
||||
|
{ |
||||
|
res = _parent._resultSelector(_other.Value!, value); |
||||
|
} |
||||
|
catch (Exception ex) |
||||
|
{ |
||||
|
_parent.ForwardOnError(ex); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
_parent.ForwardOnNext(res); |
||||
|
} |
||||
|
else if (_other.Done) |
||||
|
{ |
||||
|
_parent.ForwardOnCompleted(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void OnError(Exception error) |
||||
|
{ |
||||
|
lock (_parent._gate) |
||||
|
{ |
||||
|
_parent.ForwardOnError(error); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void OnCompleted() |
||||
|
{ |
||||
|
lock (_parent._gate) |
||||
|
{ |
||||
|
Done = true; |
||||
|
|
||||
|
if (_other.Done) |
||||
|
{ |
||||
|
_parent.ForwardOnCompleted(); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
_parent._secondDisposable.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
internal sealed class CombineLatest<TSource, TResult> : IObservable<TResult> |
||||
|
{ |
||||
|
private readonly IEnumerable<IObservable<TSource>> _sources; |
||||
|
private readonly Func<TSource[], TResult> _resultSelector; |
||||
|
|
||||
|
public CombineLatest(IEnumerable<IObservable<TSource>> sources, Func<TSource[], TResult> resultSelector) |
||||
|
{ |
||||
|
_sources = sources; |
||||
|
_resultSelector = resultSelector; |
||||
|
} |
||||
|
|
||||
|
public IDisposable Subscribe(IObserver<TResult> observer) |
||||
|
{ |
||||
|
var sink = new _(_resultSelector, observer); |
||||
|
sink.Run(_sources); |
||||
|
return sink; |
||||
|
} |
||||
|
|
||||
|
internal sealed class _ : IdentitySink<TResult> |
||||
|
{ |
||||
|
private readonly object _gate = new object(); |
||||
|
private readonly Func<TSource[], TResult> _resultSelector; |
||||
|
|
||||
|
public _(Func<TSource[], TResult> resultSelector, IObserver<TResult> observer) |
||||
|
: base(observer) |
||||
|
{ |
||||
|
_resultSelector = resultSelector; |
||||
|
|
||||
|
// NB: These will be set in Run before getting used.
|
||||
|
_hasValue = null!; |
||||
|
_values = null!; |
||||
|
_isDone = null!; |
||||
|
_subscriptions = null!; |
||||
|
} |
||||
|
|
||||
|
private bool[] _hasValue; |
||||
|
private bool _hasValueAll; |
||||
|
private TSource[] _values; |
||||
|
private bool[] _isDone; |
||||
|
private IDisposable[] _subscriptions; |
||||
|
|
||||
|
public void Run(IEnumerable<IObservable<TSource>> sources) |
||||
|
{ |
||||
|
var srcs = sources.ToArray(); |
||||
|
|
||||
|
var N = srcs.Length; |
||||
|
|
||||
|
_hasValue = new bool[N]; |
||||
|
_hasValueAll = false; |
||||
|
|
||||
|
_values = new TSource[N]; |
||||
|
|
||||
|
_isDone = new bool[N]; |
||||
|
|
||||
|
_subscriptions = new IDisposable[N]; |
||||
|
|
||||
|
for (var i = 0; i < N; i++) |
||||
|
{ |
||||
|
var j = i; |
||||
|
|
||||
|
var o = new SourceObserver(this, j); |
||||
|
_subscriptions[j] = o; |
||||
|
|
||||
|
o.Disposable = srcs[j].Subscribe(o); |
||||
|
} |
||||
|
|
||||
|
SetUpstream(new CompositeDisposable(_subscriptions)); |
||||
|
} |
||||
|
|
||||
|
private void OnNext(int index, TSource value) |
||||
|
{ |
||||
|
lock (_gate) |
||||
|
{ |
||||
|
_values[index] = value; |
||||
|
|
||||
|
_hasValue[index] = true; |
||||
|
|
||||
|
if (_hasValueAll || (_hasValueAll = _hasValue.All(v => v))) |
||||
|
{ |
||||
|
TResult res; |
||||
|
try |
||||
|
{ |
||||
|
res = _resultSelector(_values); |
||||
|
} |
||||
|
catch (Exception ex) |
||||
|
{ |
||||
|
ForwardOnError(ex); |
||||
|
return; |
||||
|
} |
||||
|
|
||||
|
ForwardOnNext(res); |
||||
|
} |
||||
|
else if (_isDone.Where((_, i) => i != index).All(d => d)) |
||||
|
{ |
||||
|
ForwardOnCompleted(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private new void OnError(Exception error) |
||||
|
{ |
||||
|
lock (_gate) |
||||
|
{ |
||||
|
ForwardOnError(error); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private void OnCompleted(int index) |
||||
|
{ |
||||
|
lock (_gate) |
||||
|
{ |
||||
|
_isDone[index] = true; |
||||
|
|
||||
|
if (_isDone.All(d => d)) |
||||
|
{ |
||||
|
ForwardOnCompleted(); |
||||
|
} |
||||
|
else |
||||
|
{ |
||||
|
_subscriptions[index].Dispose(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private sealed class SourceObserver : IObserver<TSource>, IDisposable |
||||
|
{ |
||||
|
private readonly _ _parent; |
||||
|
private readonly int _index; |
||||
|
|
||||
|
public SourceObserver(_ parent, int index) |
||||
|
{ |
||||
|
_parent = parent; |
||||
|
_index = index; |
||||
|
} |
||||
|
|
||||
|
public IDisposable? Disposable { get; set; } |
||||
|
|
||||
|
public void OnNext(TSource value) |
||||
|
{ |
||||
|
_parent.OnNext(_index, value); |
||||
|
} |
||||
|
|
||||
|
public void OnError(Exception error) |
||||
|
{ |
||||
|
_parent.OnError(error); |
||||
|
} |
||||
|
|
||||
|
public void OnCompleted() |
||||
|
{ |
||||
|
_parent.OnCompleted(_index); |
||||
|
} |
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
Disposable?.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,111 @@ |
|||||
|
using System; |
||||
|
using System.Threading; |
||||
|
|
||||
|
namespace Avalonia.Reactive.Operators; |
||||
|
|
||||
|
// Code based on https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Internal/Sink.cs
|
||||
|
|
||||
|
internal abstract class Sink<TTarget> : IDisposable |
||||
|
{ |
||||
|
private IDisposable? _upstream; |
||||
|
private volatile IObserver<TTarget> _observer; |
||||
|
|
||||
|
protected Sink(IObserver<TTarget> observer) |
||||
|
{ |
||||
|
_observer = observer; |
||||
|
} |
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
Dispose(true); |
||||
|
} |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Override this method to dispose additional resources.
|
||||
|
/// The method is guaranteed to be called at most once.
|
||||
|
/// </summary>
|
||||
|
/// <param name="disposing">If true, the method was called from <see cref="Dispose()"/>.</param>
|
||||
|
protected virtual void Dispose(bool disposing) |
||||
|
{ |
||||
|
//Calling base.Dispose(true) is not a proper disposal, so we can omit the assignment here.
|
||||
|
//Sink is internal so this can pretty much be enforced.
|
||||
|
//_observer = NopObserver<TTarget>.Instance;
|
||||
|
|
||||
|
_upstream?.Dispose(); |
||||
|
} |
||||
|
|
||||
|
public void ForwardOnNext(TTarget value) |
||||
|
{ |
||||
|
_observer.OnNext(value); |
||||
|
} |
||||
|
|
||||
|
public void ForwardOnCompleted() |
||||
|
{ |
||||
|
_observer.OnCompleted(); |
||||
|
Dispose(); |
||||
|
} |
||||
|
|
||||
|
public void ForwardOnError(Exception error) |
||||
|
{ |
||||
|
_observer.OnError(error); |
||||
|
Dispose(); |
||||
|
} |
||||
|
|
||||
|
protected void SetUpstream(IDisposable upstream) |
||||
|
{ |
||||
|
_upstream = upstream; |
||||
|
} |
||||
|
|
||||
|
protected void DisposeUpstream() |
||||
|
{ |
||||
|
_upstream?.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
internal abstract class Sink<TSource, TTarget> : Sink<TTarget>, IObserver<TSource> |
||||
|
{ |
||||
|
protected Sink(IObserver<TTarget> observer) : base(observer) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
public virtual void Run(IObservable<TSource> source) |
||||
|
{ |
||||
|
SetUpstream(source.Subscribe(this)); |
||||
|
} |
||||
|
|
||||
|
public abstract void OnNext(TSource value); |
||||
|
|
||||
|
public virtual void OnError(Exception error) => ForwardOnError(error); |
||||
|
|
||||
|
public virtual void OnCompleted() => ForwardOnCompleted(); |
||||
|
|
||||
|
public IObserver<TTarget> GetForwarder() => new _(this); |
||||
|
|
||||
|
private sealed class _ : IObserver<TTarget> |
||||
|
{ |
||||
|
private readonly Sink<TSource, TTarget> _forward; |
||||
|
|
||||
|
public _(Sink<TSource, TTarget> forward) |
||||
|
{ |
||||
|
_forward = forward; |
||||
|
} |
||||
|
|
||||
|
public void OnNext(TTarget value) => _forward.ForwardOnNext(value); |
||||
|
|
||||
|
public void OnError(Exception error) => _forward.ForwardOnError(error); |
||||
|
|
||||
|
public void OnCompleted() => _forward.ForwardOnCompleted(); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
internal abstract class IdentitySink<T> : Sink<T, T> |
||||
|
{ |
||||
|
protected IdentitySink(IObserver<T> observer) : base(observer) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
public override void OnNext(T value) |
||||
|
{ |
||||
|
ForwardOnNext(value); |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,144 @@ |
|||||
|
using System; |
||||
|
|
||||
|
namespace Avalonia.Reactive.Operators; |
||||
|
|
||||
|
// Code based on https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Linq/Observable/Switch.cs
|
||||
|
|
||||
|
internal sealed class Switch<TSource> : IObservable<TSource> |
||||
|
{ |
||||
|
private readonly IObservable<IObservable<TSource>> _sources; |
||||
|
|
||||
|
public Switch(IObservable<IObservable<TSource>> sources) |
||||
|
{ |
||||
|
_sources = sources; |
||||
|
} |
||||
|
|
||||
|
public IDisposable Subscribe(IObserver<TSource> observer) |
||||
|
{ |
||||
|
return _sources.Subscribe(new _(observer)); |
||||
|
} |
||||
|
|
||||
|
internal sealed class _ : Sink<IObservable<TSource>, TSource> |
||||
|
{ |
||||
|
private readonly object _gate = new object(); |
||||
|
|
||||
|
public _(IObserver<TSource> observer) |
||||
|
: base(observer) |
||||
|
{ |
||||
|
} |
||||
|
|
||||
|
private IDisposable? _innerSerialDisposable; |
||||
|
private bool _isStopped; |
||||
|
private ulong _latest; |
||||
|
private bool _hasLatest; |
||||
|
|
||||
|
protected override void Dispose(bool disposing) |
||||
|
{ |
||||
|
if (disposing) |
||||
|
{ |
||||
|
_innerSerialDisposable?.Dispose(); |
||||
|
} |
||||
|
|
||||
|
base.Dispose(disposing); |
||||
|
} |
||||
|
|
||||
|
public override void OnNext(IObservable<TSource> value) |
||||
|
{ |
||||
|
ulong id; |
||||
|
|
||||
|
lock (_gate) |
||||
|
{ |
||||
|
id = unchecked(++_latest); |
||||
|
_hasLatest = true; |
||||
|
} |
||||
|
|
||||
|
var innerObserver = new InnerObserver(this, id); |
||||
|
|
||||
|
_innerSerialDisposable = innerObserver; |
||||
|
innerObserver.Disposable = value.Subscribe(innerObserver); |
||||
|
} |
||||
|
|
||||
|
public override void OnError(Exception error) |
||||
|
{ |
||||
|
lock (_gate) |
||||
|
{ |
||||
|
ForwardOnError(error); |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public override void OnCompleted() |
||||
|
{ |
||||
|
lock (_gate) |
||||
|
{ |
||||
|
DisposeUpstream(); |
||||
|
|
||||
|
_isStopped = true; |
||||
|
if (!_hasLatest) |
||||
|
{ |
||||
|
ForwardOnCompleted(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
private sealed class InnerObserver : IObserver<TSource>, IDisposable |
||||
|
{ |
||||
|
private readonly _ _parent; |
||||
|
private readonly ulong _id; |
||||
|
|
||||
|
public InnerObserver(_ parent, ulong id) |
||||
|
{ |
||||
|
_parent = parent; |
||||
|
_id = id; |
||||
|
} |
||||
|
|
||||
|
public IDisposable? Disposable { get; set; } |
||||
|
|
||||
|
public void OnNext(TSource value) |
||||
|
{ |
||||
|
lock (_parent._gate) |
||||
|
{ |
||||
|
if (_parent._latest == _id) |
||||
|
{ |
||||
|
_parent.ForwardOnNext(value); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void OnError(Exception error) |
||||
|
{ |
||||
|
lock (_parent._gate) |
||||
|
{ |
||||
|
Dispose(); |
||||
|
|
||||
|
if (_parent._latest == _id) |
||||
|
{ |
||||
|
_parent.ForwardOnError(error); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void OnCompleted() |
||||
|
{ |
||||
|
lock (_parent._gate) |
||||
|
{ |
||||
|
Dispose(); |
||||
|
|
||||
|
if (_parent._latest == _id) |
||||
|
{ |
||||
|
_parent._hasLatest = false; |
||||
|
|
||||
|
if (_parent._isStopped) |
||||
|
{ |
||||
|
_parent.ForwardOnCompleted(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
Disposable?.Dispose(); |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
} |
||||
@ -0,0 +1,35 @@ |
|||||
|
using System; |
||||
|
using System.Threading; |
||||
|
|
||||
|
namespace Avalonia.Reactive; |
||||
|
|
||||
|
/// <summary>
|
||||
|
/// Represents a disposable resource whose underlying disposable resource can be replaced by another disposable resource, causing automatic disposal of the previous underlying disposable resource.
|
||||
|
/// </summary>
|
||||
|
internal sealed class SerialDisposableValue : IDisposable |
||||
|
{ |
||||
|
private IDisposable? _current; |
||||
|
private bool _disposed; |
||||
|
|
||||
|
public IDisposable? Disposable |
||||
|
{ |
||||
|
get => _current; |
||||
|
set |
||||
|
{ |
||||
|
_current?.Dispose(); |
||||
|
_current = value; |
||||
|
|
||||
|
if (_disposed) |
||||
|
{ |
||||
|
_current?.Dispose(); |
||||
|
_current = null; |
||||
|
} |
||||
|
} |
||||
|
} |
||||
|
|
||||
|
public void Dispose() |
||||
|
{ |
||||
|
_disposed = true; |
||||
|
_current?.Dispose(); |
||||
|
} |
||||
|
} |
||||
@ -1,18 +0,0 @@ |
|||||
using System; |
|
||||
|
|
||||
namespace Avalonia.Utilities |
|
||||
{ |
|
||||
/// <summary>
|
|
||||
/// Defines a listener to a event subscribed vis the <see cref="WeakObservable"/>.
|
|
||||
/// </summary>
|
|
||||
/// <typeparam name="T">The type of the event arguments.</typeparam>
|
|
||||
public interface IWeakSubscriber<T> where T : EventArgs |
|
||||
{ |
|
||||
/// <summary>
|
|
||||
/// Invoked when the subscribed event is raised.
|
|
||||
/// </summary>
|
|
||||
/// <param name="sender">The event sender.</param>
|
|
||||
/// <param name="e">The event arguments.</param>
|
|
||||
void OnEvent(object? sender, T e); |
|
||||
} |
|
||||
} |
|
||||
@ -1,60 +0,0 @@ |
|||||
using System; |
|
||||
using System.Reactive; |
|
||||
using System.Reactive.Linq; |
|
||||
|
|
||||
namespace Avalonia.Utilities |
|
||||
{ |
|
||||
/// <summary>
|
|
||||
/// Provides extension methods for working with weak event handlers.
|
|
||||
/// </summary>
|
|
||||
public static class WeakObservable |
|
||||
{ |
|
||||
|
|
||||
private class Handler<TEventArgs> |
|
||||
: IWeakSubscriber<TEventArgs>, |
|
||||
IWeakEventSubscriber<TEventArgs> where TEventArgs : EventArgs |
|
||||
{ |
|
||||
private IObserver<EventPattern<object, TEventArgs>> _observer; |
|
||||
|
|
||||
public Handler(IObserver<EventPattern<object, TEventArgs>> observer) |
|
||||
{ |
|
||||
_observer = observer; |
|
||||
} |
|
||||
|
|
||||
public void OnEvent(object? sender, TEventArgs e) |
|
||||
{ |
|
||||
_observer.OnNext(new EventPattern<object, TEventArgs>(sender, e)); |
|
||||
} |
|
||||
|
|
||||
public void OnEvent(object? sender, WeakEvent ev, TEventArgs e) |
|
||||
{ |
|
||||
_observer.OnNext(new EventPattern<object, TEventArgs>(sender, e)); |
|
||||
} |
|
||||
} |
|
||||
|
|
||||
/// <summary>
|
|
||||
/// Converts a WeakEvent conforming to the standard .NET event pattern into an observable
|
|
||||
/// sequence, subscribing weakly.
|
|
||||
/// </summary>
|
|
||||
/// <typeparam name="TTarget">The type of target.</typeparam>
|
|
||||
/// <typeparam name="TEventArgs">The type of the event args.</typeparam>
|
|
||||
/// <param name="target">Object instance that exposes the event to convert.</param>
|
|
||||
/// <param name="ev">The weak event to convert.</param>
|
|
||||
/// <returns></returns>
|
|
||||
public static IObservable<EventPattern<object, TEventArgs>> FromEventPattern<TTarget, TEventArgs>( |
|
||||
TTarget target, WeakEvent<TTarget, TEventArgs> ev) |
|
||||
where TEventArgs : EventArgs where TTarget : class |
|
||||
{ |
|
||||
_ = target ?? throw new ArgumentNullException(nameof(target)); |
|
||||
_ = ev ?? throw new ArgumentNullException(nameof(ev)); |
|
||||
|
|
||||
return Observable.Create<EventPattern<object, TEventArgs>>(observer => |
|
||||
{ |
|
||||
var handler = new Handler<TEventArgs>(observer); |
|
||||
ev.Subscribe(target, handler); |
|
||||
return () => ev.Unsubscribe(target, handler); |
|
||||
}).Publish().RefCount(); |
|
||||
} |
|
||||
|
|
||||
} |
|
||||
} |
|
||||
Some files were not shown because too many files changed in this diff
Loading…
Reference in new issue