diff --git a/src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerGrain.cs index b45a5a416..535b04989 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerGrain.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Grains/EventConsumerGrain.cs @@ -7,6 +7,7 @@ // ========================================================================== using System; +using System.Runtime.CompilerServices; using System.Threading.Tasks; using Squidex.Infrastructure.Log; using Squidex.Infrastructure.States; @@ -38,12 +39,9 @@ namespace Squidex.Infrastructure.CQRS.Events.Grains this.eventStore = eventStore; } - protected override void DisposeObject(bool disposing) + public void Dispose() { - if (disposing) - { - dispatcher.StopAndWaitAsync().Wait(); - } + dispatcher.StopAndWaitAsync().Wait(); } protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position) @@ -179,12 +177,12 @@ namespace Squidex.Infrastructure.CQRS.Events.Grains return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception)); } - private Task DoAndUpdateStateAsync(Action action) + private Task DoAndUpdateStateAsync(Action action, [CallerMemberName] string caller = null) { - return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; }); + return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; }, caller); } - private async Task DoAndUpdateStateAsync(Func action) + private async Task DoAndUpdateStateAsync(Func action, [CallerMemberName] string caller = null) { try { @@ -202,7 +200,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Grains } log.LogFatal(ex, w => w - .WriteProperty("action", "HandleEvent") + .WriteProperty("action", caller) .WriteProperty("state", "Failed") .WriteProperty("eventConsumer", eventConsumer.Name)); diff --git a/src/Squidex.Infrastructure/States/StateFactory.cs b/src/Squidex.Infrastructure/States/StateFactory.cs index 7d17dab27..518ad8f31 100644 --- a/src/Squidex.Infrastructure/States/StateFactory.cs +++ b/src/Squidex.Infrastructure/States/StateFactory.cs @@ -7,10 +7,10 @@ // ========================================================================== using System; -using System.Collections.Generic; using System.Threading.Tasks; using Microsoft.Extensions.Caching.Memory; -using Squidex.Infrastructure.Tasks; + +#pragma warning disable RECS0096 // Type parameter is never used namespace Squidex.Infrastructure.States { @@ -21,10 +21,27 @@ namespace Squidex.Infrastructure.States private readonly IStateStore store; private readonly IMemoryCache statesCache; private readonly IServiceProvider services; - private readonly List states = new List(); - private readonly TaskFactory taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(1)); + private readonly object lockObject = new object(); private IDisposable pubSubscription; + public sealed class ObjectHolder where T : StatefulObject + { + private readonly Task activationTask; + private readonly T obj; + + public ObjectHolder(T obj, IStateHolder stateHolder) + { + this.obj = obj; + + activationTask = obj.ActivateAsync(stateHolder); + } + + public Task ActivateAsync() + { + return activationTask.ContinueWith(x => obj); + } + } + public StateFactory( IPubSub pubSub, IServiceProvider services, @@ -46,7 +63,10 @@ namespace Squidex.Infrastructure.States { pubSubscription = pubSub.Subscribe(m => { - statesCache.Remove(m.Key); + lock (lockObject) + { + statesCache.Remove(m.Key); + } }); } @@ -66,54 +86,36 @@ namespace Squidex.Infrastructure.States { Guard.NotNull(key, nameof(key)); - return taskFactory.StartNew(async () => + lock (lockObject) { if (statesCache.TryGetValue(key, out var state)) { - return state; + return Task.FromResult(state); } - else + + state = (T)services.GetService(typeof(T)); + + var stateHolder = new StateHolder(key, () => { - state = (T)services.GetService(typeof(T)); - - var stateHolder = new StateHolder(key, () => - { - pubSub.Publish(new InvalidateMessage { Key = key }, false); - }, store); - - await state.ActivateAsync(stateHolder); - - statesCache.CreateEntry(key) - .SetValue(state) - .SetAbsoluteExpiration(CacheDuration) - .RegisterPostEvictionCallback((k, v, r, s) => - { - taskFactory.StartNew(() => - { - state.Dispose(); - states.Remove(state); - }).Forget(); - }) - .Dispose(); - - states.Add(state); - - return state; - } - }).Unwrap(); + pubSub.Publish(new InvalidateMessage { Key = key }, false); + }, store); + + statesCache.CreateEntry(key) + .SetValue(state) + .SetAbsoluteExpiration(CacheDuration) + .Dispose(); + + var stateObj = new ObjectHolder(state, stateHolder); + + return stateObj.ActivateAsync(); + } } protected override void DisposeObject(bool disposing) { if (disposing) { - taskFactory.StartNew(() => - { - foreach (var state in states) - { - state.Dispose(); - } - }).Wait(); + pubSubscription.Dispose(); } } } diff --git a/src/Squidex.Infrastructure/States/StatefulObject.cs b/src/Squidex.Infrastructure/States/StatefulObject.cs index f6fa8c677..492494cc5 100644 --- a/src/Squidex.Infrastructure/States/StatefulObject.cs +++ b/src/Squidex.Infrastructure/States/StatefulObject.cs @@ -10,7 +10,7 @@ using System.Threading.Tasks; namespace Squidex.Infrastructure.States { - public abstract class StatefulObject : DisposableObjectBase + public abstract class StatefulObject { private IStateHolder stateHolder; @@ -61,9 +61,5 @@ namespace Squidex.Infrastructure.States await stateHolder.WriteAsync(); } } - - protected override void DisposeObject(bool disposing) - { - } } } diff --git a/src/Squidex.Infrastructure/Tasks/LimitedConcurrencyLevelTaskScheduler.cs b/src/Squidex.Infrastructure/Tasks/LimitedConcurrencyLevelTaskScheduler.cs deleted file mode 100644 index 347a2d2d2..000000000 --- a/src/Squidex.Infrastructure/Tasks/LimitedConcurrencyLevelTaskScheduler.cs +++ /dev/null @@ -1,142 +0,0 @@ -// ========================================================================== -// LimitedConcurrencyLevelTaskScheduler.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Threading; -using System.Threading.Tasks; - -namespace Squidex.Infrastructure.Tasks -{ - public sealed class LimitedConcurrencyLevelTaskScheduler : TaskScheduler - { - [ThreadStatic] - private static bool currentThreadIsProcessingItems; - private readonly LinkedList tasks = new LinkedList(); - private readonly int maxDegreeOfParallelism; - private int delegatesQueuedOrRunning; - - public override int MaximumConcurrencyLevel - { - get { return maxDegreeOfParallelism; } - } - - public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism) - { - Guard.GreaterThan(maxDegreeOfParallelism, 0, nameof(maxDegreeOfParallelism)); - - this.maxDegreeOfParallelism = maxDegreeOfParallelism; - } - - protected override void QueueTask(Task task) - { - lock (tasks) - { - tasks.AddLast(task); - - if (delegatesQueuedOrRunning < maxDegreeOfParallelism) - { - ++delegatesQueuedOrRunning; - - NotifyThreadPoolOfPendingWork(); - } - } - } - - private void NotifyThreadPoolOfPendingWork() - { - ThreadPool.UnsafeQueueUserWorkItem(_ => - { - currentThreadIsProcessingItems = true; - try - { - while (true) - { - Task item; - - lock (tasks) - { - if (tasks.Count == 0) - { - --delegatesQueuedOrRunning; - break; - } - - item = tasks.First.Value; - - tasks.RemoveFirst(); - } - - TryExecuteTask(item); - } - } - finally - { - currentThreadIsProcessingItems = false; - } - }, null); - } - - protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) - { - if (!currentThreadIsProcessingItems) - { - return false; - } - - if (taskWasPreviouslyQueued) - { - if (TryDequeue(task)) - { - return TryExecuteTask(task); - } - else - { - return false; - } - } - else - { - return TryExecuteTask(task); - } - } - - protected override bool TryDequeue(Task task) - { - lock (tasks) - { - return tasks.Remove(task); - } - } - - protected override IEnumerable GetScheduledTasks() - { - var lockTaken = false; - try - { - Monitor.TryEnter(tasks, ref lockTaken); - - if (lockTaken) - { - return tasks; - } - else - { - throw new NotSupportedException(); - } - } - finally - { - if (lockTaken) - { - Monitor.Exit(tasks); - } - } - } - } -} \ No newline at end of file diff --git a/tests/Benchmarks/Tests/ReadSchemaState.cs b/tests/Benchmarks/Tests/ReadSchemaState.cs index 69916636f..ed3ff13f2 100644 --- a/tests/Benchmarks/Tests/ReadSchemaState.cs +++ b/tests/Benchmarks/Tests/ReadSchemaState.cs @@ -7,7 +7,7 @@ // ========================================================================== using System; -using System.Collections.Generic; +using System.Collections.Immutable; using Benchmarks.Tests.TestData; using Microsoft.Extensions.DependencyInjection; using NodaTime; @@ -41,7 +41,7 @@ namespace Benchmarks.Tests } }; - state.Schemas = new Dictionary(); + state.Schemas = ImmutableDictionary.Empty; for (var i = 1; i <= 100; i++) { @@ -60,10 +60,10 @@ namespace Benchmarks.Tests schema.SchemaDef = schema.SchemaDef.AddField(new StringField(j, j.ToString(), Partitioning.Invariant)); } - state.Schemas.Add(schema.Id, schema); + state.Schemas = state.Schemas.Add(schema.Id, schema); } - state.Rules = new Dictionary(); + state.Rules = ImmutableDictionary.Empty; for (var i = 0; i < 100; i++) { @@ -77,7 +77,7 @@ namespace Benchmarks.Tests RuleDef = new Rule(new ContentChangedTrigger(), new WebhookAction()) }; - state.Rules.Add(rule.Id, rule); + state.Rules = state.Rules.Add(rule.Id, rule); } grain.SetState(state); diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs index 0bf8de942..bdc90ad9b 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerManagerTests.cs @@ -117,9 +117,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Grains public void Should_not_dispose_actors() { sut.Dispose(); - - Assert.False(actor1.IsDisposed); - Assert.False(actor2.IsDisposed); } } } diff --git a/tests/Squidex.Infrastructure.Tests/States/StatesTests.cs b/tests/Squidex.Infrastructure.Tests/States/StatesTests.cs index 150af75c3..4531b353c 100644 --- a/tests/Squidex.Infrastructure.Tests/States/StatesTests.cs +++ b/tests/Squidex.Infrastructure.Tests/States/StatesTests.cs @@ -7,6 +7,7 @@ // ========================================================================== using System; +using System.Collections.Generic; using System.Threading.Tasks; using FakeItEasy; using Microsoft.Extensions.Caching.Memory; @@ -132,57 +133,31 @@ namespace Squidex.Infrastructure.States await InvalidateCacheAsync(); - Assert.True(actual.IsDisposed); + Assert.False(cache.TryGetValue(key, out var t)); } [Fact] - public async Task Should_not_dispose_detached_when_message_sent() + public async Task Should_return_same_instance_for_parallel_requests() { - var actual = await sut.GetDetachedAsync(key); - - await InvalidateCacheAsync(); - - Assert.False(actual.IsDisposed); - } - - [Fact] - public async Task Should_dispose_states_if_exired() - { - var actual = await sut.GetAsync(key); - - await RemoveFromCacheAsync(); - - Assert.True(actual.IsDisposed); - } - - [Fact] - public async Task Should_not_dispose_detached_states_if_exired() - { - var actual = await sut.GetDetachedAsync(key); - - await RemoveFromCacheAsync(); - - Assert.False(actual.IsDisposed); - } - - [Fact] - public async Task Should_dispose_states_if_disposed() - { - var actual = await sut.GetAsync(key); + A.CallTo(() => store.ReadAsync(key)) + .ReturnsLazily(() => Task.Delay(1).ContinueWith(x => (1, "1"))); - sut.Dispose(); + var tasks = new List>(); - Assert.True(actual.IsDisposed); - } + for (var i = 0; i < 1000; i++) + { + tasks.Add(Task.Run(() => sut.GetAsync(key))); + } - [Fact] - public async Task Should_not_dispose_detached_states_if_disposed() - { - var actual = await sut.GetDetachedAsync(key); + var retrievedStates = await Task.WhenAll(tasks); - sut.Dispose(); + foreach (var retrievedState in retrievedStates) + { + Assert.Same(retrievedStates[0], retrievedState); + } - Assert.False(actual.IsDisposed); + A.CallTo(() => store.ReadAsync(key)) + .MustHaveHappened(Repeated.Exactly.Once); } private async Task RemoveFromCacheAsync()