diff --git a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs index 1fbb026ae..d294069c0 100644 --- a/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs +++ b/src/Squidex.Infrastructure/CQRS/Events/Actors/EventConsumerActor.cs @@ -99,7 +99,12 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors return DoAndUpdateStateAsync(async () => { - await DispatchConsumerAsync(ParseEvent(storedEvent)); + var @event = ParseKnownEvent(storedEvent); + + if (@event != null) + { + await DispatchConsumerAsync(@event); + } statusError = null; statusPosition = storedEvent.EventPosition; @@ -296,14 +301,21 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors } } - private Envelope ParseEvent(StoredEvent message) + private Envelope ParseKnownEvent(StoredEvent message) { - var @event = formatter.Parse(message.Data); + try + { + var @event = formatter.Parse(message.Data); - @event.SetEventPosition(message.EventPosition); - @event.SetEventStreamNumber(message.EventStreamNumber); + @event.SetEventPosition(message.EventPosition); + @event.SetEventStreamNumber(message.EventStreamNumber); - return @event; + return @event; + } + catch (TypeNameNotFoundException) + { + return null; + } } } } \ No newline at end of file diff --git a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs index 43a719e3c..1dde168d1 100644 --- a/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs +++ b/tests/Squidex.Infrastructure.Tests/CQRS/Events/Actors/EventConsumerActorTests.cs @@ -185,6 +185,26 @@ namespace Squidex.Infrastructure.CQRS.Events.Actors .MustHaveHappened(Repeated.Exactly.Once); } + [Fact] + public async Task Should_ignore_old_events() + { + A.CallTo(() => formatter.Parse(eventData, true)) + .Throws(new TypeNameNotFoundException()); + + var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); + + await OnSubscribeAsync(); + await OnEventAsync(eventSubscription, @event); + + sut.Dispose(); + + A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null)) + .MustHaveHappened(Repeated.Exactly.Once); + + A.CallTo(() => eventConsumer.On(envelope)) + .MustNotHaveHappened(); + } + [Fact] public async Task Should_not_invoke_and_update_position_when_event_is_from_another_subscription() {