From 7c037b99873972eedc0dc2842f8bf3fe51f7fe42 Mon Sep 17 00:00:00 2001 From: Sebastian Stehle Date: Mon, 9 Oct 2017 21:25:50 +0200 Subject: [PATCH] Event consumer improved. --- .../CQRS/Events/Actors/EventConsumerActor.cs | 61 +++-- .../Events/Actors/EventConsumerActorTests.cs | 213 +++++++++++++----- 2 files changed, 193 insertions(+), 81 deletions(-) diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index d52346079..bc9878b0e 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -18,7 +18,6 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { public sealed class EventConsumerActor : DisposableObjectBase, IEventSubscriber, IActor { - private const int ReconnectWaitMs = 1000; private readonly EventDataFormatter formatter; private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5); private readonly IEventStore eventStore; @@ -28,10 +27,10 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors private IEventSubscription eventSubscription; private IEventConsumer eventConsumer; private bool isStopped; - private bool statusIsRunning; + private bool statusIsRunning = true; private string statusPosition; private string statusError; - private Guid stateId; + private Guid stateId = Guid.NewGuid(); private sealed class Teardown { @@ -62,6 +61,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors public Guid StateId { get; set; } } + public int ReconnectWaitMs { get; set; } = 5000; + public EventConsumerActor( EventDataFormatter formatter, IEventStore eventStore, @@ -130,7 +131,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors try { - stateId = Guid.NewGuid(); + var oldStateId = stateId; + var newStateId = stateId = Guid.NewGuid(); switch (message) { @@ -138,7 +140,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { isStopped = true; - break; + return; } case Setup setup: @@ -147,11 +149,19 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); - statusError = status?.Error; - statusPosition = status?.Position; - statusIsRunning = !(status?.IsStopped ?? false); + if (status != null) + { + statusError = status.Error; + statusPosition = status.Position; + statusIsRunning = !status.IsStopped; + } - return; + if (statusIsRunning) + { + await SubscribeAsync(); + } + + break; } case StartConsumerMessage startConsumer: @@ -178,8 +188,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors await UnsubscribeAsync(); - statusError = null; - statusIsRunning = true; + statusIsRunning = false; break; } @@ -199,7 +208,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors case Reconnect reconnect: { - if (!statusIsRunning || reconnect.StateId != stateId) + if (!statusIsRunning || reconnect.StateId != oldStateId) { return; } @@ -220,9 +229,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors if (retryWindow.CanRetryAfterFailure()) { - var id = stateId; - - Task.Delay(ReconnectWaitMs).ContinueWith(t => dispatcher.SendAsync(new Reconnect { StateId = id })).Forget(); + Task.Delay(ReconnectWaitMs).ContinueWith(t => dispatcher.SendAsync(new Reconnect { StateId = newStateId })).Forget(); } else { @@ -243,14 +250,26 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors await DispatchConsumerAsync(@event); + statusError = null; statusPosition = @eventReceived.Event.EventPosition; break; } } + + await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError); } catch (Exception ex) { + try + { + await UnsubscribeAsync(); + } + catch (Exception unsubscribeException) + { + ex = new AggregateException(ex, unsubscribeException); + } + log.LogFatal(ex, w => w .WriteProperty("action", "HandleEvent") .WriteProperty("state", "Failed") @@ -258,9 +277,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors statusError = ex.ToString(); statusIsRunning = false; - } - finally - { + await eventConsumerInfoRepository.SetAsync(eventConsumer.Name, statusPosition, !statusIsRunning, statusError); } } @@ -275,14 +292,14 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors } } - private async Task SubscribeAsync() + private Task SubscribeAsync() { if (eventSubscription == null) { - var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); - - eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, status.Position); + eventSubscription = eventStore.CreateSubscription(this, eventConsumer.EventsFilter, statusPosition); } + + return TaskHelper.Done; } private async Task ClearAsync() diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs index 0b58c6d0c..8ee29c569 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs @@ -59,37 +59,69 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope); - sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log); + sut = new EventConsumerActor(formatter, eventStore, eventConsumerInfoRepository, log) { ReconnectWaitMs = 0 }; sutActor = sut; sutSubscriber = sut; } - /* [Fact] - public async Task Should_subscribe_to_event_store_when_started() + public async Task Should_not_not_subscribe_to_event_store_when_stopped_in_db() { - await SubscribeAsync(); + consumerInfo.IsStopped = true; + + await OnSubscribeAsync(); + + sut.Dispose(); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_subscribe_to_event_store_when_not_found_in_db() + { + A.CallTo(() => eventConsumerInfoRepository.FindAsync(consumerName)).Returns(Task.FromResult(null)); + + await OnSubscribeAsync(); sut.Dispose(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null)) .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Once); + } + + [Fact] + public async Task Should_subscribe_to_event_store_when_not_stopped_in_db() + { + await OnSubscribeAsync(); + + sut.Dispose(); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_stop_subscription_when_stopped() { - await SubscribeAsync(); + await OnSubscribeAsync(); + sutActor.Tell(new StopConsumerMessage()); sutActor.Tell(new StopConsumerMessage()); sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null)) + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, true, null)) + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null)) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventSubscription.StopAsync()) @@ -99,12 +131,16 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors [Fact] public async Task Should_reset_consumer_when_resetting() { - await SubscribeAsync(); + await OnSubscribeAsync(); + sutActor.Tell(new StopConsumerMessage()); sutActor.Tell(new ResetConsumerMessage()); sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null)) + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null)) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null)) @@ -122,16 +158,15 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - - await sutSubscriber.OnEventAsync(eventSubscription, @event); + await OnSubscribeAsync(); + await OnEventAsync(eventSubscription, @event); sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null)) + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, @event.EventPosition)) + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null)) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventConsumer.On(envelope)) @@ -143,118 +178,178 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - - await sutSubscriber.OnEventAsync(A.Fake(), @event); + await OnSubscribeAsync(); + await OnEventAsync(A.Fake(), @event); sut.Dispose(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null)) + .MustNotHaveHappened(); + A.CallTo(() => eventConsumer.On(envelope)) - .MustNotHaveHappened(Repeated.Exactly.Once); + .MustNotHaveHappened(); + } + + [Fact] + public async Task Should_reopen_subscription_when_exception_is_retrieved() + { + var ex = new InvalidOperationException(); + + await OnSubscribeAsync(); + await OnErrorAsync(eventSubscription, ex); + + sut.Dispose(); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Times(3)); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString())) + .MustNotHaveHappened(); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Twice); + } + + [Fact] + public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription() + { + var ex = new InvalidOperationException(); + + await OnSubscribeAsync(); + await OnErrorAsync(A.Fake(), ex); - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false)) - .MustNotHaveHappened(Repeated.Exactly.Once); + sut.Dispose(); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString())) + .MustNotHaveHappened(); } [Fact] public async Task Should_stop_if_resetting_failed() { - var exception = new InvalidOperationException("Exception"); + var ex = new InvalidOperationException(); A.CallTo(() => eventConsumer.ClearAsync()) - .Throws(exception); + .Throws(ex); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); + await OnSubscribeAsync(); sutActor.Tell(new ResetConsumerMessage()); sut.Dispose(); - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString())) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString())) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_stop_if_handling_failed() { - var exception = new InvalidOperationException("Exception"); + var ex = new InvalidOperationException(); A.CallTo(() => eventConsumer.On(envelope)) - .Throws(exception); + .Throws(ex); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - await sutSubscriber.OnEventAsync(eventSubscription, @event); + await OnSubscribeAsync(); + await OnEventAsync(eventSubscription, @event); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) .MustHaveHappened(); - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false)) - .MustNotHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString())) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString())) - .MustHaveHappened(); + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] - public async Task Should_start_after_stop_when_handling_failed() + public async Task Should_stop_if_deserialization_failed() { - var exception = new InvalidOperationException("Exception"); + var ex = new InvalidOperationException(); - A.CallTo(() => eventConsumer.On(envelope)) - .Throws(exception); + A.CallTo(() => formatter.Parse(eventData, true)) + .Throws(ex); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - await sutSubscriber.OnEventAsync(eventSubscription, @event); + await OnSubscribeAsync(); + await OnEventAsync(eventSubscription, @event); - sutActor.Tell(new StartConsumerMessage()); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) - .MustHaveHappened(); - - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false)) .MustNotHaveHappened(); - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString())) - .MustHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString())) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.StartAsync(consumerName)) - .MustHaveHappened(Repeated.Exactly.Twice); + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); } [Fact] - public async Task Should_stop_if_deserialization_failed() + public async Task Should_start_after_stop_when_handling_failed() { - var exception = new InvalidOperationException("Exception"); + var exception = new InvalidOperationException(); - A.CallTo(() => formatter.Parse(eventData, true)) + A.CallTo(() => eventConsumer.On(envelope)) .Throws(exception); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); - await SubscribeAsync(); - await sutSubscriber.OnEventAsync(eventSubscription, @event); + await OnSubscribeAsync(); + await OnEventAsync(eventSubscription, @event); + sutActor.Tell(new StartConsumerMessage()); + sutActor.Tell(new StartConsumerMessage()); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) - .MustNotHaveHappened(); + .MustHaveHappened(); - A.CallTo(() => eventConsumerInfoRepository.SetPositionAsync(consumerName, @event.EventPosition, false)) - .MustNotHaveHappened(); + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, exception.ToString())) + .MustHaveHappened(Repeated.Exactly.Once); - A.CallTo(() => eventConsumerInfoRepository.StopAsync(consumerName, exception.ToString())) - .MustHaveHappened(); - }*/ + A.CallTo(() => eventSubscription.StopAsync()) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) + .MustHaveHappened(Repeated.Exactly.Twice); + } + + private async Task OnErrorAsync(IEventSubscription subscriber, Exception ex) + { + await sutSubscriber.OnErrorAsync(subscriber, ex); + + await Task.Delay(200); + } + + private async Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev) + { + await sutSubscriber.OnEventAsync(subscriber, ev); + + await Task.Delay(200); + } - private async Task SubscribeAsync() + private async Task OnSubscribeAsync() { await sut.SubscribeAsync(eventConsumer);