|
|
|
@ -67,14 +67,18 @@ namespace Squidex.Infrastructure.Commands |
|
|
|
|
|
|
|
var allEvents = factory.WithEventSourcing(GetType(), UniqueId, @event => |
|
|
|
{ |
|
|
|
var newVersion = snapshot.Version + 1; |
|
|
|
@event = @event.Migrate(snapshot); |
|
|
|
|
|
|
|
if (snapshots.Contains(newVersion)) |
|
|
|
var (newSnapshot, isChanged) = ApplyEvent(@event, true, snapshot, snapshot.Version, false); |
|
|
|
|
|
|
|
// Can only be null in case of errors or inconsistent streams.
|
|
|
|
if (newSnapshot != null) |
|
|
|
{ |
|
|
|
return false; |
|
|
|
snapshot = newSnapshot; |
|
|
|
} |
|
|
|
|
|
|
|
return ApplyEvent(@event, true, snapshot, snapshot.Version, false); |
|
|
|
// If all snapshorts from this one here are valid we can stop.
|
|
|
|
return newSnapshot != null && !snapshots.ContainsThisAndNewer(newSnapshot.Version); |
|
|
|
}); |
|
|
|
|
|
|
|
await allEvents.ReadAsync(); |
|
|
|
@ -97,14 +101,9 @@ namespace Squidex.Infrastructure.Commands |
|
|
|
}), |
|
|
|
@event => |
|
|
|
{ |
|
|
|
if (@event.Payload is IMigratedStateEvent<T> migratable) |
|
|
|
{ |
|
|
|
var payload = migratable.Migrate(Snapshot); |
|
|
|
@event = @event.Migrate(Snapshot); |
|
|
|
|
|
|
|
@event = new Envelope<IEvent>(payload, @event.Headers); |
|
|
|
} |
|
|
|
|
|
|
|
return ApplyEvent(@event, true, Snapshot, Version, true); |
|
|
|
return ApplyEvent(@event, true, Snapshot, Version, true).Success; |
|
|
|
}); |
|
|
|
} |
|
|
|
|
|
|
|
@ -146,7 +145,7 @@ namespace Squidex.Infrastructure.Commands |
|
|
|
|
|
|
|
@event.SetAggregateId(uniqueId); |
|
|
|
|
|
|
|
if (ApplyEvent(@event, false, Snapshot, Version, true)) |
|
|
|
if (ApplyEvent(@event, false, Snapshot, Version, true).Success) |
|
|
|
{ |
|
|
|
uncomittedEvents.Add(@event); |
|
|
|
} |
|
|
|
@ -299,13 +298,13 @@ namespace Squidex.Infrastructure.Commands |
|
|
|
snapshots.ResetTo(previousSnapshot, previousVersion); |
|
|
|
} |
|
|
|
|
|
|
|
private bool ApplyEvent(Envelope<IEvent> @event, bool isLoading, T snapshot, long version, bool clean) |
|
|
|
private (T?, bool Success) ApplyEvent(Envelope<IEvent> @event, bool isLoading, T snapshot, long version, bool clean) |
|
|
|
{ |
|
|
|
if (IsDeleted(snapshot)) |
|
|
|
{ |
|
|
|
if (!CanRecreate(@event.Payload)) |
|
|
|
{ |
|
|
|
return false; |
|
|
|
return default; |
|
|
|
} |
|
|
|
|
|
|
|
snapshot = new T |
|
|
|
@ -324,14 +323,14 @@ namespace Squidex.Infrastructure.Commands |
|
|
|
|
|
|
|
var isChanged = !ReferenceEquals(snapshot, newSnapshot); |
|
|
|
|
|
|
|
if (!ReferenceEquals(newSnapshot, snapshot)) |
|
|
|
if (isChanged) |
|
|
|
{ |
|
|
|
newSnapshot.Version = newVersion; |
|
|
|
|
|
|
|
snapshots.Add(newSnapshot, newVersion, clean); |
|
|
|
} |
|
|
|
|
|
|
|
return isChanged; |
|
|
|
return (newSnapshot, isChanged); |
|
|
|
} |
|
|
|
|
|
|
|
private async Task ReadAsync() |
|
|
|
|