diff --git a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs index 2ba110d71..6a4c4962a 100644 --- a/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs +++ b/backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs @@ -46,61 +46,69 @@ namespace Migrations.Migrations.MongoDb var actionBlock = new ActionBlock(async batch => { - var writes = new List>(); - - foreach (var document in batch) + try { - var eventStream = document["EventStream"].AsString; + var writes = new List>(); - if (TryGetAppId(document, out var appId)) + foreach (var document in batch) { - if (!eventStream.StartsWith("app-", StringComparison.OrdinalIgnoreCase)) + var eventStream = document["EventStream"].AsString; + + if (TryGetAppId(document, out var appId)) { - var indexOfType = eventStream.IndexOf('-'); - var indexOfId = indexOfType + 1; + if (!eventStream.StartsWith("app-", StringComparison.OrdinalIgnoreCase)) + { + var indexOfType = eventStream.IndexOf('-'); + var indexOfId = indexOfType + 1; - var indexOfOldId = eventStream.LastIndexOf("--", StringComparison.OrdinalIgnoreCase); + var indexOfOldId = eventStream.LastIndexOf("--", StringComparison.OrdinalIgnoreCase); - if (indexOfOldId > 0) - { - indexOfId = indexOfOldId + 2; - } + if (indexOfOldId > 0) + { + indexOfId = indexOfOldId + 2; + } + + var domainType = eventStream.Substring(0, indexOfType); + var domainId = eventStream[indexOfId..]; + + var newDomainId = DomainId.Combine(DomainId.Create(appId), DomainId.Create(domainId)).ToString(); + var newStreamName = $"{domainType}-{newDomainId}"; - var domainType = eventStream.Substring(0, indexOfType); - var domainId = eventStream[indexOfId..]; + document["EventStream"] = newStreamName; - var newDomainId = DomainId.Combine(DomainId.Create(appId), DomainId.Create(domainId)).ToString(); - var newStreamName = $"{domainType}-{newDomainId}"; + foreach (var @event in document["Events"].AsBsonArray) + { + var metadata = @event["Metadata"].AsBsonDocument; - document["EventStream"] = newStreamName; + metadata["AggregateId"] = newDomainId; + } + } foreach (var @event in document["Events"].AsBsonArray) { var metadata = @event["Metadata"].AsBsonDocument; - metadata["AggregateId"] = newDomainId; + metadata.Remove("AppId"); } } - foreach (var @event in document["Events"].AsBsonArray) - { - var metadata = @event["Metadata"].AsBsonDocument; + var filter = Builders.Filter.Eq("_id", document["_id"].AsString); - metadata.Remove("AppId"); - } + writes.Add(new ReplaceOneModel(filter, document) + { + IsUpsert = true + }); } - var filter = Builders.Filter.Eq("_id", document["_id"].AsString); - - writes.Add(new ReplaceOneModel(filter, document) + if (writes.Count > 0) { - IsUpsert = true - }); + await collectionNew.BulkWriteAsync(writes, writeOptions); + } } - - if (writes.Count > 0) + catch (OperationCanceledException ex) { - await collectionNew.BulkWriteAsync(writes, writeOptions); + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); } }, new ExecutionDataflowBlockOptions { diff --git a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs index d40b43ad3..247382d43 100644 --- a/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs +++ b/backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs @@ -102,39 +102,47 @@ namespace Migrations.Migrations.MongoDb var actionBlock = new ActionBlock(async batch => { - var writes = new List>(); - - foreach (var document in batch) + try { - var appId = document["_ai"].AsString; - - var documentIdOld = document["_id"].AsString; + var writes = new List>(); - if (documentIdOld.Contains("--", StringComparison.OrdinalIgnoreCase)) + foreach (var document in batch) { - var index = documentIdOld.LastIndexOf("--", StringComparison.OrdinalIgnoreCase); + var appId = document["_ai"].AsString; - documentIdOld = documentIdOld[(index + 2)..]; - } + var documentIdOld = document["_id"].AsString; + + if (documentIdOld.Contains("--", StringComparison.OrdinalIgnoreCase)) + { + var index = documentIdOld.LastIndexOf("--", StringComparison.OrdinalIgnoreCase); + + documentIdOld = documentIdOld[(index + 2)..]; + } - var documentIdNew = DomainId.Combine(DomainId.Create(appId), DomainId.Create(documentIdOld)).ToString(); + var documentIdNew = DomainId.Combine(DomainId.Create(appId), DomainId.Create(documentIdOld)).ToString(); - document["id"] = documentIdOld; - document["_id"] = documentIdNew; + document["id"] = documentIdOld; + document["_id"] = documentIdNew; - extraAction?.Invoke(document); + extraAction?.Invoke(document); - var filter = Builders.Filter.Eq("_id", documentIdNew); + var filter = Builders.Filter.Eq("_id", documentIdNew); - writes.Add(new ReplaceOneModel(filter, document) + writes.Add(new ReplaceOneModel(filter, document) + { + IsUpsert = true + }); + } + + if (writes.Count > 0) { - IsUpsert = true - }); + await collectionNew.BulkWriteAsync(writes, writeOptions); + } } - - if (writes.Count > 0) + catch (OperationCanceledException ex) { - await collectionNew.BulkWriteAsync(writes, writeOptions); + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); } }, new ExecutionDataflowBlockOptions { diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs index cb14ee925..655d9213b 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs @@ -59,14 +59,30 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject var createCommandsBlock = new TransformBlock(task => { - return CreateCommand(task); + try + { + return CreateCommand(task); + } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } }, executionOptions); var executeCommandBlock = new ActionBlock(async command => { - if (command != null) + try + { + if (command != null) + { + await ExecuteCommandAsync(command); + } + } + catch (OperationCanceledException ex) { - await ExecuteCommandAsync(command); + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); } }, executionOptions); @@ -115,23 +131,16 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject private static async Task ExecuteCommandAsync(BulkTaskCommand bulkCommand) { var (task, id, command) = bulkCommand; - - Exception? exception = null; try { await task.Bus.PublishAsync(command); + + task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex)); } catch (Exception ex) { - exception = ex; + task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex)); } - - task.Results.Add(new BulkUpdateResultItem - { - Id = id, - JobIndex = task.JobIndex, - Exception = exception - }); } private BulkTaskCommand? CreateCommand(BulkTask task) @@ -148,13 +157,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject } catch (Exception ex) { - task.Results.Add(new BulkUpdateResultItem - { - Id = id, - JobIndex = task.JobIndex, - Exception = ex - }); - + task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex)); return null; } } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs index 44f81a1bf..db70680b4 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs @@ -325,20 +325,28 @@ namespace Squidex.Domain.Apps.Entities.Backup var writeBlock = new ActionBlock<(string, Envelope)[]>(async batch => { - var commits = new List(batch.Length); - - foreach (var (stream, @event) in batch) + try { - var offset = runningStreamMapper.GetStreamOffset(stream); + var commits = new List(batch.Length); - commits.Add(EventCommit.Create(stream, offset, @event, eventDataFormatter)); - } + foreach (var (stream, @event) in batch) + { + var offset = runningStreamMapper.GetStreamOffset(stream); - await eventStore.AppendUnsafeAsync(commits); + commits.Add(EventCommit.Create(stream, offset, @event, eventDataFormatter)); + } - handled += commits.Count; + await eventStore.AppendUnsafeAsync(commits); - Log($"Reading {reader.ReadEvents}/{handled} events and {reader.ReadAttachments} attachments completed.", true); + handled += commits.Count; + + Log($"Reading {reader.ReadEvents}/{handled} events and {reader.ReadAttachments} attachments completed.", true); + } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, diff --git a/backend/src/Squidex.Domain.Apps.Entities/BulkUpdateResultItem.cs b/backend/src/Squidex.Domain.Apps.Entities/BulkUpdateResultItem.cs index efbfd8132..1154efc82 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/BulkUpdateResultItem.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/BulkUpdateResultItem.cs @@ -8,14 +8,9 @@ using System; using Squidex.Infrastructure; +#pragma warning disable SA1313 // Parameter names should begin with lower-case letter + namespace Squidex.Domain.Apps.Entities { - public sealed class BulkUpdateResultItem - { - public DomainId? Id { get; set; } - - public int JobIndex { get; set; } - - public Exception? Exception { get; set; } - } + public sealed record BulkUpdateResultItem(DomainId? Id, int JobIndex, Exception? Exception = null); } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs index 6ff129cba..9b2575f26 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs @@ -67,12 +67,28 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject var createCommandsBlock = new TransformManyBlock(async task => { - return await CreateCommandsAsync(task); - }, executionOptions); + try + { + return await CreateCommandsAsync(task); + } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } + }, executionOptions); var executeCommandBlock = new ActionBlock(async command => { - await ExecuteCommandAsync(command); + try + { + await ExecuteCommandAsync(command); + } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } }, executionOptions); createCommandsBlock.LinkTo(executeCommandBlock, new DataflowLinkOptions @@ -124,22 +140,16 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject { var (task, id, command) = bulkCommand; - Exception? exception = null; try { await task.Bus.PublishAsync(command); + + task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex)); } catch (Exception ex) { - exception = ex; + task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex)); } - - task.Results.Add(new BulkUpdateResultItem - { - Id = id, - JobIndex = task.JobIndex, - Exception = exception - }); } private async Task> CreateCommandsAsync(BulkTask task) @@ -167,22 +177,13 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject } catch (Exception ex) { - task.Results.Add(new BulkUpdateResultItem - { - Id = id, - JobIndex = task.JobIndex, - Exception = ex - }); + task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex)); } } } catch (Exception ex) { - task.Results.Add(new BulkUpdateResultItem - { - JobIndex = task.JobIndex, - Exception = ex - }); + task.Results.Add(new BulkUpdateResultItem(null, task.JobIndex, ex)); } return commands; diff --git a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs index 802fff4ed..7932737a2 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs @@ -190,9 +190,17 @@ namespace Squidex.Infrastructure.MongoDb var actionBlock = new ActionBlock(async x => { - if (!combined.IsCancellationRequested) + try { - await processor(x); + if (!combined.IsCancellationRequested) + { + await processor(x); + } + } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); } }, new ExecutionDataflowBlockOptions diff --git a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs index b4bebf1bb..1cd68835b 100644 --- a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs +++ b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs @@ -91,26 +91,34 @@ namespace Squidex.Infrastructure.Commands var workerBlock = new ActionBlock(async ids => { - await using (var context = store.WithBatchContext(typeof(T))) + try { - await context.LoadAsync(ids); - - foreach (var id in ids) + await using (var context = store.WithBatchContext(typeof(T))) { - try - { - var domainObject = Factory.Create(serviceProvider, context); - - domainObject.Setup(id); + await context.LoadAsync(ids); - await domainObject.RebuildStateAsync(); - } - catch (DomainObjectNotFoundException) + foreach (var id in ids) { - return; + try + { + var domainObject = Factory.Create(serviceProvider, context); + + domainObject.Setup(id); + + await domainObject.RebuildStateAsync(); + } + catch (DomainObjectNotFoundException) + { + return; + } } } } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } }, new ExecutionDataflowBlockOptions { diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs index 25be745ee..0a048f434 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs @@ -50,24 +50,32 @@ namespace Squidex.Infrastructure.EventSourcing.Grains var parse = new TransformBlock(job => { - if (job.StoredEvent != null) + try { - job.ShouldHandle = eventConsumer.Handles(job.StoredEvent); - } - - if (job.ShouldHandle) - { - try + if (job.StoredEvent != null) { - job.Event = eventDataFormatter.ParseIfKnown(job.StoredEvent!); + job.ShouldHandle = eventConsumer.Handles(job.StoredEvent); } - catch (Exception ex) + + if (job.ShouldHandle) { - job.Exception = ex; + try + { + job.Event = eventDataFormatter.ParseIfKnown(job.StoredEvent!); + } + catch (Exception ex) + { + job.Exception = ex; + } } - } - return job; + return job; + } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, @@ -82,24 +90,32 @@ namespace Squidex.Infrastructure.EventSourcing.Grains var handle = new ActionBlock>(async jobs => { - var sender = eventSubscription?.Sender; - - foreach (var jobsBySender in jobs.GroupBy(x => x.Sender)) + try { - if (sender != null && ReferenceEquals(jobsBySender.Key, sender)) - { - var exception = jobs.FirstOrDefault(x => x.Exception != null)?.Exception; + var sender = eventSubscription?.Sender; - if (exception != null) - { - await grain.OnErrorAsync(sender, exception); - } - else + foreach (var jobsBySender in jobs.GroupBy(x => x.Sender)) + { + if (sender != null && ReferenceEquals(jobsBySender.Key, sender)) { - await grain.OnEventsAsync(sender, GetEvents(jobsBySender), GetPosition(jobsBySender)); + var exception = jobs.FirstOrDefault(x => x.Exception != null)?.Exception; + + if (exception != null) + { + await grain.OnErrorAsync(sender, exception); + } + else + { + await grain.OnEventsAsync(sender, GetEvents(jobsBySender), GetPosition(jobsBySender)); + } } } } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } }, new ExecutionDataflowBlockOptions { diff --git a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs b/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs index da818d2b6..dc3f89e13 100644 --- a/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs +++ b/backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs @@ -44,7 +44,18 @@ namespace Squidex.Infrastructure.Tasks workerOption.MaxDegreeOfParallelism = 1; workerOption.MaxMessagesPerTask = 1; - workers[i] = new ActionBlock(action, workerOption); + workers[i] = new ActionBlock(async input => + { + try + { + await action(input); + } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } + }, workerOption); } var distributorOption = new ExecutionDataflowBlockOptions @@ -54,12 +65,20 @@ namespace Squidex.Infrastructure.Tasks BoundedCapacity = 1 }; - distributor = new ActionBlock(x => + distributor = new ActionBlock(async input => { - var partition = Math.Abs(partitioner(x)) % workers.Length; + try + { + var partition = Math.Abs(partitioner(input)) % workers.Length; - return workers[partition].SendAsync(x); - }, distributorOption); + await workers[partition].SendAsync(input); + } + catch (OperationCanceledException ex) + { + // Dataflow swallows operation cancelled exception. + throw new AggregateException(ex); + } + }, distributorOption); distributor.Completion.ContinueWith(x => {