Browse Source

State factory simplified.

pull/200/head
Sebastian Stehle 8 years ago
parent
commit
723e4ca6b9
  1. 16
      src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerGrain.cs
  2. 86
      src/Squidex.Infrastructure/States/StateFactory.cs
  3. 6
      src/Squidex.Infrastructure/States/StatefulObject.cs
  4. 142
      src/Squidex.Infrastructure/Tasks/LimitedConcurrencyLevelTaskScheduler.cs
  5. 10
      tests/Benchmarks/Tests/ReadSchemaState.cs
  6. 3
      tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs
  7. 59
      tests/Squidex.Infrastructure.Tests/States/StatesTests.cs

16
src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerGrain.cs

@ -7,6 +7,7 @@
// ==========================================================================
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Squidex.Infrastructure.Log;
using Squidex.Infrastructure.States;
@ -38,12 +39,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Grains
this.eventStore = eventStore;
}
protected override void DisposeObject(bool disposing)
public void Dispose()
{
if (disposing)
{
dispatcher.StopAndWaitAsync().Wait();
}
dispatcher.StopAndWaitAsync().Wait();
}
protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position)
@ -179,12 +177,12 @@ namespace Squidex.Infrastructure.CQRS.Events.Grains
return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception));
}
private Task DoAndUpdateStateAsync(Action action)
private Task DoAndUpdateStateAsync(Action action, [CallerMemberName] string caller = null)
{
return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; });
return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; }, caller);
}
private async Task DoAndUpdateStateAsync(Func<Task> action)
private async Task DoAndUpdateStateAsync(Func<Task> action, [CallerMemberName] string caller = null)
{
try
{
@ -202,7 +200,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Grains
}
log.LogFatal(ex, w => w
.WriteProperty("action", "HandleEvent")
.WriteProperty("action", caller)
.WriteProperty("state", "Failed")
.WriteProperty("eventConsumer", eventConsumer.Name));

86
src/Squidex.Infrastructure/States/StateFactory.cs

@ -7,10 +7,10 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Extensions.Caching.Memory;
using Squidex.Infrastructure.Tasks;
#pragma warning disable RECS0096 // Type parameter is never used
namespace Squidex.Infrastructure.States
{
@ -21,10 +21,27 @@ namespace Squidex.Infrastructure.States
private readonly IStateStore store;
private readonly IMemoryCache statesCache;
private readonly IServiceProvider services;
private readonly List<IDisposable> states = new List<IDisposable>();
private readonly TaskFactory taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(1));
private readonly object lockObject = new object();
private IDisposable pubSubscription;
public sealed class ObjectHolder<T, TState> where T : StatefulObject<TState>
{
private readonly Task activationTask;
private readonly T obj;
public ObjectHolder(T obj, IStateHolder<TState> stateHolder)
{
this.obj = obj;
activationTask = obj.ActivateAsync(stateHolder);
}
public Task<T> ActivateAsync()
{
return activationTask.ContinueWith(x => obj);
}
}
public StateFactory(
IPubSub pubSub,
IServiceProvider services,
@ -46,7 +63,10 @@ namespace Squidex.Infrastructure.States
{
pubSubscription = pubSub.Subscribe<InvalidateMessage>(m =>
{
statesCache.Remove(m.Key);
lock (lockObject)
{
statesCache.Remove(m.Key);
}
});
}
@ -66,54 +86,36 @@ namespace Squidex.Infrastructure.States
{
Guard.NotNull(key, nameof(key));
return taskFactory.StartNew(async () =>
lock (lockObject)
{
if (statesCache.TryGetValue<T>(key, out var state))
{
return state;
return Task.FromResult(state);
}
else
state = (T)services.GetService(typeof(T));
var stateHolder = new StateHolder<TState>(key, () =>
{
state = (T)services.GetService(typeof(T));
var stateHolder = new StateHolder<TState>(key, () =>
{
pubSub.Publish(new InvalidateMessage { Key = key }, false);
}, store);
await state.ActivateAsync(stateHolder);
statesCache.CreateEntry(key)
.SetValue(state)
.SetAbsoluteExpiration(CacheDuration)
.RegisterPostEvictionCallback((k, v, r, s) =>
{
taskFactory.StartNew(() =>
{
state.Dispose();
states.Remove(state);
}).Forget();
})
.Dispose();
states.Add(state);
return state;
}
}).Unwrap();
pubSub.Publish(new InvalidateMessage { Key = key }, false);
}, store);
statesCache.CreateEntry(key)
.SetValue(state)
.SetAbsoluteExpiration(CacheDuration)
.Dispose();
var stateObj = new ObjectHolder<T, TState>(state, stateHolder);
return stateObj.ActivateAsync();
}
}
protected override void DisposeObject(bool disposing)
{
if (disposing)
{
taskFactory.StartNew(() =>
{
foreach (var state in states)
{
state.Dispose();
}
}).Wait();
pubSubscription.Dispose();
}
}
}

