From 4b0356f1223257a574f2c6a75cf8e5918b5c3af0 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Fri, 3 Dec 2021 20:36:30 +0100 Subject: [PATCH] Fix event consumer. --- .../EventSourcing/Grains/BatchSubscriber.cs | 13 +++++++++-- .../Grains/EventConsumerGrain.cs | 19 +++++++-------- .../Grains/EventConsumerGrainTests.cs | 23 +++++++++++++++++-- .../rules/pages/rule/rule-page.component.html | 16 ++++++------- 4 files changed, 50 insertions(+), 21 deletions(-) diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs index 7c5407c66..a1733f2aa 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs @@ -97,8 +97,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { await foreach (var task in taskQueue.Reader.ReadAllAsync(completed.Token)) { - var scheduler = TaskScheduler.Current; - var sender = eventSubscription?.Sender; if (sender == null) @@ -139,6 +137,17 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { return; } + catch (Exception ex) + { + var sender = eventSubscription?.Sender; + + if (sender != null) + { + await grain.OnErrorAsync(sender, ex); + } + + throw; + } } public Task CompleteAsync() diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs index 9b1d4e61a..b98f65d34 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs @@ -62,7 +62,16 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { if (currentSubscription is BatchSubscriber batchSubscriber) { - await batchSubscriber.CompleteAsync(); + try + { + await batchSubscriber.CompleteAsync(); + } + catch (Exception ex) + { + log.LogFatal(ex, w => w + .WriteProperty("action", "CompleteConsumer") + .WriteProperty("status", "Failed")); + } } } @@ -227,14 +236,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await state.WriteAsync(); } } - catch (Exception ex) - { - Unsubscribe(); - - State = State.Stopped(ex); - - DeactivateOnIdle(); - } finally { semaphore.Release(); diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs index b155c74f2..a568016b3 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs @@ -7,6 +7,7 @@ using FakeItEasy; using FluentAssertions; +using Orleans.Storage; using Squidex.Infrastructure.Orleans; using Squidex.Infrastructure.TestHelpers; using Squidex.Log; @@ -479,10 +480,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_start_after_stop_if_handling_failed() { - var exception = new InvalidOperationException(); + var ex = new InvalidOperationException(); A.CallTo(() => eventConsumer.On(envelope)) - .Throws(exception); + .Throws(ex); await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); @@ -510,6 +511,24 @@ namespace Squidex.Infrastructure.EventSourcing.Grains .MustHaveHappened(2, Times.Exactly); } + [Fact] + public async Task Should_fail_if_writing_failed() + { + var ex = new InconsistentStateException(); + + A.CallTo(() => grainState.WriteAsync()) + .Throws(ex); + + await sut.ActivateAsync(consumerName); + await sut.ActivateAsync(); + + await OnEventAsync(eventSubscription, storedEvent); + + await sut.CompleteAsync(); + + AssetGrainState(isStopped: true, position: storedEvent.EventPosition, error: ex.ToString(), 1); + } + private Task OnErrorAsync(IEventSubscription subscription, Exception exception) { return sut.OnErrorAsync(subscription, exception); diff --git a/frontend/app/features/rules/pages/rule/rule-page.component.html b/frontend/app/features/rules/pages/rule/rule-page.component.html index 5f9de384e..76064283a 100644 --- a/frontend/app/features/rules/pages/rule/rule-page.component.html +++ b/frontend/app/features/rules/pages/rule/rule-page.component.html @@ -14,14 +14,6 @@ - -
{{ 'common.enabled' | sqxTranslate }} @@ -34,6 +26,14 @@
+ +