diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs index 185638518..29fbec27a 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs @@ -58,10 +58,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains private readonly IGrainState grainState = A.Fake>(); private readonly IEventConsumer eventConsumer = A.Fake(); + private readonly IEventDataFormatter formatter = A.Fake(); private readonly IEventStore eventStore = A.Fake(); private readonly IEventSubscription eventSubscription = A.Fake(); private readonly ISemanticLog log = A.Fake(); - private readonly IEventDataFormatter formatter = A.Fake(); + private readonly StoredEvent storedEvent; private readonly EventData eventData = new EventData("Type", new EnvelopeHeaders(), "Payload"); private readonly Envelope envelope = new Envelope(new MyEvent()); private readonly MyEventConsumerGrain sut; @@ -98,7 +99,9 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => eventSubscription.Sender) .Returns(eventSubscription); - A.CallTo(() => formatter.ParseIfKnown(A.That.Matches(x => x.Data == eventData))) + storedEvent = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); + + A.CallTo(() => formatter.ParseIfKnown(storedEvent)) .Returns(envelope); sut = new MyEventConsumerGrain( @@ -119,7 +122,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: true, position: initialPosition); A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) .MustNotHaveHappened(); @@ -133,7 +136,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) .MustHaveHappenedOnceExactly(); @@ -149,7 +152,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) .MustHaveHappenedOnceExactly(); @@ -163,7 +166,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) .MustHaveHappenedOnceExactly(); @@ -179,7 +182,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: true, position: initialPosition); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -198,7 +201,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = null, Error = null }); + AssetGrainState(isStopped: false, position: null); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappened(2, Times.Exactly); @@ -219,16 +222,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_invoke_and_update_position_if_event_received() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 1 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition, count: 1); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -240,23 +241,21 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_invoke_and_update_position_if_event_received_one_by_one() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - A.CallTo(() => eventConsumer.BatchSize) .Returns(1); await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 5 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition, count: 5); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappened(5, Times.Exactly); @@ -268,23 +267,21 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_invoke_and_update_position_if_event_received_batched() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - A.CallTo(() => eventConsumer.BatchSize) .Returns(100); await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 5 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition, count: 5); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -296,19 +293,17 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_not_invoke_but_update_position_if_consumer_does_not_want_to_handle() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - - A.CallTo(() => eventConsumer.Handles(@event)) + A.CallTo(() => eventConsumer.Handles(storedEvent)) .Returns(false); await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 0 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -323,16 +318,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => formatter.ParseIfKnown(A.That.Matches(x => x.Data == eventData))) .Returns(null); - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 0 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -344,16 +337,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_not_invoke_and_update_position_if_event_is_from_another_subscription() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(A.Fake(), @event); + await OnEventAsync(A.Fake(), storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventConsumer.On(envelope)) .MustNotHaveHappened(); @@ -371,7 +362,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + AssetGrainState(isStopped: true, position: initialPosition, error: ex.ToString()); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -392,7 +383,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => grainState.WriteAsync()) .MustNotHaveHappened(); @@ -425,7 +416,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + AssetGrainState(isStopped: true, position: initialPosition, error: ex.ToString()); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -442,16 +433,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => eventConsumer.On(envelope)) .Throws(ex); - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + AssetGrainState(isStopped: true, position: initialPosition, error: ex.ToString()); A.CallTo(() => eventConsumer.On(envelope)) .MustHaveHappened(); @@ -471,16 +460,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => formatter.ParseIfKnown(A.That.Matches(x => x.Data == eventData))) .Throws(ex); - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + AssetGrainState(isStopped: true, position: initialPosition, error: ex.ToString()); A.CallTo(() => eventConsumer.On(envelope)) .MustNotHaveHappened(); @@ -500,12 +487,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => eventConsumer.On(envelope)) .Throws(exception); - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); @@ -513,7 +498,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.StartAsync(); await sut.StartAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventConsumer.On(envelope)) .MustHaveHappened(); @@ -538,9 +523,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains return sut.OnEventAsync(subscription, ev); } - private void AssetGrainState(EventConsumerState state) + private void AssetGrainState(bool isStopped = false, string? position = null, string? error = null, int count = 0) { - grainState.Value.Should().BeEquivalentTo(state); + var expected = new EventConsumerState { IsStopped = isStopped, Position = position, Error = error, Count = count }; + + grainState.Value.Should().BeEquivalentTo(expected); } } }