From e70063df33803bc13295f935df0591de493ef192 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Wed, 30 Jun 2021 21:16:15 +0200 Subject: [PATCH] Code cleanup. --- .../Grains/EventConsumerGrainTests.cs | 105 ++++++++---------- 1 file changed, 46 insertions(+), 59 deletions(-) diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs index 593944228..485bfdfc9 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Grains/EventConsumerGrainTests.cs @@ -58,10 +58,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains private readonly IGrainState grainState = A.Fake>(); private readonly IEventConsumer eventConsumer = A.Fake(); + private readonly IEventDataFormatter formatter = A.Fake(); private readonly IEventStore eventStore = A.Fake(); private readonly IEventSubscription eventSubscription = A.Fake(); private readonly ISemanticLog log = A.Fake(); - private readonly IEventDataFormatter formatter = A.Fake(); + private readonly StoredEvent storedEvent; private readonly EventData eventData = new EventData("Type", new EnvelopeHeaders(), "Payload"); private readonly Envelope envelope = new Envelope(new MyEvent()); private readonly MyEventConsumerGrain sut; @@ -95,7 +96,9 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => eventSubscription.Sender) .Returns(eventSubscription); - A.CallTo(() => formatter.ParseIfKnown(A.That.Matches(x => x.Data == eventData))) + storedEvent = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); + + A.CallTo(() => formatter.ParseIfKnown(storedEvent)) .Returns(envelope); sut = new MyEventConsumerGrain( @@ -116,7 +119,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: true, position: initialPosition); A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) .MustNotHaveHappened(); @@ -130,7 +133,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) .MustHaveHappenedOnceExactly(); @@ -146,7 +149,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) .MustHaveHappenedOnceExactly(); @@ -160,7 +163,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) .MustHaveHappenedOnceExactly(); @@ -176,7 +179,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: true, position: initialPosition); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -195,7 +198,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = null, Error = null }); + AssetGrainState(isStopped: false, position: null); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappened(2, Times.Exactly); @@ -216,16 +219,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_invoke_and_update_position_if_event_received() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 1 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition, count: 1); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -237,23 +238,21 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_invoke_and_update_position_if_event_received_one_by_one() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - A.CallTo(() => eventConsumer.BatchSize) .Returns(1); await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 5 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition, count: 5); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappened(5, Times.Exactly); @@ -265,23 +264,21 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_invoke_and_update_position_if_event_received_batched() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - A.CallTo(() => eventConsumer.BatchSize) .Returns(100); await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 5 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition, count: 5); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -293,19 +290,17 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_not_invoke_but_update_position_if_consumer_does_not_want_to_handle() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - - A.CallTo(() => eventConsumer.Handles(@event)) + A.CallTo(() => eventConsumer.Handles(storedEvent)) .Returns(false); await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 0 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -320,16 +315,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => formatter.ParseIfKnown(A.That.Matches(x => x.Data == eventData))) .Returns(null); - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = @event.EventPosition, Error = null, Count = 0 }); + AssetGrainState(isStopped: false, position: storedEvent.EventPosition); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -341,16 +334,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains [Fact] public async Task Should_not_invoke_and_update_position_if_event_is_from_another_subscription() { - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(A.Fake(), @event); + await OnEventAsync(A.Fake(), storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventConsumer.On(envelope)) .MustNotHaveHappened(); @@ -368,7 +359,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + AssetGrainState(isStopped: true, position: initialPosition, error: ex.ToString()); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -389,7 +380,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => grainState.WriteAsync()) .MustNotHaveHappened(); @@ -422,7 +413,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + AssetGrainState(isStopped: true, position: initialPosition, error: ex.ToString()); A.CallTo(() => grainState.WriteAsync()) .MustHaveHappenedOnceExactly(); @@ -439,16 +430,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => eventConsumer.On(envelope)) .Throws(ex); - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + AssetGrainState(isStopped: true, position: initialPosition, error: ex.ToString()); A.CallTo(() => eventConsumer.On(envelope)) .MustHaveHappened(); @@ -468,16 +457,14 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => formatter.ParseIfKnown(A.That.Matches(x => x.Data == eventData))) .Throws(ex); - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); - AssetGrainState(new EventConsumerState { IsStopped = true, Position = initialPosition, Error = ex.ToString() }); + AssetGrainState(isStopped: true, position: initialPosition, error: ex.ToString()); A.CallTo(() => eventConsumer.On(envelope)) .MustNotHaveHappened(); @@ -497,12 +484,10 @@ namespace Squidex.Infrastructure.EventSourcing.Grains A.CallTo(() => eventConsumer.On(envelope)) .Throws(exception); - var @event = new StoredEvent("Stream", Guid.NewGuid().ToString(), 123, eventData); - await sut.ActivateAsync(consumerName); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, @event); + await OnEventAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); @@ -510,7 +495,7 @@ namespace Squidex.Infrastructure.EventSourcing.Grains await sut.StartAsync(); await sut.StartAsync(); - AssetGrainState(new EventConsumerState { IsStopped = false, Position = initialPosition, Error = null }); + AssetGrainState(isStopped: false, position: initialPosition); A.CallTo(() => eventConsumer.On(envelope)) .MustHaveHappened(); @@ -535,9 +520,11 @@ namespace Squidex.Infrastructure.EventSourcing.Grains return sut.OnEventAsync(subscription, ev); } - private void AssetGrainState(EventConsumerState state) + private void AssetGrainState(bool isStopped = false, string? position = null, string? error = null, int count = 0) { - grainState.Value.Should().BeEquivalentTo(state); + var expected = new EventConsumerState { IsStopped = isStopped, Position = position, Error = error, Count = count }; + + grainState.Value.Should().BeEquivalentTo(expected); } } -} \ No newline at end of file +}