diff --git a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs index bded9e846..8b852db8e 100644 --- a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs +++ b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs @@ -15,6 +15,7 @@ using MongoDB.Bson; using MongoDB.Driver; using Squidex.Infrastructure; using Squidex.Infrastructure.Migrations; +using Squidex.Infrastructure.Tasks; namespace Migrations.Migrations.MongoDb { @@ -118,10 +119,7 @@ namespace Migrations.Migrations.MongoDb BoundedCapacity = SizeOfQueue }); - batchBlock.LinkTo(actionBlock, new DataflowLinkOptions - { - PropagateCompletion = true - }); + batchBlock.BidirectionalLinkTo(actionBlock); await collectionOld.Find(new BsonDocument()).ForEachAsync(batchBlock.SendAsync, ct); diff --git a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs index da158ce1f..e9d12cbfe 100644 --- a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs +++ b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs @@ -15,6 +15,7 @@ using MongoDB.Driver; using Squidex.Infrastructure; using Squidex.Infrastructure.Migrations; using Squidex.Infrastructure.MongoDb; +using Squidex.Infrastructure.Tasks; namespace Migrations.Migrations.MongoDb { @@ -152,10 +153,7 @@ namespace Migrations.Migrations.MongoDb BoundedCapacity = SizeOfQueue }); - batchBlock.LinkTo(actionBlock, new DataflowLinkOptions - { - PropagateCompletion = true - }); + batchBlock.BidirectionalLinkTo(actionBlock); await collectionOld.Find(new BsonDocument()).ForEachAsync(batchBlock.SendAsync, ct); diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs index 4a15ada0e..20057aeaa 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs @@ -1,4 +1,4 @@ -// ========================================================================== +// ========================================================================== // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex UG (haftungsbeschraenkt) @@ -14,6 +14,7 @@ using Squidex.Domain.Apps.Entities.Contents; using Squidex.Infrastructure; using Squidex.Infrastructure.Commands; using Squidex.Infrastructure.Reflection; +using Squidex.Infrastructure.Tasks; using Squidex.Shared; #pragma warning disable SA1313 // Parameter names should begin with lower-case letter @@ -84,10 +85,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject } }, executionOptions); - createCommandsBlock.LinkTo(executeCommandBlock, new DataflowLinkOptions - { - PropagateCompletion = true - }); + createCommandsBlock.BidirectionalLinkTo(executeCommandBlock); contextProvider.Context.Change(b => b .WithoutAssetEnrichment() diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs index 3862f9600..291678386 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs @@ -344,10 +344,7 @@ namespace Squidex.Domain.Apps.Entities.Backup BoundedCapacity = BatchSize * 2 }); - batchBlock.LinkTo(writeBlock, new DataflowLinkOptions - { - PropagateCompletion = true - }); + batchBlock.BidirectionalLinkTo(writeBlock); await reader.ReadEventsAsync(streamNameResolver, eventDataFormatter, async job => { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs index 8ac0a09d2..449d64fad 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs @@ -1,4 +1,4 @@ -// ========================================================================== +// ========================================================================== // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex UG (haftungsbeschraenkt) @@ -17,6 +17,7 @@ using Squidex.Domain.Apps.Entities.Schemas; using Squidex.Infrastructure; using Squidex.Infrastructure.Commands; using Squidex.Infrastructure.Reflection; +using Squidex.Infrastructure.Tasks; using Squidex.Infrastructure.Translations; using Squidex.Shared; @@ -88,10 +89,7 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject } }, executionOptions); - createCommandsBlock.LinkTo(executeCommandBlock, new DataflowLinkOptions - { - PropagateCompletion = true - }); + createCommandsBlock.BidirectionalLinkTo(executeCommandBlock); contextProvider.Context.Change(b => b .WithoutContentEnrichment() diff --git a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs index 5ce8770b9..9b4fbd01e 100644 --- a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs +++ b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs @@ -14,6 +14,7 @@ using Microsoft.Extensions.DependencyInjection; using Squidex.Caching; using Squidex.Infrastructure.EventSourcing; using Squidex.Infrastructure.States; +using Squidex.Infrastructure.Tasks; #pragma warning disable RECS0108 // Warns about static fields in generic types @@ -131,10 +132,7 @@ namespace Squidex.Infrastructure.Commands BoundedCapacity = batchSize }); - batchBlock.LinkTo(workerBlock, new DataflowLinkOptions - { - PropagateCompletion = true - }); + batchBlock.BidirectionalLinkTo(workerBlock); var handledIds = new HashSet(); diff --git a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs b/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs index dc3f89e13..174b050c8 100644 --- a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs +++ b/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs @@ -9,11 +9,10 @@ using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; -using Squidex.Infrastructure.Reflection; namespace Squidex.Infrastructure.Tasks { - public class PartitionedActionBlock : ITargetBlock + public sealed class PartitionedActionBlock : ITargetBlock { private readonly ITargetBlock distributor; private readonly ActionBlock[] workers; @@ -39,32 +38,16 @@ namespace Squidex.Infrastructure.Tasks for (var i = 0; i < dataflowBlockOptions.MaxDegreeOfParallelism; i++) { - var workerOption = SimpleMapper.Map(dataflowBlockOptions, new ExecutionDataflowBlockOptions()); - - workerOption.MaxDegreeOfParallelism = 1; - workerOption.MaxMessagesPerTask = 1; - - workers[i] = new ActionBlock(async input => + workers[i] = new ActionBlock(action, new ExecutionDataflowBlockOptions() { - try - { - await action(input); - } - catch (OperationCanceledException ex) - { - // Dataflow swallows operation cancelled exception. - throw new AggregateException(ex); - } - }, workerOption); + BoundedCapacity = dataflowBlockOptions.BoundedCapacity, + CancellationToken = dataflowBlockOptions.CancellationToken, + MaxDegreeOfParallelism = 1, + MaxMessagesPerTask = 1, + TaskScheduler = dataflowBlockOptions.TaskScheduler, + }); } - var distributorOption = new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = 1, - MaxMessagesPerTask = 1, - BoundedCapacity = 1 - }; - distributor = new ActionBlock(async input => { try @@ -78,15 +61,14 @@ namespace Squidex.Infrastructure.Tasks // Dataflow swallows operation cancelled exception. throw new AggregateException(ex); } - }, distributorOption); - - distributor.Completion.ContinueWith(x => + }, new ExecutionDataflowBlockOptions { - foreach (var worker in workers) - { - worker.Complete(); - } + BoundedCapacity = 1, + MaxDegreeOfParallelism = 1, + MaxMessagesPerTask = 1 }); + + LinkCompletion(); } public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock? source, bool consumeToAccept) @@ -103,5 +85,36 @@ namespace Squidex.Infrastructure.Tasks { distributor.Fault(exception); } + +#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void + private async void LinkCompletion() +#pragma warning restore RECS0165 // Asynchronous methods should return a Task instead of void + { + try + { + await distributor.Completion.ConfigureAwait(false); + } +#pragma warning disable RECS0022 // A catch clause that catches System.Exception and has an empty body + catch +#pragma warning restore RECS0022 // A catch clause that catches System.Exception and has an empty body + { + // we do not want to change the stacktrace of the exception. + } + + if (distributor.Completion.IsFaulted && distributor.Completion.Exception != null) + { + foreach (var worker in workers) + { + ((IDataflowBlock)worker).Fault(distributor.Completion.Exception); + } + } + else + { + foreach (var worker in workers) + { + worker.Complete(); + } + } + } } } diff --git a/backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs b/backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs index ad515f55f..74fe44b2e 100644 --- a/backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs +++ b/backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs @@ -8,6 +8,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; namespace Squidex.Infrastructure.Tasks { @@ -53,5 +54,31 @@ namespace Squidex.Infrastructure.Tasks return await task; } } + +#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void + public static async void BidirectionalLinkTo(this ISourceBlock source, ITargetBlock target) +#pragma warning restore RECS0165 // Asynchronous methods should return a Task instead of void + { + source.LinkTo(target, new DataflowLinkOptions + { + PropagateCompletion = true + }); + + try + { + await target.Completion.ConfigureAwait(false); + } +#pragma warning disable RECS0022 // A catch clause that catches System.Exception and has an empty body + catch +#pragma warning restore RECS0022 // A catch clause that catches System.Exception and has an empty body + { + // we do not want to change the stacktrace of the exception. + } + + if (target.Completion.IsFaulted && target.Completion.Exception != null) + { + source.Fault(target.Completion.Exception.Flatten()); + } + } } }