Browse Source

Fix/ignore old events (#796)

* App service binding.

* Ignore old events (after deletion) to migrate corrupt event streams.

* Delete backup file.
pull/797/head
Sebastian Stehle 4 years ago
committed by GitHub
parent
commit
9010af5565
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs
  2. 11
      backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs
  3. 2
      backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs
  4. 11
      backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs
  5. 5
      backend/src/Squidex.Domain.Apps.Entities/DomainObjectState.cs
  6. 6
      backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs
  7. 4
      backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs
  8. 6
      backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs
  9. 60
      backend/src/Squidex.Infrastructure/Commands/DomainObject.cs
  10. 2
      backend/src/Squidex.Infrastructure/Commands/IDomainState.cs
  11. 47
      backend/src/Squidex.Infrastructure/Migrations/Migrator.cs
  12. 6
      backend/src/Squidex/Config/Orleans/OrleansServices.cs
  13. 38
      backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs
  14. 25
      backend/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs
  15. 12
      backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs
  16. 5
      backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainState.cs

4
backend/src/Squidex.Domain.Apps.Entities/Apps/DomainObject/AppDomainObject.cs

@ -44,9 +44,9 @@ namespace Squidex.Domain.Apps.Entities.Apps.DomainObject
this.initialSettings = initialPatterns; this.initialSettings = initialPatterns;
} }
protected override bool IsDeleted() protected override bool IsDeleted(State snapshot)
{ {
return Snapshot.IsDeleted; return snapshot.IsDeleted;
} }
protected override bool CanAcceptCreation(ICommand command) protected override bool CanAcceptCreation(ICommand command)

11
backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetDomainObject.cs

