Browse Source

Catch operation cancelled exception in TPL dataflow.

pull/687/head
Sebastian 5 years ago
parent
commit
ed3a455e3f
  1. 72
      backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs
  2. 50
      backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs
  3. 43
      backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs
  4. 26
      backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs
  5. 11
      backend/src/Squidex.Domain.Apps.Entities/BulkUpdateResultItem.cs
  6. 47
      backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs
  7. 12
      backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs
  8. 34
      backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs
  9. 64
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs
  10. 29
      backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs

72
backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs

@ -46,61 +46,69 @@ namespace Migrations.Migrations.MongoDb
var actionBlock = new ActionBlock<BsonDocument[]>(async batch =>
{
var writes = new List<WriteModel<BsonDocument>>();
foreach (var document in batch)
try
{
var eventStream = document["EventStream"].AsString;
var writes = new List<WriteModel<BsonDocument>>();
if (TryGetAppId(document, out var appId))
foreach (var document in batch)
{
if (!eventStream.StartsWith("app-", StringComparison.OrdinalIgnoreCase))
var eventStream = document["EventStream"].AsString;
if (TryGetAppId(document, out var appId))
{
var indexOfType = eventStream.IndexOf('-');
var indexOfId = indexOfType + 1;
if (!eventStream.StartsWith("app-", StringComparison.OrdinalIgnoreCase))
{
var indexOfType = eventStream.IndexOf('-');
var indexOfId = indexOfType + 1;
var indexOfOldId = eventStream.LastIndexOf("--", StringComparison.OrdinalIgnoreCase);
var indexOfOldId = eventStream.LastIndexOf("--", StringComparison.OrdinalIgnoreCase);
if (indexOfOldId > 0)
{
indexOfId = indexOfOldId + 2;
}
if (indexOfOldId > 0)
{
indexOfId = indexOfOldId + 2;
}
var domainType = eventStream.Substring(0, indexOfType);
var domainId = eventStream[indexOfId..];
var newDomainId = DomainId.Combine(DomainId.Create(appId), DomainId.Create(domainId)).ToString();
var newStreamName = $"{domainType}-{newDomainId}";
var domainType = eventStream.Substring(0, indexOfType);
var domainId = eventStream[indexOfId..];
document["EventStream"] = newStreamName;
var newDomainId = DomainId.Combine(DomainId.Create(appId), DomainId.Create(domainId)).ToString();
var newStreamName = $"{domainType}-{newDomainId}";
foreach (var @event in document["Events"].AsBsonArray)
{
var metadata = @event["Metadata"].AsBsonDocument;
document["EventStream"] = newStreamName;
metadata["AggregateId"] = newDomainId;
}
}
foreach (var @event in document["Events"].AsBsonArray)
{
var metadata = @event["Metadata"].AsBsonDocument;
metadata["AggregateId"] = newDomainId;
metadata.Remove("AppId");
}
}
foreach (var @event in document["Events"].AsBsonArray)
{
var metadata = @event["Metadata"].AsBsonDocument;
var filter = Builders<BsonDocument>.Filter.Eq("_id", document["_id"].AsString);
metadata.Remove("AppId");
}
writes.Add(new ReplaceOneModel<BsonDocument>(filter, document)
{
IsUpsert = true
});
}
var filter = Builders<BsonDocument>.Filter.Eq("_id", document["_id"].AsString);
writes.Add(new ReplaceOneModel<BsonDocument>(filter, document)
if (writes.Count > 0)
{
IsUpsert = true
});
await collectionNew.BulkWriteAsync(writes, writeOptions);
}
}
if (writes.Count > 0)
catch (OperationCanceledException ex)
{
await collectionNew.BulkWriteAsync(writes, writeOptions);
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, new ExecutionDataflowBlockOptions
{

50
backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs

@ -102,39 +102,47 @@ namespace Migrations.Migrations.MongoDb
var actionBlock = new ActionBlock<BsonDocument[]>(async batch =>
{
var writes = new List<WriteModel<BsonDocument>>();
foreach (var document in batch)
try
{
var appId = document["_ai"].AsString;
var documentIdOld = document["_id"].AsString;
var writes = new List<WriteModel<BsonDocument>>();
if (documentIdOld.Contains("--", StringComparison.OrdinalIgnoreCase))
foreach (var document in batch)
{
var index = documentIdOld.LastIndexOf("--", StringComparison.OrdinalIgnoreCase);
var appId = document["_ai"].AsString;
documentIdOld = documentIdOld[(index + 2)..];
}
var documentIdOld = document["_id"].AsString;
if (documentIdOld.Contains("--", StringComparison.OrdinalIgnoreCase))
{
var index = documentIdOld.LastIndexOf("--", StringComparison.OrdinalIgnoreCase);
documentIdOld = documentIdOld[(index + 2)..];
}
var documentIdNew = DomainId.Combine(DomainId.Create(appId), DomainId.Create(documentIdOld)).ToString();
var documentIdNew = DomainId.Combine(DomainId.Create(appId), DomainId.Create(documentIdOld)).ToString();
document["id"] = documentIdOld;
document["_id"] = documentIdNew;
document["id"] = documentIdOld;
document["_id"] = documentIdNew;
extraAction?.Invoke(document);
extraAction?.Invoke(document);
var filter = Builders<BsonDocument>.Filter.Eq("_id", documentIdNew);
var filter = Builders<BsonDocument>.Filter.Eq("_id", documentIdNew);
writes.Add(new ReplaceOneModel<BsonDocument>(filter, document)
writes.Add(new ReplaceOneModel<BsonDocument>(filter, document)
{
IsUpsert = true
});
}
if (writes.Count > 0)
{
IsUpsert = true
});
await collectionNew.BulkWriteAsync(writes, writeOptions);
}
}
if (writes.Count > 0)
catch (OperationCanceledException ex)
{
await collectionNew.BulkWriteAsync(writes, writeOptions);
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, new ExecutionDataflowBlockOptions
{

43
backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs

@ -59,14 +59,30 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject
var createCommandsBlock = new TransformBlock<BulkTask, BulkTaskCommand?>(task =>
{
return CreateCommand(task);
try
{
return CreateCommand(task);
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, executionOptions);
var executeCommandBlock = new ActionBlock<BulkTaskCommand?>(async command =>
{
if (command != null)
try
{
if (command != null)
{
await ExecuteCommandAsync(command);
}
}
catch (OperationCanceledException ex)
{
await ExecuteCommandAsync(command);
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, executionOptions);
@ -115,23 +131,16 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject
private static async Task ExecuteCommandAsync(BulkTaskCommand bulkCommand)
{
var (task, id, command) = bulkCommand;
Exception? exception = null;
try
{
await task.Bus.PublishAsync(command);
task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex));
}
catch (Exception ex)
{
exception = ex;
task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex));
}
task.Results.Add(new BulkUpdateResultItem
{
Id = id,
JobIndex = task.JobIndex,
Exception = exception
});
}
private BulkTaskCommand? CreateCommand(BulkTask task)
@ -148,13 +157,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject
}
catch (Exception ex)
{
task.Results.Add(new BulkUpdateResultItem
{
Id = id,
JobIndex = task.JobIndex,
Exception = ex
});
task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex));
return null;
}
}

26
backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs

@ -325,20 +325,28 @@ namespace Squidex.Domain.Apps.Entities.Backup
var writeBlock = new ActionBlock<(string, Envelope<IEvent>)[]>(async batch =>
{
var commits = new List<EventCommit>(batch.Length);
foreach (var (stream, @event) in batch)
try
{
var offset = runningStreamMapper.GetStreamOffset(stream);
var commits = new List<EventCommit>(batch.Length);
commits.Add(EventCommit.Create(stream, offset, @event, eventDataFormatter));
}
foreach (var (stream, @event) in batch)
{
var offset = runningStreamMapper.GetStreamOffset(stream);
await eventStore.AppendUnsafeAsync(commits);
commits.Add(EventCommit.Create(stream, offset, @event, eventDataFormatter));
}
handled += commits.Count;
await eventStore.AppendUnsafeAsync(commits);
Log($"Reading {reader.ReadEvents}/{handled} events and {reader.ReadAttachments} attachments completed.", true);
handled += commits.Count;
Log($"Reading {reader.ReadEvents}/{handled} events and {reader.ReadAttachments} attachments completed.", true);
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,

11
backend/src/Squidex.Domain.Apps.Entities/BulkUpdateResultItem.cs

@ -8,14 +8,9 @@
using System;
using Squidex.Infrastructure;
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
namespace Squidex.Domain.Apps.Entities
{
public sealed class BulkUpdateResultItem
{
public DomainId? Id { get; set; }
public int JobIndex { get; set; }
public Exception? Exception { get; set; }
}
public sealed record BulkUpdateResultItem(DomainId? Id, int JobIndex, Exception? Exception = null);
}

47
backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs

@ -67,12 +67,28 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject
var createCommandsBlock = new TransformManyBlock<BulkTask, BulkTaskCommand>(async task =>
{
return await CreateCommandsAsync(task);
}, executionOptions);
try
{
return await CreateCommandsAsync(task);
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, executionOptions);
var executeCommandBlock = new ActionBlock<BulkTaskCommand>(async command =>
{
await ExecuteCommandAsync(command);
try
{
await ExecuteCommandAsync(command);
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, executionOptions);
createCommandsBlock.LinkTo(executeCommandBlock, new DataflowLinkOptions
@ -124,22 +140,16 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject
{
var (task, id, command) = bulkCommand;
Exception? exception = null;
try
{
await task.Bus.PublishAsync(command);
task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex));
}
catch (Exception ex)
{
exception = ex;
task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex));
}
task.Results.Add(new BulkUpdateResultItem
{
Id = id,
JobIndex = task.JobIndex,
Exception = exception
});
}
private async Task<IEnumerable<BulkTaskCommand>> CreateCommandsAsync(BulkTask task)
@ -167,22 +177,13 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject
}
catch (Exception ex)
{
task.Results.Add(new BulkUpdateResultItem
{
Id = id,
JobIndex = task.JobIndex,
Exception = ex
});
task.Results.Add(new BulkUpdateResultItem(id, task.JobIndex, ex));
}
}
}
catch (Exception ex)
{
task.Results.Add(new BulkUpdateResultItem
{
JobIndex = task.JobIndex,
Exception = ex
});
task.Results.Add(new BulkUpdateResultItem(null, task.JobIndex, ex));
}
return commands;

