diff --git a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs b/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs index 329e7f6c8..ed0631e04 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Orleans/Grains/Implementation/EventConsumerGrain.cs @@ -26,7 +26,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation private readonly ISemanticLog log; private IEventSubscription currentSubscription; private IEventConsumer eventConsumer; - private TaskFactory dispatcher; + private SingleThreadedDispatcher dispatcher; public EventConsumerGrain( EventDataFormatter eventFormatter, @@ -62,21 +62,29 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation public override Task OnActivateAsync() { - eventConsumer = eventConsumerFactory(this.GetPrimaryKeyString()); + dispatcher = new SingleThreadedDispatcher(1, TaskScheduler.Current); - dispatcher = new TaskFactory(TaskScheduler.Current); + eventConsumer = eventConsumerFactory(this.GetPrimaryKeyString()); return TaskHelper.Done; } + public override Task OnDeactivateAsync() + { + return dispatcher.StopAndWaitAsync(); + } + public Task ActivateAsync() { - if (!State.IsStopped) + return dispatcher.DispatchAndUnwrapAsync(() => { - Subscribe(State.Position); - } + if (!State.IsStopped) + { + Subscribe(State.Position); + } - return TaskHelper.Done; + return TaskHelper.Done; + }); } private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent) @@ -129,61 +137,70 @@ namespace Squidex.Infrastructure.CQRS.Events.Orleans.Grains.Implementation public Task StartAsync() { - if (!State.IsStopped) + return dispatcher.DispatchAndUnwrapAsync(() => { - return TaskHelper.Done; - } + if (!State.IsStopped) + { + return TaskHelper.Done; + } - return DoAndUpdateStateAsync(() => - { - Subscribe(State.Position); + return DoAndUpdateStateAsync(() => + { + Subscribe(State.Position); - State = State.Started(); + State = State.Started(); + }); }); } public Task StopAsync() { - if (State.IsStopped) + return dispatcher.DispatchAndUnwrapAsync(() => { - return TaskHelper.Done; - } + if (State.IsStopped) + { + return TaskHelper.Done; + } - return DoAndUpdateStateAsync(() => - { - Unsubscribe(); + return DoAndUpdateStateAsync(() => + { + Unsubscribe(); - State = State.Stopped(); + State = State.Stopped(); + }); }); } public Task ResetAsync() { - return DoAndUpdateStateAsync(async () => + return dispatcher.DispatchAndUnwrapAsync(() => { - Unsubscribe(); + return DoAndUpdateStateAsync(async () => + { + Unsubscribe(); - await ClearAsync(); + await ClearAsync(); - Subscribe(null); + Subscribe(null); - State = EventConsumerGrainState.Initial(); + State = EventConsumerGrainState.Initial(); + }); }); } Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) { - return dispatcher.StartNew(() => HandleEventAsync(subscription, storedEvent)).Unwrap(); + return dispatcher.DispatchAndUnwrapAsync(() => HandleEventAsync(subscription, storedEvent)); } Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) { - return dispatcher.StartNew(() => HandleErrorAsync(subscription, exception)).Unwrap(); + return dispatcher.DispatchAndUnwrapAsync(() => HandleErrorAsync(subscription, exception)); } Task IEventSubscriber.OnClosedAsync(IEventSubscription subscription) { - return dispatcher.StartNew(() => HandleClosedAsync(subscription)).Unwrap(); + return dispatcher.DispatchAndUnwrapAsync(() => HandleClosedAsync(subscription)); } public Task> GetStateAsync() diff --git a/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs b/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs index 9121b027a..f3aebd20e 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs @@ -15,8 +15,8 @@ namespace Squidex.Infrastructure.CQRS.Events { public sealed class RetrySubscription : IEventSubscription, IEventSubscriber { - private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(); - private readonly CancellationTokenSource disposeCts = new CancellationTokenSource(); + private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(10); + private readonly CancellationTokenSource timerCts = new CancellationTokenSource(); private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5); private readonly IEventStore eventStore; private readonly IEventSubscriber eventSubscriber; @@ -43,7 +43,10 @@ namespace Squidex.Infrastructure.CQRS.Events private void Subscribe() { - currentSubscription = eventStore.CreateSubscription(this, streamFilter, position); + if (currentSubscription == null) + { + currentSubscription = eventStore.CreateSubscription(this, streamFilter, position); + } } private void Unsubscribe() @@ -80,7 +83,7 @@ namespace Squidex.Infrastructure.CQRS.Events if (retryWindow.CanRetryAfterFailure()) { - Task.Delay(ReconnectWaitMs, disposeCts.Token).ContinueWith(t => + Task.Delay(ReconnectWaitMs, timerCts.Token).ContinueWith(t => { dispatcher.DispatchAsync(() => Subscribe()); }).Forget(); @@ -112,7 +115,7 @@ namespace Squidex.Infrastructure.CQRS.Events await dispatcher.DispatchAsync(() => Unsubscribe()); await dispatcher.StopAndWaitAsync(); - disposeCts.Cancel(); + timerCts.Cancel(); } } } diff --git a/src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs b/src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs index 6a58699f6..d38c381e7 100644 --- a/src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs +++ b/src/Squidex.Infrastructure/Tasks/SingleThreadedDispatcher.cs @@ -17,18 +17,54 @@ namespace Squidex.Infrastructure.Tasks private readonly ActionBlock> block; private bool isStopped; - public SingleThreadedDispatcher(int capacity = 10) + public SingleThreadedDispatcher(int capacity = 1, TaskScheduler scheduler = null) { var options = new ExecutionDataflowBlockOptions { + BoundedCapacity = capacity, MaxMessagesPerTask = -1, MaxDegreeOfParallelism = 1, - BoundedCapacity = capacity + TaskScheduler = scheduler ?? TaskScheduler.Default }; block = new ActionBlock>(Handle, options); } + public Task DispatchAndUnwrapAsync(Func action) + { + Guard.NotNull(action, nameof(action)); + + var cts = new TaskCompletionSource(); + + block.SendAsync(async () => + { + try + { + await action(); + + cts.SetResult(true); + } + catch (Exception ex) + { + cts.TrySetException(ex); + } + }); + + return cts.Task; + } + + public Task DispatchAndUnwrapAsync(Action action) + { + Guard.NotNull(action, nameof(action)); + + return DispatchAndUnwrapAsync(() => + { + action(); + + return TaskHelper.Done; + }); + } + public Task DispatchAsync(Func action) { Guard.NotNull(action, nameof(action)); diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs index 069cf4d55..84eb646d8 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs @@ -26,20 +26,22 @@ namespace Squidex.Infrastructure.CQRS.Events { A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)).Returns(eventSubscription); - sut = new RetrySubscription(eventStore, eventSubscriber, streamFilter, null) { ReconnectWaitMs = 0 }; + sut = new RetrySubscription(eventStore, eventSubscriber, streamFilter, null) { ReconnectWaitMs = 100 }; sutSubscriber = sut; } [Fact] - public void Should_subscribe_after_constructor() + public async Task Should_subscribe_after_constructor() { + await sut.StopAsync(); + A.CallTo(() => eventStore.CreateSubscription(sut, streamFilter, null)) .MustHaveHappened(); } [Fact] - public async Task Should_reopen_subscription_when_exception_is_retrieved() + public async Task Should_reopen_subscription_once_when_exception_is_retrieved() { await OnErrorAsync(eventSubscription, new InvalidOperationException()); diff --git a/tests/Squidex.Infrastructure.Tests/Tasks/SingleThreadedDispatcherTests.cs b/tests/Squidex.Infrastructure.Tests/Tasks/SingleThreadedDispatcherTests.cs index 7077a72c5..5e110745c 100644 --- a/tests/Squidex.Infrastructure.Tests/Tasks/SingleThreadedDispatcherTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Tasks/SingleThreadedDispatcherTests.cs @@ -18,14 +18,19 @@ namespace Squidex.Infrastructure.Tasks private readonly SingleThreadedDispatcher sut = new SingleThreadedDispatcher(); [Fact] - public async Task Should_handle_messages_sequentially() + public async Task Should_handle_async_messages_sequentially() { var source = Enumerable.Range(1, 100); var target = new List(); foreach (var item in source) { - sut.DispatchAsync(() => target.Add(item)).Forget(); + sut.DispatchAsync(() => + { + target.Add(item); + + return TaskHelper.Done; + }).Forget(); } await sut.StopAndWaitAsync(); @@ -33,58 +38,20 @@ namespace Squidex.Infrastructure.Tasks Assert.Equal(source, target); } - /* [Fact] - public async Task Should_raise_error_event_when_event_handling_failed() + public async Task Should_handle_sync_messages_sequentially() { - sut.Tell(new FailedMessage()); - sut.Tell(new SuccessMessage { Counter = 2 }); - sut.Tell(new SuccessMessage { Counter = 3 }); - - await sut.StopAsync(); - - Assert.True(sut.Invokes[0] is InvalidOperationException); - - sut.Invokes.Skip(1).ShouldBeEquivalentTo(new List - { - new SuccessMessage { Counter = 2 }, - new SuccessMessage { Counter = 3 }, - true - }); - } - - [Fact] - public async Task Should_not_handle_messages_after_stop() - { - sut.Tell(new SuccessMessage { Counter = 1 }); - - await sut.StopAsync(); - - sut.Tell(new SuccessMessage { Counter = 2 }); - sut.Tell(new SuccessMessage { Counter = 3 }); - - sut.Tell(new InvalidOperationException()); + var source = Enumerable.Range(1, 100); + var target = new List(); - sut.Invokes.ShouldBeEquivalentTo(new List + foreach (var item in source) { - new SuccessMessage { Counter = 1 }, - true - }); - } - - [Fact] - public void Should_call_stop_on_dispose() - { - sut.Tell(new SuccessMessage { Counter = 1 }); + sut.DispatchAsync(() => target.Add(item)).Forget(); + } - sut.Dispose(); + await sut.StopAndWaitAsync(); - sut.Invokes.ShouldBeEquivalentTo(new List - { - new SuccessMessage { Counter = 1 }, - true - }); + Assert.Equal(source, target); } - */ } }