Browse Source

Expected version improved.

pull/206/head
Sebastian Stehle 8 years ago
parent
commit
a826a0e3d6
  1. 3
      src/Squidex.Domain.Apps.Entities/DomainObjectState.cs
  2. 7
      src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs
  3. 27
      src/Squidex.Infrastructure/Commands/DomainObjectBase.cs
  4. 17
      src/Squidex.Infrastructure/EventSourcing/ExpectedVersion.cs
  5. 6
      src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
  6. 4
      src/Squidex.Infrastructure/States/IPersistence.cs
  7. 117
      src/Squidex.Infrastructure/States/Persistence.cs
  8. 10
      src/Squidex.Infrastructure/States/PersistenceMode.cs
  9. 10
      src/Squidex.Infrastructure/States/Store.cs
  10. 4
      tests/Squidex.Infrastructure.Tests/Commands/DomainObjectBaseTests.cs
  11. 28
      tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs
  12. 1
      tests/Squidex.Infrastructure.Tests/Reflection/SimpleCopierTests.cs
  13. 24
      tests/Squidex.Infrastructure.Tests/States/StateEventSourcingTests.cs
  14. 45
      tests/Squidex.Infrastructure.Tests/States/StateSnapshotTests.cs

3
src/Squidex.Domain.Apps.Entities/DomainObjectState.cs

@ -20,8 +20,7 @@ namespace Squidex.Domain.Apps.Entities
IEntityWithVersion, IEntityWithVersion,
IUpdateableEntity, IUpdateableEntity,
IUpdateableEntityWithCreatedBy, IUpdateableEntityWithCreatedBy,
IUpdateableEntityWithLastModifiedBy, IUpdateableEntityWithLastModifiedBy
IUpdateableEntityWithVersion
where T : Cloneable where T : Cloneable
{ {
[JsonProperty] [JsonProperty]

7
src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore.cs

@ -19,7 +19,6 @@ namespace Squidex.Infrastructure.EventSourcing
{ {
public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore public class MongoEventStore : MongoRepositoryBase<MongoEventCommit>, IEventStore
{ {
private const long AnyVersion = long.MinValue;
private const int MaxAttempts = 20; private const int MaxAttempts = 20;
private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0); private static readonly BsonTimestamp EmptyTimestamp = new BsonTimestamp(0);
private static readonly FieldDefinition<MongoEventCommit, BsonTimestamp> TimestampField = Fields.Build(x => x.Timestamp); private static readonly FieldDefinition<MongoEventCommit, BsonTimestamp> TimestampField = Fields.Build(x => x.Timestamp);
@ -130,7 +129,7 @@ namespace Squidex.Infrastructure.EventSourcing
public Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events) public Task AppendEventsAsync(Guid commitId, string streamName, ICollection<EventData> events)
{ {
return AppendEventsInternalAsync(commitId, streamName, AnyVersion, events); return AppendEventsInternalAsync(commitId, streamName, ExpectedVersion.Any, events);
} }
public Task AppendEventsAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events) public Task AppendEventsAsync(Guid commitId, string streamName, long expectedVersion, ICollection<EventData> events)
@ -152,7 +151,7 @@ namespace Squidex.Infrastructure.EventSourcing
var currentVersion = await GetEventStreamOffset(streamName); var currentVersion = await GetEventStreamOffset(streamName);
if (expectedVersion != AnyVersion && expectedVersion != currentVersion) if (expectedVersion != ExpectedVersion.Any && expectedVersion != currentVersion)
{ {
throw new WrongEventVersionException(currentVersion, expectedVersion); throw new WrongEventVersionException(currentVersion, expectedVersion);
} }
@ -175,7 +174,7 @@ namespace Squidex.Infrastructure.EventSourcing
{ {
currentVersion = await GetEventStreamOffset(streamName); currentVersion = await GetEventStreamOffset(streamName);
if (expectedVersion != AnyVersion) if (expectedVersion != ExpectedVersion.Any)
{ {
throw new WrongEventVersionException(currentVersion, expectedVersion); throw new WrongEventVersionException(currentVersion, expectedVersion);
} }

27
src/Squidex.Infrastructure/Commands/DomainObjectBase.cs

@ -78,24 +78,19 @@ namespace Squidex.Infrastructure.Commands
public async Task WriteAsync(ISemanticLog log) public async Task WriteAsync(ISemanticLog log)
{ {
var newVersion = Version + uncomittedEvents.Count; await persistence.WriteSnapshotAsync(state);
if (newVersion != Version) try
{ {
await persistence.WriteSnapshotAsync(state, newVersion); await persistence.WriteEventsAsync(uncomittedEvents.ToArray());
}
try catch (Exception ex)
{ {
await persistence.WriteEventsAsync(uncomittedEvents.ToArray()); log.LogFatal(ex, w => w.WriteProperty("action", "writeEvents"));
} }
catch (Exception ex) finally
{ {
log.LogFatal(ex, w => w.WriteProperty("action", "writeEvents")); uncomittedEvents.Clear();
}
finally
{
uncomittedEvents.Clear();
}
} }
} }
} }

17
src/Squidex.Infrastructure/EventSourcing/ExpectedVersion.cs

@ -0,0 +1,17 @@
// ==========================================================================
// ExpectedVersion.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
namespace Squidex.Infrastructure.EventSourcing
{
public static class ExpectedVersion
{
public const int Any = -2;
public const int Empty = -1;
}
}

6
src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs

@ -68,17 +68,17 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
public virtual void Stop() public virtual void Stop()
{ {
dispatcher.DispatchAsync(() => HandleStopAsync()).Forget(); dispatcher.DispatchAsync(HandleStopAsync).Forget();
} }
public virtual void Start() public virtual void Start()
{ {
dispatcher.DispatchAsync(() => HandleStartAsync()).Forget(); dispatcher.DispatchAsync(HandleStartAsync).Forget();
} }
public virtual void Reset() public virtual void Reset()
{ {
dispatcher.DispatchAsync(() => HandleResetAsync()).Forget(); dispatcher.DispatchAsync(HandleResetAsync).Forget();
} }
public virtual void Activate(IEventConsumer eventConsumer) public virtual void Activate(IEventConsumer eventConsumer)

4
src/Squidex.Infrastructure/States/IPersistence.cs

@ -18,8 +18,8 @@ namespace Squidex.Infrastructure.States
Task WriteEventsAsync(IEnumerable<Envelope<IEvent>> @events); Task WriteEventsAsync(IEnumerable<Envelope<IEvent>> @events);
Task WriteSnapshotAsync(TState state, long newVersion = -1); Task WriteSnapshotAsync(TState state);
Task ReadAsync(long? expectedVersion = null); Task ReadAsync(long expectedVersion = ExpectedVersion.Any);
} }
} }

