using System; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reflection; using System.Threading.Tasks; namespace Avalonia.Data.Core.Plugins { /// /// Handles binding to s for the '^' stream binding operator. /// public class TaskStreamPlugin : IStreamPlugin { /// /// Checks whether this plugin handles the specified value. /// /// A weak reference to the value. /// True if the plugin can handle the value; otherwise false. public virtual bool Match(WeakReference reference) { reference.TryGetTarget(out var target); return target is Task; } /// /// Starts producing output based on the specified value. /// /// A weak reference to the object. /// /// An observable that produces the output for the value. /// public virtual IObservable Start(WeakReference reference) { reference.TryGetTarget(out var target); if (target is Task task) { var resultProperty = task.GetType().GetRuntimeProperty("Result"); if (resultProperty != null) { switch (task.Status) { case TaskStatus.RanToCompletion: case TaskStatus.Faulted: return HandleCompleted(task); default: var subject = new Subject(); task.ContinueWith( x => HandleCompleted(task).Subscribe(subject), TaskScheduler.FromCurrentSynchronizationContext()) .ConfigureAwait(false); return subject; } } } return Observable.Empty(); } private static IObservable HandleCompleted(Task task) { var resultProperty = task.GetType().GetRuntimeProperty("Result"); if (resultProperty != null) { switch (task.Status) { case TaskStatus.RanToCompletion: return Observable.Return(resultProperty.GetValue(task)); case TaskStatus.Faulted: return Observable.Return(new BindingNotification(task.Exception!, BindingErrorType.Error)); default: throw new AvaloniaInternalException("HandleCompleted called for non-completed Task."); } } return Observable.Empty(); } } }