|
|
|
@ -193,7 +193,18 @@ namespace Perspex.Markup.Data |
|
|
|
|
|
|
|
if (_node != null) |
|
|
|
{ |
|
|
|
var subscription = _node.Subscribe(observer); |
|
|
|
IObservable<object> source = _node; |
|
|
|
|
|
|
|
if (_rootObservable != null) |
|
|
|
{ |
|
|
|
source = source.TakeUntil(Complete(_rootObservable)); |
|
|
|
} |
|
|
|
else if (_update != null) |
|
|
|
{ |
|
|
|
source = source.TakeUntil(Complete(_update)); |
|
|
|
} |
|
|
|
|
|
|
|
var subscription = source.Subscribe(observer); |
|
|
|
|
|
|
|
return Disposable.Create(() => |
|
|
|
{ |
|
|
|
@ -221,6 +232,13 @@ namespace Perspex.Markup.Data |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private static IObservable<Unit> Complete<T>(IObservable<T> input) |
|
|
|
{ |
|
|
|
return Observable.Merge( |
|
|
|
input.TakeLast(1).Select(_ => Unit.Default), |
|
|
|
input.IsEmpty().Where(x => x).Select(_ => Unit.Default)); |
|
|
|
} |
|
|
|
|
|
|
|
private void IncrementCount() |
|
|
|
{ |
|
|
|
if (_count++ == 0 && _node != null) |
|
|
|
|