117
src/Squidex.Infrastructure/States/Persistence.cs

@ -21,15 +21,17 @@ namespace Squidex.Infrastructure.States
private readonly IStreamNameResolver streamNameResolver; private readonly IStreamNameResolver streamNameResolver;
private readonly IEventStore eventStore; private readonly IEventStore eventStore;
private readonly IEventDataFormatter eventDataFormatter; private readonly IEventDataFormatter eventDataFormatter;
private readonly PersistenceMode persistenceMode;
private readonly Action invalidate; private readonly Action invalidate;
private readonly Func<TState, Task> applyState; private readonly Func<TState, Task> applyState;
private readonly Func<Envelope<IEvent>, Task> applyEvent; private readonly Func<Envelope<IEvent>, Task> applyEvent;
private long positionSnapshot = -1; private long versionSnapshot = -1;
private long positionEvent = -1; private long versionEvents = -1;
private long version;
public long Version public long Version
{ {
get { return Math.Max(positionEvent, positionSnapshot); } get { return version; }
} }
public Persistence(string ownerKey, public Persistence(string ownerKey,
@ -38,6 +40,7 @@ namespace Squidex.Infrastructure.States
IEventDataFormatter eventDataFormatter, IEventDataFormatter eventDataFormatter,
ISnapshotStore<TState> snapshotStore, ISnapshotStore<TState> snapshotStore,
IStreamNameResolver streamNameResolver, IStreamNameResolver streamNameResolver,
PersistenceMode persistenceMode,
Func<TState, Task> applyState, Func<TState, Task> applyState,
Func<Envelope<IEvent>, Task> applyEvent) Func<Envelope<IEvent>, Task> applyEvent)
{ {
@ -47,37 +50,61 @@ namespace Squidex.Infrastructure.States
this.invalidate = invalidate; this.invalidate = invalidate;
this.eventStore = eventStore; this.eventStore = eventStore;
this.eventDataFormatter = eventDataFormatter; this.eventDataFormatter = eventDataFormatter;
this.persistenceMode = persistenceMode;
this.snapshotStore = snapshotStore; this.snapshotStore = snapshotStore;
this.streamNameResolver = streamNameResolver; this.streamNameResolver = streamNameResolver;
} }
public async Task ReadAsync(long? expectedVersion) public async Task ReadAsync(long expectedVersion = ExpectedVersion.Any)
{ {
positionSnapshot = -1; versionSnapshot = -1;
positionEvent = -1; versionEvents = -1;
if (applyState != null) await ReadSnapshotAsync();
await ReadEventsAsync();
UpdateVersion();
if (expectedVersion != ExpectedVersion.Any && expectedVersion != version)
{
if (version == ExpectedVersion.Empty)
{
throw new DomainObjectNotFoundException(ownerKey, typeof(TOwner));
}
else
{
throw new DomainObjectVersionException(ownerKey, typeof(TOwner), version, expectedVersion);
}
}
}
private async Task ReadSnapshotAsync()
{
if (UseSnapshots())
{ {
var (state, position) = await snapshotStore.ReadAsync(ownerKey); var (state, position) = await snapshotStore.ReadAsync(ownerKey);
positionSnapshot = position; versionSnapshot = position;
positionEvent = position; versionEvents = position;
if (applyState != null && position >= 0) if (applyState != null && position >= 0)
{ {
await applyState(state); await applyState(state);
} }
} }
}
if (applyEvent != null && streamNameResolver != null) private async Task ReadEventsAsync()
{
if (UseEventSourcing())
{ {
var events = await eventStore.GetEventsAsync(GetStreamName(), positionEvent + 1); var events = await eventStore.GetEventsAsync(GetStreamName(), versionEvents + 1);
foreach (var @event in events) foreach (var @event in events)
{ {
positionEvent++; versionEvents++;
if (@event.EventStreamNumber != positionEvent) if (@event.EventStreamNumber != versionEvents)
{ {
throw new InvalidOperationException("Events must follow the snapshot version in consecutive order with no gaps."); throw new InvalidOperationException("Events must follow the snapshot version in consecutive order with no gaps.");
} }
@ -90,46 +117,28 @@ namespace Squidex.Infrastructure.States
} }
} }
} }
var newVersion = Version;
if (expectedVersion.HasValue && expectedVersion.Value != newVersion)
{
if (newVersion == -1)
{
throw new DomainObjectNotFoundException(ownerKey, typeof(TOwner));
}
else
{
throw new DomainObjectVersionException(ownerKey, typeof(TOwner), newVersion, expectedVersion.Value);
}
}
} }
public async Task WriteSnapshotAsync(TState state, long newVersion = -1) public async Task WriteSnapshotAsync(TState state)
{ {
if (newVersion < 0) var newVersion = UseEventSourcing() ? versionEvents : versionSnapshot + 1;
{
newVersion =
applyEvent != null ?
positionEvent :
positionSnapshot + 1;
}
if (newVersion != positionSnapshot) if (newVersion != versionSnapshot)
{ {
try try
{ {
await snapshotStore.WriteAsync(ownerKey, state, positionSnapshot, newVersion); await snapshotStore.WriteAsync(ownerKey, state, versionSnapshot, newVersion);
} }
catch (InconsistentStateException ex) catch (InconsistentStateException ex)
{ {
throw new DomainObjectVersionException(ownerKey, typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion); throw new DomainObjectVersionException(ownerKey, typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion);
} }
positionSnapshot = newVersion; versionSnapshot = newVersion;
} }
UpdateVersion();
invalidate?.Invoke(); invalidate?.Invoke();
} }
@ -141,6 +150,8 @@ namespace Squidex.Infrastructure.States
if (eventArray.Length > 0) if (eventArray.Length > 0)
{ {
var expectedVersion = UseEventSourcing() ? version : ExpectedVersion.Any;
var commitId = Guid.NewGuid(); var commitId = Guid.NewGuid();
var eventStream = GetStreamName(); var eventStream = GetStreamName();
@ -155,9 +166,11 @@ namespace Squidex.Infrastructure.States
throw new DomainObjectVersionException(ownerKey, typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion); throw new DomainObjectVersionException(ownerKey, typeof(TOwner), ex.CurrentVersion, ex.ExpectedVersion);
} }
positionEvent += eventArray.Length; versionEvents += eventArray.Length;
} }
UpdateVersion();
invalidate?.Invoke(); invalidate?.Invoke();
} }
@ -171,6 +184,16 @@ namespace Squidex.Infrastructure.States
return streamNameResolver.GetStreamName(typeof(TOwner), ownerKey); return streamNameResolver.GetStreamName(typeof(TOwner), ownerKey);
} }
private bool UseSnapshots()
{
return persistenceMode == PersistenceMode.Snapshots || persistenceMode == PersistenceMode.SnapshotsAndEventSourcing;
}
private bool UseEventSourcing()
{
return persistenceMode == PersistenceMode.EventSourcing || persistenceMode == PersistenceMode.SnapshotsAndEventSourcing;
}
private Envelope<IEvent> ParseKnownEvent(StoredEvent storedEvent) private Envelope<IEvent> ParseKnownEvent(StoredEvent storedEvent)
{ {
try try
@ -182,5 +205,21 @@ namespace Squidex.Infrastructure.States
return null; return null;
} }
} }
private void UpdateVersion()
{
if (persistenceMode == PersistenceMode.Snapshots)
{
version = versionSnapshot;
}
else if (persistenceMode == PersistenceMode.EventSourcing)
{
version = versionEvents;
}
else if (persistenceMode == PersistenceMode.SnapshotsAndEventSourcing)
{
version = Math.Max(versionEvents, versionSnapshot);
}
}
} }
} }

