Browse Source

Improve TPL flow.

pull/768/head
Sebastian 4 years ago
parent
commit
adc5501c65
  1. 6
      backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs
  2. 6
      backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs
  3. 8
      backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs
  4. 5
      backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs
  5. 8
      backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs
  6. 6
      backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs
  7. 75
      backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs
  8. 27
      backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs

6
backend/src/Migrations/Migrations/MongoDb/AddAppIdToEventStream.cs

@ -15,6 +15,7 @@ using MongoDB.Bson;
using MongoDB.Driver;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Migrations;
using Squidex.Infrastructure.Tasks;
namespace Migrations.Migrations.MongoDb
{
@ -118,10 +119,7 @@ namespace Migrations.Migrations.MongoDb
BoundedCapacity = SizeOfQueue
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
batchBlock.BidirectionalLinkTo(actionBlock);
await collectionOld.Find(new BsonDocument()).ForEachAsync(batchBlock.SendAsync, ct);

6
backend/src/Migrations/Migrations/MongoDb/ConvertDocumentIds.cs

@ -15,6 +15,7 @@ using MongoDB.Driver;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Migrations;
using Squidex.Infrastructure.MongoDb;
using Squidex.Infrastructure.Tasks;
namespace Migrations.Migrations.MongoDb
{
@ -152,10 +153,7 @@ namespace Migrations.Migrations.MongoDb
BoundedCapacity = SizeOfQueue
});
batchBlock.LinkTo(actionBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
batchBlock.BidirectionalLinkTo(actionBlock);
await collectionOld.Find(new BsonDocument()).ForEachAsync(batchBlock.SendAsync, ct);

8
backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetsBulkUpdateCommandMiddleware.cs

@ -1,4 +1,4 @@
// ==========================================================================
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
@ -14,6 +14,7 @@ using Squidex.Domain.Apps.Entities.Contents;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.Tasks;
using Squidex.Shared;
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
@ -84,10 +85,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject
}
}, executionOptions);
createCommandsBlock.LinkTo(executeCommandBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
createCommandsBlock.BidirectionalLinkTo(executeCommandBlock);
contextProvider.Context.Change(b => b
.WithoutAssetEnrichment()

5
backend/src/Squidex.Domain.Apps.Entities/Backup/RestoreGrain.cs

@ -344,10 +344,7 @@ namespace Squidex.Domain.Apps.Entities.Backup
BoundedCapacity = BatchSize * 2
});
batchBlock.LinkTo(writeBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
batchBlock.BidirectionalLinkTo(writeBlock);
await reader.ReadEventsAsync(streamNameResolver, eventDataFormatter, async job =>
{

8
backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentsBulkUpdateCommandMiddleware.cs

@ -1,4 +1,4 @@
// ==========================================================================
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
@ -17,6 +17,7 @@ using Squidex.Domain.Apps.Entities.Schemas;
using Squidex.Infrastructure;
using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.Reflection;
using Squidex.Infrastructure.Tasks;
using Squidex.Infrastructure.Translations;
using Squidex.Shared;
@ -88,10 +89,7 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject
}
}, executionOptions);
createCommandsBlock.LinkTo(executeCommandBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
createCommandsBlock.BidirectionalLinkTo(executeCommandBlock);
contextProvider.Context.Change(b => b
.WithoutContentEnrichment()

6
backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs

@ -14,6 +14,7 @@ using Microsoft.Extensions.DependencyInjection;
using Squidex.Caching;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.States;
using Squidex.Infrastructure.Tasks;
#pragma warning disable RECS0108 // Warns about static fields in generic types
@ -131,10 +132,7 @@ namespace Squidex.Infrastructure.Commands
BoundedCapacity = batchSize
});
batchBlock.LinkTo(workerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
batchBlock.BidirectionalLinkTo(workerBlock);
var handledIds = new HashSet<DomainId>();

75
backend/src/Squidex.Infrastructure/Tasks/PartitionedActionBlock.cs

@ -9,11 +9,10 @@ using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using Squidex.Infrastructure.Reflection;
namespace Squidex.Infrastructure.Tasks
{
public class PartitionedActionBlock<TInput> : ITargetBlock<TInput>
public sealed class PartitionedActionBlock<TInput> : ITargetBlock<TInput>
{
private readonly ITargetBlock<TInput> distributor;
private readonly ActionBlock<TInput>[] workers;
@ -39,31 +38,15 @@ namespace Squidex.Infrastructure.Tasks
for (var i = 0; i < dataflowBlockOptions.MaxDegreeOfParallelism; i++)
{
var workerOption = SimpleMapper.Map(dataflowBlockOptions, new ExecutionDataflowBlockOptions());
workerOption.MaxDegreeOfParallelism = 1;
workerOption.MaxMessagesPerTask = 1;
workers[i] = new ActionBlock<TInput>(async input =>
{
try
{
await action(input);
}
catch (OperationCanceledException ex)
{
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, workerOption);
}
var distributorOption = new ExecutionDataflowBlockOptions
workers[i] = new ActionBlock<TInput>(action, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1,
BoundedCapacity = 1
};
TaskScheduler = dataflowBlockOptions.TaskScheduler,
});
}
distributor = new ActionBlock<TInput>(async input =>
{
@ -78,15 +61,14 @@ namespace Squidex.Infrastructure.Tasks
// Dataflow swallows operation cancelled exception.
throw new AggregateException(ex);
}
}, distributorOption);
distributor.Completion.ContinueWith(x =>
}, new ExecutionDataflowBlockOptions
{
foreach (var worker in workers)
{
worker.Complete();
}
BoundedCapacity = 1,
MaxDegreeOfParallelism = 1,
MaxMessagesPerTask = 1
});
LinkCompletion();
}
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput>? source, bool consumeToAccept)
@ -103,5 +85,36 @@ namespace Squidex.Infrastructure.Tasks
{
distributor.Fault(exception);
}
#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void
private async void LinkCompletion()
#pragma warning restore RECS0165 // Asynchronous methods should return a Task instead of void
{
try
{
await distributor.Completion.ConfigureAwait(false);
}
#pragma warning disable RECS0022 // A catch clause that catches System.Exception and has an empty body
catch
#pragma warning restore RECS0022 // A catch clause that catches System.Exception and has an empty body
{
// we do not want to change the stacktrace of the exception.
}
if (distributor.Completion.IsFaulted && distributor.Completion.Exception != null)
{
foreach (var worker in workers)
{
((IDataflowBlock)worker).Fault(distributor.Completion.Exception);
}
}
else
{
foreach (var worker in workers)
{
worker.Complete();
}
}
}
}
}

27
backend/src/Squidex.Infrastructure/Tasks/TaskExtensions.cs

@ -8,6 +8,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Squidex.Infrastructure.Tasks
{
@ -53,5 +54,31 @@ namespace Squidex.Infrastructure.Tasks
return await task;
}
}
#pragma warning disable RECS0165 // Asynchronous methods should return a Task instead of void
public static async void BidirectionalLinkTo<T>(this ISourceBlock<T> source, ITargetBlock<T> target)
#pragma warning restore RECS0165 // Asynchronous methods should return a Task instead of void
{
source.LinkTo(target, new DataflowLinkOptions
{
PropagateCompletion = true
});
try
{
await target.Completion.ConfigureAwait(false);
}
#pragma warning disable RECS0022 // A catch clause that catches System.Exception and has an empty body
catch
#pragma warning restore RECS0022 // A catch clause that catches System.Exception and has an empty body
{
// we do not want to change the stacktrace of the exception.
}
if (target.Completion.IsFaulted && target.Completion.Exception != null)
{
source.Fault(target.Completion.Exception.Flatten());
}
}
}
}

Loading…
Cancel
Save