Browse Source

Share replay fix.

pull/414/head
Sebastian Stehle 6 years ago
parent
commit
133f7267be
  1. 29
      src/Squidex.Infrastructure/Orleans/StateFilter.cs
  2. 4
      src/Squidex.Infrastructure/States/InconsistentStateException.cs
  3. 11
      src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs
  4. 1
      src/Squidex/Config/Orleans/OrleansServices.cs
  5. 8
      src/Squidex/app/framework/state.ts
  6. 6
      tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs
  7. 6
      tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs

29
src/Squidex.Infrastructure/Orleans/StateFilter.cs

@ -0,0 +1,29 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Threading.Tasks;
using Orleans;
using Orleans.Storage;
using StateInconsistentStateException = Squidex.Infrastructure.States.InconsistentStateException;
namespace Squidex.Infrastructure.Orleans
{
public sealed class StateFilter : IIncomingGrainCallFilter
{
public async Task Invoke(IIncomingGrainCallContext context)
{
try
{
await context.Invoke();
}
catch (StateInconsistentStateException ex)
{
throw new InconsistentStateException(ex.Message, ex);
}
}
}
}

4
src/Squidex.Infrastructure/States/InconsistentStateException.cs

@ -26,8 +26,8 @@ namespace Squidex.Infrastructure.States
get { return expectedVersion; } get { return expectedVersion; }
} }
public InconsistentStateException(long currentVersion, long expectedVersion, Exception ex) public InconsistentStateException(long currentVersion, long expectedVersion, Exception inner = null)
: base(FormatMessage(currentVersion, expectedVersion), ex) : base(FormatMessage(currentVersion, expectedVersion), inner)
{ {
this.currentVersion = currentVersion; this.currentVersion = currentVersion;

11
src/Squidex.Infrastructure/States/Persistence{TSnapshot,TKey}.cs

@ -77,7 +77,7 @@ namespace Squidex.Infrastructure.States
} }
else else
{ {
throw new DomainObjectVersionException(ownerKey.ToString(), ownerType, version, expectedVersion); throw new InconsistentStateException(version, expectedVersion);
} }
} }
} }
@ -133,15 +133,8 @@ namespace Squidex.Infrastructure.States
var newVersion = UseEventSourcing() ? versionEvents : versionSnapshot + 1; var newVersion = UseEventSourcing() ? versionEvents : versionSnapshot + 1;
if (newVersion != versionSnapshot) if (newVersion != versionSnapshot)
{
try
{ {
await snapshotStore.WriteAsync(ownerKey, state, versionSnapshot, newVersion); await snapshotStore.WriteAsync(ownerKey, state, versionSnapshot, newVersion);
}
catch (InconsistentStateException ex)
{
throw new DomainObjectVersionException(ownerKey.ToString(), ownerType, ex.CurrentVersion, ex.ExpectedVersion);
}
versionSnapshot = newVersion; versionSnapshot = newVersion;
} }
@ -175,7 +168,7 @@ namespace Squidex.Infrastructure.States
} }
catch (WrongEventVersionException ex) catch (WrongEventVersionException ex)
{ {
throw new DomainObjectVersionException(ownerKey.ToString(), ownerType, ex.CurrentVersion, ex.ExpectedVersion); throw new InconsistentStateException(ex.CurrentVersion, ex.ExpectedVersion, ex);
} }
versionEvents += eventArray.Length; versionEvents += eventArray.Length;

1
src/Squidex/Config/Orleans/OrleansServices.cs

@ -65,6 +65,7 @@ namespace Squidex.Config.Orleans
builder.AddIncomingGrainCallFilter<ActivationLimiterFilter>(); builder.AddIncomingGrainCallFilter<ActivationLimiterFilter>();
builder.AddIncomingGrainCallFilter<LocalCacheFilter>(); builder.AddIncomingGrainCallFilter<LocalCacheFilter>();
builder.AddIncomingGrainCallFilter<LoggingFilter>(); builder.AddIncomingGrainCallFilter<LoggingFilter>();
builder.AddIncomingGrainCallFilter<StateFilter>();
var orleansPortSilo = config.GetOptionalValue("orleans:siloPort", 11111); var orleansPortSilo = config.GetOptionalValue("orleans:siloPort", 11111);
var orleansPortGateway = config.GetOptionalValue("orleans:gatewayPort", 40000); var orleansPortGateway = config.GetOptionalValue("orleans:gatewayPort", 40000);

8
src/Squidex/app/framework/state.ts

