|
|
|
@ -1,5 +1,5 @@ |
|
|
|
// ==========================================================================
|
|
|
|
// EventReceiver.cs
|
|
|
|
// EventConsumerActor.cs
|
|
|
|
// Squidex Headless CMS
|
|
|
|
// ==========================================================================
|
|
|
|
// Copyright (c) Squidex Group
|
|
|
|
@ -23,7 +23,8 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors |
|
|
|
private readonly ISemanticLog log; |
|
|
|
private IEventSubscription eventSubscription; |
|
|
|
private IEventConsumer eventConsumer; |
|
|
|
private string position; |
|
|
|
private bool isStarted; |
|
|
|
private bool isSetup; |
|
|
|
|
|
|
|
public EventConsumerActor( |
|
|
|
EventDataFormatter formatter, |
|
|
|
@ -47,9 +48,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors |
|
|
|
{ |
|
|
|
Guard.NotNull(eventConsumer, nameof(eventConsumer)); |
|
|
|
|
|
|
|
this.eventConsumer = eventConsumer; |
|
|
|
|
|
|
|
SendAsync(new StartConsumerMessage()); |
|
|
|
SendAsync(new SetupConsumerMessage { EventConsumer = eventConsumer }); |
|
|
|
} |
|
|
|
|
|
|
|
protected override async Task OnStop() |
|
|
|
@ -69,21 +68,28 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors |
|
|
|
{ |
|
|
|
switch (message) |
|
|
|
{ |
|
|
|
case StopConsumerMessage stopConsumer: |
|
|
|
case SetupConsumerMessage setupConsumer when !isSetup: |
|
|
|
{ |
|
|
|
await StopAsync(stopConsumer.Exception); |
|
|
|
await SetupAsync(setupConsumer.EventConsumer); |
|
|
|
|
|
|
|
break; |
|
|
|
} |
|
|
|
|
|
|
|
case StartConsumerMessage startConsumer: |
|
|
|
case StartConsumerMessage startConsumer when isSetup && !isStarted: |
|
|
|
{ |
|
|
|
await StartAsync(); |
|
|
|
|
|
|
|
break; |
|
|
|
} |
|
|
|
|
|
|
|
case ResetConsumerMessage resetConsumer: |
|
|
|
case StopConsumerMessage stopConsumer when isSetup && isStarted: |
|
|
|
{ |
|
|
|
await StopAsync(stopConsumer.Exception); |
|
|
|
|
|
|
|
break; |
|
|
|
} |
|
|
|
|
|
|
|
case ResetConsumerMessage resetConsumer when isSetup: |
|
|
|
{ |
|
|
|
await StopAsync(); |
|
|
|
await ResetAsync(); |
|
|
|
@ -92,7 +98,7 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors |
|
|
|
break; |
|
|
|
} |
|
|
|
|
|
|
|
case ReceiveEventMessage receiveEvent: |
|
|
|
case ReceiveEventMessage receiveEvent when isSetup: |
|
|
|
{ |
|
|
|
if (receiveEvent.Source == eventSubscription) |
|
|
|
{ |
|
|
|
@ -105,28 +111,48 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
private async Task StartAsync() |
|
|
|
private async Task SetupAsync(IEventConsumer consumer) |
|
|
|
{ |
|
|
|
eventConsumer = consumer; |
|
|
|
|
|
|
|
await eventConsumerInfoRepository.CreateAsync(eventConsumer.Name); |
|
|
|
|
|
|
|
position = (await eventConsumerInfoRepository.FindAsync(eventConsumer.Name)).Position; |
|
|
|
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); |
|
|
|
|
|
|
|
if (!status.IsStopped) |
|
|
|
{ |
|
|
|
SendAsync(new StartConsumerMessage()).Forget(); |
|
|
|
} |
|
|
|
|
|
|
|
isSetup = true; |
|
|
|
} |
|
|
|
|
|
|
|
private async Task StartAsync() |
|
|
|
{ |
|
|
|
await eventConsumerInfoRepository.StartAsync(eventConsumer.Name); |
|
|
|
|
|
|
|
var status = await eventConsumerInfoRepository.FindAsync(eventConsumer.Name); |
|
|
|
|
|
|
|
var position = status.Position; |
|
|
|
|
|
|
|
eventSubscription = eventStore.CreateSubscription(); |
|
|
|
eventSubscription.SendAsync(new SubscribeMessage { Parent = this, StreamFilter = eventConsumer.EventsFilter, Position = position }).Forget(); |
|
|
|
|
|
|
|
isStarted = true; |
|
|
|
} |
|
|
|
|
|
|
|
private async Task StopAsync(Exception exception = null) |
|
|
|
{ |
|
|
|
if (eventSubscription != null) |
|
|
|
{ |
|
|
|
await eventSubscription.StopAsync(); |
|
|
|
} |
|
|
|
|
|
|
|
await eventConsumerInfoRepository.StopAsync(eventConsumer.Name, exception?.Message); |
|
|
|
await eventSubscription.StopAsync(); |
|
|
|
|
|
|
|
isStarted = false; |
|
|
|
} |
|
|
|
|
|
|
|
private async Task ResetAsync() |
|
|
|
{ |
|
|
|
await eventConsumerInfoRepository.ResetAsync(eventConsumer.Name); |
|
|
|
|
|
|
|
var actionId = Guid.NewGuid().ToString(); |
|
|
|
try |
|
|
|
{ |
|
|
|
|