From 4f7f8fea05228121b6b3d59fe30a64e6bc438017 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Fri, 3 Dec 2021 19:06:49 +0100 Subject: [PATCH] Deactivate consumer on write exception. --- .../EventSourcing/Grains/BatchSubscriber.cs | 2 ++ .../EventSourcing/Grains/EventConsumerGrain.cs | 8 ++++++++ 2 files changed, 10 insertions(+) diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs index a86927467..7c5407c66 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/BatchSubscriber.cs @@ -97,6 +97,8 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { await foreach (var task in taskQueue.Reader.ReadAllAsync(completed.Token)) { + var scheduler = TaskScheduler.Current; + var sender = eventSubscription?.Sender; if (sender == null) diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs index 9313163a1..9b1d4e61a 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs @@ -227,6 +227,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await state.WriteAsync(); } } + catch (Exception ex) + { + Unsubscribe(); + + State = State.Stopped(ex); + + DeactivateOnIdle(); + } finally { semaphore.Release();