|
|
|
@ -13,7 +13,7 @@ using Squidex.Infrastructure.Tasks; |
|
|
|
|
|
|
|
namespace Squidex.Infrastructure.EventSourcing.Consume; |
|
|
|
|
|
|
|
public class EventConsumerProcessor : IEventSubscriber<ParsedEvents> |
|
|
|
public partial class EventConsumerProcessor : IEventSubscriber<ParsedEvents> |
|
|
|
{ |
|
|
|
private readonly SimpleState<EventConsumerState> state; |
|
|
|
private readonly IEventFormatter eventFormatter; |
|
|
|
@ -69,7 +69,7 @@ public class EventConsumerProcessor : IEventSubscriber<ParsedEvents> |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
log.LogCritical(ex, "Failed to complete consumer."); |
|
|
|
LogFailedToCompleteConsumer(log, ex); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -107,7 +107,7 @@ public class EventConsumerProcessor : IEventSubscriber<ParsedEvents> |
|
|
|
|
|
|
|
if (logWindow.CanRetryAfterFailure()) |
|
|
|
{ |
|
|
|
log.LogError(exception, "Failed to handle event."); |
|
|
|
LogFailedToHandleEvent(log, exception); |
|
|
|
} |
|
|
|
}, State.Position); |
|
|
|
} |
|
|
|
@ -215,8 +215,7 @@ public class EventConsumerProcessor : IEventSubscriber<ParsedEvents> |
|
|
|
ex = new AggregateException(ex, unsubscribeException); |
|
|
|
} |
|
|
|
|
|
|
|
log.LogCritical(ex, "Failed to update consumer {consumer} at position {position} from {caller}.", |
|
|
|
eventConsumer.Name, position, caller); |
|
|
|
LogFailedToUpdateConsumer(log, eventConsumer.Name, position, caller, ex); |
|
|
|
|
|
|
|
State = previousState.Stopped(ex); |
|
|
|
} |
|
|
|
@ -233,7 +232,7 @@ public class EventConsumerProcessor : IEventSubscriber<ParsedEvents> |
|
|
|
{ |
|
|
|
if (log.IsEnabled(LogLevel.Debug)) |
|
|
|
{ |
|
|
|
log.LogDebug("Event consumer {consumer} reset started", eventConsumer.Name); |
|
|
|
LogEventConsumerResetStarted(log, eventConsumer.Name); |
|
|
|
} |
|
|
|
|
|
|
|
var watch = ValueStopwatch.StartNew(); |
|
|
|
@ -243,7 +242,7 @@ public class EventConsumerProcessor : IEventSubscriber<ParsedEvents> |
|
|
|
} |
|
|
|
finally |
|
|
|
{ |
|
|
|
log.LogDebug("Event consumer {consumer} reset completed after {time}ms.", eventConsumer.Name, watch.Stop()); |
|
|
|
LogEventConsumerResetCompleted(log, eventConsumer.Name, watch.Stop()); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
@ -282,4 +281,19 @@ public class EventConsumerProcessor : IEventSubscriber<ParsedEvents> |
|
|
|
{ |
|
|
|
return eventStore.CreateSubscription(subscriber, eventConsumer.EventsFilter, State.Position); |
|
|
|
} |
|
|
|
|
|
|
|
[LoggerMessage(EventId = 1, Level = LogLevel.Critical, Message = "Failed to complete consumer.")] |
|
|
|
private static partial void LogFailedToCompleteConsumer(ILogger logger, Exception exception); |
|
|
|
|
|
|
|
[LoggerMessage(EventId = 2, Level = LogLevel.Error, Message = "Failed to handle event.")] |
|
|
|
private static partial void LogFailedToHandleEvent(ILogger logger, Exception exception); |
|
|
|
|
|
|
|
[LoggerMessage(EventId = 3, Level = LogLevel.Critical, Message = "Failed to update consumer {consumer} at position {position} from {caller}.")] |
|
|
|
private static partial void LogFailedToUpdateConsumer(ILogger logger, string consumer, string? position, string? caller, Exception exception); |
|
|
|
|
|
|
|
[LoggerMessage(EventId = 4, Level = LogLevel.Debug, Message = "Event consumer {consumer} reset started")] |
|
|
|
private static partial void LogEventConsumerResetStarted(ILogger logger, string consumer); |
|
|
|
|
|
|
|
[LoggerMessage(EventId = 5, Level = LogLevel.Debug, Message = "Event consumer {consumer} reset completed after {time}ms.")] |
|
|
|
private static partial void LogEventConsumerResetCompleted(ILogger logger, string consumer, long time); |
|
|
|
} |
|
|
|
|