6
src/Squidex.Infrastructure/States/StatefulObject.cs

@ -10,7 +10,7 @@ using System.Threading.Tasks;
namespace Squidex.Infrastructure.States
{
public abstract class StatefulObject<T> : DisposableObjectBase
public abstract class StatefulObject<T>
{
private IStateHolder<T> stateHolder;
@ -61,9 +61,5 @@ namespace Squidex.Infrastructure.States
await stateHolder.WriteAsync();
}
}
protected override void DisposeObject(bool disposing)
{
}
}
}

142
src/Squidex.Infrastructure/Tasks/LimitedConcurrencyLevelTaskScheduler.cs

@ -1,142 +0,0 @@
// ==========================================================================
// LimitedConcurrencyLevelTaskScheduler.cs
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex Group
// All rights reserved.
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace Squidex.Infrastructure.Tasks
{
public sealed class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
[ThreadStatic]
private static bool currentThreadIsProcessingItems;
private readonly LinkedList<Task> tasks = new LinkedList<Task>();
private readonly int maxDegreeOfParallelism;
private int delegatesQueuedOrRunning;
public override int MaximumConcurrencyLevel
{
get { return maxDegreeOfParallelism; }
}
public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
{
Guard.GreaterThan(maxDegreeOfParallelism, 0, nameof(maxDegreeOfParallelism));
this.maxDegreeOfParallelism = maxDegreeOfParallelism;
}
protected override void QueueTask(Task task)
{
lock (tasks)
{
tasks.AddLast(task);
if (delegatesQueuedOrRunning < maxDegreeOfParallelism)
{
++delegatesQueuedOrRunning;
NotifyThreadPoolOfPendingWork();
}
}
}
private void NotifyThreadPoolOfPendingWork()
{
ThreadPool.UnsafeQueueUserWorkItem(_ =>
{
currentThreadIsProcessingItems = true;
try
{
while (true)
{
Task item;
lock (tasks)
{
if (tasks.Count == 0)
{
--delegatesQueuedOrRunning;
break;
}
item = tasks.First.Value;
tasks.RemoveFirst();
}
TryExecuteTask(item);
}
}
finally
{
currentThreadIsProcessingItems = false;
}
}, null);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
{
if (!currentThreadIsProcessingItems)
{
return false;
}
if (taskWasPreviouslyQueued)
{
if (TryDequeue(task))
{
return TryExecuteTask(task);
}
else
{
return false;
}
}
else
{
return TryExecuteTask(task);
}
}
protected override bool TryDequeue(Task task)
{
lock (tasks)
{
return tasks.Remove(task);
}
}
protected override IEnumerable<Task> GetScheduledTasks()
{
var lockTaken = false;
try
{
Monitor.TryEnter(tasks, ref lockTaken);
if (lockTaken)
{
return tasks;
}
else
{
throw new NotSupportedException();
}
}
finally
{
if (lockTaken)
{
Monitor.Exit(tasks);
}
}
}
}
}