@ -33,9 +33,9 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject
Capacity = 2; Capacity = 2;
} }
protected override bool IsDeleted() protected override bool IsDeleted(State snapshot)
{ {
return Snapshot.IsDeleted; return snapshot.IsDeleted;
} }
protected override bool CanRecreate() protected override bool CanRecreate()
@ -43,6 +43,11 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject
return true; return true;
} }
protected override bool CanRecreate(IEvent @event)
{
return @event is AssetCreated;
}
protected override bool CanAcceptCreation(ICommand command) protected override bool CanAcceptCreation(ICommand command)
{ {
return command is AssetCommand; return command is AssetCommand;
@ -64,7 +69,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject
{ {
var operation = await AssetOperation.CreateAsync(serviceProvider, c, () => Snapshot); var operation = await AssetOperation.CreateAsync(serviceProvider, c, () => Snapshot);
if (Version > EtagVersion.Empty && !IsDeleted()) if (Version > EtagVersion.Empty && !IsDeleted(Snapshot))
{ {
await UpdateCore(c.AsUpdate(), operation); await UpdateCore(c.AsUpdate(), operation);
} }

2
backend/src/Squidex.Domain.Apps.Entities/Assets/DomainObject/AssetFolderDomainObject.cs

@ -30,7 +30,7 @@ namespace Squidex.Domain.Apps.Entities.Assets.DomainObject
this.serviceProvider = serviceProvider; this.serviceProvider = serviceProvider;
} }
protected override bool IsDeleted() protected override bool IsDeleted(State snapshot)
{ {
return Snapshot.IsDeleted; return Snapshot.IsDeleted;
} }

11
backend/src/Squidex.Domain.Apps.Entities/Contents/DomainObject/ContentDomainObject.cs

@ -38,9 +38,9 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject
Capacity = 5; Capacity = 5;
} }
protected override bool IsDeleted() protected override bool IsDeleted(State snapshot)
{ {
return Snapshot.IsDeleted; return snapshot.IsDeleted;
} }
protected override bool CanAcceptCreation(ICommand command) protected override bool CanAcceptCreation(ICommand command)
@ -53,6 +53,11 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject
return true; return true;
} }
protected override bool CanRecreate(IEvent @event)
{
return @event is ContentCreated;
}
protected override bool CanAccept(ICommand command) protected override bool CanAccept(ICommand command)
{ {
return command is ContentCommand contentCommand && return command is ContentCommand contentCommand &&
@ -70,7 +75,7 @@ namespace Squidex.Domain.Apps.Entities.Contents.DomainObject
{ {
var operation = await ContentOperation.CreateAsync(serviceProvider, c, () => Snapshot); var operation = await ContentOperation.CreateAsync(serviceProvider, c, () => Snapshot);
if (Version > EtagVersion.Empty && !IsDeleted()) if (Version > EtagVersion.Empty && !IsDeleted(Snapshot))
{ {
await UpdateCore(c.AsUpdate(), operation); await UpdateCore(c.AsUpdate(), operation);
} }

5
backend/src/Squidex.Domain.Apps.Entities/DomainObjectState.cs

@ -42,6 +42,11 @@ namespace Squidex.Domain.Apps.Entities
return false; return false;
} }
public T Copy()
{
return (T)MemberwiseClone();
}
public T Apply(Envelope<IEvent> @event) public T Apply(Envelope<IEvent> @event)
{ {
var payload = (SquidexEvent)@event.Payload; var payload = (SquidexEvent)@event.Payload;

6
backend/src/Squidex.Domain.Apps.Entities/Rules/DomainObject/RuleDomainObject.cs

@ -1,4 +1,4 @@
// ========================================================================== // ==========================================================================
// Squidex Headless CMS // Squidex Headless CMS
// ========================================================================== // ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt) // Copyright (c) Squidex UG (haftungsbeschraenkt)
@ -32,9 +32,9 @@ namespace Squidex.Domain.Apps.Entities.Rules.DomainObject
this.ruleEnqueuer = ruleEnqueuer; this.ruleEnqueuer = ruleEnqueuer;
} }
protected override bool IsDeleted() protected override bool IsDeleted(State snapshot)
{ {
return Snapshot.IsDeleted; return snapshot.IsDeleted;
} }
protected override bool CanAcceptCreation(ICommand command) protected override bool CanAcceptCreation(ICommand command)

4
backend/src/Squidex.Domain.Apps.Entities/Schemas/DomainObject/SchemaDomainObject.cs

@ -30,9 +30,9 @@ namespace Squidex.Domain.Apps.Entities.Schemas.DomainObject
{ {
} }
protected override bool IsDeleted() protected override bool IsDeleted(State snapshot)
{ {
return Snapshot.IsDeleted; return snapshot.IsDeleted;
} }
protected override bool CanAcceptCreation(ICommand command) protected override bool CanAcceptCreation(ICommand command)

6
backend/src/Squidex.Infrastructure/Commands/DomainObject.Execute.cs

@ -161,7 +161,7 @@ namespace Squidex.Infrastructure.Commands
{ {
Guard.NotNull(command, nameof(command)); Guard.NotNull(command, nameof(command));
if (Version != EtagVersion.Empty && !(IsDeleted() && CanRecreate())) if (Version != EtagVersion.Empty && !(IsDeleted(Snapshot) && CanRecreate()))
{ {
throw new DomainObjectConflictException(uniqueId.ToString()); throw new DomainObjectConflictException(uniqueId.ToString());
} }
@ -189,7 +189,7 @@ namespace Squidex.Infrastructure.Commands
await EnsureLoadedAsync(); await EnsureLoadedAsync();
if (IsDeleted() && !CanRecreate()) if (IsDeleted(Snapshot) && !CanRecreate())
{ {
throw new DomainObjectDeletedException(uniqueId.ToString()); throw new DomainObjectDeletedException(uniqueId.ToString());
} }
@ -220,7 +220,7 @@ namespace Squidex.Infrastructure.Commands
private void NotDeleted() private void NotDeleted()
{ {
if (IsDeleted()) if (IsDeleted(Snapshot))
{ {
throw new DomainObjectDeletedException(uniqueId.ToString()); throw new DomainObjectDeletedException(uniqueId.ToString());
} }

60
backend/src/Squidex.Infrastructure/Commands/DomainObject.cs

@ -72,17 +72,12 @@ namespace Squidex.Infrastructure.Commands
{ {
var newVersion = snapshot.Version + 1; var newVersion = snapshot.Version + 1;
if (!snapshots.Contains(newVersion)) if (snapshots.Contains(newVersion))
{ {
snapshot = Apply(snapshot, @event); return false;
snapshot.Version = newVersion;
snapshots.Add(snapshot, newVersion, false);
return true;
} }
return false; return ApplyEvent(@event, true, snapshot, snapshot.Version, false);
}); });
await allEvents.ReadAsync(); await allEvents.ReadAsync();
@ -112,7 +107,7 @@ namespace Squidex.Infrastructure.Commands
@event = new Envelope<IEvent>(payload, @event.Headers); @event = new Envelope<IEvent>(payload, @event.Headers);
} }
return ApplyEvent(@event, true); return ApplyEvent(@event, true, Snapshot, Version, true);
}); });
} }
@ -154,7 +149,7 @@ namespace Squidex.Infrastructure.Commands
@event.SetAggregateId(uniqueId); @event.SetAggregateId(uniqueId);
if (ApplyEvent(@event, false)) if (ApplyEvent(@event, false, Snapshot, Version, true))
{ {
uncomittedEvents.Add(@event); uncomittedEvents.Add(@event);
} }
@ -217,7 +212,7 @@ namespace Squidex.Infrastructure.Commands
{ {
Guard.NotNull(handler, nameof(handler)); Guard.NotNull(handler, nameof(handler));
var wasDeleted = IsDeleted(); var wasDeleted = IsDeleted(Snapshot);
var previousSnapshot = Snapshot; var previousSnapshot = Snapshot;
var previousVersion = Version; var previousVersion = Version;
@ -243,7 +238,7 @@ namespace Squidex.Infrastructure.Commands
foreach (var @event in uncomittedEvents) foreach (var @event in uncomittedEvents)
{ {
ApplyEvent(@event, false); ApplyEvent(@event, false, Snapshot, Version, true);
} }
await WriteAsync(events); await WriteAsync(events);
@ -287,7 +282,7 @@ namespace Squidex.Infrastructure.Commands
return true; return true;
} }
protected virtual bool IsDeleted() protected virtual bool IsDeleted(T snapshot)
{ {
return false; return false;
} }
@ -297,36 +292,49 @@ namespace Squidex.Infrastructure.Commands
return false; return false;
} }
protected virtual bool CanRecreate(IEvent @event)
{
return false;
}
private void RestorePreviousSnapshot(T previousSnapshot, long previousVersion) private void RestorePreviousSnapshot(T previousSnapshot, long previousVersion)
{ {
snapshots.ResetTo(previousSnapshot, previousVersion); snapshots.ResetTo(previousSnapshot, previousVersion);
} }
private bool ApplyEvent(Envelope<IEvent> @event, bool isLoading) private bool ApplyEvent(Envelope<IEvent> @event, bool isLoading, T snapshot, long version, bool clean)
{ {
var newVersion = Version + 1; if (IsDeleted(snapshot))
var snapshotOld = Snapshot;
if (IsDeleted())
{ {
snapshotOld = new T if (!CanRecreate(@event.Payload))
{
return false;
}
snapshot = new T
{ {
Version = Version Version = Version
}; };
} }
var snapshotNew = Apply(snapshotOld, @event); var newVersion = version + 1;
var newSnapshot = Apply(snapshot, @event);
if (!ReferenceEquals(snapshotOld, snapshotNew) || isLoading) if (ReferenceEquals(snapshot, newSnapshot) && isLoading)
{ {
snapshotNew.Version = newVersion; newSnapshot = snapshot.Copy();
snapshots.Add(snapshotNew, newVersion, true); }
return true; var isChanged = !ReferenceEquals(snapshot, newSnapshot);
if (!ReferenceEquals(newSnapshot, snapshot))
{
newSnapshot.Version = newVersion;
snapshots.Add(newSnapshot, newVersion, clean);
} }
return false; return isChanged;
} }
private async Task ReadAsync() private async Task ReadAsync()

