From 9010af55652b3afa77db142e04209faf99456507 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sun, 21 Nov 2021 16:12:15 +0100 Subject: [PATCH] Fix/ignore old events (#796) * App service binding. * Ignore old events (after deletion) to migrate corrupt event streams. * Delete backup file. --- .../Apps/DomainObject/AppDomainObject.cs | 4 +- .../Assets/DomainObject/AssetDomainObject.cs | 11 +++- .../DomainObject/AssetFolderDomainObject.cs | 2 +- .../DomainObject/ContentDomainObject.cs | 11 +++- .../DomainObjectState.cs | 5 ++ .../Rules/DomainObject/RuleDomainObject.cs | 6 +- .../DomainObject/SchemaDomainObject.cs | 4 +- .../Commands/DomainObject.Execute.cs | 6 +- .../Commands/DomainObject.cs | 60 +++++++++++-------- .../Commands/IDomainState.cs | 2 + .../Migrations/Migrator.cs | 47 ++++++++++----- .../Squidex/Config/Orleans/OrleansServices.cs | 6 ++ .../Commands/DomainObjectTests.cs | 38 ++++++++++++ .../Migrations/MigratorTests.cs | 25 ++++++++ .../TestHelpers/MyDomainObject.cs | 12 +++- .../TestHelpers/MyDomainState.cs | 5 ++ 16 files changed, 185 insertions(+), 59 deletions(-) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs index 39b9a5da6..a8336fef1 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs @@ -44,9 +44,9 @@ namespace Squidex.Domain.Apps.Entities.Apps.DomainObject this.initialSettings = initialPatterns; } - protected override bool IsDeleted() + protected override bool IsDeleted(State snapshot) { - return Snapshot.IsDeleted; + return snapshot.IsDeleted; } protected override bool CanAcceptCreation(ICommand command) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs index 5462541af..cee7ed964 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs @@ -33,9 +33,9 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject Capacity = 2; } - protected override bool IsDeleted() + protected override bool IsDeleted(State snapshot) { - return Snapshot.IsDeleted; + return snapshot.IsDeleted; } protected override bool CanRecreate() @@ -43,6 +43,11 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject return true; } + protected override bool CanRecreate(IEvent @event) + { + return @event is AssetCreated; + } + protected override bool CanAcceptCreation(ICommand command) { return command is AssetCommand; @@ -64,7 +69,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject { var operation = await AssetOperation.CreateAsync(serviceProvider, c, () => Snapshot); - if (Version > EtagVersion.Empty && !IsDeleted()) + if (Version > EtagVersion.Empty && !IsDeleted(Snapshot)) { await UpdateCore(c.AsUpdate(), operation); } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs index 61b237cb7..1e45dcae2 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs @@ -30,7 +30,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject this.serviceProvider = serviceProvider; } - protected override bool IsDeleted() + protected override bool IsDeleted(State snapshot) { return Snapshot.IsDeleted; } diff --git a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs index 064078c12..47d5d35b2 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs @@ -38,9 +38,9 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject Capacity = 5; } - protected override bool IsDeleted() + protected override bool IsDeleted(State snapshot) { - return Snapshot.IsDeleted; + return snapshot.IsDeleted; } protected override bool CanAcceptCreation(ICommand command) @@ -53,6 +53,11 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject return true; } + protected override bool CanRecreate(IEvent @event) + { + return @event is ContentCreated; + } + protected override bool CanAccept(ICommand command) { return command is ContentCommand contentCommand && @@ -70,7 +75,7 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject { var operation = await ContentOperation.CreateAsync(serviceProvider, c, () => Snapshot); - if (Version > EtagVersion.Empty && !IsDeleted()) + if (Version > EtagVersion.Empty && !IsDeleted(Snapshot)) { await UpdateCore(c.AsUpdate(), operation); } diff --git a/backend/src/Squidex.Domain.Apps.Entities/DomainObjectState.cs b/backend/src/Squidex.Domain.Apps.Entities/DomainObjectState.cs index 21bd41d84..5dcae143f 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/DomainObjectState.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/DomainObjectState.cs @@ -42,6 +42,11 @@ namespace Squidex.Domain.Apps.Entities return false; } + public T Copy() + { + return (T)MemberwiseClone(); + } + public T Apply(Envelope @event) { var payload = (SquidexEvent)@event.Payload; diff --git a/backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs index bf5555e87..c669036e1 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs @@ -1,4 +1,4 @@ -// ========================================================================== +// ========================================================================== // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex UG (haftungsbeschraenkt) @@ -32,9 +32,9 @@ namespace Squidex.Domain.Apps.Entities.Rules.DomainObject this.ruleEnqueuer = ruleEnqueuer; } - protected override bool IsDeleted() + protected override bool IsDeleted(State snapshot) { - return Snapshot.IsDeleted; + return snapshot.IsDeleted; } protected override bool CanAcceptCreation(ICommand command) diff --git a/backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs b/backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs index 32c2a4f8d..8deed0b20 100644 --- a/backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs +++ b/backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs @@ -30,9 +30,9 @@ namespace Squidex.Domain.Apps.Entities.Schemas.DomainObject { } - protected override bool IsDeleted() + protected override bool IsDeleted(State snapshot) { - return Snapshot.IsDeleted; + return snapshot.IsDeleted; } protected override bool CanAcceptCreation(ICommand command) diff --git a/backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs b/backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs index c8cb2e1f7..ccc320381 100644 --- a/backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs +++ b/backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs @@ -161,7 +161,7 @@ namespace Squidex.Infrastructure.Commands { Guard.NotNull(command, nameof(command)); - if (Version != EtagVersion.Empty && !(IsDeleted() && CanRecreate())) + if (Version != EtagVersion.Empty && !(IsDeleted(Snapshot) && CanRecreate())) { throw new DomainObjectConflictException(uniqueId.ToString()); } @@ -189,7 +189,7 @@ namespace Squidex.Infrastructure.Commands await EnsureLoadedAsync(); - if (IsDeleted() && !CanRecreate()) + if (IsDeleted(Snapshot) && !CanRecreate()) { throw new DomainObjectDeletedException(uniqueId.ToString()); } @@ -220,7 +220,7 @@ namespace Squidex.Infrastructure.Commands private void NotDeleted() { - if (IsDeleted()) + if (IsDeleted(Snapshot)) { throw new DomainObjectDeletedException(uniqueId.ToString()); } diff --git a/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs b/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs index 13161c4ad..727e32ec5 100644 --- a/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs +++ b/backend/src/Squidex.Infrastructure/Commands/DomainObject.cs @@ -72,17 +72,12 @@ namespace Squidex.Infrastructure.Commands { var newVersion = snapshot.Version + 1; - if (!snapshots.Contains(newVersion)) + if (snapshots.Contains(newVersion)) { - snapshot = Apply(snapshot, @event); - snapshot.Version = newVersion; - - snapshots.Add(snapshot, newVersion, false); - - return true; + return false; } - return false; + return ApplyEvent(@event, true, snapshot, snapshot.Version, false); }); await allEvents.ReadAsync(); @@ -112,7 +107,7 @@ namespace Squidex.Infrastructure.Commands @event = new Envelope(payload, @event.Headers); } - return ApplyEvent(@event, true); + return ApplyEvent(@event, true, Snapshot, Version, true); }); } @@ -154,7 +149,7 @@ namespace Squidex.Infrastructure.Commands @event.SetAggregateId(uniqueId); - if (ApplyEvent(@event, false)) + if (ApplyEvent(@event, false, Snapshot, Version, true)) { uncomittedEvents.Add(@event); } @@ -217,7 +212,7 @@ namespace Squidex.Infrastructure.Commands { Guard.NotNull(handler, nameof(handler)); - var wasDeleted = IsDeleted(); + var wasDeleted = IsDeleted(Snapshot); var previousSnapshot = Snapshot; var previousVersion = Version; @@ -243,7 +238,7 @@ namespace Squidex.Infrastructure.Commands foreach (var @event in uncomittedEvents) { - ApplyEvent(@event, false); + ApplyEvent(@event, false, Snapshot, Version, true); } await WriteAsync(events); @@ -287,7 +282,7 @@ namespace Squidex.Infrastructure.Commands return true; } - protected virtual bool IsDeleted() + protected virtual bool IsDeleted(T snapshot) { return false; } @@ -297,36 +292,49 @@ namespace Squidex.Infrastructure.Commands return false; } + protected virtual bool CanRecreate(IEvent @event) + { + return false; + } + private void RestorePreviousSnapshot(T previousSnapshot, long previousVersion) { snapshots.ResetTo(previousSnapshot, previousVersion); } - private bool ApplyEvent(Envelope @event, bool isLoading) + private bool ApplyEvent(Envelope @event, bool isLoading, T snapshot, long version, bool clean) { - var newVersion = Version + 1; - - var snapshotOld = Snapshot; - - if (IsDeleted()) + if (IsDeleted(snapshot)) { - snapshotOld = new T + if (!CanRecreate(@event.Payload)) + { + return false; + } + + snapshot = new T { Version = Version }; } - var snapshotNew = Apply(snapshotOld, @event); + var newVersion = version + 1; + var newSnapshot = Apply(snapshot, @event); - if (!ReferenceEquals(snapshotOld, snapshotNew) || isLoading) + if (ReferenceEquals(snapshot, newSnapshot) && isLoading) { - snapshotNew.Version = newVersion; - snapshots.Add(snapshotNew, newVersion, true); + newSnapshot = snapshot.Copy(); + } - return true; + var isChanged = !ReferenceEquals(snapshot, newSnapshot); + + if (!ReferenceEquals(newSnapshot, snapshot)) + { + newSnapshot.Version = newVersion; + + snapshots.Add(newSnapshot, newVersion, clean); } - return false; + return isChanged; } private async Task ReadAsync() diff --git a/backend/src/Squidex.Infrastructure/Commands/IDomainState.cs b/backend/src/Squidex.Infrastructure/Commands/IDomainState.cs index 50d43920e..a99bc11ef 100644 --- a/backend/src/Squidex.Infrastructure/Commands/IDomainState.cs +++ b/backend/src/Squidex.Infrastructure/Commands/IDomainState.cs @@ -13,6 +13,8 @@ namespace Squidex.Infrastructure.Commands { long Version { get; set; } + T Copy(); + T Apply(Envelope @event); } } diff --git a/backend/src/Squidex.Infrastructure/Migrations/Migrator.cs b/backend/src/Squidex.Infrastructure/Migrations/Migrator.cs index 3c2e80f67..77ee147c0 100644 --- a/backend/src/Squidex.Infrastructure/Migrations/Migrator.cs +++ b/backend/src/Squidex.Infrastructure/Migrations/Migrator.cs @@ -32,17 +32,13 @@ namespace Squidex.Infrastructure.Migrations public async Task MigrateAsync( CancellationToken ct = default) { - try + if (!await TryLockAsync(ct)) { - while (!await migrationStatus.TryLockAsync(ct)) - { - log.LogInformation(w => w - .WriteProperty("action", "Migrate") - .WriteProperty("mesage", $"Waiting {LockWaitMs}ms to acquire lock.")); - - await Task.Delay(LockWaitMs, ct); - } + return; + } + try + { var version = await migrationStatus.GetVersionAsync(ct); while (!ct.IsCancellationRequested) @@ -91,12 +87,35 @@ namespace Squidex.Infrastructure.Migrations } finally { -#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods that take one -#pragma warning disable MA0040 // Flow the cancellation token - await migrationStatus.UnlockAsync(); -#pragma warning restore MA0040 // Flow the cancellation token -#pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods that take one + await UnlockAsync(); + } + } + + private async Task TryLockAsync( + CancellationToken ct) + { + try + { + while (!await migrationStatus.TryLockAsync(ct)) + { + log.LogInformation(w => w + .WriteProperty("action", "Migrate") + .WriteProperty("mesage", $"Waiting {LockWaitMs}ms to acquire lock.")); + + await Task.Delay(LockWaitMs, ct); + } + } + catch (OperationCanceledException) + { + return false; } + + return true; + } + + private Task UnlockAsync() + { + return migrationStatus.UnlockAsync(); } } } diff --git a/backend/src/Squidex/Config/Orleans/OrleansServices.cs b/backend/src/Squidex/Config/Orleans/OrleansServices.cs index 246ad853c..52485fe97 100644 --- a/backend/src/Squidex/Config/Orleans/OrleansServices.cs +++ b/backend/src/Squidex/Config/Orleans/OrleansServices.cs @@ -5,6 +5,7 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +using System; using System.Globalization; using System.Net; using System.Net.Sockets; @@ -50,6 +51,11 @@ namespace Squidex.Config.Orleans parts.AddApplicationPart(SquidexInfrastructure.Assembly); }); + builder.Configure(options => + { + options.TurnWarningLengthThreshold = TimeSpan.FromSeconds(5); + }); + builder.Configure(options => { options.Configure(); diff --git a/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs b/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs index 6ffe8be2c..5c2ee3911 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs @@ -97,10 +97,47 @@ namespace Squidex.Infrastructure.Commands Assert.Equal(20, sut.Snapshot.Value); } + [Fact] + public async Task Should_recreate_when_loading() + { + sut.RecreateEvent = true; + + SetupCreated( + new ValueChanged { Value = 2 }, + new ValueChanged { Value = 3 }, + new Deleted(), + new ValueChanged { Value = 4 }); + + await sut.EnsureLoadedAsync(); + + Assert.Equal(3, sut.Version); + Assert.Equal(3, sut.Snapshot.Version); + + AssertSnapshot(sut.Snapshot, 4, 3); + } + + [Fact] + public async Task Should_ignore_events_after_deleting_when_loading() + { + SetupCreated( + new ValueChanged { Value = 2 }, + new ValueChanged { Value = 3 }, + new Deleted(), + new ValueChanged { Value = 4 }); + + await sut.EnsureLoadedAsync(); + + Assert.Equal(2, sut.Version); + Assert.Equal(2, sut.Snapshot.Version); + + AssertSnapshot(sut.Snapshot, 3, 2, true); + } + [Fact] public async Task Should_recreate_with_create_command_if_deleted_before() { sut.Recreate = true; + sut.RecreateEvent = true; SetupCreated(2); SetupDeleted(); @@ -141,6 +178,7 @@ namespace Squidex.Infrastructure.Commands public async Task Should_recreate_with_upsert_command_if_deleted_before() { sut.Recreate = true; + sut.RecreateEvent = true; SetupCreated(2); SetupDeleted(); diff --git a/backend/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs b/backend/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs index ac29e873c..7d0982cd5 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs @@ -245,6 +245,31 @@ namespace Squidex.Infrastructure.Migrations .MustHaveHappenedOnceExactly(); } + [Fact] + public async Task Should_not_release_lock_if_not_acquired() + { + using (var tcs = new CancellationTokenSource()) + { + var sut = new Migrator(status, path, log) { LockWaitMs = 2 }; + + A.CallTo(() => status.TryLockAsync(tcs.Token)) + .Returns(false); + + var task = sut.MigrateAsync(tcs.Token); + +#pragma warning disable MA0040 // Flow the cancellation token + await Task.Delay(100); +#pragma warning restore MA0040 // Flow the cancellation token + + tcs.Cancel(); + + await task; + + A.CallTo(() => status.UnlockAsync(A._)) + .MustNotHaveHappened(); + } + } + private IMigration BuildMigration(int fromVersion, int toVersion) { var migration = A.Fake(); diff --git a/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs b/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs index e30bf64b8..61051501b 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs @@ -9,6 +9,7 @@ using System; using System.Threading.Tasks; using FakeItEasy; using Squidex.Infrastructure.Commands; +using Squidex.Infrastructure.EventSourcing; using Squidex.Infrastructure.States; using Squidex.Log; @@ -18,6 +19,8 @@ namespace Squidex.Infrastructure.TestHelpers { public bool Recreate { get; set; } + public bool RecreateEvent { get; set; } + public int VersionsToKeep { get => Capacity; @@ -29,6 +32,11 @@ namespace Squidex.Infrastructure.TestHelpers { } + protected override bool CanRecreate(IEvent @event) + { + return RecreateEvent ? @event is ValueChanged : false; + } + protected override bool CanRecreate() { return Recreate; @@ -54,9 +62,9 @@ namespace Squidex.Infrastructure.TestHelpers return true; } - protected override bool IsDeleted() + protected override bool IsDeleted(MyDomainState snapshot) { - return Snapshot.IsDeleted; + return snapshot.IsDeleted; } public override Task ExecuteAsync(IAggregateCommand command) diff --git a/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainState.cs b/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainState.cs index b47bc9d21..4f870ef72 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainState.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainState.cs @@ -20,6 +20,11 @@ namespace Squidex.Infrastructure.TestHelpers public long Value { get; set; } + public MyDomainState Copy() + { + return (MyDomainState)MemberwiseClone(); + } + public MyDomainState Apply(Envelope @event) { switch (@event.Payload)