Browse Source

Additional guard for reentrants

pull/596/head
Sebastian 5 years ago
parent
commit
7cf3328834
  1. 45
      backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs
  2. 6
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs

45
backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs

@ -22,6 +22,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
private readonly IEventDataFormatter eventDataFormatter; private readonly IEventDataFormatter eventDataFormatter;
private readonly IEventStore eventStore; private readonly IEventStore eventStore;
private readonly ISemanticLog log; private readonly ISemanticLog log;
private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1);
private TaskScheduler? scheduler; private TaskScheduler? scheduler;
private BatchSubscriber? currentSubscriber; private BatchSubscriber? currentSubscriber;
private IEventConsumer? eventConsumer; private IEventConsumer? eventConsumer;
@ -197,34 +198,42 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
private async Task DoAndUpdateStateAsync(Func<Task> action, [CallerMemberName] string? caller = null) private async Task DoAndUpdateStateAsync(Func<Task> action, [CallerMemberName] string? caller = null)
{ {
var previousState = State; await semaphore.WaitAsync();
try try
{ {
await action(); var previousState = State;
}
catch (Exception ex)
{
try try
{ {
Unsubscribe(); await action();
} }
catch (Exception unsubscribeException) catch (Exception ex)
{ {
ex = new AggregateException(ex, unsubscribeException); try
{
Unsubscribe();
}
catch (Exception unsubscribeException)
{
ex = new AggregateException(ex, unsubscribeException);
}
log.LogFatal(ex, w => w
.WriteProperty("action", caller)
.WriteProperty("status", "Failed")
.WriteProperty("eventConsumer", eventConsumer!.Name));
State = previousState.Stopped(ex);
} }
log.LogFatal(ex, w => w if (State != previousState)
.WriteProperty("action", caller) {
.WriteProperty("status", "Failed") await state.WriteAsync();
.WriteProperty("eventConsumer", eventConsumer!.Name)); }
State = previousState.Stopped(ex);
} }
finally
if (State != previousState)
{ {
await state.WriteAsync(); semaphore.Release();
} }
} }

6
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs

@ -166,12 +166,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains
public async Task Should_fetch_infos_from_all_grains() public async Task Should_fetch_infos_from_all_grains()
{ {
A.CallTo(() => grainA.GetStateAsync()) A.CallTo(() => grainA.GetStateAsync())
.Returns(new Immutable<EventConsumerInfo>( .Returns(new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" });
new EventConsumerInfo { Name = "A", Error = "A-Error", IsStopped = false, Position = "123" }));
A.CallTo(() => grainB.GetStateAsync()) A.CallTo(() => grainB.GetStateAsync())
.Returns(new Immutable<EventConsumerInfo>( .Returns( new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" });
new EventConsumerInfo { Name = "B", Error = "B-Error", IsStopped = false, Position = "456" }));
var infos = await sut.GetConsumersAsync(); var infos = await sut.GetConsumersAsync();

Loading…
Cancel
Save