// ========================================================================== // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex UG (haftungsbeschraenkt) // All rights reserved. Licensed under the MIT license. // ========================================================================== using System; using System.Linq; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; using Squidex.Infrastructure.Reflection; namespace Squidex.Infrastructure.Tasks { public class PartitionedActionBlock : ITargetBlock { private readonly ITargetBlock distributor; private readonly ActionBlock[] workers; public Task Completion { get => Task.WhenAll(workers.Select(x => x.Completion)); } public PartitionedActionBlock(Func action, Func partitioner) : this(action, partitioner, new ExecutionDataflowBlockOptions()) { } public PartitionedActionBlock(Func action, Func partitioner, ExecutionDataflowBlockOptions dataflowBlockOptions) { Guard.NotNull(action, nameof(action)); Guard.NotNull(partitioner, nameof(partitioner)); Guard.NotNull(dataflowBlockOptions, nameof(dataflowBlockOptions)); Guard.GreaterThan(dataflowBlockOptions.MaxDegreeOfParallelism, 1, nameof(dataflowBlockOptions.MaxDegreeOfParallelism)); workers = new ActionBlock[dataflowBlockOptions.MaxDegreeOfParallelism]; for (var i = 0; i < dataflowBlockOptions.MaxDegreeOfParallelism; i++) { var workerOption = SimpleMapper.Map(dataflowBlockOptions, new ExecutionDataflowBlockOptions()); workerOption.MaxDegreeOfParallelism = 1; workerOption.MaxMessagesPerTask = DataflowBlockOptions.Unbounded; workers[i] = new ActionBlock(action, workerOption); } var distributorOption = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, MaxMessagesPerTask = DataflowBlockOptions.Unbounded, BoundedCapacity = 1 }; distributor = new ActionBlock(x => { var partition = Math.Abs(partitioner(x)) % workers.Length; return workers[partition].SendAsync(x); }, distributorOption); distributor.Completion.ContinueWith(x => { foreach (var worker in workers) { worker.Complete(); } }); } public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock? source, bool consumeToAccept) { return distributor.OfferMessage(messageHeader, messageValue, source, consumeToAccept); } public void Complete() { distributor.Complete(); } public void Fault(Exception exception) { distributor.Fault(exception); } } }