10
src/Squidex.Domain.Apps.Entities/IUpdateableEntityWithVersion.cs → src/Squidex.Infrastructure/States/PersistenceMode.cs

@ -1,15 +1,17 @@
// ========================================================================== // ==========================================================================
// IUpdateableEntityWithVersion.cs // PersistenceMode.cs
// Squidex Headless CMS // Squidex Headless CMS
// ========================================================================== // ==========================================================================
// Copyright (c) Squidex Group // Copyright (c) Squidex Group
// All rights reserved. // All rights reserved.
// ========================================================================== // ==========================================================================
namespace Squidex.Domain.Apps.Entities namespace Squidex.Infrastructure.States
{ {
public interface IUpdateableEntityWithVersion public enum PersistenceMode
{ {
long Version { get; set; } EventSourcing,
Snapshots,
SnapshotsAndEventSourcing
} }
} }

10
src/Squidex.Infrastructure/States/Store.cs

@ -36,26 +36,26 @@ namespace Squidex.Infrastructure.States
public IPersistence<object> WithEventSourcing<TOwner>(string key, Func<Envelope<IEvent>, Task> applyEvent) public IPersistence<object> WithEventSourcing<TOwner>(string key, Func<Envelope<IEvent>, Task> applyEvent)
{ {
return CreatePersistence<TOwner, object>(key, null, applyEvent); return CreatePersistence<TOwner, object>(key, PersistenceMode.EventSourcing, null, applyEvent);
} }
public IPersistence<TState> WithSnapshots<TOwner, TState>(string key, Func<TState, Task> applySnapshot) public IPersistence<TState> WithSnapshots<TOwner, TState>(string key, Func<TState, Task> applySnapshot)
{ {
return CreatePersistence<TOwner, TState>(key, applySnapshot, null); return CreatePersistence<TOwner, TState>(key, PersistenceMode.Snapshots, applySnapshot, null);
} }
public IPersistence<TState> WithSnapshotsAndEventSourcing<TOwner, TState>(string key, Func<TState, Task> applySnapshot, Func<Envelope<IEvent>, Task> applyEvent) public IPersistence<TState> WithSnapshotsAndEventSourcing<TOwner, TState>(string key, Func<TState, Task> applySnapshot, Func<Envelope<IEvent>, Task> applyEvent)
{ {
return CreatePersistence<TOwner, TState>(key, applySnapshot, applyEvent); return CreatePersistence<TOwner, TState>(key, PersistenceMode.SnapshotsAndEventSourcing, applySnapshot, applyEvent);
} }
private IPersistence<TState> CreatePersistence<TOwner, TState>(string key, Func<TState, Task> applySnapshot, Func<Envelope<IEvent>, Task> applyEvent) private IPersistence<TState> CreatePersistence<TOwner, TState>(string key, PersistenceMode mode, Func<TState, Task> applySnapshot, Func<Envelope<IEvent>, Task> applyEvent)
{ {
Guard.NotNullOrEmpty(key, nameof(key)); Guard.NotNullOrEmpty(key, nameof(key));
var snapshotStore = (ISnapshotStore<TState>)services.GetService(typeof(ISnapshotStore<TState>)); var snapshotStore = (ISnapshotStore<TState>)services.GetService(typeof(ISnapshotStore<TState>));
return new Persistence<TOwner, TState>(key, invalidate, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, applySnapshot, applyEvent); return new Persistence<TOwner, TState>(key, invalidate, eventStore, eventDataFormatter, snapshotStore, streamNameResolver, mode, applySnapshot, applyEvent);
} }
} }
} }

