Browse Source

Invalidate cache when writing failed.

pull/206/head
Sebastian Stehle 8 years ago
parent
commit
bab2d1d6c6
  1. 3
      src/Squidex.Infrastructure/States/Persistence.cs
  2. 85
      src/Squidex.Infrastructure/States/Persistence{TOwner,TState,TKey}.cs
  3. 12
      src/Squidex.Infrastructure/States/StateFactory.cs
  4. 9
      src/Squidex.Infrastructure/States/Store.cs
  5. 48
      tests/Squidex.Infrastructure.Tests/States/StateEventSourcingTests.cs
  6. 33
      tests/Squidex.Infrastructure.Tests/States/StateSnapshotTests.cs

3
src/Squidex.Infrastructure/States/Persistence.cs

@ -18,12 +18,13 @@ namespace Squidex.Infrastructure.States
{ {
public Persistence(TKey ownerKey, public Persistence(TKey ownerKey,
Action invalidate, Action invalidate,
Action failed,
IEventStore eventStore, IEventStore eventStore,
IEventDataFormatter eventDataFormatter, IEventDataFormatter eventDataFormatter,
ISnapshotStore<object, TKey> snapshotStore, ISnapshotStore<object, TKey> snapshotStore,
IStreamNameResolver streamNameResolver, IStreamNameResolver streamNameResolver,
Func<Envelope<IEvent>, Task> applyEvent) Func<Envelope<IEvent>, Task> applyEvent)
: base(ownerKey, invalidate, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, PersistenceMode.EventSourcing, null, applyEvent) : base(ownerKey, invalidate, failed, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, PersistenceMode.EventSourcing, null, applyEvent)
{ {
} }
} }

85
src/Squidex.Infrastructure/States/Persistence{TOwner,TState,TKey}.cs

