diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index 8160252f3..d81be0421 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -7,7 +7,6 @@ using System; using System.Collections.Generic; -using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver; @@ -87,9 +86,9 @@ namespace Squidex.Infrastructure.EventSourcing return QueryAsync(callback, lastPosition, filter, ct); } - private async Task QueryAsync(Func callback, StreamPosition lastPosition, FilterDefinition filter, CancellationToken ct) + private Task QueryAsync(Func callback, StreamPosition lastPosition, FilterDefinition filter, CancellationToken ct) { - await Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachAsync(async commit => + return Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit => { var eventStreamOffset = (int)commit.EventStreamOffset; diff --git a/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs b/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs index 1db3eff76..96ff8de11 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs @@ -7,7 +7,9 @@ using System; using System.Linq.Expressions; +using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using MongoDB.Bson; using MongoDB.Driver; using Squidex.Infrastructure.States; @@ -128,5 +130,51 @@ namespace Squidex.Infrastructure.MongoDb } } } + + public static async Task ForEachPipelineAsync(this IAsyncCursorSource source, Func processor, CancellationToken cancellationToken = default(CancellationToken)) + { + var cursor = await source.ToCursorAsync(cancellationToken); + + await cursor.ForEachPipelineAsync(processor, cancellationToken); + } + + public static async Task ForEachPipelineAsync(this IAsyncCursor source, Func processor, CancellationToken cancellationToken = default(CancellationToken)) + { + var actionBlock = + new ActionBlock(processor, + new ExecutionDataflowBlockOptions + { + MaxDegreeOfParallelism = 1, + MaxMessagesPerTask = 1, + BoundedCapacity = 100 + }); + + using (var selfToken = new CancellationTokenSource()) + { + using (var combined = CancellationTokenSource.CreateLinkedTokenSource(selfToken.Token, cancellationToken)) + { + try + { + await source.ForEachAsync(async i => + { + if (!await actionBlock.SendAsync(i, combined.Token)) + { + selfToken.Cancel(); + } + }, combined.Token); + + actionBlock.Complete(); + } + catch (Exception ex) + { + ((IDataflowBlock)actionBlock).Fault(ex); + } + finally + { + await actionBlock.Completion; + } + } + } + } } } diff --git a/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj b/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj index 70a012240..bef0bc3ae 100644 --- a/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj +++ b/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj @@ -15,6 +15,7 @@ + diff --git a/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs b/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs new file mode 100644 index 000000000..d48b967b5 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs @@ -0,0 +1,161 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MongoDB.Driver; +using Squidex.Infrastructure.Tasks; +using Xunit; + +namespace Squidex.Infrastructure.MongoDb +{ + public class MongoExtensionsTests + { + public sealed class Cursor : IAsyncCursor + { + private readonly List items = new List(); + private int index = -1; + + public IEnumerable Current + { + get + { + if (items[index] is Exception ex) + { + throw ex; + } + + return Enumerable.Repeat((T)items[index], 1); + } + } + + public Cursor Add(T item) + { + items.Add(item); + + return this; + } + + public Cursor Add(Exception ex) + { + items.Add(ex); + + return this; + } + + public void Dispose() + { + } + + public bool MoveNext(CancellationToken cancellationToken = default(CancellationToken)) + { + index++; + + return index < items.Count; + } + + public async Task MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken)) + { + await Task.Delay(1, cancellationToken); + + return MoveNext(cancellationToken); + } + } + + [Fact] + public async Task Should_enumerate_over_items() + { + var result = new List(); + + var cursor = new Cursor().Add(0).Add(1).Add(1).Add(2).Add(3).Add(5); + + await cursor.ForEachPipelineAsync(x => + { + result.Add(x); + return TaskHelper.Done; + }); + + Assert.Equal(new List { 0, 1, 1, 2, 3, 5 }, result); + } + + [Fact] + public async Task Should_break_when_cursor_failed() + { + var ex = new InvalidOperationException(); + + var result = new List(); + + var cursor = new Cursor().Add(0).Add(1).Add(1).Add(ex).Add(2).Add(3).Add(5); + + await Assert.ThrowsAsync(() => + { + return cursor.ForEachPipelineAsync(x => + { + result.Add(x); + return TaskHelper.Done; + }); + }); + + Assert.Equal(new List { 0, 1, 1 }, result); + } + + [Fact] + public async Task Should_break_when_handler_failed() + { + var ex = new InvalidOperationException(); + + var result = new List(); + + var cursor = new Cursor().Add(0).Add(1).Add(1).Add(2).Add(3).Add(5); + + await Assert.ThrowsAsync(() => + { + return cursor.ForEachPipelineAsync(x => + { + if (x == 2) + { + throw ex; + } + + result.Add(x); + return TaskHelper.Done; + }); + }); + + Assert.Equal(new List { 0, 1, 1 }, result); + } + + [Fact] + public async Task Should_stop_when_cancelled1() + { + var cts = new CancellationTokenSource(); + + var result = new List(); + + var cursor = new Cursor().Add(0).Add(1).Add(1).Add(2).Add(3).Add(5); + + await Assert.ThrowsAsync(() => + { + return cursor.ForEachPipelineAsync(x => + { + if (x == 2) + { + cts.Cancel(); + } + + result.Add(x); + return TaskHelper.Done; + }, cts.Token); + }); + + Assert.Equal(new List { 0, 1, 1, 2 }, result); + } + } +}