12
backend/src/Squidex.Infrastructure.MongoDb/MongoDb/MongoExtensions.cs

@ -190,9 +190,17 @@ namespace Squidex.Infrastructure.MongoDb
var actionBlock =
new ActionBlock<T>(async x =>
{
if (!combined.IsCancellationRequested)
try
{
await processor(x);
if (!combined.IsCancellationRequested)
{
await processor(x);
}
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
},
new ExecutionDataflowBlockOptions

34
backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs

@ -91,26 +91,34 @@ namespace Squidex.Infrastructure.Commands
var workerBlock = new ActionBlock<DomainId[]>(async ids =>
{
await using (var context = store.WithBatchContext(typeof(T)))
try
{
await context.LoadAsync(ids);
foreach (var id in ids)
await using (var context = store.WithBatchContext(typeof(T)))
{
try
{
var domainObject = Factory<T, TState>.Create(serviceProvider, context);
domainObject.Setup(id);
await context.LoadAsync(ids);
await domainObject.RebuildStateAsync();
}
catch (DomainObjectNotFoundException)
foreach (var id in ids)
{
return;
try
{
var domainObject = Factory<T, TState>.Create(serviceProvider, context);
domainObject.Setup(id);
await domainObject.RebuildStateAsync();
}
catch (DomainObjectNotFoundException)
{
return;
}
}
}
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
},
new ExecutionDataflowBlockOptions
{

64
backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs

@ -50,24 +50,32 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
var parse = new TransformBlock<Job, Job>(job =>
{
if (job.StoredEvent != null)
try
{
job.ShouldHandle = eventConsumer.Handles(job.StoredEvent);
}
if (job.ShouldHandle)
{
try
if (job.StoredEvent != null)
{
job.Event = eventDataFormatter.ParseIfKnown(job.StoredEvent!);
job.ShouldHandle = eventConsumer.Handles(job.StoredEvent);
}
catch (Exception ex)
if (job.ShouldHandle)
{
job.Exception = ex;
try
{
job.Event = eventDataFormatter.ParseIfKnown(job.StoredEvent!);
}
catch (Exception ex)
{
job.Exception = ex;
}
}
}
return job;
return job;
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 1,
@ -82,24 +90,32 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
var handle = new ActionBlock<IList<Job>>(async jobs =>
{
var sender = eventSubscription?.Sender;
foreach (var jobsBySender in jobs.GroupBy(x => x.Sender))
try
{
if (sender != null && ReferenceEquals(jobsBySender.Key, sender))
{
var exception = jobs.FirstOrDefault(x => x.Exception != null)?.Exception;
var sender = eventSubscription?.Sender;
if (exception != null)
{
await grain.OnErrorAsync(sender, exception);
}
else
foreach (var jobsBySender in jobs.GroupBy(x => x.Sender))
{
if (sender != null && ReferenceEquals(jobsBySender.Key, sender))
{
await grain.OnEventsAsync(sender, GetEvents(jobsBySender), GetPosition(jobsBySender));
var exception = jobs.FirstOrDefault(x => x.Exception != null)?.Exception;
if (exception != null)
{
await grain.OnErrorAsync(sender, exception);
}
else
{
await grain.OnEventsAsync(sender, GetEvents(jobsBySender), GetPosition(jobsBySender));
}
}
}
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
},
new ExecutionDataflowBlockOptions
{

29
backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs

@ -44,7 +44,18 @@ namespace Squidex.Infrastructure.Tasks
workerOption.MaxDegreeOfParallelism = 1;
workerOption.MaxMessagesPerTask = 1;
workers[i] = new ActionBlock<TInput>(action, workerOption);
workers[i] = new ActionBlock<TInput>(async input =>
{
try
{
await action(input);
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, workerOption);
}
var distributorOption = new ExecutionDataflowBlockOptions
@ -54,12 +65,20 @@ namespace Squidex.Infrastructure.Tasks
BoundedCapacity = 1
};
distributor = new ActionBlock<TInput>(x =>
distributor = new ActionBlock<TInput>(async input =>
{
var partition = Math.Abs(partitioner(x)) % workers.Length;
try
{
var partition = Math.Abs(partitioner(input)) % workers.Length;
return workers[partition].SendAsync(x);
}, distributorOption);
await workers[partition].SendAsync(input);
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, distributorOption);
distributor.Completion.ContinueWith(x =>
{

Loading…
Cancel
Save