Browse Source

Fixes to batch writing.

pull/743/head
Sebastian 4 years ago
parent
commit
1519d58226
  1. 14
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs
  2. 14
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs
  3. 9
      backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs
  4. 15
      backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs
  5. 12
      backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs
  6. 14
      backend/src/Squidex.Infrastructure/States/BatchContext.cs
  7. 63
      backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs

14
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs

@ -54,14 +54,20 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
{ {
using (Profiler.TraceMethod<MongoAssetFolderRepository>()) using (Profiler.TraceMethod<MongoAssetFolderRepository>())
{ {
var entities = snapshots.Select(Map).ToList(); var updates = snapshots.Select(Map).Select(x =>
new ReplaceOneModel<MongoAssetFolderEntity>(
if (entities.Count == 0) Filter.Eq(y => y.DocumentId, x.DocumentId),
x)
{
IsUpsert = true
}).ToList();
if (updates.Count == 0)
{ {
return; return;
} }
await Collection.InsertManyAsync(entities, InsertUnordered); await Collection.BulkWriteAsync(updates);
} }
} }

14
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs

@ -54,14 +54,20 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets
{ {
using (Profiler.TraceMethod<MongoAssetFolderRepository>()) using (Profiler.TraceMethod<MongoAssetFolderRepository>())
{ {
var entities = snapshots.Select(Map).ToList(); var updates = snapshots.Select(Map).Select(x =>
new ReplaceOneModel<MongoAssetEntity>(
if (entities.Count == 0) Filter.Eq(y => y.DocumentId, x.DocumentId),
x)
{
IsUpsert = true
}).ToList();
if (updates.Count == 0)
{ {
return; return;
} }
await Collection.InsertManyAsync(entities, InsertUnordered); await Collection.BulkWriteAsync(updates);
} }
} }

9
backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs

@ -216,7 +216,14 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents
return Task.CompletedTask; return Task.CompletedTask;
} }
return Collection.InsertManyAsync(entities, InsertUnordered); var writes = entities.Select(x => new ReplaceOneModel<MongoContentEntity>(
Filter.Eq(y => y.DocumentId, x.DocumentId),
x)
{
IsUpsert = true
}).ToList();
return Collection.BulkWriteAsync(writes);
} }
} }
} }

15
backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs

@ -72,12 +72,17 @@ namespace Squidex.Infrastructure.States
{ {
using (Profiler.TraceMethod<MongoSnapshotStore<T>>()) using (Profiler.TraceMethod<MongoSnapshotStore<T>>())
{ {
var writes = snapshots.Select(x => new InsertOneModel<MongoState<T>>(new MongoState<T> var writes = snapshots.Select(x => new ReplaceOneModel<MongoState<T>>(
Filter.Eq(y => y.DocumentId, x.Key),
new MongoState<T>
{
Doc = x.Value,
DocumentId = x.Key,
Version = x.Version
})
{ {
Doc = x.Value, IsUpsert = true
DocumentId = x.Key, }).ToList();
Version = x.Version
})).ToList();
if (writes.Count == 0) if (writes.Count == 0)
{ {

12
backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs

@ -45,7 +45,9 @@ namespace Squidex.Infrastructure.Commands
this.localCache = localCache; this.localCache = localCache;
} }
public virtual async Task RebuildAsync<T, TState>(string filter, int batchSize, CancellationToken ct = default) where T : DomainObject<TState> where TState : class, IDomainState<TState>, new() public virtual async Task RebuildAsync<T, TState>(string filter, int batchSize,
CancellationToken ct = default)
where T : DomainObject<TState> where TState : class, IDomainState<TState>, new()
{ {
var store = serviceProvider.GetRequiredService<IStore<TState>>(); var store = serviceProvider.GetRequiredService<IStore<TState>>();
@ -62,7 +64,9 @@ namespace Squidex.Infrastructure.Commands
}, batchSize, ct); }, batchSize, ct);
} }
public virtual async Task InsertManyAsync<T, TState>(IEnumerable<DomainId> source, int batchSize, CancellationToken ct = default) where T : DomainObject<TState> where TState : class, IDomainState<TState>, new() public virtual async Task InsertManyAsync<T, TState>(IEnumerable<DomainId> source, int batchSize,
CancellationToken ct = default)
where T : DomainObject<TState> where TState : class, IDomainState<TState>, new()
{ {
Guard.NotNull(source, nameof(source)); Guard.NotNull(source, nameof(source));
Guard.Between(batchSize, 1, 1000, nameof(batchSize)); Guard.Between(batchSize, 1, 1000, nameof(batchSize));
@ -78,7 +82,9 @@ namespace Squidex.Infrastructure.Commands
}, batchSize, ct); }, batchSize, ct);
} }
private async Task InsertManyAsync<T, TState>(IStore<TState> store, Func<Func<DomainId, Task>, Task> source, int batchSize, CancellationToken ct = default) where T : DomainObject<TState> where TState : class, IDomainState<TState>, new() private async Task InsertManyAsync<T, TState>(IStore<TState> store, Func<Func<DomainId, Task>, Task> source, int batchSize,
CancellationToken ct = default)
where T : DomainObject<TState> where TState : class, IDomainState<TState>, new()
{ {
var parallelism = Environment.ProcessorCount; var parallelism = Environment.ProcessorCount;

14
backend/src/Squidex.Infrastructure/States/BatchContext.cs

@ -25,7 +25,7 @@ namespace Squidex.Infrastructure.States
private readonly IEventDataFormatter eventDataFormatter; private readonly IEventDataFormatter eventDataFormatter;
private readonly IStreamNameResolver streamNameResolver; private readonly IStreamNameResolver streamNameResolver;
private readonly Dictionary<DomainId, (long, List<Envelope<IEvent>>)> @events = new Dictionary<DomainId, (long, List<Envelope<IEvent>>)>(); private readonly Dictionary<DomainId, (long, List<Envelope<IEvent>>)> @events = new Dictionary<DomainId, (long, List<Envelope<IEvent>>)>();
private List<(DomainId Key, T Snapshot, long Version)>? snapshots; private Dictionary<DomainId, (T Snapshot, long Version)>? snapshots;
internal BatchContext( internal BatchContext(
Type owner, Type owner,
@ -43,8 +43,12 @@ namespace Squidex.Infrastructure.States
internal void Add(DomainId key, T snapshot, long version) internal void Add(DomainId key, T snapshot, long version)
{ {
snapshots ??= new List<(DomainId Key, T Snapshot, long Version)>(); snapshots ??= new Dictionary<DomainId, (T Snapshot, long Version)>();
snapshots.Add((key, snapshot, version));
if (!snapshots.TryGetValue(key, out var existing) || existing.Version < version)
{
snapshots[key] = (snapshot, version);
}
} }
public async Task LoadAsync(IEnumerable<DomainId> ids) public async Task LoadAsync(IEnumerable<DomainId> ids)
@ -87,7 +91,9 @@ namespace Squidex.Infrastructure.States
return Task.CompletedTask; return Task.CompletedTask;
} }
return snapshotStore.WriteManyAsync(current); var list = current.Select(x => (x.Key, x.Value.Snapshot, x.Value.Version));
return snapshotStore.WriteManyAsync(list);
} }
public IPersistence<T> WithEventSourcing(Type owner, DomainId key, HandleEvent? applyEvent) public IPersistence<T> WithEventSourcing(Type owner, DomainId key, HandleEvent? applyEvent)

