diff --git a/src/Squidex.Infrastructure/States/Persistence.cs b/src/Squidex.Infrastructure/States/Persistence.cs index 7a6738d92..a306e3327 100644 --- a/src/Squidex.Infrastructure/States/Persistence.cs +++ b/src/Squidex.Infrastructure/States/Persistence.cs @@ -18,12 +18,13 @@ namespace Squidex.Infrastructure.States { public Persistence(TKey ownerKey, Action invalidate, + Action failed, IEventStore eventStore, IEventDataFormatter eventDataFormatter, ISnapshotStore snapshotStore, IStreamNameResolver streamNameResolver, Func, Task> applyEvent) - : base(ownerKey, invalidate, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, PersistenceMode.EventSourcing, null, applyEvent) + : base(ownerKey, invalidate, failed, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, PersistenceMode.EventSourcing, null, applyEvent) { } } diff --git a/src/Squidex.Infrastructure/States/Persistence{TOwner,TState,TKey}.cs b/src/Squidex.Infrastructure/States/Persistence{TOwner,TState,TKey}.cs index ccc80d3aa..112b06099 100644 --- a/src/Squidex.Infrastructure/States/Persistence{TOwner,TState,TKey}.cs +++ b/src/Squidex.Infrastructure/States/Persistence{TOwner,TState,TKey}.cs @@ -25,6 +25,7 @@ namespace Squidex.Infrastructure.States private readonly IEventDataFormatter eventDataFormatter; private readonly PersistenceMode persistenceMode; private readonly Action invalidate; + private readonly Action failed; private readonly Func applyState; private readonly Func, Task> applyEvent; private long versionSnapshot = EtagVersion.Empty; @@ -38,6 +39,7 @@ namespace Squidex.Infrastructure.States public Persistence(TKey ownerKey, Action invalidate, + Action failed, IEventStore eventStore, IEventDataFormatter eventDataFormatter, ISnapshotStore snapshotStore, @@ -49,9 +51,10 @@ namespace Squidex.Infrastructure.States this.ownerKey = ownerKey; this.applyState = applyState; this.applyEvent = applyEvent; - this.invalidate = invalidate; this.eventStore = eventStore; this.eventDataFormatter = eventDataFormatter; + this.invalidate = invalidate; + this.failed = failed; this.persistenceMode = persistenceMode; this.snapshotStore = snapshotStore; this.streamNameResolver = streamNameResolver; @@ -128,57 +131,75 @@ namespace Squidex.Infrastructure.States public async Task WriteSnapshotAsync(TState state) { - var newVersion = UseEventSourcing() ? versionEvents : versionSnapshot + 1; - - if (newVersion != versionSnapshot) + try { - try - { - await snapshotStore.WriteAsync(ownerKey, state, versionSnapshot, newVersion); - } - catch (InconsistentStateException ex) + var newVersion = UseEventSourcing() ? versionEvents : versionSnapshot + 1; + + if (newVersion != versionSnapshot) { - throw new DomainObjectVersionException(ownerKey.ToString(), typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion); + try + { + await snapshotStore.WriteAsync(ownerKey, state, versionSnapshot, newVersion); + } + catch (InconsistentStateException ex) + { + throw new DomainObjectVersionException(ownerKey.ToString(), typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion); + } + + versionSnapshot = newVersion; } - versionSnapshot = newVersion; - } + UpdateVersion(); - UpdateVersion(); + invalidate?.Invoke(); + } + catch + { + failed?.Invoke(); - invalidate?.Invoke(); + throw; + } } public async Task WriteEventsAsync(IEnumerable> events) { Guard.NotNull(events, nameof(@events)); - var eventArray = events.ToArray(); - - if (eventArray.Length > 0) + try { - var expectedVersion = UseEventSourcing() ? version : EtagVersion.Any; + var eventArray = events.ToArray(); - var commitId = Guid.NewGuid(); + if (eventArray.Length > 0) + { + var expectedVersion = UseEventSourcing() ? version : EtagVersion.Any; - var eventStream = GetStreamName(); - var eventData = GetEventData(eventArray, commitId); + var commitId = Guid.NewGuid(); - try - { - await eventStore.AppendEventsAsync(commitId, GetStreamName(), expectedVersion, eventData); - } - catch (WrongEventVersionException ex) - { - throw new DomainObjectVersionException(ownerKey.ToString(), typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion); + var eventStream = GetStreamName(); + var eventData = GetEventData(eventArray, commitId); + + try + { + await eventStore.AppendEventsAsync(commitId, GetStreamName(), expectedVersion, eventData); + } + catch (WrongEventVersionException ex) + { + throw new DomainObjectVersionException(ownerKey.ToString(), typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion); + } + + versionEvents += eventArray.Length; } - versionEvents += eventArray.Length; - } + UpdateVersion(); - UpdateVersion(); + invalidate?.Invoke(); + } + catch + { + failed?.Invoke(); - invalidate?.Invoke(); + throw; + } } private EventData[] GetEventData(Envelope[] events, Guid commitId) diff --git a/src/Squidex.Infrastructure/States/StateFactory.cs b/src/Squidex.Infrastructure/States/StateFactory.cs index 9ac202d72..abeb95d8a 100644 --- a/src/Squidex.Infrastructure/States/StateFactory.cs +++ b/src/Squidex.Infrastructure/States/StateFactory.cs @@ -126,10 +126,14 @@ namespace Squidex.Infrastructure.States var state = (T)services.GetService(typeof(T)); - var stateStore = new Store(eventStore, eventDataFormatter, services, streamNameResolver, () => - { - pubSub.Publish(new InvalidateMessage { Key = key.ToString() }, false); - }); + var stateStore = new Store(eventStore, eventDataFormatter, services, streamNameResolver, + () => + { + pubSub.Publish(new InvalidateMessage { Key = key.ToString() }, false); + }, () => + { + statesCache.Remove(key); + }); stateObj = new ObjectHolder(state, key, stateStore); diff --git a/src/Squidex.Infrastructure/States/Store.cs b/src/Squidex.Infrastructure/States/Store.cs index e200a9257..714fc3472 100644 --- a/src/Squidex.Infrastructure/States/Store.cs +++ b/src/Squidex.Infrastructure/States/Store.cs @@ -15,6 +15,7 @@ namespace Squidex.Infrastructure.States internal sealed class Store : IStore { private readonly Action invalidate; + private readonly Action failed; private readonly IServiceProvider services; private readonly IStreamNameResolver streamNameResolver; private readonly IEventStore eventStore; @@ -25,10 +26,12 @@ namespace Squidex.Infrastructure.States IEventDataFormatter eventDataFormatter, IServiceProvider services, IStreamNameResolver streamNameResolver, - Action invalidate = null) + Action invalidate = null, + Action failed = null) { this.eventStore = eventStore; this.eventDataFormatter = eventDataFormatter; + this.failed = failed; this.invalidate = invalidate; this.services = services; this.streamNameResolver = streamNameResolver; @@ -50,7 +53,7 @@ namespace Squidex.Infrastructure.States var snapshotStore = (ISnapshotStore)services.GetService(typeof(ISnapshotStore)); - return new Persistence(key, invalidate, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, applyEvent); + return new Persistence(key, invalidate, failed, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, applyEvent); } private IPersistence CreatePersistence(TKey key, PersistenceMode mode, Func applySnapshot, Func, Task> applyEvent) @@ -59,7 +62,7 @@ namespace Squidex.Infrastructure.States var snapshotStore = (ISnapshotStore)services.GetService(typeof(ISnapshotStore)); - return new Persistence(key, invalidate, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, mode, applySnapshot, applyEvent); + return new Persistence(key, invalidate, failed, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, mode, applySnapshot, applyEvent); } } } diff --git a/tests/Squidex.Infrastructure.Tests/States/StateEventSourcingTests.cs b/tests/Squidex.Infrastructure.Tests/States/StateEventSourcingTests.cs index eae17945f..ac1581d2e 100644 --- a/tests/Squidex.Infrastructure.Tests/States/StateEventSourcingTests.cs +++ b/tests/Squidex.Infrastructure.Tests/States/StateEventSourcingTests.cs @@ -27,7 +27,7 @@ namespace Squidex.Infrastructure.States private readonly List appliedEvents = new List(); private IPersistence persistence; - public long ExpectedVersion { get; set; } + public long ExpectedVersion { get; set; } = EtagVersion.Any; public List AppliedEvents { @@ -51,7 +51,7 @@ namespace Squidex.Infrastructure.States { private IPersistence persistence; - public long ExpectedVersion { get; set; } + public long ExpectedVersion { get; set; } = EtagVersion.Any; public Task ActivateAsync(string key, IStore store) { @@ -110,10 +110,8 @@ namespace Squidex.Infrastructure.States } [Fact] - public async Task Should_read_events_from_snapshot() + public async Task Should_read_status_from_snapshot() { - statefulObjectWithSnapShot.ExpectedVersion = EtagVersion.Any; - A.CallTo(() => snapshotStore.ReadAsync(key)) .Returns((2, 2L)); @@ -128,8 +126,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_throw_exception_if_events_are_older_than_snapshot() { - statefulObjectWithSnapShot.ExpectedVersion = EtagVersion.Any; - A.CallTo(() => snapshotStore.ReadAsync(key)) .Returns((2, 2L)); @@ -141,8 +137,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_throw_exception_if_events_have_gaps_to_snapshot() { - statefulObjectWithSnapShot.ExpectedVersion = EtagVersion.Any; - A.CallTo(() => snapshotStore.ReadAsync(key)) .Returns((2, 2L)); @@ -171,6 +165,19 @@ namespace Squidex.Infrastructure.States await Assert.ThrowsAsync(() => sut.GetSingleAsync(key)); } + [Fact] + public async Task Should_throw_exception_if_other_version_found_from_snapshot() + { + statefulObjectWithSnapShot.ExpectedVersion = 1; + + A.CallTo(() => snapshotStore.ReadAsync(key)) + .Returns((2, 2L)); + + SetupEventStore(0); + + await Assert.ThrowsAsync(() => sut.GetSingleAsync(key)); + } + [Fact] public async Task Should_not_throw_exception_if_noting_expected() { @@ -197,8 +204,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_serve_next_request_from_cache() { - statefulObject.ExpectedVersion = EtagVersion.Any; - SetupEventStore(0); var actualObject1 = await sut.GetSingleAsync(key); @@ -215,8 +220,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_write_to_store_with_previous_position() { - statefulObject.ExpectedVersion = EtagVersion.Any; - InvalidateMessage message = null; pubSub.Subscribe(m => @@ -245,8 +248,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_wrap_exception_when_writing_to_store_with_previous_position() { - statefulObject.ExpectedVersion = EtagVersion.Any; - SetupEventStore(3); var actualObject = await sut.GetSingleAsync(key); @@ -260,8 +261,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_remove_from_cache_when_invalidation_message_received() { - statefulObject.ExpectedVersion = EtagVersion.Any; - var actualObject = await sut.GetSingleAsync(key); await InvalidateCacheAsync(); @@ -270,10 +269,21 @@ namespace Squidex.Infrastructure.States } [Fact] - public async Task Should_return_same_instance_for_parallel_requests() + public async Task Should_remove_from_cache_when_write_failed() { - statefulObject.ExpectedVersion = EtagVersion.Any; + A.CallTo(() => eventStore.AppendEventsAsync(A.Ignored, A.Ignored, A.Ignored, A>.Ignored)) + .Throws(new InvalidOperationException()); + + var actualObject = await sut.GetSingleAsync(key); + + await Assert.ThrowsAsync(() => statefulObject.WriteEventsAsync(new MyEvent())); + + Assert.False(cache.TryGetValue(key, out var t)); + } + [Fact] + public async Task Should_return_same_instance_for_parallel_requests() + { A.CallTo(() => snapshotStore.ReadAsync(key)) .ReturnsLazily(() => Task.Delay(1).ContinueWith(x => ((object)1, 1L))); diff --git a/tests/Squidex.Infrastructure.Tests/States/StateSnapshotTests.cs b/tests/Squidex.Infrastructure.Tests/States/StateSnapshotTests.cs index f003c6ef6..1a84194a0 100644 --- a/tests/Squidex.Infrastructure.Tests/States/StateSnapshotTests.cs +++ b/tests/Squidex.Infrastructure.Tests/States/StateSnapshotTests.cs @@ -26,7 +26,7 @@ namespace Squidex.Infrastructure.States private IPersistence persistence; private int state; - public long ExpectedVersion { get; set; } + public long ExpectedVersion { get; set; } = EtagVersion.Any; public long Version { @@ -102,8 +102,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_set_to_empty_when_store_returns_not_found() { - statefulObject.ExpectedVersion = EtagVersion.Any; - A.CallTo(() => snapshotStore.ReadAsync(key)) .Returns((123, EtagVersion.NotFound)); @@ -138,8 +136,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_not_throw_exception_if_noting_expected() { - statefulObject.ExpectedVersion = EtagVersion.Any; - A.CallTo(() => snapshotStore.ReadAsync(key)) .Returns((0, EtagVersion.Empty)); @@ -149,8 +145,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_provide_state_from_services_and_add_to_cache() { - statefulObject.ExpectedVersion = EtagVersion.Any; - var actualObject = await sut.GetSingleAsync(key); Assert.Same(statefulObject, actualObject); @@ -160,8 +154,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_serve_next_request_from_cache() { - statefulObject.ExpectedVersion = EtagVersion.Any; - var actualObject1 = await sut.GetSingleAsync(key); Assert.Same(statefulObject, actualObject1); @@ -176,8 +168,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_not_serve_next_request_from_cache_when_detached() { - statefulObject.ExpectedVersion = EtagVersion.Any; - var actualObject1 = await sut.CreateAsync(key); Assert.Same(statefulObject, actualObject1); @@ -192,8 +182,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_write_to_store_with_previous_version() { - statefulObject.ExpectedVersion = EtagVersion.Any; - InvalidateMessage message = null; pubSub.Subscribe(m => @@ -223,8 +211,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_wrap_exception_when_writing_to_store_with_previous_version() { - statefulObject.ExpectedVersion = EtagVersion.Any; - A.CallTo(() => snapshotStore.ReadAsync(key)) .Returns((123, 13)); @@ -239,8 +225,6 @@ namespace Squidex.Infrastructure.States [Fact] public async Task Should_remove_from_cache_when_invalidation_message_received() { - statefulObject.ExpectedVersion = EtagVersion.Any; - var actualObject = await sut.GetSingleAsync(key); await InvalidateCacheAsync(); @@ -249,10 +233,21 @@ namespace Squidex.Infrastructure.States } [Fact] - public async Task Should_return_same_instance_for_parallel_requests() + public async Task Should_remove_from_cache_when_write_failed() { - statefulObject.ExpectedVersion = EtagVersion.Any; + A.CallTo(() => snapshotStore.WriteAsync(A.Ignored, A.Ignored, A.Ignored, A.Ignored)) + .Throws(new InvalidOperationException()); + var actualObject = await sut.GetSingleAsync(key); + + await Assert.ThrowsAsync(() => statefulObject.WriteStateAsync()); + + Assert.False(cache.TryGetValue(key, out var t)); + } + + [Fact] + public async Task Should_return_same_instance_for_parallel_requests() + { A.CallTo(() => snapshotStore.ReadAsync(key)) .ReturnsLazily(() => Task.Delay(1).ContinueWith(x => (1, 1L)));