4
tests/Squidex.Infrastructure.Tests/Commands/DomainObjectBaseTests.cs

@ -77,7 +77,7 @@ namespace Squidex.Infrastructure.Commands
await sut.WriteAsync(A.Fake<ISemanticLog>()); await sut.WriteAsync(A.Fake<ISemanticLog>());
A.CallTo(() => persistence.WriteSnapshotAsync(newState, 102)) A.CallTo(() => persistence.WriteSnapshotAsync(newState))
.MustHaveHappened(); .MustHaveHappened();
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.That.Matches(x => x.Count() == 2))) A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.That.Matches(x => x.Count() == 2)))
.MustHaveHappened(); .MustHaveHappened();
@ -110,7 +110,7 @@ namespace Squidex.Infrastructure.Commands
await sut.WriteAsync(A.Fake<ISemanticLog>()); await sut.WriteAsync(A.Fake<ISemanticLog>());
A.CallTo(() => persistence.WriteSnapshotAsync(newState, 102)) A.CallTo(() => persistence.WriteSnapshotAsync(newState))
.MustHaveHappened(); .MustHaveHappened();
A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.That.Matches(x => x.Count() == 2))) A.CallTo(() => persistence.WriteEventsAsync(A<IEnumerable<Envelope<IEvent>>>.That.Matches(x => x.Count() == 2)))
.MustHaveHappened(); .MustHaveHappened();

