diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index 445c84dd0..f650103fb 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -1,5 +1,5 @@ // ========================================================================== -// EventReceiver.cs +// EventConsumerActor.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group @@ -23,7 +23,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors private readonly ISemanticLog log; private IEventSubscription eventSubscription; private IEventConsumer eventConsumer; - private string position; + private bool isStarted; + private bool isSetup; public EventConsumerActor( EventDataFormatter formatter, @@ -47,9 +48,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { Guard.NotNull(eventConsumer, nameof(eventConsumer)); - this.eventConsumer = eventConsumer; - - SendAsync(new StartConsumerMessage()); + SendAsync(new SetupConsumerMessage { EventConsumer = eventConsumer }); } protected override async Task OnStop() @@ -69,21 +68,28 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors { switch (message) { - case StopConsumerMessage stopConsumer: + case SetupConsumerMessage setupConsumer when !isSetup: { - await StopAsync(stopConsumer.Exception); + await SetupAsync(setupConsumer.EventConsumer); break; } - case StartConsumerMessage startConsumer: + case StartConsumerMessage startConsumer when isSetup && !isStarted: { await StartAsync(); break; } - case ResetConsumerMessage resetConsumer: + case StopConsumerMessage stopConsumer when isSetup && isStarted: + { + await StopAsync(stopConsumer.Exception); + + break; + } + + case ResetConsumerMessage resetConsumer when isSetup: { await StopAsync(); await ResetAsync(); @@ -92,7 +98,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors break; } - case ReceiveEventMessage receiveEvent: + case ReceiveEventMessage receiveEvent when isSetup: { if (receiveEvent.Source == eventSubscription) { @@ -105,28 +111,48 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors } } - private async Task StartAsync() + private async Task SetupAsync(IEventConsumer consumer) { + eventConsumer = consumer; + await eventConsumerInfoRepository.CreateAsync(eventConsumer.Name); - position = (await eventConsumerInfoRepository.FindAsync(eventConsumer.Name)).Position; + var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); + + if (!status.IsStopped) + { + SendAsync(new StartConsumerMessage()).Forget(); + } + + isSetup = true; + } + + private async Task StartAsync() + { + await eventConsumerInfoRepository.StartAsync(eventConsumer.Name); + + var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); + + var position = status.Position; eventSubscription = eventStore.CreateSubscription(); eventSubscription.SendAsync(new SubscribeMessage { Parent = this, StreamFilter = eventConsumer.EventsFilter, Position = position }).Forget(); + + isStarted = true; } private async Task StopAsync(Exception exception = null) { - if (eventSubscription != null) - { - await eventSubscription.StopAsync(); - } - await eventConsumerInfoRepository.StopAsync(eventConsumer.Name, exception?.Message); + await eventSubscription.StopAsync(); + + isStarted = false; } private async Task ResetAsync() { + await eventConsumerInfoRepository.ResetAsync(eventConsumer.Name); + var actionId = Guid.NewGuid().ToString(); try { diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SetupConsumerMessage.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SetupConsumerMessage.cs new file mode 100644 index 000000000..7ae7c93e0 --- /dev/null +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/Messages/SetupConsumerMessage.cs @@ -0,0 +1,18 @@ +// ========================================================================== +// SetupConsumerMessage.cs +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex Group +// All rights reserved. +// ========================================================================== + +using Squidex.Infrastructure.Actors; + +namespace Squidex.Infrastructure.CQRS.Events.Actors.Messages +{ + [TypeName(nameof(SetupConsumerMessage))] + public sealed class SetupConsumerMessage : IMessage + { + public IEventConsumer EventConsumer { get; set; } + } +} diff --git a/src/Squidex/Config/Domain/InfrastructureModule.cs b/src/Squidex/Config/Domain/InfrastructureModule.cs index 651af7079..86afbdd22 100644 --- a/src/Squidex/Config/Domain/InfrastructureModule.cs +++ b/src/Squidex/Config/Domain/InfrastructureModule.cs @@ -136,6 +136,10 @@ namespace Squidex.Config.Domain .As() .SingleInstance(); + builder.RegisterType() + .As() + .SingleInstance(); + builder.RegisterType() .As() .AsSelf()