diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs index c89f00daf..71f3c53c2 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetFolderRepository_SnapshotStore.cs @@ -54,14 +54,20 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets { using (Profiler.TraceMethod()) { - var entities = snapshots.Select(Map).ToList(); - - if (entities.Count == 0) + var updates = snapshots.Select(Map).Select(x => + new ReplaceOneModel( + Filter.Eq(y => y.DocumentId, x.DocumentId), + x) + { + IsUpsert = true + }).ToList(); + + if (updates.Count == 0) { return; } - await Collection.InsertManyAsync(entities, InsertUnordered); + await Collection.BulkWriteAsync(updates); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs index 3722f2ae0..a4af16806 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Assets/MongoAssetRepository_SnapshotStore.cs @@ -54,14 +54,20 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Assets { using (Profiler.TraceMethod()) { - var entities = snapshots.Select(Map).ToList(); - - if (entities.Count == 0) + var updates = snapshots.Select(Map).Select(x => + new ReplaceOneModel( + Filter.Eq(y => y.DocumentId, x.DocumentId), + x) + { + IsUpsert = true + }).ToList(); + + if (updates.Count == 0) { return; } - await Collection.InsertManyAsync(entities, InsertUnordered); + await Collection.BulkWriteAsync(updates); } } diff --git a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs index c67f74346..d41b074c5 100644 --- a/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs +++ b/backend/src/Squidex.Domain.Apps.Entities.MongoDb/Contents/MongoContentCollection.cs @@ -216,7 +216,14 @@ namespace Squidex.Domain.Apps.Entities.MongoDb.Contents return Task.CompletedTask; } - return Collection.InsertManyAsync(entities, InsertUnordered); + var writes = entities.Select(x => new ReplaceOneModel( + Filter.Eq(y => y.DocumentId, x.DocumentId), + x) + { + IsUpsert = true + }).ToList(); + + return Collection.BulkWriteAsync(writes); } } } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs b/backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs index 809af949c..43c8515ae 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/States/MongoSnapshotStore.cs @@ -72,12 +72,17 @@ namespace Squidex.Infrastructure.States { using (Profiler.TraceMethod>()) { - var writes = snapshots.Select(x => new InsertOneModel>(new MongoState + var writes = snapshots.Select(x => new ReplaceOneModel>( + Filter.Eq(y => y.DocumentId, x.Key), + new MongoState + { + Doc = x.Value, + DocumentId = x.Key, + Version = x.Version + }) { - Doc = x.Value, - DocumentId = x.Key, - Version = x.Version - })).ToList(); + IsUpsert = true + }).ToList(); if (writes.Count == 0) { diff --git a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs index b2fec2225..5ce8770b9 100644 --- a/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs +++ b/backend/src/Squidex.Infrastructure/Commands/Rebuilder.cs @@ -45,7 +45,9 @@ namespace Squidex.Infrastructure.Commands this.localCache = localCache; } - public virtual async Task RebuildAsync(string filter, int batchSize, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() + public virtual async Task RebuildAsync(string filter, int batchSize, + CancellationToken ct = default) + where T : DomainObject where TState : class, IDomainState, new() { var store = serviceProvider.GetRequiredService>(); @@ -62,7 +64,9 @@ namespace Squidex.Infrastructure.Commands }, batchSize, ct); } - public virtual async Task InsertManyAsync(IEnumerable source, int batchSize, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() + public virtual async Task InsertManyAsync(IEnumerable source, int batchSize, + CancellationToken ct = default) + where T : DomainObject where TState : class, IDomainState, new() { Guard.NotNull(source, nameof(source)); Guard.Between(batchSize, 1, 1000, nameof(batchSize)); @@ -78,7 +82,9 @@ namespace Squidex.Infrastructure.Commands }, batchSize, ct); } - private async Task InsertManyAsync(IStore store, Func, Task> source, int batchSize, CancellationToken ct = default) where T : DomainObject where TState : class, IDomainState, new() + private async Task InsertManyAsync(IStore store, Func, Task> source, int batchSize, + CancellationToken ct = default) + where T : DomainObject where TState : class, IDomainState, new() { var parallelism = Environment.ProcessorCount; diff --git a/backend/src/Squidex.Infrastructure/States/BatchContext.cs b/backend/src/Squidex.Infrastructure/States/BatchContext.cs index 3dd9acd84..85fdf7220 100644 --- a/backend/src/Squidex.Infrastructure/States/BatchContext.cs +++ b/backend/src/Squidex.Infrastructure/States/BatchContext.cs @@ -25,7 +25,7 @@ namespace Squidex.Infrastructure.States private readonly IEventDataFormatter eventDataFormatter; private readonly IStreamNameResolver streamNameResolver; private readonly Dictionary>)> @events = new Dictionary>)>(); - private List<(DomainId Key, T Snapshot, long Version)>? snapshots; + private Dictionary? snapshots; internal BatchContext( Type owner, @@ -43,8 +43,12 @@ namespace Squidex.Infrastructure.States internal void Add(DomainId key, T snapshot, long version) { - snapshots ??= new List<(DomainId Key, T Snapshot, long Version)>(); - snapshots.Add((key, snapshot, version)); + snapshots ??= new Dictionary(); + + if (!snapshots.TryGetValue(key, out var existing) || existing.Version < version) + { + snapshots[key] = (snapshot, version); + } } public async Task LoadAsync(IEnumerable ids) @@ -87,7 +91,9 @@ namespace Squidex.Infrastructure.States return Task.CompletedTask; } - return snapshotStore.WriteManyAsync(current); + var list = current.Select(x => (x.Key, x.Value.Snapshot, x.Value.Version)); + + return snapshotStore.WriteManyAsync(list); } public IPersistence WithEventSourcing(Type owner, DomainId key, HandleEvent? applyEvent) diff --git a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs index 6d445a3cd..a7a53af4b 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/States/PersistenceBatchTests.cs @@ -126,6 +126,67 @@ namespace Squidex.Infrastructure.States .MustHaveHappenedOnceExactly(); } + [Fact] + public async Task Should_write_each_id_only_once_if_same_id_requested_twice() + { + var key1 = DomainId.NewGuid(); + var key2 = DomainId.NewGuid(); + + var bulk = sut.WithBatchContext(None.Type); + + await bulk.LoadAsync(new[] { key1, key2 }); + + var persistedEvents1_1 = Save.Events(); + var persistence1_1 = bulk.WithEventSourcing(None.Type, key1, persistedEvents1_1.Write); + + var persistedEvents1_2 = Save.Events(); + var persistence1_2 = bulk.WithEventSourcing(None.Type, key1, persistedEvents1_2.Write); + + await persistence1_1.WriteSnapshotAsync(12); + await persistence1_2.WriteSnapshotAsync(12); + + A.CallTo(() => snapshotStore.WriteAsync(A._, A._, A._, A._)) + .MustNotHaveHappened(); + + A.CallTo(() => snapshotStore.WriteManyAsync(A>._)) + .MustNotHaveHappened(); + + await bulk.CommitAsync(); + await bulk.DisposeAsync(); + + A.CallTo(() => snapshotStore.WriteManyAsync(A>.That.Matches(x => x.Count() == 1))) + .MustHaveHappenedOnceExactly(); + } + + [Fact] + public async Task Should_write_each_id_only_once_if_same_persistence_written_twice() + { + var key1 = DomainId.NewGuid(); + var key2 = DomainId.NewGuid(); + + var bulk = sut.WithBatchContext(None.Type); + + await bulk.LoadAsync(new[] { key1, key2 }); + + var persistedEvents1 = Save.Events(); + var persistence1 = bulk.WithEventSourcing(None.Type, key1, persistedEvents1.Write); + + await persistence1.WriteSnapshotAsync(12); + await persistence1.WriteSnapshotAsync(13); + + A.CallTo(() => snapshotStore.WriteAsync(A._, A._, A._, A._)) + .MustNotHaveHappened(); + + A.CallTo(() => snapshotStore.WriteManyAsync(A>._)) + .MustNotHaveHappened(); + + await bulk.CommitAsync(); + await bulk.DisposeAsync(); + + A.CallTo(() => snapshotStore.WriteManyAsync(A>.That.Matches(x => x.Count() == 1))) + .MustHaveHappenedOnceExactly(); + } + private void SetupEventStore(Dictionary> streams) { var storedStreams = new Dictionary>(); @@ -161,4 +222,4 @@ namespace Squidex.Infrastructure.States .Returns(storedStreams); } } -} \ No newline at end of file +}