2
backend/src/Squidex.Infrastructure/Commands/IDomainState.cs

@ -13,6 +13,8 @@ namespace Squidex.Infrastructure.Commands
{ {
long Version { get; set; } long Version { get; set; }
T Copy();
T Apply(Envelope<IEvent> @event); T Apply(Envelope<IEvent> @event);
} }
} }

47
backend/src/Squidex.Infrastructure/Migrations/Migrator.cs

@ -32,17 +32,13 @@ namespace Squidex.Infrastructure.Migrations
public async Task MigrateAsync( public async Task MigrateAsync(
CancellationToken ct = default) CancellationToken ct = default)
{ {
try if (!await TryLockAsync(ct))
{ {
while (!await migrationStatus.TryLockAsync(ct)) return;
{ }
log.LogInformation(w => w
.WriteProperty("action", "Migrate")
.WriteProperty("mesage", $"Waiting {LockWaitMs}ms to acquire lock."));
await Task.Delay(LockWaitMs, ct);
}
try
{
var version = await migrationStatus.GetVersionAsync(ct); var version = await migrationStatus.GetVersionAsync(ct);
while (!ct.IsCancellationRequested) while (!ct.IsCancellationRequested)
@ -91,12 +87,35 @@ namespace Squidex.Infrastructure.Migrations
} }
finally finally
{ {
#pragma warning disable CA2016 // Forward the 'CancellationToken' parameter to methods that take one await UnlockAsync();
#pragma warning disable MA0040 // Flow the cancellation token }
await migrationStatus.UnlockAsync(); }
#pragma warning restore MA0040 // Flow the cancellation token
#pragma warning restore CA2016 // Forward the 'CancellationToken' parameter to methods that take one private async Task<bool> TryLockAsync(
CancellationToken ct)
{
try
{
while (!await migrationStatus.TryLockAsync(ct))
{
log.LogInformation(w => w
.WriteProperty("action", "Migrate")
.WriteProperty("mesage", $"Waiting {LockWaitMs}ms to acquire lock."));
await Task.Delay(LockWaitMs, ct);
}
}
catch (OperationCanceledException)
{
return false;
} }
return true;
}
private Task UnlockAsync()
{
return migrationStatus.UnlockAsync();
} }
} }
} }

