Browse Source

Store recent events.

pull/906/head
Sebastian 3 years ago
parent
commit
127491d61d
  1. 6
      backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs
  2. 11
      backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs
  3. 10
      backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs
  4. 6
      backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs
  5. 4
      backend/src/Squidex.Infrastructure/EventSourcing/Consume/BatchSubscription.cs
  6. 11
      backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerProcessor.cs
  7. 36
      backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerState.cs
  8. 4
      backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParseSubscription.cs
  9. 4
      backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParsedEvent.cs
  10. 2
      backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs
  11. 80
      backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs
  12. 133
      backend/src/Squidex.Infrastructure/EventSourcing/RecentEvents.cs
  13. 5
      backend/src/Squidex.Infrastructure/EventSourcing/StoredEvent.cs
  14. 18
      backend/src/Squidex.Infrastructure/EventSourcing/SubscriptionQuery.cs
  15. 18
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs
  16. 12
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs
  17. 130
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs
  18. 91
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RecentEventsTests.cs
  19. 8
      backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs

6
backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs

@ -46,11 +46,9 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
public IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> subscriber, string? streamFilter = null, string? position = null)
public IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> subscriber, SubscriptionQuery query)
{
Guard.NotNull(streamFilter);
return new GetEventStoreSubscription(subscriber, client, projectionClient, serializer, position, StreamPrefix, streamFilter);
return new GetEventStoreSubscription(subscriber, client, projectionClient, serializer, query, StreamPrefix);
}
public async IAsyncEnumerable<StoredEvent> QueryAllAsync(string? streamFilter = null, string? position = null, int take = int.MaxValue,

11
backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs

@ -21,15 +21,14 @@ namespace Squidex.Infrastructure.EventSourcing
EventStoreClient client,
EventStoreProjectionClient projectionClient,
IJsonSerializer serializer,
string? position,
string? prefix,
string? streamFilter)
SubscriptionQuery query,
string? prefix)
{
Task.Run(async () =>
{
var ct = cts.Token;
var streamName = await projectionClient.CreateProjectionAsync(streamFilter);
var streamName = await projectionClient.CreateProjectionAsync(query.StreamFilter);
async Task OnEvent(StreamSubscription subscription, ResolvedEvent @event,
CancellationToken ct)
@ -50,9 +49,9 @@ namespace Squidex.Infrastructure.EventSourcing
}
}
if (!string.IsNullOrWhiteSpace(position))
if (!string.IsNullOrWhiteSpace(query.Position))
{
var from = FromStream.After(position.ToPosition(true));
var from = FromStream.After(query.Position.ToPosition(true));
subscription = await client.SubscribeToStreamAsync(streamName, from,
OnEvent, true,

10
backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs

@ -18,15 +18,15 @@ namespace Squidex.Infrastructure.EventSourcing
private readonly IEventSubscriber<StoredEvent> eventSubscriber;
private readonly CancellationTokenSource stopToken = new CancellationTokenSource();
public MongoEventStoreSubscription(MongoEventStore eventStore, IEventSubscriber<StoredEvent> eventSubscriber, string? streamFilter, string? position)
public MongoEventStoreSubscription(MongoEventStore eventStore, IEventSubscriber<StoredEvent> eventSubscriber, SubscriptionQuery query)
{
this.eventStore = eventStore;
this.eventSubscriber = eventSubscriber;
QueryAsync(streamFilter, position).Forget();
QueryAsync(query).Forget();
}
private async Task QueryAsync(string? streamFilter, string? position)
private async Task QueryAsync(SubscriptionQuery query)
{
try
{
@ -34,7 +34,7 @@ namespace Squidex.Infrastructure.EventSourcing
try
{
lastRawPosition = await QueryOldAsync(streamFilter, position);
lastRawPosition = await QueryOldAsync(query.StreamFilter, query.Position);
}
catch (OperationCanceledException)
{
@ -42,7 +42,7 @@ namespace Squidex.Infrastructure.EventSourcing
if (!stopToken.IsCancellationRequested)
{
await QueryCurrentAsync(streamFilter, lastRawPosition);
await QueryCurrentAsync(query.StreamFilter, lastRawPosition);
}
}
catch (Exception ex)

6
backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs

@ -21,17 +21,17 @@ namespace Squidex.Infrastructure.EventSourcing
{
private static readonly List<StoredEvent> EmptyEvents = new List<StoredEvent>();
public IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> subscriber, string? streamFilter = null, string? position = null)
public IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> subscriber, SubscriptionQuery query)
{
Guard.NotNull(subscriber);
if (CanUseChangeStreams)
{
return new MongoEventStoreSubscription(this, subscriber, streamFilter, position);
return new MongoEventStoreSubscription(this, subscriber, query);
}
else
{
return new PollingSubscription(this, subscriber, streamFilter, position);
return new PollingSubscription(this, subscriber, query);
}
}

4
backend/src/Squidex.Infrastructure/EventSourcing/Consume/BatchSubscription.cs

@ -74,10 +74,10 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
{
// Events can be null if the event consumer is not interested in the stored event.
var eventList = batch.Select(x => x.Event).NotNull().ToList();
var eventPosition = batch[^1].Position;
var eventEnd = batch[^1];
// Use a struct here to save a few allocations.
await eventSink.OnNextAsync(this, new ParsedEvents(eventList, eventPosition));
await eventSink.OnNextAsync(this, new ParsedEvents(eventList, eventEnd.Position, eventEnd.Context));
}
break;

11
backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerProcessor.cs

@ -76,7 +76,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
await DispatchAsync(@event.Events);
State = State.Handled(@event.Position, @event.Events.Count);
State = State.Handled(@event.Position, @event.Context, @event.Events.Count);
}, State.Position);
}
@ -269,7 +269,14 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
protected virtual IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> subscriber)
{
return eventStore.CreateSubscription(subscriber, eventConsumer!.EventsFilter, State.Position);
var query = new SubscriptionQuery
{
Context = State.Context,
Position = State.Position,
StreamFilter = eventConsumer.EventsFilter
};
return eventStore.CreateSubscription(subscriber, query);
}
}
}