@ -25,6 +25,7 @@ namespace Squidex.Infrastructure.States
private readonly IEventDataFormatter eventDataFormatter; private readonly IEventDataFormatter eventDataFormatter;
private readonly PersistenceMode persistenceMode; private readonly PersistenceMode persistenceMode;
private readonly Action invalidate; private readonly Action invalidate;
private readonly Action failed;
private readonly Func<TState, Task> applyState; private readonly Func<TState, Task> applyState;
private readonly Func<Envelope<IEvent>, Task> applyEvent; private readonly Func<Envelope<IEvent>, Task> applyEvent;
private long versionSnapshot = EtagVersion.Empty; private long versionSnapshot = EtagVersion.Empty;
@ -38,6 +39,7 @@ namespace Squidex.Infrastructure.States
public Persistence(TKey ownerKey, public Persistence(TKey ownerKey,
Action invalidate, Action invalidate,
Action failed,
IEventStore eventStore, IEventStore eventStore,
IEventDataFormatter eventDataFormatter, IEventDataFormatter eventDataFormatter,
ISnapshotStore<TState, TKey> snapshotStore, ISnapshotStore<TState, TKey> snapshotStore,
@ -49,9 +51,10 @@ namespace Squidex.Infrastructure.States
this.ownerKey = ownerKey; this.ownerKey = ownerKey;
this.applyState = applyState; this.applyState = applyState;
this.applyEvent = applyEvent; this.applyEvent = applyEvent;
this.invalidate = invalidate;
this.eventStore = eventStore; this.eventStore = eventStore;
this.eventDataFormatter = eventDataFormatter; this.eventDataFormatter = eventDataFormatter;
this.invalidate = invalidate;
this.failed = failed;
this.persistenceMode = persistenceMode; this.persistenceMode = persistenceMode;
this.snapshotStore = snapshotStore; this.snapshotStore = snapshotStore;
this.streamNameResolver = streamNameResolver; this.streamNameResolver = streamNameResolver;
@ -128,57 +131,75 @@ namespace Squidex.Infrastructure.States
public async Task WriteSnapshotAsync(TState state) public async Task WriteSnapshotAsync(TState state)
{ {
var newVersion = UseEventSourcing() ? versionEvents : versionSnapshot + 1; try
if (newVersion != versionSnapshot)
{ {
try var newVersion = UseEventSourcing() ? versionEvents : versionSnapshot + 1;
{
await snapshotStore.WriteAsync(ownerKey, state, versionSnapshot, newVersion); if (newVersion != versionSnapshot)
}
catch (InconsistentStateException ex)
{ {
throw new DomainObjectVersionException(ownerKey.ToString(), typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion); try
{
await snapshotStore.WriteAsync(ownerKey, state, versionSnapshot, newVersion);
}
catch (InconsistentStateException ex)
{
throw new DomainObjectVersionException(ownerKey.ToString(), typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion);
}
versionSnapshot = newVersion;
} }
versionSnapshot = newVersion; UpdateVersion();
}
UpdateVersion(); invalidate?.Invoke();
}
catch
{
failed?.Invoke();
invalidate?.Invoke(); throw;
}
} }
public async Task WriteEventsAsync(IEnumerable<Envelope<IEvent>> events) public async Task WriteEventsAsync(IEnumerable<Envelope<IEvent>> events)
{ {
Guard.NotNull(events, nameof(@events)); Guard.NotNull(events, nameof(@events));
var eventArray = events.ToArray(); try
if (eventArray.Length > 0)
{ {
var expectedVersion = UseEventSourcing() ? version : EtagVersion.Any; var eventArray = events.ToArray();
var commitId = Guid.NewGuid(); if (eventArray.Length > 0)
{
var expectedVersion = UseEventSourcing() ? version : EtagVersion.Any;
var eventStream = GetStreamName(); var commitId = Guid.NewGuid();
var eventData = GetEventData(eventArray, commitId);
try var eventStream = GetStreamName();
{ var eventData = GetEventData(eventArray, commitId);
await eventStore.AppendEventsAsync(commitId, GetStreamName(), expectedVersion, eventData);
} try
catch (WrongEventVersionException ex) {
{ await eventStore.AppendEventsAsync(commitId, GetStreamName(), expectedVersion, eventData);
throw new DomainObjectVersionException(ownerKey.ToString(), typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion); }
catch (WrongEventVersionException ex)
{
throw new DomainObjectVersionException(ownerKey.ToString(), typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion);
}
versionEvents += eventArray.Length;
} }
versionEvents += eventArray.Length; UpdateVersion();
}
UpdateVersion(); invalidate?.Invoke();
}
catch
{
failed?.Invoke();
invalidate?.Invoke(); throw;
}
} }
private EventData[] GetEventData(Envelope<IEvent>[] events, Guid commitId) private EventData[] GetEventData(Envelope<IEvent>[] events, Guid commitId)

12
src/Squidex.Infrastructure/States/StateFactory.cs

@ -126,10 +126,14 @@ namespace Squidex.Infrastructure.States
var state = (T)services.GetService(typeof(T)); var state = (T)services.GetService(typeof(T));
var stateStore = new Store<T, TKey>(eventStore, eventDataFormatter, services, streamNameResolver, () => var stateStore = new Store<T, TKey>(eventStore, eventDataFormatter, services, streamNameResolver,
{ () =>
pubSub.Publish(new InvalidateMessage { Key = key.ToString() }, false); {
}); pubSub.Publish(new InvalidateMessage { Key = key.ToString() }, false);
}, () =>
{
statesCache.Remove(key);
});
stateObj = new ObjectHolder<T, TKey>(state, key, stateStore); stateObj = new ObjectHolder<T, TKey>(state, key, stateStore);

9
src/Squidex.Infrastructure/States/Store.cs

@ -15,6 +15,7 @@ namespace Squidex.Infrastructure.States
internal sealed class Store<TOwner, TKey> : IStore<TKey> internal sealed class Store<TOwner, TKey> : IStore<TKey>
{ {
private readonly Action invalidate; private readonly Action invalidate;
private readonly Action failed;
private readonly IServiceProvider services; private readonly IServiceProvider services;
private readonly IStreamNameResolver streamNameResolver; private readonly IStreamNameResolver streamNameResolver;
private readonly IEventStore eventStore; private readonly IEventStore eventStore;
@ -25,10 +26,12 @@ namespace Squidex.Infrastructure.States
IEventDataFormatter eventDataFormatter, IEventDataFormatter eventDataFormatter,
IServiceProvider services, IServiceProvider services,
IStreamNameResolver streamNameResolver, IStreamNameResolver streamNameResolver,
Action invalidate = null) Action invalidate = null,
Action failed = null)
{ {
this.eventStore = eventStore; this.eventStore = eventStore;
this.eventDataFormatter = eventDataFormatter; this.eventDataFormatter = eventDataFormatter;
this.failed = failed;
this.invalidate = invalidate; this.invalidate = invalidate;
this.services = services; this.services = services;
this.streamNameResolver = streamNameResolver; this.streamNameResolver = streamNameResolver;
@ -50,7 +53,7 @@ namespace Squidex.Infrastructure.States
var snapshotStore = (ISnapshotStore<object, TKey>)services.GetService(typeof(ISnapshotStore<object, TKey>)); var snapshotStore = (ISnapshotStore<object, TKey>)services.GetService(typeof(ISnapshotStore<object, TKey>));
return new Persistence<TOwner, TKey>(key, invalidate, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, applyEvent); return new Persistence<TOwner, TKey>(key, invalidate, failed, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, applyEvent);
} }
private IPersistence<TState> CreatePersistence<TState>(TKey key, PersistenceMode mode, Func<TState, Task> applySnapshot, Func<Envelope<IEvent>, Task> applyEvent) private IPersistence<TState> CreatePersistence<TState>(TKey key, PersistenceMode mode, Func<TState, Task> applySnapshot, Func<Envelope<IEvent>, Task> applyEvent)
@ -59,7 +62,7 @@ namespace Squidex.Infrastructure.States
var snapshotStore = (ISnapshotStore<TState, TKey>)services.GetService(typeof(ISnapshotStore<TState, TKey>)); var snapshotStore = (ISnapshotStore<TState, TKey>)services.GetService(typeof(ISnapshotStore<TState, TKey>));
return new Persistence<TOwner, TState, TKey>(key, invalidate, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, mode, applySnapshot, applyEvent); return new Persistence<TOwner, TState, TKey>(key, invalidate, failed, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, mode, applySnapshot, applyEvent);
} }
} }
} }

