From 940d11c7132066fbde04d3d398e1481638a3b7c7 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Sat, 21 Oct 2017 10:19:27 +0200 Subject: [PATCH] Retry Subscriber extracted --- .../CQRS/Events/GetEventStoreSubscription.cs | 11 +- src/Squidex.Infrastructure/Actors/Actor.cs | 125 ------- .../Actors/SingleThreadedDispatcher.cs | 69 ++++ .../CQRS/Events/Actors/EventConsumerActor.cs | 328 +++++++----------- .../CQRS/Events/IEventSubscriber.cs | 2 +- .../CQRS/Events/RetrySubscription.cs | 104 ++++++ .../Actors/ActorRemoteTests.cs | 21 +- .../Actors/ActorTests.cs | 158 --------- .../Actors/SingleThreadedDispatcherTests.cs | 91 +++++ .../Events/Actors/EventConsumerActorTests.cs | 76 ++-- .../CQRS/Events/RetrySubscriptionTests.cs | 123 +++++++ 11 files changed, 563 insertions(+), 545 deletions(-) delete mode 100644 src/Squidex.Infrastructure/Actors/Actor.cs create mode 100644 src/Squidex.Infrastructure/Actors/SingleThreadedDispatcher.cs create mode 100644 src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs delete mode 100644 tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs create mode 100644 tests/Squidex.Infrastructure.Tests/Actors/SingleThreadedDispatcherTests.cs create mode 100644 tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs diff --git a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs index 53799cf95..8f6ae46d3 100644 --- a/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs +++ b/src/Squidex.Infrastructure.GetEventStore/CQRS/Events/GetEventStoreSubscription.cs @@ -20,7 +20,7 @@ using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.CQRS.Events { - internal sealed class GetEventStoreSubscription : IEventSubscription + internal sealed class GetEventStoreSubscription : DisposableObjectBase, IEventSubscription { private const string ProjectionName = "by-{0}-{1}"; private static readonly ConcurrentDictionary SubscriptionsCreated = new ConcurrentDictionary(); @@ -57,11 +57,12 @@ namespace Squidex.Infrastructure.CQRS.Events subscription = SubscribeToStream(streamName); } - public Task StopAsync() + protected override void DisposeObject(bool disposing) { - subscription.Stop(); - - return TaskHelper.Done; + if (disposing) + { + subscription.Stop(); + } } private EventStoreCatchUpSubscription SubscribeToStream(string streamName) diff --git a/src/Squidex.Infrastructure/Actors/Actor.cs b/src/Squidex.Infrastructure/Actors/Actor.cs deleted file mode 100644 index abe5e1658..000000000 --- a/src/Squidex.Infrastructure/Actors/Actor.cs +++ /dev/null @@ -1,125 +0,0 @@ -// ========================================================================== -// Actor.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; -using Squidex.Infrastructure.Tasks; - -namespace Squidex.Infrastructure.Actors -{ - public abstract class Actor : IDisposable - { - private readonly ActionBlock block; - private bool isStopped; - - private sealed class StopMessage - { - } - - private sealed class ErrorMessage - { - public Exception Exception { get; set; } - } - - protected Actor() - { - var options = new ExecutionDataflowBlockOptions - { - MaxMessagesPerTask = -1, - MaxDegreeOfParallelism = 1, - BoundedCapacity = 10 - }; - - block = new ActionBlock(Handle, options); - } - - public void Dispose() - { - StopAndWaitAsync().Wait(); - } - - protected async Task DispatchAsync(object message) - { - Guard.NotNull(message, nameof(message)); - - await block.SendAsync(message); - } - - protected async Task FailAsync(Exception exception) - { - Guard.NotNull(exception, nameof(exception)); - - await block.SendAsync(new ErrorMessage { Exception = exception }); - } - - protected async Task StopAndWaitAsync() - { - await block.SendAsync(new StopMessage()); - await block.Completion; - } - - protected virtual Task OnStop() - { - return TaskHelper.Done; - } - - protected virtual Task OnError(Exception exception) - { - return TaskHelper.Done; - } - - protected virtual Task OnMessage(object message) - { - return TaskHelper.Done; - } - - private async Task Handle(object message) - { - if (isStopped) - { - return; - } - - switch (message) - { - case StopMessage stopMessage: - { - isStopped = true; - - block.Complete(); - - await OnStop(); - - break; - } - - case ErrorMessage errorMessage: - { - await OnError(errorMessage.Exception); - - break; - } - - default: - { - try - { - await OnMessage(message); - } - catch (Exception ex) - { - await OnError(ex); - } - - break; - } - } - } - } -} diff --git a/src/Squidex.Infrastructure/Actors/SingleThreadedDispatcher.cs b/src/Squidex.Infrastructure/Actors/SingleThreadedDispatcher.cs new file mode 100644 index 000000000..993ae84f1 --- /dev/null +++ b/src/Squidex.Infrastructure/Actors/SingleThreadedDispatcher.cs @@ -0,0 +1,69 @@ +// ========================================================================== +// Actor.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.Actors +{ + public sealed class SingleThreadedDispatcher + { + private readonly ActionBlock> block; + private bool isStopped; + + public SingleThreadedDispatcher(int capacity = 10) + { + var options = new ExecutionDataflowBlockOptions + { + MaxMessagesPerTask = -1, + MaxDegreeOfParallelism = 1, + BoundedCapacity = capacity + }; + + block = new ActionBlock>(Handle, options); + } + + public Task DispatchAsync(Func action) + { + Guard.NotNull(action, nameof(action)); + + return block.SendAsync(action); + } + + public Task DispatchAsync(Action action) + { + Guard.NotNull(action, nameof(action)); + + return block.SendAsync(() => { action(); return TaskHelper.Done; }); + } + + public async Task StopAndWaitAsync() + { + await DispatchAsync(() => + { + isStopped = true; + + block.Complete(); + }); + + await block.Completion; + } + + private Task Handle(Func action) + { + if (isStopped) + { + return TaskHelper.Done; + } + + return action(); + } + } +} diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index 88a769942..90ab63426 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -8,7 +8,6 @@ using System; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; using Squidex.Infrastructure.Actors; using Squidex.Infrastructure.CQRS.Events.Actors.Messages; using Squidex.Infrastructure.Log; @@ -16,53 +15,24 @@ using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.CQRS.Events.Actors { - public sealed class EventConsumerActor : DisposableObjectBase, IEventSubscriber, IActor + public class EventConsumerActor : DisposableObjectBase, IEventSubscriber, IActor { private readonly EventDataFormatter formatter; - private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5); private readonly IEventStore eventStore; private readonly IEventConsumerInfoRepository eventConsumerInfoRepository; private readonly ISemanticLog log; - private readonly ActionBlock dispatcher; - private IEventSubscription eventSubscription; + private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(1); + private IEventSubscription currentSubscription; private IEventConsumer eventConsumer; - private bool isStopped; private bool statusIsRunning = true; private string statusPosition; private string statusError; - private Guid stateId = Guid.NewGuid(); - private sealed class Teardown + private static Func DefaultFactory { + get { return (e, s, t, p) => new RetrySubscription(e, s, t, p); } } - private sealed class Setup - { - public IEventConsumer EventConsumer { get; set; } - } - - private abstract class SubscriptionMessage - { - public IEventSubscription Subscription { get; set; } - } - - private sealed class SubscriptionEventReceived : SubscriptionMessage - { - public StoredEvent Event { get; set; } - } - - private sealed class SubscriptionFailed : SubscriptionMessage - { - public Exception Exception { get; set; } - } - - private sealed class Reconnect - { - public Guid StateId { get; set; } - } - - public int ReconnectWaitMs { get; set; } = 5000; - public EventConsumerActor( EventDataFormatter formatter, IEventStore eventStore, @@ -79,199 +49,172 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors this.formatter = formatter; this.eventStore = eventStore; this.eventConsumerInfoRepository = eventConsumerInfoRepository; - - var options = new ExecutionDataflowBlockOptions - { - MaxMessagesPerTask = -1, - MaxDegreeOfParallelism = 1, - BoundedCapacity = 10 - }; - - dispatcher = new ActionBlock(OnMessage, options); } protected override void DisposeObject(bool disposing) { if (disposing) { - dispatcher.SendAsync(new Teardown()).Wait(); - dispatcher.Complete(); - dispatcher.Completion.Wait(); + dispatcher.StopAndWaitAsync().Wait(); } } - public async Task WaitForCompletionAsync() + protected virtual IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position) { - while (dispatcher.InputCount > 0) - { - await Task.Delay(20); - } + return new RetrySubscription(eventStore, this, streamFilter, position); } public Task SubscribeAsync(IEventConsumer eventConsumer) { Guard.NotNull(eventConsumer, nameof(eventConsumer)); - return dispatcher.SendAsync(new Setup { EventConsumer = eventConsumer }); + return dispatcher.DispatchAsync(() => HandleSetupAsync(eventConsumer)); } - Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent @event) + private async Task HandleSetupAsync(IEventConsumer consumer) { - return dispatcher.SendAsync(new SubscriptionEventReceived { Subscription = subscription, Event = @event }); - } + eventConsumer = consumer; - Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) - { - return dispatcher.SendAsync(new SubscriptionFailed { Subscription = subscription, Exception = exception }); - } + var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); - void IActor.Tell(object message) - { - dispatcher.SendAsync(message).Forget(); + if (status != null) + { + statusError = status.Error; + statusPosition = status.Position; + statusIsRunning = !status.IsStopped; + } + + if (statusIsRunning) + { + Subscribe(statusPosition); + } } - private async Task OnMessage(object message) + private Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent) { - if (isStopped) + if (subscription != currentSubscription) { - return; + return TaskHelper.Done; } - try + return DoAndUpdateStateAsync(async () => { - var oldStateId = stateId; - var newStateId = stateId = Guid.NewGuid(); - - switch (message) - { - case Teardown teardown: - { - isStopped = true; - - return; - } - - case Setup setup: - { - eventConsumer = setup.EventConsumer; - - var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); - - if (status != null) - { - statusError = status.Error; - statusPosition = status.Position; - statusIsRunning = !status.IsStopped; - } - - if (statusIsRunning) - { - await SubscribeThisAsync(statusPosition); - } - - break; - } + await DispatchConsumerAsync(formatter.Parse(storedEvent.Data)); - case StartConsumerMessage startConsumer: - { - if (statusIsRunning) - { - return; - } - - await SubscribeThisAsync(statusPosition); + statusError = null; + statusPosition = storedEvent.EventPosition; + }); + } - statusError = null; - statusIsRunning = true; + private Task HandleErrorAsync(IEventSubscription subscription, Exception exception) + { + if (subscription != currentSubscription) + { + return TaskHelper.Done; + } - break; - } + return DoAndUpdateStateAsync(() => + { + Unsubscribe(); - case StopConsumerMessage stopConsumer: - { - if (!statusIsRunning) - { - return; - } + statusError = exception.ToString(); + statusIsRunning = false; + }); + } - await UnsubscribeThisAsync(); + private Task HandleStartAsync() + { + if (statusIsRunning) + { + return TaskHelper.Done; + } - statusIsRunning = false; + return DoAndUpdateStateAsync(() => + { + Subscribe(statusPosition); - break; - } + statusError = null; + statusIsRunning = true; + }); + } - case ResetConsumerMessage resetConsumer: - { - await UnsubscribeThisAsync(); - await ClearAsync(); - await SubscribeThisAsync(null); + private Task HandleStopAsync() + { + if (!statusIsRunning) + { + return TaskHelper.Done; + } - statusError = null; - statusPosition = null; - statusIsRunning = true; + return DoAndUpdateStateAsync(() => + { + Unsubscribe(); - break; - } + statusError = null; + statusIsRunning = false; + }); + } - case Reconnect reconnect: - { - if (!statusIsRunning || reconnect.StateId != oldStateId) - { - return; - } + private Task HandleResetInternalAsync() + { + return DoAndUpdateStateAsync(async () => + { + Unsubscribe(); - await SubscribeThisAsync(statusPosition); + await ClearAsync(); - break; - } + Subscribe(null); - case SubscriptionFailed subscriptionFailed: - { - if (subscriptionFailed.Subscription != eventSubscription) - { - return; - } + statusError = null; + statusPosition = null; + statusIsRunning = true; + }); + } - await UnsubscribeThisAsync(); + Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + { + return dispatcher.DispatchAsync(() => HandleEventAsync(subscription, storedEvent)); + } - if (retryWindow.CanRetryAfterFailure()) - { - Task.Delay(ReconnectWaitMs).ContinueWith(t => dispatcher.SendAsync(new Reconnect { StateId = newStateId })).Forget(); - } - else - { - throw subscriptionFailed.Exception; - } + Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) + { + return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception)); + } - break; - } + void IActor.Tell(object message) + { + switch (message) + { + case StopConsumerMessage stop: + dispatcher.DispatchAsync(() => HandleStopAsync()).Forget(); + break; - case SubscriptionEventReceived eventReceived: - { - if (eventReceived.Subscription != eventSubscription) - { - return; - } + case StartConsumerMessage stop: + dispatcher.DispatchAsync(() => HandleStartAsync()).Forget(); + break; - var @event = ParseEvent(eventReceived.Event); - - await DispatchConsumerAsync(@event); - - statusError = null; - statusPosition = @eventReceived.Event.EventPosition; + case ResetConsumerMessage stop: + dispatcher.DispatchAsync(() => HandleResetInternalAsync()).Forget(); + break; + } + } - break; - } - } + private Task DoAndUpdateStateAsync(Action action) + { + return DoAndUpdateStateAsync(() => { action(); return TaskHelper.Done; }); + } + private async Task DoAndUpdateStateAsync(Func action) + { + try + { + await action(); await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError); } catch (Exception ex) { try { - await UnsubscribeThisAsync(); + Unsubscribe(); } catch (Exception unsubscribeException) { @@ -290,27 +233,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors } } - private Task UnsubscribeThisAsync() - { - if (eventSubscription != null) - { - eventSubscription.StopAsync().Forget(); - eventSubscription = null; - } - - return TaskHelper.Done; - } - - private Task SubscribeThisAsync(string position) - { - if (eventSubscription == null) - { - eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, position); - } - - return TaskHelper.Done; - } - private async Task ClearAsync() { var actionId = Guid.NewGuid().ToString(); @@ -356,6 +278,24 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors } } + private void Unsubscribe() + { + if (currentSubscription != null) + { + currentSubscription.StopAsync().Forget(); + currentSubscription = null; + } + } + + private void Subscribe(string position) + { + if (currentSubscription == null) + { + currentSubscription?.StopAsync().Forget(); + currentSubscription = CreateSubscription(eventStore, eventConsumer.EventsFilter, position); + } + } + private Envelope ParseEvent(StoredEvent message) { var @event = formatter.Parse(message.Data); diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs index 39772dd99..6957f83c1 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventSubscriber.cs @@ -13,7 +13,7 @@ namespace Squidex.Infrastructure.CQRS.Events { public interface IEventSubscriber { - Task OnEventAsync(IEventSubscription subscription, StoredEvent @event); + Task OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent); Task OnErrorAsync(IEventSubscription subscription, Exception exception); } diff --git a/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs b/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs new file mode 100644 index 000000000..5c232455b --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/RetrySubscription.cs @@ -0,0 +1,104 @@ +// ========================================================================== +// RetrySubscription.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading; +using System.Threading.Tasks; +using Squidex.Infrastructure.Actors; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.CQRS.Events +{ + public sealed class RetrySubscription : IEventSubscription, IEventSubscriber + { + private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(10); + private readonly CancellationTokenSource disposeCts = new CancellationTokenSource(); + private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5); + private readonly IEventStore eventStore; + private readonly IEventSubscriber eventSubscriber; + private readonly string streamFilter; + private IEventSubscription currentSubscription; + private string position; + + public int ReconnectWaitMs { get; set; } = 5000; + + public RetrySubscription(IEventStore eventStore, IEventSubscriber eventSubscriber, string streamFilter, string position) + { + Guard.NotNull(eventStore, nameof(eventStore)); + Guard.NotNull(eventSubscriber, nameof(eventSubscriber)); + + this.position = position; + + this.eventStore = eventStore; + this.eventSubscriber = eventSubscriber; + + this.streamFilter = streamFilter; + + Subscribe(); + } + + private void Subscribe() + { + currentSubscription = eventStore.CreateSubscription(this, streamFilter, position); + } + + private void Unsubscribe() + { + currentSubscription?.StopAsync().Forget(); + } + + private async Task HandleEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + { + if (subscription == currentSubscription) + { + await eventSubscriber.OnEventAsync(this, storedEvent); + + position = storedEvent.EventPosition; + } + } + + private async Task HandleErrorAsync(IEventSubscription subscription, Exception exception) + { + if (subscription == currentSubscription) + { + subscription.StopAsync().Forget(); + subscription = null; + + if (retryWindow.CanRetryAfterFailure()) + { + Task.Delay(ReconnectWaitMs, disposeCts.Token).ContinueWith(t => + { + dispatcher.DispatchAsync(() => Subscribe()); + }).Forget(); + } + else + { + await eventSubscriber.OnErrorAsync(this, exception); + } + } + } + + Task IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + { + return dispatcher.DispatchAsync(() => HandleEventAsync(subscription, storedEvent)); + } + + Task IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) + { + return dispatcher.DispatchAsync(() => HandleErrorAsync(subscription, exception)); + } + + public async Task StopAsync() + { + await dispatcher.DispatchAsync(() => Unsubscribe()); + await dispatcher.StopAndWaitAsync(); + + disposeCts.Cancel(); + } + } +} diff --git a/tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs b/tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs index 005570593..e53df6eac 100644 --- a/tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs +++ b/tests/Squidex.Infrastructure.Tests/Actors/ActorRemoteTests.cs @@ -22,20 +22,23 @@ namespace Squidex.Infrastructure.Actors public int Counter { get; set; } } - private sealed class MyActor : Actor, IActor + private sealed class MyActor : IActor { + private readonly SingleThreadedDispatcher dispatcher = new SingleThreadedDispatcher(); + public List Invokes { get; } = new List(); - public void Tell(object message) + public Task StopAndWaitAsync() { - DispatchAsync(message).Forget(); + return dispatcher.StopAndWaitAsync(); } - protected override Task OnMessage(object message) + public void Tell(object message) { - Invokes.Add(message); - - return TaskHelper.Done; + dispatcher.DispatchAsync(() => + { + Invokes.Add(message); + }).Forget(); } } @@ -55,13 +58,13 @@ namespace Squidex.Infrastructure.Actors } [Fact] - public void Should_handle_messages_sequentially() + public async Task Should_handle_messages_sequentially() { remoteActor.Tell(new SuccessMessage { Counter = 1 }); remoteActor.Tell(new SuccessMessage { Counter = 2 }); remoteActor.Tell(new SuccessMessage { Counter = 3 }); - actor.Dispose(); + await actor.StopAndWaitAsync(); actor.Invokes.ShouldBeEquivalentTo(new List { diff --git a/tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs b/tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs deleted file mode 100644 index aa6d27a9c..000000000 --- a/tests/Squidex.Infrastructure.Tests/Actors/ActorTests.cs +++ /dev/null @@ -1,158 +0,0 @@ -// ========================================================================== -// ActorTests.cs -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex Group -// All rights reserved. -// ========================================================================== - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading.Tasks; -using FluentAssertions; -using Squidex.Infrastructure.Tasks; -using Xunit; - -namespace Squidex.Infrastructure.Actors -{ - public class ActorTests - { - public class SuccessMessage - { - public int Counter { get; set; } - } - - public class FailedMessage - { - } - - private sealed class MyActor : Actor, IActor - { - public List Invokes { get; } = new List(); - - public void Tell(Exception exception) - { - FailAsync(exception).Forget(); - } - - public void Tell(object message) - { - DispatchAsync(message).Forget(); - } - - public Task StopAsync() - { - return StopAndWaitAsync(); - } - - protected override Task OnStop() - { - Invokes.Add(true); - - return TaskHelper.Done; - } - - protected override Task OnError(Exception exception) - { - Invokes.Add(exception); - - return TaskHelper.Done; - } - - protected override Task OnMessage(object message) - { - if (message is FailedMessage) - { - throw new InvalidOperationException(); - } - - Invokes.Add(message); - - return TaskHelper.Done; - } - } - - private readonly MyActor sut = new MyActor(); - - [Fact] - public async Task Should_invoke_with_exception() - { - sut.Tell(new InvalidOperationException()); - - await sut.StopAsync(); - - Assert.True(sut.Invokes[0] is InvalidOperationException); - } - - [Fact] - public async Task Should_handle_messages_sequentially() - { - sut.Tell(new SuccessMessage { Counter = 1 }); - sut.Tell(new SuccessMessage { Counter = 2 }); - sut.Tell(new SuccessMessage { Counter = 3 }); - - await sut.StopAsync(); - - sut.Invokes.ShouldBeEquivalentTo(new List - { - new SuccessMessage { Counter = 1 }, - new SuccessMessage { Counter = 2 }, - new SuccessMessage { Counter = 3 }, - true - }); - } - - [Fact] - public async Task Should_raise_error_event_when_event_handling_failed() - { - sut.Tell(new FailedMessage()); - sut.Tell(new SuccessMessage { Counter = 2 }); - sut.Tell(new SuccessMessage { Counter = 3 }); - - await sut.StopAsync(); - - Assert.True(sut.Invokes[0] is InvalidOperationException); - - sut.Invokes.Skip(1).ShouldBeEquivalentTo(new List - { - new SuccessMessage { Counter = 2 }, - new SuccessMessage { Counter = 3 }, - true - }); - } - - [Fact] - public async Task Should_not_handle_messages_after_stop() - { - sut.Tell(new SuccessMessage { Counter = 1 }); - - await sut.StopAsync(); - - sut.Tell(new SuccessMessage { Counter = 2 }); - sut.Tell(new SuccessMessage { Counter = 3 }); - - sut.Tell(new InvalidOperationException()); - - sut.Invokes.ShouldBeEquivalentTo(new List - { - new SuccessMessage { Counter = 1 }, - true - }); - } - - [Fact] - public void Should_call_stop_on_dispose() - { - sut.Tell(new SuccessMessage { Counter = 1 }); - - sut.Dispose(); - - sut.Invokes.ShouldBeEquivalentTo(new List - { - new SuccessMessage { Counter = 1 }, - true - }); - } - } -} diff --git a/tests/Squidex.Infrastructure.Tests/Actors/SingleThreadedDispatcherTests.cs b/tests/Squidex.Infrastructure.Tests/Actors/SingleThreadedDispatcherTests.cs new file mode 100644 index 000000000..4ec54c725 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/Actors/SingleThreadedDispatcherTests.cs @@ -0,0 +1,91 @@ +// ========================================================================== +// SingleThreadedDispatcherTests.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Squidex.Infrastructure.Tasks; +using Xunit; + +namespace Squidex.Infrastructure.Actors +{ + public class SingleThreadedDispatcherTests + { + private readonly SingleThreadedDispatcher sut = new SingleThreadedDispatcher(); + + [Fact] + public async Task Should_handle_messages_sequentially() + { + var source = Enumerable.Range(1, 100); + var target = new List(); + + foreach (var item in source) + { + sut.DispatchAsync(() => target.Add(item)).Forget(); + } + + await sut.StopAndWaitAsync(); + + Assert.Equal(source, target); + } + + /* + [Fact] + public async Task Should_raise_error_event_when_event_handling_failed() + { + sut.Tell(new FailedMessage()); + sut.Tell(new SuccessMessage { Counter = 2 }); + sut.Tell(new SuccessMessage { Counter = 3 }); + + await sut.StopAsync(); + + Assert.True(sut.Invokes[0] is InvalidOperationException); + + sut.Invokes.Skip(1).ShouldBeEquivalentTo(new List + { + new SuccessMessage { Counter = 2 }, + new SuccessMessage { Counter = 3 }, + true + }); + } + + [Fact] + public async Task Should_not_handle_messages_after_stop() + { + sut.Tell(new SuccessMessage { Counter = 1 }); + + await sut.StopAsync(); + + sut.Tell(new SuccessMessage { Counter = 2 }); + sut.Tell(new SuccessMessage { Counter = 3 }); + + sut.Tell(new InvalidOperationException()); + + sut.Invokes.ShouldBeEquivalentTo(new List + { + new SuccessMessage { Counter = 1 }, + true + }); + } + + [Fact] + public void Should_call_stop_on_dispose() + { + sut.Tell(new SuccessMessage { Counter = 1 }); + + sut.Dispose(); + + sut.Invokes.ShouldBeEquivalentTo(new List + { + new SuccessMessage { Counter = 1 }, + true + }); + } + */ + } +} diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs index eeaeedc12..43a719e3c 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs @@ -33,6 +33,23 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors public string Position { get; set; } } + public sealed class MyEventConsumerActor : EventConsumerActor + { + public MyEventConsumerActor( + EventDataFormatter formatter, + IEventStore eventStore, + IEventConsumerInfoRepository eventConsumerInfoRepository, + ISemanticLog log) + : base(formatter, eventStore, eventConsumerInfoRepository, log) + { + } + + protected override IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position) + { + return eventStore.CreateSubscription(this, streamFilter, position); + } + } + private readonly IEventConsumerInfoRepository eventConsumerInfoRepository = A.Fake(); private readonly IEventConsumer eventConsumer = A.Fake(); private readonly IEventStore eventStore = A.Fake(); @@ -59,14 +76,18 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope); - sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log) { ReconnectWaitMs = 0 }; + sut = new MyEventConsumerActor( + formatter, + eventStore, + eventConsumerInfoRepository, + log); sutActor = sut; sutSubscriber = sut; } [Fact] - public async Task Should_not_not_subscribe_to_event_store_when_stopped_in_db() + public async Task Should_not_subscribe_to_event_store_when_stopped_in_db() { consumerInfo.IsStopped = true; @@ -87,9 +108,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null)) - .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) .MustHaveHappened(Repeated.Exactly.Once); } @@ -101,9 +119,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) - .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) .MustHaveHappened(Repeated.Exactly.Once); } @@ -118,9 +133,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) - .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null)) .MustHaveHappened(Repeated.Exactly.Once); @@ -137,9 +149,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors sutActor.Tell(new ResetConsumerMessage()); sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) - .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null)) .MustHaveHappened(Repeated.Exactly.Once); @@ -169,9 +178,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) - .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null)) .MustHaveHappened(Repeated.Exactly.Once); @@ -189,43 +195,10 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) - .MustHaveHappened(Repeated.Exactly.Once); - - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null)) - .MustNotHaveHappened(); - A.CallTo(() => eventConsumer.On(envelope)) .MustNotHaveHappened(); } - [Fact] - public async Task Should_reopen_subscription_when_exception_is_retrieved() - { - var ex = new InvalidOperationException(); - - await OnSubscribeAsync(); - await OnErrorAsync(eventSubscription, ex); - - await Task.Delay(200); - - await sut.WaitForCompletionAsync(); - - sut.Dispose(); - - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) - .MustHaveHappened(Repeated.Exactly.Times(3)); - - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString())) - .MustNotHaveHappened(); - - A.CallTo(() => eventSubscription.StopAsync()) - .MustHaveHappened(Repeated.Exactly.Once); - - A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) - .MustHaveHappened(Repeated.Exactly.Twice); - } - [Fact] public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription() { @@ -236,9 +209,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) - .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString())) .MustNotHaveHappened(); } diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs new file mode 100644 index 000000000..a49a4a723 --- /dev/null +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/RetrySubscriptionTests.cs @@ -0,0 +1,123 @@ +// ========================================================================== +// EventConsumerActorTests.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using System; +using System.Threading.Tasks; +using FakeItEasy; +using Xunit; + +namespace Squidex.Infrastructure.CQRS.Events +{ + public class RetrySubscriptionTests + { + private readonly IEventStore eventStore = A.Fake(); + private readonly IEventSubscriber eventSubscriber = A.Fake(); + private readonly IEventSubscription eventSubscription = A.Fake(); + private readonly IEventSubscriber sutSubscriber; + private readonly RetrySubscription sut; + private readonly string streamFilter = Guid.NewGuid().ToString(); + + public RetrySubscriptionTests() + { + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)).Returns(eventSubscription); + + sut = new RetrySubscription(eventStore, eventSubscriber, streamFilter, null) { ReconnectWaitMs = 0 }; + + sutSubscriber = sut; + } + + [Fact] + public void Should_subscribe_after_constructor() + { + A.CallTo(() => eventStore.CreateSubscription(sut, streamFilter, null)) + .MustHaveHappened(); + } + + [Fact] + public async Task Should_reopen_subscription_when_exception_is_retrieved() + { + await OnErrorAsync(eventSubscription, new InvalidOperationException()); + + await Task.Delay(200); + + await sut.StopAsync(); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Twice); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Twice); + + A.CallTo(() => eventSubscriber.OnErrorAsync(A.Ignored, A.Ignored)) + .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_forward_error_from_inner_subscription_when_failed_often() + { + var ex = new InvalidOperationException(); + + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + await OnErrorAsync(eventSubscription, ex); + await sut.StopAsync(); + + A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex)) + .MustHaveHappened(); + } + + [Fact] + public async Task Should_forward_event_from_inner_subscription() + { + var ev = new StoredEvent("1", 2, new EventData()); + + await OnEventAsync(eventSubscription, ev); + await sut.StopAsync(); + + A.CallTo(() => eventSubscriber.OnEventAsync(sut, ev)) + .MustHaveHappened(); + } + + [Fact] + public async Task Should_not_forward_error_when_exception_is_from_another_subscription() + { + var ex = new InvalidOperationException(); + + await OnErrorAsync(A.Fake(), ex); + await sut.StopAsync(); + + A.CallTo(() => eventSubscriber.OnErrorAsync(A.Ignored, A.Ignored)) + .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_not_forward_event_when_message_is_from_another_subscription() + { + var ev = new StoredEvent("1", 2, new EventData()); + + await OnEventAsync(A.Fake(), ev); + await sut.StopAsync(); + + A.CallTo(() => eventSubscriber.OnEventAsync(A.Ignored, A.Ignored)) + .MustNotHaveHappened(); + } + + private Task OnErrorAsync(IEventSubscription subscriber, Exception ex) + { + return sutSubscriber.OnErrorAsync(subscriber, ex); + } + + private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev) + { + return sutSubscriber.OnEventAsync(subscriber, ev); + } + } +} \ No newline at end of file