28
tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs

@ -67,11 +67,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
A.CallTo(() => eventConsumer.Name) A.CallTo(() => eventConsumer.Name)
.Returns(consumerName); .Returns(consumerName);
A.CallTo(() => persistence.ReadAsync(null)) A.CallTo(() => persistence.ReadAsync(ExpectedVersion.Any))
.Invokes(new Action<long?>(s => apply(state))); .Invokes(new Action<long>(s => apply(state)));
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.Invokes(new Action<EventConsumerState, long>((s, v) => state = s)); .Invokes(new Action<EventConsumerState>(s => state = s));
A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope); A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope);
@ -132,7 +132,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null }); state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null });
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once); .MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync()) A.CallTo(() => eventSubscription.StopAsync())
@ -150,7 +150,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = null, Error = null }); state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = null, Error = null });
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice); .MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventConsumer.ClearAsync()) A.CallTo(() => eventConsumer.ClearAsync())
@ -180,7 +180,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null }); state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once); .MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumer.On(envelope)) A.CallTo(() => eventConsumer.On(envelope))
@ -204,7 +204,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null }); state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null });
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once); .MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventConsumer.On(envelope)) A.CallTo(() => eventConsumer.On(envelope))
@ -243,7 +243,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once); .MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync()) A.CallTo(() => eventSubscription.StopAsync())
@ -264,7 +264,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null });
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustNotHaveHappened(); .MustNotHaveHappened();
} }
@ -284,7 +284,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); state.ShouldBeEquivalentTo(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() });
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once); .MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync()) A.CallTo(() => eventSubscription.StopAsync())
@ -313,7 +313,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
A.CallTo(() => eventConsumer.On(envelope)) A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened(); .MustHaveHappened();
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once); .MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync()) A.CallTo(() => eventSubscription.StopAsync())
@ -344,7 +344,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
A.CallTo(() => eventConsumer.On(envelope)) A.CallTo(() => eventConsumer.On(envelope))
.MustNotHaveHappened(); .MustNotHaveHappened();
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(Repeated.Exactly.Once); .MustHaveHappened(Repeated.Exactly.Once);
A.CallTo(() => eventSubscription.StopAsync()) A.CallTo(() => eventSubscription.StopAsync())
@ -375,7 +375,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
A.CallTo(() => eventConsumer.On(envelope)) A.CallTo(() => eventConsumer.On(envelope))
.MustHaveHappened(); .MustHaveHappened();
A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored, -1)) A.CallTo(() => persistence.WriteSnapshotAsync(A<EventConsumerState>.Ignored))
.MustHaveHappened(Repeated.Exactly.Twice); .MustHaveHappened(Repeated.Exactly.Twice);
A.CallTo(() => eventSubscription.StopAsync()) A.CallTo(() => eventSubscription.StopAsync())

