diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs index ce02d6d6a..054a9cabf 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerGrain.cs @@ -28,6 +28,12 @@ namespace Squidex.Infrastructure.EventSourcing.Grains private IEventSubscription? currentSubscription; private IEventConsumer? eventConsumer; + private EventConsumerState State + { + get => state.Value; + set => state.Value = value; + } + public EventConsumerGrain( EventConsumerFactory eventConsumerFactory, IGrainState state, @@ -65,7 +71,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains private Immutable CreateInfo() { - return state.Value.ToInfo(eventConsumer!.Name).AsImmutable(); + return State.ToInfo(eventConsumer!.Name).AsImmutable(); } public Task OnEventAsync(Immutable subscription, Immutable storedEvent) @@ -87,7 +93,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains } } - state.Value = state.Value.Handled(storedEvent.Value.EventPosition); + State = State.Handled(storedEvent.Value.EventPosition); }); } @@ -102,32 +108,39 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { Unsubscribe(); - state.Value = state.Value.Failed(exception.Value); + State = State.Stopped(exception.Value); }); } - public Task ActivateAsync() + public async Task ActivateAsync() { - if (!state.Value.IsStopped) + if (State.IsFailed) { - Subscribe(state.Value.Position); - } + await DoAndUpdateStateAsync(() => + { + Subscribe(State.Position); - return Task.CompletedTask; + State = State.Started(); + }); + } + else if (!State.IsStopped) + { + Subscribe(State.Position); + } } public async Task> StartAsync() { - if (!state.Value.IsStopped) + if (!State.IsStopped) { return CreateInfo(); } await DoAndUpdateStateAsync(() => { - Subscribe(state.Value.Position); + Subscribe(State.Position); - state.Value = state.Value.Started(); + State = State.Started(); }); return CreateInfo(); @@ -135,7 +148,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains public async Task> StopAsync() { - if (state.Value.IsStopped) + if (State.IsStopped) { return CreateInfo(); } @@ -144,7 +157,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { Unsubscribe(); - state.Value = state.Value.Stopped(); + State = State.Stopped(); }); return CreateInfo(); @@ -160,7 +173,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains Subscribe(null); - state.Value = state.Value.Reset(); + State = State.Reset(); }); return CreateInfo(); @@ -193,7 +206,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains .WriteProperty("status", "Failed") .WriteProperty("eventConsumer", eventConsumer!.Name)); - state.Value = state.Value.Failed(ex); + State = State.Stopped(ex); } await state.WriteAsync(); @@ -259,7 +272,6 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { if (currentSubscription == null) { - currentSubscription?.StopAsync().Forget(); currentSubscription = CreateSubscription(eventConsumer!.EventsFilter, position); } else diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs index 96b105c23..660dde9d8 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerManagerGrain.cs @@ -41,7 +41,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains { DelayDeactivation(TimeSpan.FromDays(1)); - RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(10)); + RegisterOrUpdateReminder("Default", TimeSpan.Zero, TimeSpan.FromMinutes(5)); RegisterTimer(x => ActivateAsync(null), null, TimeSpan.Zero, TimeSpan.FromSeconds(10)); return Task.FromResult(true); @@ -112,7 +112,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains public Task ReceiveReminder(string reminderName, TickStatus status) { - return StartAllAsync(); + return ActivateAsync(null); } } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerState.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerState.cs index 8a68081fc..5fabc4c86 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerState.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Grains/EventConsumerState.cs @@ -18,29 +18,43 @@ namespace Squidex.Infrastructure.EventSourcing.Grains public string? Position { get; set; } - public EventConsumerState Reset() + public bool IsPaused { - return new EventConsumerState(); + get { return IsStopped && string.IsNullOrWhiteSpace(Error); } } - public EventConsumerState Handled(string position) + public bool IsFailed + { + get { return IsStopped && !string.IsNullOrWhiteSpace(Error); } + } + + public EventConsumerState() { - return new EventConsumerState { Position = position }; } - public EventConsumerState Failed(Exception ex) + public EventConsumerState(string? position) + { + Position = position; + } + + public EventConsumerState Reset() + { + return new EventConsumerState(); + } + + public EventConsumerState Handled(string position) { - return new EventConsumerState { Position = Position, IsStopped = true, Error = ex?.ToString() }; + return new EventConsumerState(position); } - public EventConsumerState Stopped() + public EventConsumerState Stopped(Exception? ex = null) { - return new EventConsumerState { Position = Position, IsStopped = true }; + return new EventConsumerState(Position) { IsStopped = true, Error = ex?.ToString() }; } public EventConsumerState Started() { - return new EventConsumerState { Position = Position, IsStopped = false }; + return new EventConsumerState(Position) { IsStopped = false }; } public EventConsumerInfo ToInfo(string name) diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs index 9343583d2..3b07005d2 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs @@ -107,6 +107,20 @@ namespace Squidex.Infrastructure.EventSourcing.Grains .MustHaveHappened(1, Times.Exactly); } + [Fact] + public async Task Should_subscribe_to_event_store_when_failed() + { + grainState.Value = grainState.Value.Stopped(new InvalidOperationException()); + + await sut.ActivateAsync(consumerName); + await sut.ActivateAsync(); + + grainState.Value.Should().BeEquivalentTo(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + + A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) + .MustHaveHappened(1, Times.Exactly); + } + [Fact] public async Task Should_subscribe_to_event_store_when_not_stopped_in_db() { diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs index b8fb217f0..2d29eb56a 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerManagerGrainTests.cs @@ -67,14 +67,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains } [Fact] - public async Task Should_start_all_grains_on_reminder() + public async Task Should_activate_all_grains_on_reminder() { await sut.ReceiveReminder(null!, default); - A.CallTo(() => grainA.StartAsync()) + A.CallTo(() => grainA.ActivateAsync()) .MustHaveHappened(); - A.CallTo(() => grainB.StartAsync()) + A.CallTo(() => grainB.ActivateAsync()) .MustHaveHappened(); }