From 9baa2ca297f40a5072c26f669991cb1db14daf25 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Fri, 9 Mar 2018 20:52:26 +0100 Subject: [PATCH] Mongo queries improved --- .../Actions/ElasticSearchActionHandler.cs | 1 - .../EventSourcing/MongoEventStore_Reader.cs | 5 +- .../MongoDb/MongoExtensions.cs | 48 ++++++ .../Squidex.Infrastructure.MongoDb.csproj | 1 + src/Squidex/Squidex.csproj | 2 +- .../MongoDb/MongoExtensionsTests.cs | 161 ++++++++++++++++++ 6 files changed, 213 insertions(+), 5 deletions(-) create mode 100644 tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs diff --git a/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/ElasticSearchActionHandler.cs b/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/ElasticSearchActionHandler.cs index 5ef0359bc..a65813fce 100644 --- a/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/ElasticSearchActionHandler.cs +++ b/src/Squidex.Domain.Apps.Core.Operations/HandleRules/Actions/ElasticSearchActionHandler.cs @@ -8,7 +8,6 @@ using System; using System.Threading.Tasks; using Elasticsearch.Net; -using Newtonsoft.Json; using Newtonsoft.Json.Linq; using Squidex.Domain.Apps.Core.Contents; using Squidex.Domain.Apps.Core.Rules; diff --git a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index eed2d0bce..175b74942 100644 --- a/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -7,7 +7,6 @@ using System; using System.Collections.Generic; -using System.Reactive.Linq; using System.Threading; using System.Threading.Tasks; using MongoDB.Driver; @@ -87,9 +86,9 @@ namespace Squidex.Infrastructure.EventSourcing return QueryAsync(callback, lastPosition, filter, ct); } - private async Task QueryAsync(Func callback, StreamPosition lastPosition, FilterDefinition filter, CancellationToken ct) + private Task QueryAsync(Func callback, StreamPosition lastPosition, FilterDefinition filter, CancellationToken ct) { - await Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachAsync(async commit => + return Collection.Find(filter).Sort(Sort.Ascending(TimestampField)).ForEachPipelineAsync(async commit => { var eventStreamOffset = (int)commit.EventStreamOffset; diff --git a/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs b/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs index 1db3eff76..96ff8de11 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs @@ -7,7 +7,9 @@ using System; using System.Linq.Expressions; +using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; using MongoDB.Bson; using MongoDB.Driver; using Squidex.Infrastructure.States; @@ -128,5 +130,51 @@ namespace Squidex.Infrastructure.MongoDb } } } + + public static async Task ForEachPipelineAsync(this IAsyncCursorSource source, Func processor, CancellationToken cancellationToken = default(CancellationToken)) + { + var cursor = await source.ToCursorAsync(cancellationToken); + + await cursor.ForEachPipelineAsync(processor, cancellationToken); + } + + public static async Task ForEachPipelineAsync(this IAsyncCursor source, Func processor, CancellationToken cancellationToken = default(CancellationToken)) + { + var actionBlock = + new ActionBlock(processor, + new ExecutionDataflowBlockOptions + { + MaxDegreeOfParallelism = 1, + MaxMessagesPerTask = 1, + BoundedCapacity = 100 + }); + + using (var selfToken = new CancellationTokenSource()) + { + using (var combined = CancellationTokenSource.CreateLinkedTokenSource(selfToken.Token, cancellationToken)) + { + try + { + await source.ForEachAsync(async i => + { + if (!await actionBlock.SendAsync(i, combined.Token)) + { + selfToken.Cancel(); + } + }, combined.Token); + + actionBlock.Complete(); + } + catch (Exception ex) + { + ((IDataflowBlock)actionBlock).Fault(ex); + } + finally + { + await actionBlock.Completion; + } + } + } + } } } diff --git a/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj b/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj index 43fd73478..e7f9a5feb 100644 --- a/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj +++ b/src/Squidex.Infrastructure.MongoDb/Squidex.Infrastructure.MongoDb.csproj @@ -15,6 +15,7 @@ + diff --git a/src/Squidex/Squidex.csproj b/src/Squidex/Squidex.csproj index 2ebea0cc2..3c5d95107 100644 --- a/src/Squidex/Squidex.csproj +++ b/src/Squidex/Squidex.csproj @@ -69,7 +69,7 @@ - + diff --git a/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs b/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs new file mode 100644 index 000000000..d48b967b5 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs @@ -0,0 +1,161 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using MongoDB.Driver; +using Squidex.Infrastructure.Tasks; +using Xunit; + +namespace Squidex.Infrastructure.MongoDb +{ + public class MongoExtensionsTests + { + public sealed class Cursor : IAsyncCursor + { + private readonly List items = new List(); + private int index = -1; + + public IEnumerable Current + { + get + { + if (items[index] is Exception ex) + { + throw ex; + } + + return Enumerable.Repeat((T)items[index], 1); + } + } + + public Cursor Add(T item) + { + items.Add(item); + + return this; + } + + public Cursor Add(Exception ex) + { + items.Add(ex); + + return this; + } + + public void Dispose() + { + } + + public bool MoveNext(CancellationToken cancellationToken = default(CancellationToken)) + { + index++; + + return index < items.Count; + } + + public async Task MoveNextAsync(CancellationToken cancellationToken = default(CancellationToken)) + { + await Task.Delay(1, cancellationToken); + + return MoveNext(cancellationToken); + } + } + + [Fact] + public async Task Should_enumerate_over_items() + { + var result = new List(); + + var cursor = new Cursor().Add(0).Add(1).Add(1).Add(2).Add(3).Add(5); + + await cursor.ForEachPipelineAsync(x => + { + result.Add(x); + return TaskHelper.Done; + }); + + Assert.Equal(new List { 0, 1, 1, 2, 3, 5 }, result); + } + + [Fact] + public async Task Should_break_when_cursor_failed() + { + var ex = new InvalidOperationException(); + + var result = new List(); + + var cursor = new Cursor().Add(0).Add(1).Add(1).Add(ex).Add(2).Add(3).Add(5); + + await Assert.ThrowsAsync(() => + { + return cursor.ForEachPipelineAsync(x => + { + result.Add(x); + return TaskHelper.Done; + }); + }); + + Assert.Equal(new List { 0, 1, 1 }, result); + } + + [Fact] + public async Task Should_break_when_handler_failed() + { + var ex = new InvalidOperationException(); + + var result = new List(); + + var cursor = new Cursor().Add(0).Add(1).Add(1).Add(2).Add(3).Add(5); + + await Assert.ThrowsAsync(() => + { + return cursor.ForEachPipelineAsync(x => + { + if (x == 2) + { + throw ex; + } + + result.Add(x); + return TaskHelper.Done; + }); + }); + + Assert.Equal(new List { 0, 1, 1 }, result); + } + + [Fact] + public async Task Should_stop_when_cancelled1() + { + var cts = new CancellationTokenSource(); + + var result = new List(); + + var cursor = new Cursor().Add(0).Add(1).Add(1).Add(2).Add(3).Add(5); + + await Assert.ThrowsAsync(() => + { + return cursor.ForEachPipelineAsync(x => + { + if (x == 2) + { + cts.Cancel(); + } + + result.Add(x); + return TaskHelper.Done; + }, cts.Token); + }); + + Assert.Equal(new List { 0, 1, 1, 2 }, result); + } + } +}