@ -167,19 +167,19 @@ export class State<T extends {}> {
return this.state.value; return this.state.value;
} }
public project<M>(project1: (value: T) => M, compare?: (x: M, y: M) => boolean) { public project<M>(project: (value: T) => M, compare?: (x: M, y: M) => boolean) {
return this.changes.pipe( return this.changes.pipe(
map(x => project1(x)), distinctUntilChanged(compare), shareReplay()); map(x => project(x)), distinctUntilChanged(compare), shareReplay(1));
} }
public projectFrom<M, N>(source: Observable<M>, project: (value: M) => N, compare?: (x: N, y: N) => boolean) { public projectFrom<M, N>(source: Observable<M>, project: (value: M) => N, compare?: (x: N, y: N) => boolean) {
return source.pipe( return source.pipe(
map(x => project(x)), distinctUntilChanged(compare), shareReplay()); map(x => project(x)), distinctUntilChanged(compare), shareReplay(1));
} }
public projectFrom2<M, N, O>(lhs: Observable<M>, rhs: Observable<N>, project: (l: M, r: N) => O, compare?: (x: O, y: O) => boolean) { public projectFrom2<M, N, O>(lhs: Observable<M>, rhs: Observable<N>, project: (l: M, r: N) => O, compare?: (x: O, y: O) => boolean) {
return combineLatest(lhs, rhs, (x, y) => project(x, y)).pipe( return combineLatest(lhs, rhs, (x, y) => project(x, y)).pipe(
distinctUntilChanged(compare), shareReplay()); distinctUntilChanged(compare), shareReplay(1));
} }
constructor(state: Readonly<T>) { constructor(state: Readonly<T>) {

6
tests/Squidex.Infrastructure.Tests/States/PersistenceEventSourcingTests.cs

@ -145,7 +145,7 @@ namespace Squidex.Infrastructure.States
var persistedEvents = new List<IEvent>(); var persistedEvents = new List<IEvent>();
var persistence = sut.WithEventSourcing(None.Type, key, x => persistedEvents.Add(x.Payload)); var persistence = sut.WithEventSourcing(None.Type, key, x => persistedEvents.Add(x.Payload));
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.ReadAsync(1)); await Assert.ThrowsAsync<InconsistentStateException>(() => persistence.ReadAsync(1));
} }
[Fact] [Fact]
@ -160,7 +160,7 @@ namespace Squidex.Infrastructure.States
var persistedEvents = new List<IEvent>(); var persistedEvents = new List<IEvent>();
var persistence = sut.WithSnapshotsAndEventSourcing(None.Type, key, (int x) => persistedState = x, x => persistedEvents.Add(x.Payload)); var persistence = sut.WithSnapshotsAndEventSourcing(None.Type, key, (int x) => persistedState = x, x => persistedEvents.Add(x.Payload));
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.ReadAsync(1)); await Assert.ThrowsAsync<InconsistentStateException>(() => persistence.ReadAsync(1));
} }
[Fact] [Fact]
@ -209,7 +209,7 @@ namespace Squidex.Infrastructure.States
A.CallTo(() => eventStore.AppendAsync(A<Guid>.Ignored, key, 2, A<ICollection<EventData>>.That.Matches(x => x.Count == 1))) A.CallTo(() => eventStore.AppendAsync(A<Guid>.Ignored, key, 2, A<ICollection<EventData>>.That.Matches(x => x.Count == 1)))
.Throws(new WrongEventVersionException(1, 1)); .Throws(new WrongEventVersionException(1, 1));
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.WriteEventAsync(Envelope.Create(new MyEvent()))); await Assert.ThrowsAsync<InconsistentStateException>(() => persistence.WriteEventAsync(Envelope.Create(new MyEvent())));
} }
[Fact] [Fact]

6
tests/Squidex.Infrastructure.Tests/States/PersistenceSnapshotTests.cs

@ -98,7 +98,7 @@ namespace Squidex.Infrastructure.States
var persistedState = 0; var persistedState = 0;
var persistence = sut.WithSnapshots(None.Type, key, (int x) => persistedState = x); var persistence = sut.WithSnapshots(None.Type, key, (int x) => persistedState = x);
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.ReadAsync(1)); await Assert.ThrowsAsync<InconsistentStateException>(() => persistence.ReadAsync(1));
} }
[Fact] [Fact]
@ -122,7 +122,7 @@ namespace Squidex.Infrastructure.States
} }
[Fact] [Fact]
public async Task Should_wrap_exception_when_writing_to_store_with_previous_version() public async Task Should_not_wrap_exception_when_writing_to_store_with_previous_version()
{ {
A.CallTo(() => snapshotStore.ReadAsync(key)) A.CallTo(() => snapshotStore.ReadAsync(key))
.Returns((20, 10)); .Returns((20, 10));
@ -135,7 +135,7 @@ namespace Squidex.Infrastructure.States
await persistence.ReadAsync(); await persistence.ReadAsync();
await Assert.ThrowsAsync<DomainObjectVersionException>(() => persistence.WriteSnapshotAsync(100)); await Assert.ThrowsAsync<InconsistentStateException>(() => persistence.WriteSnapshotAsync(100));
} }
[Fact] [Fact]

Loading…
Cancel
Save