Browse Source

Harden tests.

pull/898/head
Sebastian 3 years ago
parent
commit
cfcebd3c2c
  1. 4
      backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerProcessor.cs
  2. 29
      backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerState.cs
  3. 7
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs

4
backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerProcessor.cs

@ -175,6 +175,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
private async Task UpdateAsync(Func<Task> action, string? position, [CallerMemberName] string? caller = null) private async Task UpdateAsync(Func<Task> action, string? position, [CallerMemberName] string? caller = null)
{ {
// We do not want to deal with concurrency in this class, therefore we just use a lock.
using (await asyncLock.EnterAsync()) using (await asyncLock.EnterAsync())
{ {
var previousState = State; var previousState = State;
@ -200,7 +201,8 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
State = previousState.Stopped(ex); State = previousState.Stopped(ex);
} }
if (State != previousState) // The state is a record, therefore we can check for value equality.
if (!Equals(previousState, State))
{ {
await state.WriteAsync(); await state.WriteAsync();
} }

29
backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerState.cs

@ -7,19 +7,13 @@
using Squidex.Infrastructure.Reflection; using Squidex.Infrastructure.Reflection;
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
namespace Squidex.Infrastructure.EventSourcing.Consume namespace Squidex.Infrastructure.EventSourcing.Consume
{ {
public sealed class EventConsumerState public sealed record EventConsumerState(string? Position, int Count, bool IsStopped = false, string? Error = null)
{ {
public static readonly EventConsumerState Initial = new EventConsumerState(); public static readonly EventConsumerState Initial = new EventConsumerState(null, 0);
public bool IsStopped { get; init; }
public string? Error { get; init; }
public string? Position { get; init; }
public int Count { get; init; }
public bool IsPaused public bool IsPaused
{ {
@ -32,16 +26,10 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
} }
public EventConsumerState() public EventConsumerState()
: this(null, 0)
{ {
} }
public EventConsumerState(string? position, int count)
{
Position = position;
Count = count;
}
public EventConsumerState Handled(string position, int offset = 1) public EventConsumerState Handled(string position, int offset = 1)
{ {
return new EventConsumerState(position, Count + offset); return new EventConsumerState(position, Count + offset);
@ -49,7 +37,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
public EventConsumerState Stopped(Exception? ex = null) public EventConsumerState Stopped(Exception? ex = null)
{ {
return new EventConsumerState(Position, Count) { IsStopped = true, Error = ex?.Message }; return new EventConsumerState(Position, Count, true, ex?.Message);
} }
public EventConsumerState Started() public EventConsumerState Started()
@ -59,7 +47,10 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
public EventConsumerInfo ToInfo(string name) public EventConsumerInfo ToInfo(string name)
{ {
return SimpleMapper.Map(this, new EventConsumerInfo { Name = name }); return SimpleMapper.Map(this, new EventConsumerInfo
{
Name = name
});
} }
} }
} }

7
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs

@ -250,10 +250,13 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
} }
[Fact] [Fact]
public async Task Should_invoke_and_update_position_if_event_received_batched() public async Task Should_invoke_and_update_position_if_events_received_batched()
{ {
A.CallTo(() => eventConsumer.BatchSize) A.CallTo(() => eventConsumer.BatchSize)
.Returns(100); .Returns(5);
A.CallTo(() => eventConsumer.BatchDelay)
.Returns(int.MaxValue);
await sut.InitializeAsync(default); await sut.InitializeAsync(default);
await sut.ActivateAsync(); await sut.ActivateAsync();

Loading…
Cancel
Save