From ff394acc1e5f10fcbc3c94c69979395eb3f4767e Mon Sep 17 00:00:00 2001 From: Sebastian Date: Tue, 14 Feb 2017 22:15:03 +0100 Subject: [PATCH] Improved error handling for event consumers --- .../MongoEventConsumerInfo.cs | 4 + .../MongoEventConsumerInfoRepository.cs | 12 +- .../CQRS/Events/EventReceiver.cs | 25 +-- .../CQRS/Events/IEventConsumerInfo.cs | 2 + .../Events/IEventConsumerInfoRepository.cs | 2 +- .../EventConsumers/Models/EventConsumerDto.cs | 2 + .../event-consumers-page.component.html | 37 +++- .../event-consumers-page.component.scss | 22 +++ .../event-consumers-page.component.ts | 22 ++- .../apps/pages/apps-page.component.html | 4 +- .../pages/schema/schema-page.component.html | 4 +- .../pages/schemas/schemas-page.component.html | 4 +- .../pages/clients/client.component.html | 6 +- .../pages/clients/client.component.scss | 2 + .../contributors-page.component.html | 2 +- .../languages/languages-page.component.html | 2 +- .../services/event-consumers.service.ts | 6 +- .../pages/internal/apps-menu.component.html | 4 +- .../app/theme/icomoon/fonts/icomoon.eot | Bin 12404 -> 12772 bytes .../app/theme/icomoon/fonts/icomoon.svg | 2 + .../app/theme/icomoon/fonts/icomoon.ttf | Bin 12240 -> 12608 bytes .../app/theme/icomoon/fonts/icomoon.woff | Bin 12316 -> 12684 bytes src/Squidex/app/theme/icomoon/selection.json | 161 ++++++++++++------ src/Squidex/app/theme/icomoon/style.css | 16 +- .../CQRS/Events/EventReceiverTests.cs | 6 +- 25 files changed, 239 insertions(+), 108 deletions(-) diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs index 561882217..b1b179913 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfo.cs @@ -18,6 +18,10 @@ namespace Squidex.Infrastructure.MongoDb [BsonRepresentation(BsonType.String)] public string Name { get; set; } + [BsonElement] + [BsonIgnoreIfNull] + public string Error { get; set; } + [BsonElement] [BsonIgnoreIfDefault] public bool IsStopped { get; set; } diff --git a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs index 2726e3f68..16d73cf54 100644 --- a/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure.MongoDb/MongoEventConsumerInfoRepository.cs @@ -64,19 +64,19 @@ namespace Squidex.Infrastructure.MongoDb return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Unset(x => x.IsStopped)); } - public Task StopAsync(string consumerName) + public Task StopAsync(string consumerName, string error = null) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsStopped, true)); + return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsStopped, true).Set(x => x.Error, error)); } - public Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber) + public Task ResetAsync(string consumerName) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.LastHandledEventNumber, eventNumber).Unset(x => x.IsResetting).Unset(x => x.IsStopped)); + return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsResetting, true)); } - public Task ResetAsync(string consumerName) + public Task SetLastHandledEventNumberAsync(string consumerName, long eventNumber) { - return Collection.UpdateOneAsync(x => x.Name == consumerName, Update.Set(x => x.IsResetting, true)); + return Collection.ReplaceOneAsync(x => x.Name == consumerName, new MongoEventConsumerInfo { Name = consumerName, LastHandledEventNumber = eventNumber }); } } } diff --git a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs index 3b1733682..f2c8cfc3b 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/EventReceiver.cs @@ -7,6 +7,7 @@ // ========================================================================== using System; +using System.Reactive.Linq; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Squidex.Infrastructure.Timers; @@ -103,29 +104,20 @@ namespace Squidex.Infrastructure.CQRS.Events { return; } - - var tcs = new TaskCompletionSource(); - eventStore.GetEventsAsync(lastHandledEventNumber).Subscribe(storedEvent => - { - HandleEventAsync(eventConsumer, storedEvent, consumerName).Wait(); - }, ex => - { - tcs.SetException(ex); - }, () => - { - tcs.SetResult(true); - }, ct); + await eventStore.GetEventsAsync(lastHandledEventNumber) + .SelectMany(async storedEvent => + { + await HandleEventAsync(eventConsumer, storedEvent, consumerName); - await tcs.Task; + return storedEvent; + }).DefaultIfEmpty(); } catch (Exception ex) { logger.LogError(InfrastructureErrors.EventHandlingFailed, ex, "Failed to handle events"); - await eventConsumerInfoRepository.StopAsync(consumerName); - - throw; + await eventConsumerInfoRepository.StopAsync(consumerName, ex.ToString()); } }); @@ -137,7 +129,6 @@ namespace Squidex.Infrastructure.CQRS.Events var @event = ParseEvent(storedEvent); await DispatchConsumer(@event, eventConsumer); - await eventConsumerInfoRepository.SetLastHandledEventNumberAsync(consumerName, storedEvent.EventNumber); } diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs index 7fe026d5a..492c73a8b 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfo.cs @@ -17,5 +17,7 @@ namespace Squidex.Infrastructure.CQRS.Events bool IsResetting { get; } string Name { get; } + + string Error { get; } } } \ No newline at end of file diff --git a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs index a9a9cf788..5e77c01ae 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/IEventConsumerInfoRepository.cs @@ -21,7 +21,7 @@ namespace Squidex.Infrastructure.CQRS.Events Task StartAsync(string consumerName); - Task StopAsync(string consumerName); + Task StopAsync(string consumerName, string error = null); Task ResetAsync(string consumerName); diff --git a/src/Squidex/Controllers/Api/EventConsumers/Models/EventConsumerDto.cs b/src/Squidex/Controllers/Api/EventConsumers/Models/EventConsumerDto.cs index 6a88a000d..fa3b3b3e4 100644 --- a/src/Squidex/Controllers/Api/EventConsumers/Models/EventConsumerDto.cs +++ b/src/Squidex/Controllers/Api/EventConsumers/Models/EventConsumerDto.cs @@ -17,5 +17,7 @@ namespace Squidex.Controllers.Api.EventConsumers.Models public bool IsResetting { get; set; } public string Name { get; set; } + + public string Error { get; set; } } } diff --git a/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html b/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html index 7cb2fe175..c321d2c11 100644 --- a/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html +++ b/src/Squidex/app/features/administration/pages/event-consumers/event-consumers-page.component.html @@ -29,28 +29,32 @@ Event Number - Options + Actions