10
tests/Benchmarks/Tests/ReadSchemaState.cs

@ -7,7 +7,7 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using Benchmarks.Tests.TestData;
using Microsoft.Extensions.DependencyInjection;
using NodaTime;
@ -41,7 +41,7 @@ namespace Benchmarks.Tests
}
};
state.Schemas = new Dictionary<Guid, JsonSchemaEntity>();
state.Schemas = ImmutableDictionary<Guid, JsonSchemaEntity>.Empty;
for (var i = 1; i <= 100; i++)
{
@ -60,10 +60,10 @@ namespace Benchmarks.Tests
schema.SchemaDef = schema.SchemaDef.AddField(new StringField(j, j.ToString(), Partitioning.Invariant));
}
state.Schemas.Add(schema.Id, schema);
state.Schemas = state.Schemas.Add(schema.Id, schema);
}
state.Rules = new Dictionary<Guid, JsonRuleEntity>();
state.Rules = ImmutableDictionary<Guid, JsonRuleEntity>.Empty;
for (var i = 0; i < 100; i++)
{
@ -77,7 +77,7 @@ namespace Benchmarks.Tests
RuleDef = new Rule(new ContentChangedTrigger(), new WebhookAction())
};
state.Rules.Add(rule.Id, rule);
state.Rules = state.Rules.Add(rule.Id, rule);
}
grain.SetState(state);

3
tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs

@ -117,9 +117,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Grains
public void Should_not_dispose_actors()
{
sut.Dispose();
Assert.False(actor1.IsDisposed);
Assert.False(actor2.IsDisposed);
}
}
}

59
tests/Squidex.Infrastructure.Tests/States/StatesTests.cs

@ -7,6 +7,7 @@
// ==========================================================================
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using FakeItEasy;
using Microsoft.Extensions.Caching.Memory;
@ -132,57 +133,31 @@ namespace Squidex.Infrastructure.States
await InvalidateCacheAsync();
Assert.True(actual.IsDisposed);
Assert.False(cache.TryGetValue(key, out var t));
}
[Fact]
public async Task Should_not_dispose_detached_when_message_sent()
public async Task Should_return_same_instance_for_parallel_requests()
{
var actual = await sut.GetDetachedAsync<MyStatefulObject, int>(key);
await InvalidateCacheAsync();
Assert.False(actual.IsDisposed);
}
[Fact]
public async Task Should_dispose_states_if_exired()
{
var actual = await sut.GetAsync<MyStatefulObject, int>(key);
await RemoveFromCacheAsync();
Assert.True(actual.IsDisposed);
}
[Fact]
public async Task Should_not_dispose_detached_states_if_exired()
{
var actual = await sut.GetDetachedAsync<MyStatefulObject, int>(key);
await RemoveFromCacheAsync();
Assert.False(actual.IsDisposed);
}
[Fact]
public async Task Should_dispose_states_if_disposed()
{
var actual = await sut.GetAsync<MyStatefulObject, int>(key);
A.CallTo(() => store.ReadAsync<int>(key))
.ReturnsLazily(() => Task.Delay(1).ContinueWith(x => (1, "1")));
sut.Dispose();
var tasks = new List<Task<MyStatefulObject>>();
Assert.True(actual.IsDisposed);
}
for (var i = 0; i < 1000; i++)
{
tasks.Add(Task.Run(() => sut.GetAsync<MyStatefulObject, int>(key)));
}
[Fact]
public async Task Should_not_dispose_detached_states_if_disposed()
{
var actual = await sut.GetDetachedAsync<MyStatefulObject, int>(key);
var retrievedStates = await Task.WhenAll(tasks);
sut.Dispose();
foreach (var retrievedState in retrievedStates)
{
Assert.Same(retrievedStates[0], retrievedState);
}
Assert.False(actual.IsDisposed);
A.CallTo(() => store.ReadAsync<int>(key))
.MustHaveHappened(Repeated.Exactly.Once);
}
private async Task RemoveFromCacheAsync()

Loading…
Cancel
Save