36
backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerState.cs

@ -7,13 +7,21 @@
using Squidex.Infrastructure.Reflection;
#pragma warning disable SA1313 // Parameter names should begin with lower-case letter
namespace Squidex.Infrastructure.EventSourcing.Consume
{
public sealed record EventConsumerState(string? Position, int Count, bool IsStopped = false, string? Error = null)
public sealed record EventConsumerState
{
public static readonly EventConsumerState Initial = new EventConsumerState(null, 0);
public static readonly EventConsumerState Initial = new EventConsumerState();
public string? Position { get; init; }
public string? Error { get; init; }
public bool IsStopped { get; init; }
public long Count { get; init; }
public Dictionary<string, string>? Context { get; init; }
public bool IsPaused
{
@ -25,24 +33,24 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
get => IsStopped && !string.IsNullOrWhiteSpace(Error);
}
public EventConsumerState()
: this(null, 0)
public EventConsumerState Handled(string position, Dictionary<string, string>? context, int offset = 1)
{
return new EventConsumerState
{
Context = context,
Count = Count + offset,
Position = position
};
}
public EventConsumerState Handled(string position, int offset = 1)
public EventConsumerState Started()
{
return new EventConsumerState(position, Count + offset);
return this with { Error = null, IsStopped = false };
}
public EventConsumerState Stopped(Exception? ex = null)
{
return new EventConsumerState(Position, Count, true, ex?.Message);
}
public EventConsumerState Started()
{
return new EventConsumerState(Position, Count);
return this with { Error = ex?.Message, IsStopped = true };
}
public EventConsumerInfo ToInfo(string name)

4
backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParseSubscription.cs

@ -62,7 +62,9 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
if (!completed.IsCancellationRequested)
{
// Also invoke the subscriber if the event is null to update the position.
await eventSubscriber.OnNextAsync(this, new ParsedEvent(@event, storedEvent.EventPosition));
var parsedEvent = new ParsedEvent(@event, storedEvent.EventPosition, storedEvent.Context);
await eventSubscriber.OnNextAsync(this, parsedEvent);
}
break;

4
backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParsedEvent.cs

@ -10,7 +10,7 @@
namespace Squidex.Infrastructure.EventSourcing.Consume
{
public record struct ParsedEvent(Envelope<IEvent>? Event, string Position);
public record struct ParsedEvent(Envelope<IEvent>? Event, string Position, Dictionary<string, string>? Context);
public record struct ParsedEvents(List<Envelope<IEvent>> Events, string Position);
public record struct ParsedEvents(List<Envelope<IEvent>> Events, string Position, Dictionary<string, string>? Context);
}

2
backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs

@ -35,7 +35,7 @@ namespace Squidex.Infrastructure.EventSourcing
Task DeleteStreamAsync(string streamName,
CancellationToken ct = default);
IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> eventSubscriber, string? streamFilter = null, string? position = null);
IEventSubscription CreateSubscription(IEventSubscriber<StoredEvent> eventSubscriber, SubscriptionQuery query);
async Task AppendUnsafeAsync(IEnumerable<EventCommit> commits,
CancellationToken ct = default)

80
backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs

@ -12,75 +12,63 @@ namespace Squidex.Infrastructure.EventSourcing
{
public sealed class PollingSubscription : IEventSubscription
{
private readonly RecentEvents recentEvents = new RecentEvents();
private const string ContextKey = "RecentEvents";
private readonly CompletionTimer timer;
private sealed class RecentEvents
{
private const int Capacity = 50;
private readonly HashSet<Guid> eventIds = new HashSet<Guid>(Capacity);
private readonly Queue<(Guid, string)> eventQueue = new Queue<(Guid, string)>(Capacity);
public string? FirstPosition()
{
if (eventQueue.Count == 0)
{
return null;
}
return eventQueue.Peek().Item2;
}
public bool Add(StoredEvent @event)
{
var id = @event.Data.Headers.EventId();
if (eventIds.Contains(id))
{
return false;
}
while (eventQueue.Count >= Capacity)
{
var (storedId, _) = eventQueue.Dequeue();
eventIds.Remove(storedId);
}
eventIds.Add(id);
eventQueue.Enqueue((id, @event.EventPosition));
return true;
}
}
public PollingSubscription(
IEventStore eventStore,
IEventSubscriber<StoredEvent> eventSubscriber,
string? streamFilter,
string? position)
SubscriptionQuery query,
bool queryOnce = false)
{
// Depending on the implementation of the event store it is not guaranteed that no event is added
// with an older position than what we are currently queried.
// Therefore we use overlapping query windows of 50 events (this is just a guess).
var recentEvents = RecentEvents.Parse(query.Context?.GetOrDefault(ContextKey));
var position = recentEvents.FirstPosition() ?? query.Position;
timer = new CompletionTimer(5000, async ct =>
{
try
{
// If we have read events it is very likely that there will be more coming and we just query again immediately.
var newEventCount = 0;
do
{
newEventCount = 0;
await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, position, ct: ct))
await foreach (var @event in eventStore.QueryAllAsync(query.StreamFilter, position, ct: ct))
{
var storedEvent = @event;
// Check if we have received the event already in the latest query.
if (recentEvents.Add(storedEvent))
{
var recentEventString = recentEvents.ToString();
if (recentEventString != null)
{
// Serialize the recent events to have them available with the next run.
storedEvent = storedEvent with
{
Context = new Dictionary<string, string>
{
[ContextKey] = recentEventString
}
};
}
await eventSubscriber.OnNextAsync(this, storedEvent);
newEventCount++;
}
position = recentEvents.FirstPosition();
}
// Use the first position from the window.
position = recentEvents.FirstPosition();
}
while (newEventCount > 50);
// Use a value greater than one, because otherwise we would always query 51 events (because of the overlapping window) just for another event.
while (newEventCount > 50 && !queryOnce);
}
catch (Exception ex)
{

133
backend/src/Squidex.Infrastructure/EventSourcing/RecentEvents.cs

@ -0,0 +1,133 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using Squidex.Infrastructure.ObjectPool;
namespace Squidex.Infrastructure.EventSourcing
{
public sealed class RecentEvents
{
private readonly HashSet<Guid> eventIds;
private readonly Queue<(Guid, string)> eventQueue;
private readonly int capacity;
public IEnumerable<(Guid, string)> EventQueue => eventQueue;
public RecentEvents(int capacity = 50)
{
this.capacity = capacity;
eventIds = new HashSet<Guid>(capacity);
eventQueue = new Queue<(Guid, string)>(capacity);
}
public string? FirstPosition()
{
if (eventQueue.Count == 0)
{
return null;
}
return eventQueue.Peek().Item2;
}
public bool Add(StoredEvent @event)
{
return Add(@event.Data.Headers.EventId(), @event.EventPosition);
}
public bool Add(Guid id, string position)
{
if (eventIds.Contains(id))
{
return false;
}
while (eventQueue.Count >= capacity)
{
var (storedId, _) = eventQueue.Dequeue();
eventIds.Remove(storedId);
}
eventIds.Add(id);
eventQueue.Enqueue((id, position));
return true;
}
public static RecentEvents Parse(string? input)
{
if (string.IsNullOrWhiteSpace(input))
{
return new RecentEvents();
}
return Parse(input.AsSpan());
}
private static RecentEvents Parse(ReadOnlySpan<char> span)
{
var result = new RecentEvents();
while (span.Length > 0)
{
var endOfLine = span.IndexOf('\n');
if (endOfLine < 0)
{
endOfLine = span.Length - 1;
}
var line = span[0..endOfLine];
var separator = line.IndexOf('|');
if (separator > 0 && separator < line.Length - 1)
{
var guidSpan = line[0..separator];
if (Guid.TryParse(guidSpan, out var id))
{
result.Add(id, line[(separator + 1)..].ToString());
}
}
span = span[endOfLine..];
span = span.TrimStart('\n');
}
return result;
}
public override string? ToString()
{
if (eventQueue.Count == 0)
{
return null;
}
var sb = DefaultPools.StringBuilder.Get();
try
{
foreach (var (id, position) in eventQueue)
{
sb.Append(id);
sb.Append('|');
sb.Append(position);
sb.Append('\n');
}
return sb.ToString();
}
finally
{
DefaultPools.StringBuilder.Return(sb);
}
}
}
}

5
backend/src/Squidex.Infrastructure/EventSourcing/StoredEvent.cs

@ -9,5 +9,8 @@
namespace Squidex.Infrastructure.EventSourcing
{
public sealed record StoredEvent(string StreamName, string EventPosition, long EventStreamNumber, EventData Data);
public sealed record StoredEvent(string StreamName, string EventPosition, long EventStreamNumber, EventData Data)
{
public Dictionary<string, string>? Context { get; init; }
}
}

18
backend/src/Squidex.Infrastructure/EventSourcing/SubscriptionQuery.cs

@ -0,0 +1,18 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
namespace Squidex.Infrastructure.EventSourcing
{
public record struct SubscriptionQuery
{
public string? Position { get; set; }
public string? StreamFilter { get; set; }
public Dictionary<string, string>? Context { get; set; }
}
}

18
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs

@ -65,7 +65,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
}
};
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, A<string>._))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>._))
.Returns(eventSubscription);
A.CallTo(() => eventConsumer.Name)
@ -111,7 +111,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
AssertGrainState(isStopped: true, position: initialPosition);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, A<string>._))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>._))
.MustNotHaveHappened();
}
@ -125,7 +125,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
AssertGrainState(isStopped: false, position: initialPosition);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, A<string>._))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>._))
.MustHaveHappenedOnceExactly();
}
@ -141,7 +141,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
AssertGrainState(isStopped: false, position: initialPosition);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, A<string>._))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>._))
.MustHaveHappenedOnceExactly();
}
@ -155,7 +155,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
AssertGrainState(isStopped: false, position: initialPosition);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, A<string>._))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>._))
.MustHaveHappenedOnceExactly();
}
@ -201,10 +201,10 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
A.CallTo(() => eventSubscription.Dispose())
.MustHaveHappenedOnceExactly();
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, state.Snapshot.Position))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>.That.Matches(x => x.Position == state.Snapshot.Position)))
.MustHaveHappenedOnceExactly();
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, null))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>.That.Matches(x => x.Position == null)))
.MustHaveHappenedOnceExactly();
}
@ -349,7 +349,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
await sut.InitializeAsync(default);
await sut.ActivateAsync();
await sut.OnNextAsync(A.Fake<IEventSubscription>(), new ParsedEvents(new[] { envelope }.ToList(), storedEvent.EventPosition));
await sut.OnNextAsync(A.Fake<IEventSubscription>(), new ParsedEvents(new[] { envelope }.ToList(), storedEvent.EventPosition, null));
await sut.CompleteAsync();
AssertGrainState(isStopped: false, position: initialPosition);
@ -512,7 +512,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume
A.CallTo(() => eventSubscription.Dispose())
.MustHaveHappenedOnceExactly();
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, A<string>._))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>._))
.MustHaveHappened(2, Times.Exactly);
}

