diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs index 5902c29ad..86fcdc31d 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Writer.cs @@ -44,7 +44,6 @@ namespace Squidex.Infrastructure.EventSourcing Guard.NotEmpty(commitId, nameof(commitId)); Guard.NotNullOrEmpty(streamName, nameof(streamName)); Guard.NotNull(events, nameof(events)); - Guard.LessThan(events.Count, MaxCommitSize, "events.Count"); Guard.GreaterEquals(expectedVersion, EtagVersion.Any, nameof(expectedVersion)); using (Telemetry.Activities.StartActivity("ContentQueryService/AppendAsync")) diff --git a/backend/src/Squidex.Infrastructure/Commands/CommandExtensions.cs b/backend/src/Squidex.Infrastructure/Commands/CommandExtensions.cs index 6cf669749..5d650fa79 100644 --- a/backend/src/Squidex.Infrastructure/Commands/CommandExtensions.cs +++ b/backend/src/Squidex.Infrastructure/Commands/CommandExtensions.cs @@ -5,6 +5,8 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using Squidex.Infrastructure.EventSourcing; + namespace Squidex.Infrastructure.Commands { public static class CommandExtensions @@ -13,5 +15,17 @@ namespace Squidex.Infrastructure.Commands { return commandMiddleware.HandleAsync(context, x => Task.CompletedTask); } + + public static Envelope Migrate(this Envelope @event, T snapshot) + { + if (@event.Payload is IMigratedStateEvent migratable) + { + var payload = migratable.Migrate(snapshot); + + @event = new Envelope(payload, @event.Headers); + } + + return @event; + } } } diff --git a/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs b/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs index ea4b29680..56d455ae5 100644 --- a/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs +++ b/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs @@ -67,14 +67,18 @@ namespace Squidex.Infrastructure.Commands var allEvents = factory.WithEventSourcing(GetType(), UniqueId, @event => { - var newVersion = snapshot.Version + 1; + @event = @event.Migrate(snapshot); - if (snapshots.Contains(newVersion)) + var (newSnapshot, isChanged) = ApplyEvent(@event, true, snapshot, snapshot.Version, false); + + // Can only be null in case of errors or inconsistent streams. + if (newSnapshot != null) { - return false; + snapshot = newSnapshot; } - return ApplyEvent(@event, true, snapshot, snapshot.Version, false); + // If all snapshorts from this one here are valid we can stop. + return newSnapshot != null && !snapshots.ContainsThisAndNewer(newSnapshot.Version); }); await allEvents.ReadAsync(); @@ -97,14 +101,9 @@ namespace Squidex.Infrastructure.Commands }), @event => { - if (@event.Payload is IMigratedStateEvent migratable) - { - var payload = migratable.Migrate(Snapshot); - - @event = new Envelope(payload, @event.Headers); - } + @event = @event.Migrate(Snapshot); - return ApplyEvent(@event, true, Snapshot, Version, true); + return ApplyEvent(@event, true, Snapshot, Version, true).Success; }); } @@ -146,7 +145,7 @@ namespace Squidex.Infrastructure.Commands @event.SetAggregateId(uniqueId); - if (ApplyEvent(@event, false, Snapshot, Version, true)) + if (ApplyEvent(@event, false, Snapshot, Version, true).Success) { uncomittedEvents.Add(@event); } @@ -299,13 +298,13 @@ namespace Squidex.Infrastructure.Commands snapshots.ResetTo(previousSnapshot, previousVersion); } - private bool ApplyEvent(Envelope @event, bool isLoading, T snapshot, long version, bool clean) + private (T?, bool Success) ApplyEvent(Envelope @event, bool isLoading, T snapshot, long version, bool clean) { if (IsDeleted(snapshot)) { if (!CanRecreate(@event.Payload)) { - return false; + return default; } snapshot = new T @@ -324,14 +323,14 @@ namespace Squidex.Infrastructure.Commands var isChanged = !ReferenceEquals(snapshot, newSnapshot); - if (!ReferenceEquals(newSnapshot, snapshot)) + if (isChanged) { newSnapshot.Version = newVersion; snapshots.Add(newSnapshot, newVersion, clean); } - return isChanged; + return (newSnapshot, isChanged); } private async Task ReadAsync() diff --git a/backend/src/Squidex.Infrastructure/Commands/SnapshotList.cs b/backend/src/Squidex.Infrastructure/Commands/SnapshotList.cs index 7925baa70..090e7549e 100644 --- a/backend/src/Squidex.Infrastructure/Commands/SnapshotList.cs +++ b/backend/src/Squidex.Infrastructure/Commands/SnapshotList.cs @@ -69,11 +69,11 @@ namespace Squidex.Infrastructure.Commands return (null, false); } - public bool Contains(long version) + public bool ContainsThisAndNewer(long version) { var index = GetIndex(version); - return items.ElementAtOrDefault(index) != null; + return items.Skip(index).All(x => x != null); } public void Add(T snapshot, long version, bool clean = false) diff --git a/backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkUpdateContentsJobDto.cs b/backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkUpdateContentsJobDto.cs index cad4c462b..93ff2716b 100644 --- a/backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkUpdateContentsJobDto.cs +++ b/backend/src/Squidex/Areas/Api/Controllers/Contents/Models/BulkUpdateContentsJobDto.cs @@ -67,7 +67,7 @@ namespace Squidex.Areas.Api.Controllers.Contents.Models public BulkUpdateJob ToJob() { - return SimpleMapper.Map(this, new BulkUpdateJob()); + return SimpleMapper.Map(this, new BulkUpdateJob { Query = Query }); } } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs b/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs index d186016ff..e22dbef43 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs @@ -453,15 +453,18 @@ namespace Squidex.Infrastructure.Commands await sut.ExecuteAsync(new CreateAuto { Value = 3 }); await sut.ExecuteAsync(new UpdateAuto { Value = 4 }); + await sut.ExecuteAsync(new UpdateAuto { Value = 5 }); var version_Empty = await sut.GetSnapshotAsync(EtagVersion.Empty); var version_0 = await sut.GetSnapshotAsync(0); var version_1 = await sut.GetSnapshotAsync(1); + var version_2 = await sut.GetSnapshotAsync(2); Assert.Empty(sut.GetUncomittedEvents()); AssertSnapshot(version_Empty, 0, EtagVersion.Empty); AssertSnapshot(version_0, 3, 0); AssertSnapshot(version_1, 4, 1); + AssertSnapshot(version_2, 5, 2); A.CallTo(() => persistenceFactory.WithEventSourcing(typeof(MyDomainObject), id, A._)) .MustNotHaveHappened(); @@ -477,18 +480,21 @@ namespace Squidex.Infrastructure.Commands await sut.ExecuteAsync(new CreateAuto { Value = 3 }); await sut.ExecuteAsync(new UpdateAuto { Value = 4 }); + await sut.ExecuteAsync(new UpdateAuto { Value = 5 }); var version_Empty = await sut.GetSnapshotAsync(EtagVersion.Empty); var version_0 = await sut.GetSnapshotAsync(0); var version_1 = await sut.GetSnapshotAsync(1); + var version_2 = await sut.GetSnapshotAsync(2); Assert.Empty(sut.GetUncomittedEvents()); AssertSnapshot(version_Empty, 0, EtagVersion.Empty); AssertSnapshot(version_0, 3, 0); AssertSnapshot(version_1, 4, 1); + AssertSnapshot(version_2, 5, 2); A.CallTo(() => persistenceFactory.WithEventSourcing(typeof(MyDomainObject), id, A._)) - .MustHaveHappened(); + .MustHaveHappenedOnceExactly(); } private static void AssertSnapshot(MyDomainState state, int value, long version, bool isDeleted = false)