1
tests/Squidex.Infrastructure.Tests/Reflection/SimpleCopierTests.cs

@ -65,6 +65,7 @@ namespace Squidex.Infrastructure.Reflection
Assert.Equal(value.Value1, copy.Value1); Assert.Equal(value.Value1, copy.Value1);
Assert.Equal(value.Value2, copy.Value2); Assert.Equal(value.Value2, copy.Value2);
Assert.Equal(0, copy.ValueReadOnly); Assert.Equal(0, copy.ValueReadOnly);
Assert.Equal(value.Cloneable.Value, copy.Cloneable.Value); Assert.Equal(value.Cloneable.Value, copy.Cloneable.Value);

24
tests/Squidex.Infrastructure.Tests/States/StateEventSourcingTests.cs

@ -30,7 +30,7 @@ namespace Squidex.Infrastructure.States
private readonly List<IEvent> appliedEvents = new List<IEvent>(); private readonly List<IEvent> appliedEvents = new List<IEvent>();
private IPersistence<object> persistence; private IPersistence<object> persistence;
public long? ExpectedVersion { get; set; } public long ExpectedVersion { get; set; }
public List<IEvent> AppliedEvents public List<IEvent> AppliedEvents
{ {
@ -54,7 +54,7 @@ namespace Squidex.Infrastructure.States
{ {
private IPersistence<object> persistence; private IPersistence<object> persistence;
public long? ExpectedVersion { get; set; } public long ExpectedVersion { get; set; }
public Task ActivateAsync(string key, IStore store) public Task ActivateAsync(string key, IStore store)
{ {
@ -115,7 +115,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_read_events_from_snapshot() public async Task Should_read_events_from_snapshot()
{ {
statefulObjectWithSnapShot.ExpectedVersion = null; statefulObjectWithSnapShot.ExpectedVersion = ExpectedVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((2, 2L)); .Returns((2, 2L));
@ -131,7 +131,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_throw_exception_if_events_are_older_than_snapshot() public async Task Should_throw_exception_if_events_are_older_than_snapshot()
{ {
statefulObjectWithSnapShot.ExpectedVersion = null; statefulObjectWithSnapShot.ExpectedVersion = ExpectedVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((2, 2L)); .Returns((2, 2L));
@ -144,7 +144,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_throw_exception_if_events_have_gaps_to_snapshot() public async Task Should_throw_exception_if_events_have_gaps_to_snapshot()
{ {
statefulObjectWithSnapShot.ExpectedVersion = null; statefulObjectWithSnapShot.ExpectedVersion = ExpectedVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((2, 2L)); .Returns((2, 2L));
@ -177,7 +177,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_not_throw_exception_if_noting_expected() public async Task Should_not_throw_exception_if_noting_expected()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
SetupEventStore(0); SetupEventStore(0);
@ -187,7 +187,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_provide_state_from_services_and_add_to_cache() public async Task Should_provide_state_from_services_and_add_to_cache()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
SetupEventStore(0); SetupEventStore(0);
@ -200,7 +200,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_serve_next_request_from_cache() public async Task Should_serve_next_request_from_cache()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
SetupEventStore(0); SetupEventStore(0);
@ -218,7 +218,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_write_to_store_with_previous_position() public async Task Should_write_to_store_with_previous_position()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
InvalidateMessage message = null; InvalidateMessage message = null;
@ -248,7 +248,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_wrap_exception_when_writing_to_store_with_previous_position() public async Task Should_wrap_exception_when_writing_to_store_with_previous_position()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
SetupEventStore(3); SetupEventStore(3);
@ -263,7 +263,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_remove_from_cache_when_invalidation_message_received() public async Task Should_remove_from_cache_when_invalidation_message_received()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key); var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
@ -275,7 +275,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_return_same_instance_for_parallel_requests() public async Task Should_return_same_instance_for_parallel_requests()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.ReturnsLazily(() => Task.Delay(1).ContinueWith(x => ((object)1, 1L))); .ReturnsLazily(() => Task.Delay(1).ContinueWith(x => ((object)1, 1L)));

45
tests/Squidex.Infrastructure.Tests/States/StateSnapshotTests.cs

@ -15,6 +15,8 @@ using Microsoft.Extensions.Options;
using Squidex.Infrastructure.EventSourcing; using Squidex.Infrastructure.EventSourcing;
using Xunit; using Xunit;
#pragma warning disable RECS0002 // Convert anonymous method to method group
namespace Squidex.Infrastructure.States namespace Squidex.Infrastructure.States
{ {
public class StateSnapshotTests : IDisposable public class StateSnapshotTests : IDisposable
@ -24,7 +26,7 @@ namespace Squidex.Infrastructure.States
private IPersistence<int> persistence; private IPersistence<int> persistence;
private int state; private int state;
public long? ExpectedVersion { get; set; } public long ExpectedVersion { get; set; }
public int State public int State
{ {
@ -43,9 +45,9 @@ namespace Squidex.Infrastructure.States
state = value; state = value;
} }
public Task WriteStateAsync(long newVersion = -1) public Task WriteStateAsync()
{ {
return persistence.WriteSnapshotAsync(state, newVersion); return persistence.WriteSnapshotAsync(state);
} }
} }
@ -117,7 +119,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_not_throw_exception_if_noting_expected() public async Task Should_not_throw_exception_if_noting_expected()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((0, -1)); .Returns((0, -1));
@ -128,7 +130,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_provide_state_from_services_and_add_to_cache() public async Task Should_provide_state_from_services_and_add_to_cache()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key); var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
@ -139,7 +141,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_serve_next_request_from_cache() public async Task Should_serve_next_request_from_cache()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
var actualObject1 = await sut.GetSingleAsync<MyStatefulObject>(key); var actualObject1 = await sut.GetSingleAsync<MyStatefulObject>(key);
@ -155,7 +157,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_not_serve_next_request_from_cache_when_detached() public async Task Should_not_serve_next_request_from_cache_when_detached()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
var actualObject1 = await sut.CreateAsync<MyStatefulObject>(key); var actualObject1 = await sut.CreateAsync<MyStatefulObject>(key);
@ -171,7 +173,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_write_to_store_with_previous_version() public async Task Should_write_to_store_with_previous_version()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
InvalidateMessage message = null; InvalidateMessage message = null;
@ -199,31 +201,10 @@ namespace Squidex.Infrastructure.States
Assert.Equal(key, message.Key); Assert.Equal(key, message.Key);
} }
[Fact]
public async Task Should_write_to_store_with_explicit_version()
{
statefulObject.ExpectedVersion = null;
A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((123, 1));
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
Assert.Same(statefulObject, actualObject);
Assert.Equal(123, statefulObject.State);
statefulObject.SetState(456);
await statefulObject.WriteStateAsync(100);
A.CallTo(() => snapshotStore.WriteAsync(key, 456, 1, 100))
.MustHaveHappened();
}
[Fact] [Fact]
public async Task Should_wrap_exception_when_writing_to_store_with_previous_version() public async Task Should_wrap_exception_when_writing_to_store_with_previous_version()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((123, 13)); .Returns((123, 13));
@ -239,7 +220,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_remove_from_cache_when_invalidation_message_received() public async Task Should_remove_from_cache_when_invalidation_message_received()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key); var actualObject = await sut.GetSingleAsync<MyStatefulObject>(key);
@ -251,7 +232,7 @@ namespace Squidex.Infrastructure.States
[Fact] [Fact]
public async Task Should_return_same_instance_for_parallel_requests() public async Task Should_return_same_instance_for_parallel_requests()
{ {
statefulObject.ExpectedVersion = null; statefulObject.ExpectedVersion = ExpectedVersion.Any;
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.ReturnsLazily(() => Task.Delay(1).ContinueWith(x => (1, 1L))); .ReturnsLazily(() => Task.Delay(1).ContinueWith(x => (1, 1L)));

Loading…
Cancel
Save