12
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs

@ -461,7 +461,17 @@ namespace Squidex.Infrastructure.EventSourcing
IEventSubscription? subscription = null;
try
{
subscription = Sut.CreateSubscription(subscriber, streamFilter, fromBeginning ? null : subscriptionPosition);
var query = new SubscriptionQuery
{
StreamFilter = streamFilter
};
if (!fromBeginning)
{
query.Position = subscriptionPosition;
}
subscription = Sut.CreateSubscription(subscriber, query);
if (subscriptionRunning != null)
{

130
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs

@ -16,16 +16,20 @@ namespace Squidex.Infrastructure.EventSourcing
{
private readonly IEventStore eventStore = A.Fake<IEventStore>();
private readonly IEventSubscriber<StoredEvent> eventSubscriber = A.Fake<IEventSubscriber<StoredEvent>>();
private readonly string position = Guid.NewGuid().ToString();
private SubscriptionQuery query;
public PollingSubscriptionTests()
{
query.Position = Guid.NewGuid().ToString();
query.StreamFilter = "^my-stream";
}
[Fact]
public async Task Should_subscribe_on_start()
{
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await SubscribeAsync();
await WaitAndStopAsync(sut);
A.CallTo(() => eventStore.QueryAllAsync("^my-stream", position, A<int>._, A<CancellationToken>._))
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, query.Position, A<int>._, A<CancellationToken>._))
.MustHaveHappenedOnceExactly();
}
@ -34,12 +38,10 @@ namespace Squidex.Infrastructure.EventSourcing
{
var ex = new InvalidOperationException();
A.CallTo(() => eventStore.QueryAllAsync("^my-stream", position, A<int>._, A<CancellationToken>._))
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, query.Position, A<int>._, A<CancellationToken>._))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
var sut = await SubscribeAsync(false);
A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex))
.MustHaveHappened();
@ -50,12 +52,10 @@ namespace Squidex.Infrastructure.EventSourcing
{
var ex = new OperationCanceledException();
A.CallTo(() => eventStore.QueryAllAsync("^my-stream", position, A<int>._, A<CancellationToken>._))
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, query.Position, A<int>._, A<CancellationToken>._))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
var sut = await SubscribeAsync(false);
A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex))
.MustHaveHappened();
@ -66,12 +66,10 @@ namespace Squidex.Infrastructure.EventSourcing
{
var ex = new AggregateException(new OperationCanceledException());
A.CallTo(() => eventStore.QueryAllAsync("^my-stream", position, A<int>._, A<CancellationToken>._))
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, query.Position, A<int>._, A<CancellationToken>._))
.Throws(ex);
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await WaitAndStopAsync(sut);
var sut = await SubscribeAsync(false);
A.CallTo(() => eventSubscriber.OnErrorAsync(sut, ex))
.MustHaveHappened();
@ -80,13 +78,9 @@ namespace Squidex.Infrastructure.EventSourcing
[Fact]
public async Task Should_wake_up()
{
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
sut.WakeUp();
await WaitAndStopAsync(sut);
var sut = await SubscribeAsync(true);
A.CallTo(() => eventStore.QueryAllAsync("^my-stream", position, A<int>._, A<CancellationToken>._))
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, A<string>._, A<int>._, A<CancellationToken>._))
.MustHaveHappened(2, Times.Exactly);
}
@ -97,45 +91,89 @@ namespace Squidex.Infrastructure.EventSourcing
var receivedEvents = new List<StoredEvent>();
A.CallTo(() => eventStore.QueryAllAsync("^my-stream", position, A<int>._, A<CancellationToken>._))
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, query.Position, A<int>._, A<CancellationToken>._))
.Returns(events.ToAsyncEnumerable());
A.CallTo(() => eventSubscriber.OnNextAsync(A<IEventSubscription>._, A<StoredEvent>._))
.Invokes(x => receivedEvents.Add(x.GetArgument<StoredEvent>(1)!));
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await SubscribeAsync(true);
receivedEvents.Should().BeEquivalentTo(events, options => options.Excluding(x => x.Context));
}
[Fact]
public async Task Should_receive_missing_events_with_second_pull()
{
var events1 = Enumerable.Range(0, 200).Where(x => x % 2 == 0).Select(CreateEvent).ToArray();
var events2 = Enumerable.Range(0, 200).Where(x => x % 2 == 1).Select(CreateEvent).ToArray();
var receivedEvents = new List<StoredEvent>();
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, query.Position, A<int>._, A<CancellationToken>._))
.Returns(events1.ToAsyncEnumerable());
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, "100", A<int>._, A<CancellationToken>._))
.Returns(events2.ToAsyncEnumerable());
A.CallTo(() => eventSubscriber.OnNextAsync(A<IEventSubscription>._, A<StoredEvent>._))
.Invokes(x => receivedEvents.Add(x.GetArgument<StoredEvent>(1)!));
await SubscribeAsync(true);
receivedEvents.Should().BeEquivalentTo(events1.Union(events2), options => options.Excluding(x => x.Context));
}
sut.WakeUp();
[Fact]
public async Task Should_receive_missing_events_with_next_subscription()
{
var events1 = Enumerable.Range(0, 200).Where(x => x % 2 == 0).Select(CreateEvent).ToArray();
var events2 = Enumerable.Range(0, 200).Where(x => x % 2 == 1).Select(CreateEvent).ToArray();
await WaitAndStopAsync(sut);
var receivedEvents = new List<StoredEvent>();
receivedEvents.Should().BeEquivalentTo(events);
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, query.Position, A<int>._, A<CancellationToken>._))
.Returns(events1.ToAsyncEnumerable());
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, "100", A<int>._, A<CancellationToken>._))
.Returns(events2.ToAsyncEnumerable());
A.CallTo(() => eventSubscriber.OnNextAsync(A<IEventSubscription>._, A<StoredEvent>._))
.Invokes(x => receivedEvents.Add(x.GetArgument<StoredEvent>(1)!));
await SubscribeAsync(false, true);
query.Context = receivedEvents[^1].Context;
await SubscribeAsync(false);
receivedEvents.Should().BeEquivalentTo(events1.Union(events2), options => options.Excluding(x => x.Context));
}
[Fact]
public async Task Should_receive_missing_events_with_second_pull()
public async Task Should_not_receive_same_events_again_with_second_subscription()
{
var events1 = Enumerable.Range(0, 200).Where(x => x % 2 == 0).Select(CreateEvent).ToArray();
var events2 = Enumerable.Range(0, 200).Where(x => x % 2 == 1).Select(CreateEvent).ToArray();
var receivedEvents = new List<StoredEvent>();
A.CallTo(() => eventStore.QueryAllAsync("^my-stream", position, A<int>._, A<CancellationToken>._))
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, query.Position, A<int>._, A<CancellationToken>._))
.Returns(events1.ToAsyncEnumerable());
A.CallTo(() => eventStore.QueryAllAsync("^my-stream", "100", A<int>._, A<CancellationToken>._))
A.CallTo(() => eventStore.QueryAllAsync(query.StreamFilter, "100", A<int>._, A<CancellationToken>._))
.Returns(events2.ToAsyncEnumerable());
A.CallTo(() => eventSubscriber.OnNextAsync(A<IEventSubscription>._, A<StoredEvent>._))
.Invokes(x => receivedEvents.Add(x.GetArgument<StoredEvent>(1)!));
var sut = new PollingSubscription(eventStore, eventSubscriber, "^my-stream", position);
await SubscribeAsync(false);
sut.WakeUp();
query.Context = receivedEvents[^1].Context;
await WaitAndStopAsync(sut);
await SubscribeAsync(false);
receivedEvents.Should().BeEquivalentTo(events1.Union(events2));
receivedEvents.Should().BeEquivalentTo(events1.Union(events2), options => options.Excluding(x => x.Context));
}
private StoredEvent CreateEvent(int position)
@ -153,11 +191,25 @@ namespace Squidex.Infrastructure.EventSourcing
"payload"));
}
private static async Task WaitAndStopAsync(IEventSubscription sut)
private async Task<IEventSubscription> SubscribeAsync(bool wakeup = true, bool queryOnce = false)
{
await Task.Delay(200);
sut.Dispose();
var sut = new PollingSubscription(eventStore, eventSubscriber, query, queryOnce);
try
{
if (wakeup)
{
sut.WakeUp();
}
await Task.Delay(200);
}
finally
{
sut.Dispose();
}
return sut;
}
}
}

