mirror of https://github.com/Squidex/squidex.git
23 changed files with 195 additions and 19 deletions
@ -0,0 +1,110 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using System.Threading.Tasks.Dataflow; |
|||
using Squidex.Infrastructure.Reflection; |
|||
|
|||
namespace Squidex.Infrastructure.Tasks |
|||
{ |
|||
public class PartitionedActionBlock<TInput> : ITargetBlock<TInput> |
|||
{ |
|||
private readonly ITargetBlock<TInput> distributor; |
|||
private readonly ActionBlock<TInput>[] workers; |
|||
|
|||
public Task Completion |
|||
{ |
|||
get { return Task.WhenAll(workers.Select(x => x.Completion)); } |
|||
} |
|||
|
|||
public PartitionedActionBlock(Action<TInput> action, Func<TInput, int> partitioner) |
|||
: this (ToAsync(action), partitioner, new ExecutionDataflowBlockOptions()) |
|||
{ |
|||
} |
|||
|
|||
public PartitionedActionBlock(Func<TInput, Task> action, Func<TInput, int> partitioner) |
|||
: this(action, partitioner, new ExecutionDataflowBlockOptions()) |
|||
{ |
|||
} |
|||
|
|||
public PartitionedActionBlock(Action<TInput> action, Func<TInput, int> partitioner, ExecutionDataflowBlockOptions dataflowBlockOptions) |
|||
: this(ToAsync(action), partitioner, dataflowBlockOptions) |
|||
{ |
|||
} |
|||
|
|||
public PartitionedActionBlock(Func<TInput, Task> action, Func<TInput, int> partitioner, ExecutionDataflowBlockOptions dataflowBlockOptions) |
|||
{ |
|||
Guard.NotNull(action, nameof(action)); |
|||
Guard.NotNull(partitioner, nameof(partitioner)); |
|||
Guard.NotNull(dataflowBlockOptions, nameof(dataflowBlockOptions)); |
|||
Guard.GreaterThan(dataflowBlockOptions.MaxDegreeOfParallelism, 1, nameof(dataflowBlockOptions.MaxDegreeOfParallelism)); |
|||
|
|||
workers = new ActionBlock<TInput>[dataflowBlockOptions.MaxDegreeOfParallelism]; |
|||
|
|||
for (var i = 0; i < dataflowBlockOptions.MaxDegreeOfParallelism; i++) |
|||
{ |
|||
var workerOption = SimpleMapper.Map(dataflowBlockOptions, new ExecutionDataflowBlockOptions()); |
|||
|
|||
workerOption.MaxDegreeOfParallelism = 1; |
|||
workerOption.MaxMessagesPerTask = 1; |
|||
|
|||
workers[i] = new ActionBlock<TInput>(action, workerOption); |
|||
} |
|||
|
|||
var distributorOption = new ExecutionDataflowBlockOptions |
|||
{ |
|||
MaxDegreeOfParallelism = 1, |
|||
MaxMessagesPerTask = 1, |
|||
BoundedCapacity = 1 |
|||
}; |
|||
|
|||
distributor = new ActionBlock<TInput>(x => |
|||
{ |
|||
var partition = partitioner(x) % workers.Length; |
|||
|
|||
return workers[partition].SendAsync(x); |
|||
}, distributorOption); |
|||
|
|||
distributor.Completion.ContinueWith(x => |
|||
{ |
|||
foreach (var worker in workers) |
|||
{ |
|||
worker.Complete(); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader, TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept) |
|||
{ |
|||
return distributor.OfferMessage(messageHeader, messageValue, source, consumeToAccept); |
|||
} |
|||
|
|||
public void Complete() |
|||
{ |
|||
distributor.Complete(); |
|||
} |
|||
|
|||
public void Fault(Exception exception) |
|||
{ |
|||
distributor.Fault(exception); |
|||
} |
|||
|
|||
private static Func<TInput, Task> ToAsync(Action<TInput> action) |
|||
{ |
|||
Guard.NotNull(action, nameof(action)); |
|||
|
|||
return x => |
|||
{ |
|||
action(x); |
|||
|
|||
return TaskHelper.Done; |
|||
}; |
|||
} |
|||
} |
|||
} |
|||
@ -0,0 +1,62 @@ |
|||
// ==========================================================================
|
|||
// Squidex Headless CMS
|
|||
// ==========================================================================
|
|||
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|||
// All rights reserved. Licensed under the MIT license.
|
|||
// ==========================================================================
|
|||
|
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Linq; |
|||
using System.Threading.Tasks; |
|||
using System.Threading.Tasks.Dataflow; |
|||
using Xunit; |
|||
|
|||
namespace Squidex.Infrastructure.Tasks |
|||
{ |
|||
public class PartitionedActionBlockTests |
|||
{ |
|||
[Fact] |
|||
public async Task Should_propagate_in_order() |
|||
{ |
|||
var random = new Random(); |
|||
var partitions = 10; |
|||
|
|||
var lists = new List<int>[partitions]; |
|||
|
|||
for (var i = 0; i < partitions; i++) |
|||
{ |
|||
lists[i] = new List<int>(); |
|||
} |
|||
|
|||
var block = new PartitionedActionBlock<(int P, int V)>(x => |
|||
{ |
|||
random.Next(10); |
|||
|
|||
lists[x.P].Add(x.V); |
|||
}, x => x.P, new ExecutionDataflowBlockOptions |
|||
{ |
|||
MaxDegreeOfParallelism = 100, |
|||
MaxMessagesPerTask = 1, |
|||
BoundedCapacity = 100 |
|||
}); |
|||
|
|||
for (var i = 0; i < partitions; i++) |
|||
{ |
|||
for (var j = 0; j < 10; j++) |
|||
{ |
|||
await block.SendAsync((i, j)); |
|||
} |
|||
} |
|||
|
|||
block.Complete(); |
|||
|
|||
await block.Completion; |
|||
|
|||
foreach (var list in lists) |
|||
{ |
|||
Assert.Equal(Enumerable.Range(0, 10).ToList(), list); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
Loading…
Reference in new issue