|
|
@ -19,8 +19,6 @@ using Squidex.Infrastructure.States; |
|
|
|
|
|
|
|
|
namespace Squidex.Infrastructure.Commands |
|
|
namespace Squidex.Infrastructure.Commands |
|
|
{ |
|
|
{ |
|
|
public delegate Task IdSource(Func<DomainId, Task> add); |
|
|
|
|
|
|
|
|
|
|
|
public class Rebuilder |
|
|
public class Rebuilder |
|
|
{ |
|
|
{ |
|
|
private readonly ILocalCache localCache; |
|
|
private readonly ILocalCache localCache; |
|
|
@ -51,7 +49,7 @@ namespace Squidex.Infrastructure.Commands |
|
|
this.localCache = localCache; |
|
|
this.localCache = localCache; |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public virtual async Task RebuildAsync<T, TState>(string filter, CancellationToken ct) where T : DomainObject<TState> where TState : class, IDomainState<TState>, new() |
|
|
public virtual async Task RebuildAsync<T, TState>(string filter, int batchSize, CancellationToken ct = default) where T : DomainObject<TState> where TState : class, IDomainState<TState>, new() |
|
|
{ |
|
|
{ |
|
|
var store = serviceProvider.GetRequiredService<IStore<TState>>(); |
|
|
var store = serviceProvider.GetRequiredService<IStore<TState>>(); |
|
|
|
|
|
|
|
|
@ -65,12 +63,13 @@ namespace Squidex.Infrastructure.Commands |
|
|
|
|
|
|
|
|
await target(id); |
|
|
await target(id); |
|
|
} |
|
|
} |
|
|
}, ct); |
|
|
}, batchSize, ct); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
public virtual async Task InsertManyAsync<T, TState>(IEnumerable<DomainId> source, CancellationToken ct = default) where T : DomainObject<TState> where TState : class, IDomainState<TState>, new() |
|
|
public virtual async Task InsertManyAsync<T, TState>(IEnumerable<DomainId> source, int batchSize, CancellationToken ct = default) where T : DomainObject<TState> where TState : class, IDomainState<TState>, new() |
|
|
{ |
|
|
{ |
|
|
Guard.NotNull(source, nameof(source)); |
|
|
Guard.NotNull(source, nameof(source)); |
|
|
|
|
|
Guard.Between(batchSize, 1, 1000, nameof(batchSize)); |
|
|
|
|
|
|
|
|
var store = serviceProvider.GetRequiredService<IStore<TState>>(); |
|
|
var store = serviceProvider.GetRequiredService<IStore<TState>>(); |
|
|
|
|
|
|
|
|
@ -80,15 +79,13 @@ namespace Squidex.Infrastructure.Commands |
|
|
{ |
|
|
{ |
|
|
await target(id); |
|
|
await target(id); |
|
|
} |
|
|
} |
|
|
}, ct); |
|
|
}, batchSize, ct); |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
private async Task InsertManyAsync<T, TState>(IStore<TState> store, IdSource source, CancellationToken ct = default) where T : DomainObject<TState> where TState : class, IDomainState<TState>, new() |
|
|
private async Task InsertManyAsync<T, TState>(IStore<TState> store, Func<Func<DomainId, Task>, Task> source, int batchSize, CancellationToken ct = default) where T : DomainObject<TState> where TState : class, IDomainState<TState>, new() |
|
|
{ |
|
|
{ |
|
|
var parallelism = Environment.ProcessorCount; |
|
|
var parallelism = Environment.ProcessorCount; |
|
|
|
|
|
|
|
|
const int BatchSize = 100; |
|
|
|
|
|
|
|
|
|
|
|
var workerBlock = new ActionBlock<DomainId[]>(async ids => |
|
|
var workerBlock = new ActionBlock<DomainId[]>(async ids => |
|
|
{ |
|
|
{ |
|
|
try |
|
|
try |
|
|
@ -127,9 +124,9 @@ namespace Squidex.Infrastructure.Commands |
|
|
BoundedCapacity = parallelism |
|
|
BoundedCapacity = parallelism |
|
|
}); |
|
|
}); |
|
|
|
|
|
|
|
|
var batchBlock = new BatchBlock<DomainId>(BatchSize, new GroupingDataflowBlockOptions |
|
|
var batchBlock = new BatchBlock<DomainId>(batchSize, new GroupingDataflowBlockOptions |
|
|
{ |
|
|
{ |
|
|
BoundedCapacity = BatchSize |
|
|
BoundedCapacity = batchSize |
|
|
}); |
|
|
}); |
|
|
|
|
|
|
|
|
batchBlock.LinkTo(workerBlock, new DataflowLinkOptions |
|
|
batchBlock.LinkTo(workerBlock, new DataflowLinkOptions |
|
|
|