91
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RecentEventsTests.cs

@ -0,0 +1,91 @@
// ==========================================================================
// Squidex Headless CMS
// ==========================================================================
// Copyright (c) Squidex UG (haftungsbeschraenkt)
// All rights reserved. Licensed under the MIT license.
// ==========================================================================
using System.Globalization;
using Xunit;
namespace Squidex.Infrastructure.EventSourcing
{
public sealed class RecentEventsTests
{
[Fact]
public void Should_add_events()
{
var event1 = CreateEvent(0);
var event2 = CreateEvent(0);
var event3 = CreateEvent(0);
var sut = new RecentEvents(3);
sut.Add(event1.Id, event1.Position);
sut.Add(event2.Id, event2.Position);
sut.Add(event3.Id, event3.Position);
Assert.Equal(sut.EventQueue.ToArray(), new[] { event1, event2, event3 });
}
[Fact]
public void Should_remove_old_events_when_capacity_reached()
{
var event1 = CreateEvent(0);
var event2 = CreateEvent(0);
var event3 = CreateEvent(0);
var sut = new RecentEvents(2);
sut.Add(event1.Id, event1.Position);
sut.Add(event2.Id, event2.Position);
sut.Add(event3.Id, event3.Position);
Assert.Equal(sut.EventQueue.ToArray(), new[] { event2, event3 });
}
[Fact]
public void Should_not_add_events_twice()
{
var event1 = CreateEvent(0);
var event2 = CreateEvent(0);
var sut = new RecentEvents(2);
var added1 = sut.Add(event1.Id, event1.Position);
var added2 = sut.Add(event2.Id, event2.Position);
var added3 = sut.Add(event2.Id, event2.Position);
Assert.Equal(sut.EventQueue.ToArray(), new[] { event1, event2 });
Assert.True(added1);
Assert.True(added2);
Assert.False(added3);
}
[Theory]
[InlineData(0)]
[InlineData(10)]
[InlineData(50)]
[InlineData(100)]
public void Should_serialize_and_deserialize(int count)
{
var source = new RecentEvents();
for (var i = 0; i < count; i++)
{
var @event = CreateEvent(i);
source.Add(@event.Id, @event.Position);
}
var serialized = RecentEvents.Parse(source.ToString());
Assert.Equal(source.EventQueue.ToArray(), serialized.EventQueue.ToArray());
}
private static (Guid Id, string Position) CreateEvent(int position)
{
return (Guid.NewGuid(), position.ToString(CultureInfo.InvariantCulture));
}
}
}

