Browse Source

Fixes in pubsub.

pull/195/head
Sebastian Stehle 8 years ago
parent
commit
47673f7afc
  1. 6
      src/Squidex.Domain.Apps.Read/State/AppProvider.cs
  2. 7
      src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrain.cs
  3. 12
      src/Squidex.Infrastructure/InMemoryPubSub.cs
  4. 7
      src/Squidex.Infrastructure/States/StateHolder.cs
  5. 12
      tests/Squidex.Infrastructure.Tests/States/StatesTests.cs

6
src/Squidex.Domain.Apps.Read/State/AppProvider.cs

@ -77,13 +77,11 @@ namespace Squidex.Domain.Apps.Read.State
var appUser = await factory.GetAsync<AppUserGrain, AppUserGrainState>(userId); var appUser = await factory.GetAsync<AppUserGrain, AppUserGrainState>(userId);
var appNames = await appUser.GetAppNamesAsync(); var appNames = await appUser.GetAppNamesAsync();
var tasks = var tasks = appNames.Select(x => GetAppAsync(x));
appNames
.Select(x => GetAppAsync(x));
var apps = await Task.WhenAll(tasks); var apps = await Task.WhenAll(tasks);
return apps.Where(a => a != null).ToList(); return apps.Where(a => a != null && a.Contributors.ContainsKey(userId)).ToList();
} }
} }
} }

7
src/Squidex.Domain.Apps.Read/State/Grains/AppStateGrain.cs

@ -10,6 +10,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
using Squidex.Domain.Apps.Core.Schemas; using Squidex.Domain.Apps.Core.Schemas;
using Squidex.Domain.Apps.Events;
using Squidex.Domain.Apps.Events.Apps; using Squidex.Domain.Apps.Events.Apps;
using Squidex.Domain.Apps.Read.Apps; using Squidex.Domain.Apps.Read.Apps;
using Squidex.Domain.Apps.Read.Rules; using Squidex.Domain.Apps.Read.Rules;
@ -126,7 +127,13 @@ namespace Squidex.Domain.Apps.Read.State.Grains
} }
} }
if (message.Payload is AppEvent appEvent)
{
if (State.App == null || State.App.Id == appEvent.AppId.Id)
{
State.Apply(message); State.Apply(message);
}
}
return WriteStateAsync(); return WriteStateAsync();
}).Unwrap(); }).Unwrap();

12
src/Squidex.Infrastructure/InMemoryPubSub.cs

@ -15,10 +15,20 @@ namespace Squidex.Infrastructure
public sealed class InMemoryPubSub : IPubSub public sealed class InMemoryPubSub : IPubSub
{ {
private readonly Subject<object> subject = new Subject<object>(); private readonly Subject<object> subject = new Subject<object>();
private readonly bool publishAlways;
public InMemoryPubSub()
{
}
public InMemoryPubSub(bool publishAlways)
{
this.publishAlways = publishAlways;
}
public void Publish<T>(T value, bool notifySelf) public void Publish<T>(T value, bool notifySelf)
{ {
if (notifySelf) if (notifySelf || publishAlways)
{ {
subject.OnNext(value); subject.OnNext(value);
} }

7
src/Squidex.Infrastructure/States/StateHolder.cs

@ -38,6 +38,8 @@ namespace Squidex.Infrastructure.States
} }
public async Task WriteAsync() public async Task WriteAsync()
{
try
{ {
var newEtag = Guid.NewGuid().ToString(); var newEtag = Guid.NewGuid().ToString();
@ -45,5 +47,10 @@ namespace Squidex.Infrastructure.States
etag = newEtag; etag = newEtag;
} }
finally
{
written();
}
}
} }
} }

12
tests/Squidex.Infrastructure.Tests/States/StatesTests.cs

@ -28,7 +28,7 @@ namespace Squidex.Infrastructure.States
private readonly string key = Guid.NewGuid().ToString(); private readonly string key = Guid.NewGuid().ToString();
private readonly MyStatefulObject state = new MyStatefulObject(); private readonly MyStatefulObject state = new MyStatefulObject();
private readonly IMemoryCache cache = new MemoryCache(Options.Create(new MemoryCacheOptions())); private readonly IMemoryCache cache = new MemoryCache(Options.Create(new MemoryCacheOptions()));
private readonly IPubSub pubSub = new InMemoryPubSub(); private readonly IPubSub pubSub = new InMemoryPubSub(true);
private readonly IServiceProvider services = A.Fake<IServiceProvider>(); private readonly IServiceProvider services = A.Fake<IServiceProvider>();
private readonly IStateStore store = A.Fake<IStateStore>(); private readonly IStateStore store = A.Fake<IStateStore>();
private readonly StateFactory sut; private readonly StateFactory sut;
@ -97,6 +97,13 @@ namespace Squidex.Infrastructure.States
{ {
var etag = Guid.NewGuid().ToString(); var etag = Guid.NewGuid().ToString();
InvalidateMessage message = null;
pubSub.Subscribe<InvalidateMessage>(m =>
{
message = m;
});
A.CallTo(() => store.ReadAsync<int>(key)) A.CallTo(() => store.ReadAsync<int>(key))
.Returns((123, etag)); .Returns((123, etag));
@ -113,6 +120,9 @@ namespace Squidex.Infrastructure.States
A.CallTo(() => store.WriteAsync(key, 456, etag, A<string>.That.Matches(x => x != null))) A.CallTo(() => store.WriteAsync(key, 456, etag, A<string>.That.Matches(x => x != null)))
.MustHaveHappened(); .MustHaveHappened();
Assert.NotNull(message);
Assert.Equal(key, message.Key);
} }
[Fact] [Fact]

Loading…
Cancel
Save