diff --git a/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs b/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs index 90240e3fe..3681a4aa7 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs @@ -161,19 +161,24 @@ namespace Squidex.Infrastructure.MongoDb public static async Task ForEachPipelineAsync(this IAsyncCursor source, Func processor, CancellationToken cancellationToken = default(CancellationToken)) { - var actionBlock = - new ActionBlock(processor, - new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1, - BoundedCapacity = 100 - }); - using (var selfToken = new CancellationTokenSource()) { using (var combined = CancellationTokenSource.CreateLinkedTokenSource(selfToken.Token, cancellationToken)) { + var actionBlock = + new ActionBlock(async x => + { + if (!combined.IsCancellationRequested) + { + await processor(x); + } + }, + new ExecutionDataflowBlockOptions + { + MaxDegreeOfParallelism = 1, + MaxMessagesPerTask = 1, + BoundedCapacity = 100 + }); try { await source.ForEachAsync(async i => diff --git a/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs b/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs index d48b967b5..d0601dd3c 100644 --- a/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs +++ b/tests/Squidex.Infrastructure.Tests/MongoDb/MongoExtensionsTests.cs @@ -151,6 +151,7 @@ namespace Squidex.Infrastructure.MongoDb } result.Add(x); + return TaskHelper.Done; }, cts.Token); });