8
backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs

@ -20,10 +20,10 @@ namespace Squidex.Infrastructure.EventSourcing
public RetrySubscriptionTests()
{
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, A<string>._))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>._))
.Returns(eventSubscription);
sut = new RetrySubscription<StoredEvent>(eventSubscriber, s => eventStore.CreateSubscription(s)) { ReconnectWaitMs = 50 };
sut = new RetrySubscription<StoredEvent>(eventSubscriber, s => eventStore.CreateSubscription(s, default)) { ReconnectWaitMs = 50 };
sutSubscriber = sut;
}
@ -32,7 +32,7 @@ namespace Squidex.Infrastructure.EventSourcing
{
sut.Dispose();
A.CallTo(() => eventStore.CreateSubscription(sut, A<string>._, A<string>._))
A.CallTo(() => eventStore.CreateSubscription(sut, A<SubscriptionQuery>._))
.MustHaveHappened();
}
@ -50,7 +50,7 @@ namespace Squidex.Infrastructure.EventSourcing
A.CallTo(() => eventSubscription.Dispose())
.MustHaveHappened(2, Times.Exactly);
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<string>._, A<string>._))
A.CallTo(() => eventStore.CreateSubscription(A<IEventSubscriber<StoredEvent>>._, A<SubscriptionQuery>._))
.MustHaveHappened(2, Times.Exactly);
A.CallTo(() => eventSubscriber.OnErrorAsync(eventSubscription, A<Exception>._))

Loading…
Cancel
Save