diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs index 561882217..b1b179913 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs @@ -18,6 +18,10 @@ namespace Squidex.Infrastructure.MongoDb [BsonRepresentation(BsonType.String)] public string Name { get; set; } + [BsonElement] + [BsonIgnoreIfNull] + public string Error { get; set; } + [BsonElement] [BsonIgnoreIfDefault] public bool IsStopped { get; set; } diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs index 2726e3f68..16d73cf54 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs @@ -64,19 +64,19 @@ namespace Squidex.Infrastructure.MongoDb return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Unset(x => x.IsStopped)); } - public Task StopAsync(string consumerName) + public Task StopAsync(string consumerName, string error = null) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsStopped, true)); + return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsStopped, true).Set(x => x.Error, error)); } - public Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber) + public Task ResetAsync(string consumerName) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.LastHandledEventNumber, eventNumber).Unset(x => x.IsResetting).Unset(x => x.IsStopped)); + return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsResetting, true)); } - public Task ResetAsync(string consumerName) + public Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsResetting, true)); + return Collection.ReplaceOneAsync(x => x.Name == consumerName, new MongoEventConsumerInfo { Name = consumerName, LastHandledEventNumber = eventNumber }); } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs index 3b1733682..f2c8cfc3b 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs @@ -7,6 +7,7 @@ // ========================================================================== using System; +using System.Reactive.Linq; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Squidex.Infrastructure.Timers; @@ -103,29 +104,20 @@ namespace Squidex.Infrastructure.CQRS.Events { return; } - - var tcs = new TaskCompletionSource(); - eventStore.GetEventsAsync(lastHandledEventNumber).Subscribe(storedEvent => - { - HandleEventAsync(eventConsumer, storedEvent, consumerName).Wait(); - }, ex => - { - tcs.SetException(ex); - }, () => - { - tcs.SetResult(true); - }, ct); + await eventStore.GetEventsAsync(lastHandledEventNumber) + .SelectMany(async storedEvent => + { + await HandleEventAsync(eventConsumer, storedEvent, consumerName); - await tcs.Task; + return storedEvent; + }).DefaultIfEmpty(); } catch (Exception ex) { logger.LogError(InfrastructureErrors.EventHandlingFailed, ex, "Failed to handle events"); - await eventConsumerInfoRepository.StopAsync(consumerName); - - throw; + await eventConsumerInfoRepository.StopAsync(consumerName, ex.ToString()); } }); @@ -137,7 +129,6 @@ namespace Squidex.Infrastructure.CQRS.Events var @event = ParseEvent(storedEvent); await DispatchConsumer(@event, eventConsumer); - await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, storedEvent.EventNumber); } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs index 7fe026d5a..492c73a8b 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs @@ -17,5 +17,7 @@ namespace Squidex.Infrastructure.CQRS.Events bool IsResetting { get; } string Name { get; } + + string Error { get; } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs index a9a9cf788..5e77c01ae 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs @@ -21,7 +21,7 @@ namespace Squidex.Infrastructure.CQRS.Events Task StartAsync(string consumerName); - Task StopAsync(string consumerName); + Task StopAsync(string consumerName, string error = null); Task ResetAsync(string consumerName); diff --git a/src/Squidex/Controllers/Api/EventConsumers/Models/EventConsumerDto.cs b/src/Squidex/Controllers/Api/EventConsumers/Models/EventConsumerDto.cs index 6a88a000d..fa3b3b3e4 100644 --- a/src/Squidex/Controllers/Api/EventConsumers/Models/EventConsumerDto.cs +++ b/src/Squidex/Controllers/Api/EventConsumers/Models/EventConsumerDto.cs @@ -17,5 +17,7 @@ namespace Squidex.Controllers.Api.EventConsumers.Models public bool IsResetting { get; set; } public string Name { get; set; } + + public string Error { get; set; } } } diff --git a/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html b/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html index 7cb2fe175..c321d2c11 100644 --- a/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html +++ b/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html @@ -29,28 +29,32 @@ Event Number - Options + Actions