diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs index 0006b9fb7..1850b64fc 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs @@ -22,6 +22,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains private readonly IEventDataFormatter eventDataFormatter; private readonly IEventStore eventStore; private readonly ISemanticLog log; + private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1); private TaskScheduler? scheduler; private BatchSubscriber? currentSubscriber; private IEventConsumer? eventConsumer; @@ -197,34 +198,42 @@ namespace Squidex.Infrastructure.EventSourcing.Grains private async Task DoAndUpdateStateAsync(Func action, [CallerMemberName] string? caller = null) { - var previousState = State; - + await semaphore.WaitAsync(); try { - await action(); - } - catch (Exception ex) - { + var previousState = State; + try { - Unsubscribe(); + await action(); } - catch (Exception unsubscribeException) + catch (Exception ex) { - ex = new AggregateException(ex, unsubscribeException); + try + { + Unsubscribe(); + } + catch (Exception unsubscribeException) + { + ex = new AggregateException(ex, unsubscribeException); + } + + log.LogFatal(ex, w => w + .WriteProperty("action", caller) + .WriteProperty("status", "Failed") + .WriteProperty("eventConsumer", eventConsumer!.Name)); + + State = previousState.Stopped(ex); } - log.LogFatal(ex, w => w - .WriteProperty("action", caller) - .WriteProperty("status", "Failed") - .WriteProperty("eventConsumer", eventConsumer!.Name)); - - State = previousState.Stopped(ex); + if (State != previousState) + { + await state.WriteAsync(); + } } - - if (State != previousState) + finally { - await state.WriteAsync(); + semaphore.Release(); } } diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs index 2d29eb56a..5d405708f 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs @@ -166,12 +166,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains public async Task Should_fetch_infos_from_all_grains() { A.CallTo(() => grainA.GetStateAsync()) - .Returns(new Immutable( - new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" })); + .Returns(new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" }); A.CallTo(() => grainB.GetStateAsync()) - .Returns(new Immutable( - new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" })); + .Returns( new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" }); var infos = await sut.GetConsumersAsync();