6
backend/src/Squidex/Config/Orleans/OrleansServices.cs

@ -5,6 +5,7 @@
// All rights reserved. Licensed under the MIT license. // All rights reserved. Licensed under the MIT license.
// ========================================================================== // ==========================================================================
using System;
using System.Globalization; using System.Globalization;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
@ -50,6 +51,11 @@ namespace Squidex.Config.Orleans
parts.AddApplicationPart(SquidexInfrastructure.Assembly); parts.AddApplicationPart(SquidexInfrastructure.Assembly);
}); });
builder.Configure<SchedulingOptions>(options =>
{
options.TurnWarningLengthThreshold = TimeSpan.FromSeconds(5);
});
builder.Configure<ClusterOptions>(options => builder.Configure<ClusterOptions>(options =>
{ {
options.Configure(); options.Configure();

38
backend/tests/Squidex.Infrastructure.Tests/Commands/DomainObjectTests.cs

@ -97,10 +97,47 @@ namespace Squidex.Infrastructure.Commands
Assert.Equal(20, sut.Snapshot.Value); Assert.Equal(20, sut.Snapshot.Value);
} }
[Fact]
public async Task Should_recreate_when_loading()
{
sut.RecreateEvent = true;
SetupCreated(
new ValueChanged { Value = 2 },
new ValueChanged { Value = 3 },
new Deleted(),
new ValueChanged { Value = 4 });
await sut.EnsureLoadedAsync();
Assert.Equal(3, sut.Version);
Assert.Equal(3, sut.Snapshot.Version);
AssertSnapshot(sut.Snapshot, 4, 3);
}
[Fact]
public async Task Should_ignore_events_after_deleting_when_loading()
{
SetupCreated(
new ValueChanged { Value = 2 },
new ValueChanged { Value = 3 },
new Deleted(),
new ValueChanged { Value = 4 });
await sut.EnsureLoadedAsync();
Assert.Equal(2, sut.Version);
Assert.Equal(2, sut.Snapshot.Version);
AssertSnapshot(sut.Snapshot, 3, 2, true);
}
[Fact] [Fact]
public async Task Should_recreate_with_create_command_if_deleted_before() public async Task Should_recreate_with_create_command_if_deleted_before()
{ {
sut.Recreate = true; sut.Recreate = true;
sut.RecreateEvent = true;
SetupCreated(2); SetupCreated(2);
SetupDeleted(); SetupDeleted();
@ -141,6 +178,7 @@ namespace Squidex.Infrastructure.Commands
public async Task Should_recreate_with_upsert_command_if_deleted_before() public async Task Should_recreate_with_upsert_command_if_deleted_before()
{ {
sut.Recreate = true; sut.Recreate = true;
sut.RecreateEvent = true;
SetupCreated(2); SetupCreated(2);
SetupDeleted(); SetupDeleted();

25
backend/tests/Squidex.Infrastructure.Tests/Migrations/MigratorTests.cs

@ -245,6 +245,31 @@ namespace Squidex.Infrastructure.Migrations
.MustHaveHappenedOnceExactly(); .MustHaveHappenedOnceExactly();
} }
[Fact]
public async Task Should_not_release_lock_if_not_acquired()
{
using (var tcs = new CancellationTokenSource())
{
var sut = new Migrator(status, path, log) { LockWaitMs = 2 };
A.CallTo(() => status.TryLockAsync(tcs.Token))
.Returns(false);
var task = sut.MigrateAsync(tcs.Token);
#pragma warning disable MA0040 // Flow the cancellation token
await Task.Delay(100);
#pragma warning restore MA0040 // Flow the cancellation token
tcs.Cancel();
await task;
A.CallTo(() => status.UnlockAsync(A<CancellationToken>._))
.MustNotHaveHappened();
}
}
private IMigration BuildMigration(int fromVersion, int toVersion) private IMigration BuildMigration(int fromVersion, int toVersion)
{ {
var migration = A.Fake<IMigration>(); var migration = A.Fake<IMigration>();

12
backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainObject.cs

@ -9,6 +9,7 @@ using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using FakeItEasy; using FakeItEasy;
using Squidex.Infrastructure.Commands; using Squidex.Infrastructure.Commands;
using Squidex.Infrastructure.EventSourcing;
using Squidex.Infrastructure.States; using Squidex.Infrastructure.States;
using Squidex.Log; using Squidex.Log;
@ -18,6 +19,8 @@ namespace Squidex.Infrastructure.TestHelpers
{ {
public bool Recreate { get; set; } public bool Recreate { get; set; }
public bool RecreateEvent { get; set; }
public int VersionsToKeep public int VersionsToKeep
{ {
get => Capacity; get => Capacity;
@ -29,6 +32,11 @@ namespace Squidex.Infrastructure.TestHelpers
{ {
} }
protected override bool CanRecreate(IEvent @event)
{
return RecreateEvent ? @event is ValueChanged : false;
}
protected override bool CanRecreate() protected override bool CanRecreate()
{ {
return Recreate; return Recreate;
@ -54,9 +62,9 @@ namespace Squidex.Infrastructure.TestHelpers
return true; return true;
} }
protected override bool IsDeleted() protected override bool IsDeleted(MyDomainState snapshot)
{ {
return Snapshot.IsDeleted; return snapshot.IsDeleted;
} }
public override Task<CommandResult> ExecuteAsync(IAggregateCommand command) public override Task<CommandResult> ExecuteAsync(IAggregateCommand command)

5
backend/tests/Squidex.Infrastructure.Tests/TestHelpers/MyDomainState.cs

@ -20,6 +20,11 @@ namespace Squidex.Infrastructure.TestHelpers
public long Value { get; set; } public long Value { get; set; }
public MyDomainState Copy()
{
return (MyDomainState)MemberwiseClone();
}
public MyDomainState Apply(Envelope<IEvent> @event) public MyDomainState Apply(Envelope<IEvent> @event)
{ {
switch (@event.Payload) switch (@event.Payload)

Loading…
Cancel
Save