// ========================================================================== // EventConsumerActorTests.cs // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex Group // All rights reserved. // ========================================================================== using System; using System.Threading.Tasks; using FakeItEasy; using Squidex.Infrastructure.Actors; using Squidex.Infrastructure.CQRS.Events.Actors.Messages; using Squidex.Infrastructure.Log; using Xunit; namespace Squidex.Infrastructure.CQRS.Events.Actors { public class EventConsumerActorTests { public sealed class MyEvent : IEvent { } private sealed class MyEventConsumerInfo : IEventConsumerInfo { public bool IsStopped { get; set; } public string Name { get; set; } public string Error { get; set; } public string Position { get; set; } } public sealed class MyEventConsumerActor : EventConsumerActor { public MyEventConsumerActor( EventDataFormatter formatter, IEventStore eventStore, IEventConsumerInfoRepository eventConsumerInfoRepository, ISemanticLog log) : base(formatter, eventStore, eventConsumerInfoRepository, log) { } protected override IEventSubscription CreateSubscription(IEventStore eventStore, string streamFilter, string position) { return eventStore.CreateSubscription(this, streamFilter, position); } } private readonly IEventConsumerInfoRepository eventConsumerInfoRepository = A.Fake(); private readonly IEventConsumer eventConsumer = A.Fake(); private readonly IEventStore eventStore = A.Fake(); private readonly IEventSubscription eventSubscription = A.Fake(); private readonly ISemanticLog log = A.Fake(); private readonly IActor sutActor; private readonly IEventSubscriber sutSubscriber; private readonly EventDataFormatter formatter = A.Fake(); private readonly EventData eventData = new EventData(); private readonly Envelope envelope = new Envelope(new MyEvent()); private readonly EventConsumerActor sut; private readonly MyEventConsumerInfo consumerInfo = new MyEventConsumerInfo(); private readonly string consumerName; public EventConsumerActorTests() { consumerInfo.Position = Guid.NewGuid().ToString(); consumerName = eventConsumer.GetType().Name; A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)).Returns(eventSubscription); A.CallTo(() => eventConsumer.Name).Returns(consumerName); A.CallTo(() => eventConsumerInfoRepository.FindAsync(consumerName)).Returns(consumerInfo); A.CallTo(() => formatter.Parse(eventData, true)).Returns(envelope); sut = new MyEventConsumerActor( formatter, eventStore, eventConsumerInfoRepository, log); sutActor = sut; sutSubscriber = sut; } [Fact] public async Task Should_not_subscribe_to_event_store_when_stopped_in_db() { consumerInfo.IsStopped = true; await OnSubscribeAsync(); sut.Dispose(); A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) .MustNotHaveHappened(); } [Fact] public async Task Should_subscribe_to_event_store_when_not_found_in_db() { A.CallTo(() => eventConsumerInfoRepository.FindAsync(consumerName)).Returns(Task.FromResult(null)); await OnSubscribeAsync(); sut.Dispose(); A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_subscribe_to_event_store_when_not_stopped_in_db() { await OnSubscribeAsync(); sut.Dispose(); A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_stop_subscription_when_stopped() { await OnSubscribeAsync(); sutActor.Tell(new StopConsumerMessage()); sutActor.Tell(new StopConsumerMessage()); sut.Dispose(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null)) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventSubscription.StopAsync()) .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_reset_consumer_when_resetting() { await OnSubscribeAsync(); sutActor.Tell(new StopConsumerMessage()); sutActor.Tell(new ResetConsumerMessage()); sut.Dispose(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, null)) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, null, false, null)) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventConsumer.ClearAsync()) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventSubscription.StopAsync()) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, consumerInfo.Position)) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, null)) .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_invoke_and_update_position_when_event_received() { var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); await OnSubscribeAsync(); await OnEventAsync(eventSubscription, @event); sut.Dispose(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null)) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventConsumer.On(envelope)) .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_ignore_old_events() { A.CallTo(() => formatter.Parse(eventData, true)) .Throws(new TypeNameNotFoundException()); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); await OnSubscribeAsync(); await OnEventAsync(eventSubscription, @event); sut.Dispose(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, @event.EventPosition, false, null)) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventConsumer.On(envelope)) .MustNotHaveHappened(); } [Fact] public async Task Should_not_invoke_and_update_position_when_event_is_from_another_subscription() { var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); await OnSubscribeAsync(); await OnEventAsync(A.Fake(), @event); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) .MustNotHaveHappened(); } [Fact] public async Task Should_not_make_error_handling_when_exception_is_from_another_subscription() { var ex = new InvalidOperationException(); await OnSubscribeAsync(); await OnErrorAsync(A.Fake(), ex); sut.Dispose(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, false, ex.ToString())) .MustNotHaveHappened(); } [Fact] public async Task Should_stop_if_resetting_failed() { var ex = new InvalidOperationException(); A.CallTo(() => eventConsumer.ClearAsync()) .Throws(ex); await OnSubscribeAsync(); sutActor.Tell(new ResetConsumerMessage()); sut.Dispose(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString())) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventSubscription.StopAsync()) .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_stop_if_handling_failed() { var ex = new InvalidOperationException(); A.CallTo(() => eventConsumer.On(envelope)) .Throws(ex); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); await OnSubscribeAsync(); await OnEventAsync(eventSubscription, @event); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) .MustHaveHappened(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString())) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventSubscription.StopAsync()) .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_stop_if_deserialization_failed() { var ex = new InvalidOperationException(); A.CallTo(() => formatter.Parse(eventData, true)) .Throws(ex); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); await OnSubscribeAsync(); await OnEventAsync(eventSubscription, @event); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) .MustNotHaveHappened(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, ex.ToString())) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventSubscription.StopAsync()) .MustHaveHappened(Repeated.Exactly.Once); } [Fact] public async Task Should_start_after_stop_when_handling_failed() { var exception = new InvalidOperationException(); A.CallTo(() => eventConsumer.On(envelope)) .Throws(exception); var @event = new StoredEvent(Guid.NewGuid().ToString(), 123, eventData); await OnSubscribeAsync(); await OnEventAsync(eventSubscription, @event); sutActor.Tell(new StartConsumerMessage()); sutActor.Tell(new StartConsumerMessage()); sut.Dispose(); A.CallTo(() => eventConsumer.On(envelope)) .MustHaveHappened(); A.CallTo(() => eventConsumerInfoRepository.SetAsync(consumerName, consumerInfo.Position, true, exception.ToString())) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventSubscription.StopAsync()) .MustHaveHappened(Repeated.Exactly.Once); A.CallTo(() => eventStore.CreateSubscription(A.Ignored, A.Ignored, A.Ignored)) .MustHaveHappened(Repeated.Exactly.Twice); } private Task OnErrorAsync(IEventSubscription subscriber, Exception ex) { return sutSubscriber.OnErrorAsync(subscriber, ex); } private Task OnEventAsync(IEventSubscription subscriber, StoredEvent ev) { return sutSubscriber.OnEventAsync(subscriber, ev); } private Task OnSubscribeAsync() { return sut.SubscribeAsync(eventConsumer); } } }