48
tests/Squidex.Infrastructure.Tests/States/StateEventSourcingTests.cs

@ -27,7 +27,7 @@ namespace Squidex.Infrastructure.States
private readonly List<IEvent> appliedEvents = new List<IEvent>(); private readonly List<IEvent> appliedEvents = new List<IEvent>();
private IPersistence persistence; private IPersistence persistence;
public long ExpectedVersion { get; set; } public long ExpectedVersion { get; set; } = EtagVersion.Any;
public List<IEvent> AppliedEvents public List<IEvent> AppliedEvents
{ {
@ -51,7 +51,7 @@ namespace Squidex.Infrastructure.States
{ {
private IPersistence<object> persistence; private IPersistence<object> persistence;
public long ExpectedVersion { get; set; } public long ExpectedVersion { get; set; } = EtagVersion.Any;
public Task ActivateAsync(string key, IStore<string> store) public Task ActivateAsync(string key, IStore<string> store)
{ {
@ -110,10 +110,8 @@ namespace Squidex.Infrastructure.States
} }
[Fact] [Fact]
public async Task Should_read_events_from_snapshot() public async Task Should_read_status_from_snapshot()
{ {
statefulObjectWithSnapShot.ExpectedVersion = EtagVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((2, 2L)); .Returns((2, 2L));
@ -128,8 +126,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_throw_exception_if_events_are_older_than_snapshot() public async Task Should_throw_exception_if_events_are_older_than_snapshot()
{ {
statefulObjectWithSnapShot.ExpectedVersion = EtagVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((2, 2L)); .Returns((2, 2L));
@ -141,8 +137,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_throw_exception_if_events_have_gaps_to_snapshot() public async Task Should_throw_exception_if_events_have_gaps_to_snapshot()
{ {
statefulObjectWithSnapShot.ExpectedVersion = EtagVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((2, 2L)); .Returns((2, 2L));
@ -171,6 +165,19 @@ namespace Squidex.Infrastructure.States
await Assert.ThrowsAsync<DomainObjectVersionException>(() => sut.GetSingleAsync<MyStatefulObject>(key)); await Assert.ThrowsAsync<DomainObjectVersionException>(() => sut.GetSingleAsync<MyStatefulObject>(key));
} }
[Fact]
public async Task Should_throw_exception_if_other_version_found_from_snapshot()
{
statefulObjectWithSnapShot.ExpectedVersion = 1;
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((2, 2L));
SetupEventStore(0);
await Assert.ThrowsAsync<DomainObjectVersionException>(() => sut.GetSingleAsync<MyStatefulObjectWithSnapshot>(key));
}
[Fact] [Fact]
public async Task Should_not_throw_exception_if_noting_expected() public async Task Should_not_throw_exception_if_noting_expected()
{ {
@ -197,8 +204,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_serve_next_request_from_cache() public async Task Should_serve_next_request_from_cache()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
SetupEventStore(0); SetupEventStore(0);
var actualObject1 = await sut.GetSingleAsync<MyStatefulObject>(key); var actualObject1 = await sut.GetSingleAsync<MyStatefulObject>(key);
@ -215,8 +220,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_write_to_store_with_previous_position() public async Task Should_write_to_store_with_previous_position()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
InvalidateMessage message = null; InvalidateMessage message = null;
pubSub.Subscribe<InvalidateMessage>(m => pubSub.Subscribe<InvalidateMessage>(m =>
@ -245,8 +248,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_wrap_exception_when_writing_to_store_with_previous_position() public async Task Should_wrap_exception_when_writing_to_store_with_previous_position()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
SetupEventStore(3); SetupEventStore(3);
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key); var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
@ -260,8 +261,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_remove_from_cache_when_invalidation_message_received() public async Task Should_remove_from_cache_when_invalidation_message_received()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key); var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
await InvalidateCacheAsync(); await InvalidateCacheAsync();
@ -270,10 +269,21 @@ namespace Squidex.Infrastructure.States
} }
[Fact] [Fact]
public async Task Should_return_same_instance_for_parallel_requests() public async Task Should_remove_from_cache_when_write_failed()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any; A.CallTo(() => eventStore.AppendEventsAsync(A<Guid>.Ignored, A<string>.Ignored, A<long>.Ignored, A<ICollection<EventData>>.Ignored))
.Throws(new InvalidOperationException());
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
await Assert.ThrowsAsync<InvalidOperationException>(() => statefulObject.WriteEventsAsync(new MyEvent()));
Assert.False(cache.TryGetValue(key, out var t));
}
[Fact]
public async Task Should_return_same_instance_for_parallel_requests()
{
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.ReturnsLazily(() => Task.Delay(1).ContinueWith(x => ((object)1, 1L))); .ReturnsLazily(() => Task.Delay(1).ContinueWith(x => ((object)1, 1L)));

33
tests/Squidex.Infrastructure.Tests/States/StateSnapshotTests.cs

@ -26,7 +26,7 @@ namespace Squidex.Infrastructure.States
private IPersistence<int> persistence; private IPersistence<int> persistence;
private int state; private int state;
public long ExpectedVersion { get; set; } public long ExpectedVersion { get; set; } = EtagVersion.Any;
public long Version public long Version
{ {
@ -102,8 +102,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_set_to_empty_when_store_returns_not_found() public async Task Should_set_to_empty_when_store_returns_not_found()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((123, EtagVersion.NotFound)); .Returns((123, EtagVersion.NotFound));
@ -138,8 +136,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_not_throw_exception_if_noting_expected() public async Task Should_not_throw_exception_if_noting_expected()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((0, EtagVersion.Empty)); .Returns((0, EtagVersion.Empty));
@ -149,8 +145,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_provide_state_from_services_and_add_to_cache() public async Task Should_provide_state_from_services_and_add_to_cache()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key); var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key);
Assert.Same(statefulObject, actualObject); Assert.Same(statefulObject, actualObject);
@ -160,8 +154,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_serve_next_request_from_cache() public async Task Should_serve_next_request_from_cache()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
var actualObject1 = await sut.GetSingleAsync<MyStatefulObject, string>(key); var actualObject1 = await sut.GetSingleAsync<MyStatefulObject, string>(key);
Assert.Same(statefulObject, actualObject1); Assert.Same(statefulObject, actualObject1);
@ -176,8 +168,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_not_serve_next_request_from_cache_when_detached() public async Task Should_not_serve_next_request_from_cache_when_detached()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
var actualObject1 = await sut.CreateAsync<MyStatefulObject, string>(key); var actualObject1 = await sut.CreateAsync<MyStatefulObject, string>(key);
Assert.Same(statefulObject, actualObject1); Assert.Same(statefulObject, actualObject1);
@ -192,8 +182,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_write_to_store_with_previous_version() public async Task Should_write_to_store_with_previous_version()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
InvalidateMessage message = null; InvalidateMessage message = null;
pubSub.Subscribe<InvalidateMessage>(m => pubSub.Subscribe<InvalidateMessage>(m =>
@ -223,8 +211,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_wrap_exception_when_writing_to_store_with_previous_version() public async Task Should_wrap_exception_when_writing_to_store_with_previous_version()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((123, 13)); .Returns((123, 13));
@ -239,8 +225,6 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_remove_from_cache_when_invalidation_message_received() public async Task Should_remove_from_cache_when_invalidation_message_received()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any;
var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key); var actualObject = await sut.GetSingleAsync<MyStatefulObject, string>(key);
await InvalidateCacheAsync(); await InvalidateCacheAsync();
@ -249,10 +233,21 @@ namespace Squidex.Infrastructure.States
} }
[Fact] [Fact]
public async Task Should_return_same_instance_for_parallel_requests() public async Task Should_remove_from_cache_when_write_failed()
{ {
statefulObject.ExpectedVersion = EtagVersion.Any; A.CallTo(() => snapshotStore.WriteAsync(A<string>.Ignored, A<int>.Ignored, A<long>.Ignored, A<long>.Ignored))
.Throws(new InvalidOperationException());
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
await Assert.ThrowsAsync<InvalidOperationException>(() => statefulObject.WriteStateAsync());
Assert.False(cache.TryGetValue(key, out var t));
}
[Fact]
public async Task Should_return_same_instance_for_parallel_requests()
{
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.ReturnsLazily(() => Task.Delay(1).ContinueWith(x => (1, 1L))); .ReturnsLazily(() => Task.Delay(1).ContinueWith(x => (1, 1L)));

Loading…
Cancel
Save