// Copyright (c) The Avalonia Project. All rights reserved. // Licensed under the MIT license. See licence.md file in the project root for full license information. using System; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Reflection; using System.Threading.Tasks; using Avalonia.Data; namespace Avalonia.Markup.Data.Plugins { /// /// Handles binding to s for the '^' stream binding operator. /// public class TaskStreamPlugin : IStreamPlugin { /// /// Checks whether this plugin handles the specified value. /// /// A weak reference to the value. /// True if the plugin can handle the value; otherwise false. public virtual bool Match(WeakReference reference) => reference.Target is Task; /// /// Starts producing output based on the specified value. /// /// A weak reference to the object. /// /// An observable that produces the output for the value. /// public virtual IObservable Start(WeakReference reference) { var task = reference.Target as Task; if (task != null) { var resultProperty = task.GetType().GetTypeInfo().GetDeclaredProperty("Result"); if (resultProperty != null) { switch (task.Status) { case TaskStatus.RanToCompletion: case TaskStatus.Faulted: return HandleCompleted(task); default: var subject = new Subject(); task.ContinueWith( x => HandleCompleted(task).Subscribe(subject), TaskScheduler.FromCurrentSynchronizationContext()) .ConfigureAwait(false); return subject; } } } return Observable.Empty(); } protected IObservable HandleCompleted(Task task) { var resultProperty = task.GetType().GetTypeInfo().GetDeclaredProperty("Result"); if (resultProperty != null) { switch (task.Status) { case TaskStatus.RanToCompletion: return Observable.Return(resultProperty.GetValue(task)); case TaskStatus.Faulted: return Observable.Return(new BindingNotification(task.Exception, BindingErrorType.Error)); default: throw new AvaloniaInternalException("HandleCompleted called for non-completed Task."); } } return Observable.Empty(); } } }