63
backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs

@ -126,6 +126,67 @@ namespace Squidex.Infrastructure.States
.MustHaveHappenedOnceExactly(); .MustHaveHappenedOnceExactly();
} }
[Fact]
public async Task Should_write_each_id_only_once_if_same_id_requested_twice()
{
var key1 = DomainId.NewGuid();
var key2 = DomainId.NewGuid();
var bulk = sut.WithBatchContext(None.Type);
await bulk.LoadAsync(new[] { key1, key2 });
var persistedEvents1_1 = Save.Events();
var persistence1_1 = bulk.WithEventSourcing(None.Type, key1, persistedEvents1_1.Write);
var persistedEvents1_2 = Save.Events();
var persistence1_2 = bulk.WithEventSourcing(None.Type, key1, persistedEvents1_2.Write);
await persistence1_1.WriteSnapshotAsync(12);
await persistence1_2.WriteSnapshotAsync(12);
A.CallTo(() => snapshotStore.WriteAsync(A<DomainId>._, A<int>._, A<long>._, A<long>._))
.MustNotHaveHappened();
A.CallTo(() => snapshotStore.WriteManyAsync(A<IEnumerable<(DomainId, int, long)>>._))
.MustNotHaveHappened();
await bulk.CommitAsync();
await bulk.DisposeAsync();
A.CallTo(() => snapshotStore.WriteManyAsync(A<IEnumerable<(DomainId, int, long)>>.That.Matches(x => x.Count() == 1)))
.MustHaveHappenedOnceExactly();
}
[Fact]
public async Task Should_write_each_id_only_once_if_same_persistence_written_twice()
{
var key1 = DomainId.NewGuid();
var key2 = DomainId.NewGuid();
var bulk = sut.WithBatchContext(None.Type);
await bulk.LoadAsync(new[] { key1, key2 });
var persistedEvents1 = Save.Events();
var persistence1 = bulk.WithEventSourcing(None.Type, key1, persistedEvents1.Write);
await persistence1.WriteSnapshotAsync(12);
await persistence1.WriteSnapshotAsync(13);
A.CallTo(() => snapshotStore.WriteAsync(A<DomainId>._, A<int>._, A<long>._, A<long>._))
.MustNotHaveHappened();
A.CallTo(() => snapshotStore.WriteManyAsync(A<IEnumerable<(DomainId, int, long)>>._))
.MustNotHaveHappened();
await bulk.CommitAsync();
await bulk.DisposeAsync();
A.CallTo(() => snapshotStore.WriteManyAsync(A<IEnumerable<(DomainId, int, long)>>.That.Matches(x => x.Count() == 1)))
.MustHaveHappenedOnceExactly();
}
private void SetupEventStore(Dictionary<DomainId, List<MyEvent>> streams) private void SetupEventStore(Dictionary<DomainId, List<MyEvent>> streams)
{ {
var storedStreams = new Dictionary<string, IReadOnlyList<StoredEvent>>(); var storedStreams = new Dictionary<string, IReadOnlyList<StoredEvent>>();
@ -161,4 +222,4 @@ namespace Squidex.Infrastructure.States
.Returns(storedStreams); .Returns(storedStreams);
} }
} }
} }

Loading…
Cancel
Save