diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs index ddee50b26..52190a8f7 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStore.cs @@ -46,7 +46,7 @@ namespace Squidex.Infrastructure.EventSourcing } } - public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null) + public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null) { Guard.NotNull(streamFilter); diff --git a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs index 6f5be9cee..2192521a6 100644 --- a/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs +++ b/backend/src/Squidex.Infrastructure.GetEventStore/EventSourcing/GetEventStoreSubscription.cs @@ -7,6 +7,7 @@ using EventStore.Client; using Squidex.Infrastructure.Json; +using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.EventSourcing { @@ -16,7 +17,7 @@ namespace Squidex.Infrastructure.EventSourcing private StreamSubscription subscription; public GetEventStoreSubscription( - IEventSubscriber subscriber, + IEventSubscriber eventSubscriber, EventStoreClient client, EventStoreProjectionClient projectionClient, IJsonSerializer serializer, @@ -35,7 +36,7 @@ namespace Squidex.Infrastructure.EventSourcing { var storedEvent = Formatter.Read(@event, prefix, serializer); - await subscriber.OnEventAsync(this, storedEvent); + await eventSubscriber.OnNextAsync(this, storedEvent); } void OnError(StreamSubscription subscription, SubscriptionDroppedReason reason, Exception? ex) @@ -45,7 +46,7 @@ namespace Squidex.Infrastructure.EventSourcing { ex ??= new InvalidOperationException($"Subscription closed with reason {reason}."); - subscriber.OnErrorAsync(this, ex); + eventSubscriber.OnErrorAsync(this, ex).AsTask().Forget(); } } @@ -76,5 +77,14 @@ namespace Squidex.Infrastructure.EventSourcing cts.Cancel(); } + + public ValueTask CompleteAsync() + { + return default; + } + + public void WakeUp() + { + } } } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs index 1ce345beb..a2d23e6cc 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStoreSubscription.cs @@ -15,10 +15,10 @@ namespace Squidex.Infrastructure.EventSourcing public sealed class MongoEventStoreSubscription : IEventSubscription { private readonly MongoEventStore eventStore; - private readonly IEventSubscriber eventSubscriber; + private readonly IEventSubscriber eventSubscriber; private readonly CancellationTokenSource stopToken = new CancellationTokenSource(); - public MongoEventStoreSubscription(MongoEventStore eventStore, IEventSubscriber eventSubscriber, string? streamFilter, string? position) + public MongoEventStoreSubscription(MongoEventStore eventStore, IEventSubscriber eventSubscriber, string? streamFilter, string? position) { this.eventStore = eventStore; this.eventSubscriber = eventSubscriber; @@ -86,7 +86,7 @@ namespace Squidex.Infrastructure.EventSourcing { foreach (var storedEvent in change.FullDocument.Filtered(lastPosition)) { - await eventSubscriber.OnEventAsync(this, storedEvent); + await eventSubscriber.OnNextAsync(this, storedEvent); } } @@ -123,7 +123,7 @@ namespace Squidex.Infrastructure.EventSourcing } else { - await eventSubscriber.OnEventAsync(this, storedEvent); + await eventSubscriber.OnNextAsync(this, storedEvent); lastRawPosition = storedEvent.EventPosition; } @@ -156,5 +156,14 @@ namespace Squidex.Infrastructure.EventSourcing { stopToken.Cancel(); } + + public ValueTask CompleteAsync() + { + return default; + } + + public void WakeUp() + { + } } } diff --git a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs index 50d98576a..f703bf452 100644 --- a/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs +++ b/backend/src/Squidex.Infrastructure.MongoDb/EventSourcing/MongoEventStore_Reader.cs @@ -21,7 +21,7 @@ namespace Squidex.Infrastructure.EventSourcing { private static readonly List EmptyEvents = new List(); - public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null) + public IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null) { Guard.NotNull(subscriber); diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Consume/BatchSubscriber.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/BatchSubscriber.cs deleted file mode 100644 index 6760bc602..000000000 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Consume/BatchSubscriber.cs +++ /dev/null @@ -1,154 +0,0 @@ -// ========================================================================== -// Squidex Headless CMS -// ========================================================================== -// Copyright (c) Squidex UG (haftungsbeschraenkt) -// All rights reserved. Licensed under the MIT license. -// ========================================================================== - -using System.Threading.Channels; -using Squidex.Infrastructure.Tasks; - -#pragma warning disable RECS0082 // Parameter has the same name as a member and hides it -#pragma warning disable SA1313 // Parameter names should begin with lower-case letter - -namespace Squidex.Infrastructure.EventSourcing.Consume -{ - internal sealed class BatchSubscriber : IEventSubscriber, IEventSubscription - { - private readonly IEventSubscription eventSubscription; - private readonly Channel taskQueue; - private readonly Channel parseQueue; - private readonly Task handleTask; - private readonly CancellationTokenSource completed = new CancellationTokenSource(); - - private sealed record EventSource(StoredEvent StoredEvent); - private sealed record BatchItem(Envelope? Event, string Position); - private sealed record BatchJob(BatchItem[] Items); - private sealed record ErrorJob(Exception Exception); - - public BatchSubscriber( - EventConsumerProcessor processor, - IEventFormatter eventFormatter, - IEventConsumer eventConsumer, - Func factory) - { - eventSubscription = factory(this); - - var batchSize = Math.Max(1, eventConsumer.BatchSize); - var batchDelay = Math.Max(100, eventConsumer.BatchDelay); - - parseQueue = Channel.CreateBounded(new BoundedChannelOptions(batchSize) - { - AllowSynchronousContinuations = true, - SingleReader = true, - SingleWriter = true - }); - - taskQueue = Channel.CreateBounded(new BoundedChannelOptions(2) - { - SingleReader = true, - SingleWriter = true - }); - - var batchQueue = Channel.CreateBounded(new BoundedChannelOptions(batchSize) - { - AllowSynchronousContinuations = true, - SingleReader = true, - SingleWriter = true - }); - -#pragma warning disable MA0040 // Flow the cancellation token - batchQueue.Batch(taskQueue, x => new BatchJob(x.ToArray()), batchSize, batchDelay); - - Task.Run(async () => - { - await foreach (var source in parseQueue.Reader.ReadAllAsync(completed.Token)) - { - var storedEvent = source.StoredEvent; - try - { - Envelope? @event = null; - - if (eventConsumer.Handles(storedEvent)) - { - @event = eventFormatter.ParseIfKnown(storedEvent); - } - - await batchQueue.Writer.WriteAsync(new BatchItem(@event, storedEvent.EventPosition), completed.Token); - } - catch (Exception ex) - { - await taskQueue.Writer.WriteAsync(new ErrorJob(ex), completed.Token); - } - } - }).ContinueWith(x => batchQueue.Writer.TryComplete(x.Exception)); -#pragma warning restore MA0040 // Flow the cancellation token - - handleTask = Run(processor); - } - - private async Task Run(EventConsumerProcessor processor) - { - try - { - await foreach (var task in taskQueue.Reader.ReadAllAsync(completed.Token)) - { - switch (task) - { - case ErrorJob { Exception: not OperationCanceledException } error: - { - await processor.OnErrorAsync(this, error.Exception); - break; - } - - case BatchJob batch: - { - var eventsPosition = batch.Items[^1].Position; - var eventsCollection = batch.Items.Select(x => x.Event).NotNull().ToList(); - - await processor.OnEventsAsync(this, eventsCollection, eventsPosition); - break; - } - } - } - } - catch (OperationCanceledException) - { - return; - } - catch (Exception ex) - { - await processor.OnErrorAsync(this, ex); - } - } - - public Task CompleteAsync() - { - parseQueue.Writer.TryComplete(); - - return handleTask; - } - - public void Dispose() - { - completed.Cancel(); - - eventSubscription.Dispose(); - } - - public void WakeUp() - { - eventSubscription.WakeUp(); - } - - ValueTask IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) - { - return parseQueue.Writer.WriteAsync(new EventSource(storedEvent), completed.Token); - } - - ValueTask IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) - { - return taskQueue.Writer.WriteAsync(new ErrorJob(exception), completed.Token); - } - } -} diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Consume/BatchSubscription.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/BatchSubscription.cs new file mode 100644 index 000000000..dcb5a1a5a --- /dev/null +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/BatchSubscription.cs @@ -0,0 +1,160 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Threading.Channels; +using Squidex.Infrastructure.Tasks; + +namespace Squidex.Infrastructure.EventSourcing.Consume +{ + internal sealed class BatchSubscription : IEventSubscriber, IEventSubscription + { + private readonly IEventSubscription eventSubscription; + private readonly Channel taskQueue; + private readonly Channel batchQueue; + private readonly Task handleTask; + private readonly CancellationTokenSource completed = new CancellationTokenSource(); + + public BatchSubscription( + IEventConsumer eventConsumer, + IEventSubscriber eventSubscriber, + EventSubscriptionSource eventSource) + { + eventSubscription = eventSource(this); + + var batchSize = Math.Max(1, eventConsumer.BatchSize); + var batchDelay = Math.Max(100, eventConsumer.BatchDelay); + + taskQueue = Channel.CreateBounded(new BoundedChannelOptions(2) + { + SingleReader = true, + SingleWriter = true + }); + + batchQueue = Channel.CreateBounded(new BoundedChannelOptions(batchSize) + { + AllowSynchronousContinuations = true, + SingleReader = true, + SingleWriter = true + }); + + batchQueue.Batch(taskQueue, batchSize, batchDelay, completed.Token); + + handleTask = Run(eventSubscriber); + } + + private async Task Run(IEventSubscriber eventSink) + { + try + { + var isStopped = false; + + await foreach (var task in taskQueue.Reader.ReadAllAsync(completed.Token)) + { + switch (task) + { + case Exception exception when exception is not OperationCanceledException: + { + if (!completed.IsCancellationRequested) + { + await eventSink.OnErrorAsync(this, exception); + } + + isStopped = true; + break; + } + + case List batch: + { + if (!completed.IsCancellationRequested) + { + // Events can be null if the event consumer is not interested in the stored event. + var eventList = batch.Select(x => x.Event).NotNull().ToList(); + var eventPosition = batch[^1].Position; + + // Use a struct here to save a few allocations. + await eventSink.OnNextAsync(this, new ParsedEvents(eventList, eventPosition)); + } + + break; + } + } + + if (isStopped) + { + break; + } + } + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + if (!completed.IsCancellationRequested) + { + await eventSink.OnErrorAsync(this, ex); + } + } + } + + public void Dispose() + { + if (completed.IsCancellationRequested) + { + return; + } + + // It is not necessary to dispose the cancellation token source. + completed.Cancel(); + + // We do not lock here, it is the responsibility of the source subscription to be thread safe. + eventSubscription.Dispose(); + } + + public async ValueTask CompleteAsync() + { + await eventSubscription.CompleteAsync(); + + batchQueue.Writer.TryComplete(); + + await handleTask; + } + + public void WakeUp() + { + eventSubscription.WakeUp(); + } + + async ValueTask IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) + { + try + { + // Forward the exception from one task only, but bypass the batch. + await taskQueue.Writer.WriteAsync(exception, completed.Token); + } + catch (ChannelClosedException) + { + // This exception is acceptable and happens when an exception has been thrown before. + return; + } + } + + async ValueTask IEventSubscriber.OnNextAsync(IEventSubscription subscription, ParsedEvent @event) + { + try + { + await batchQueue.Writer.WriteAsync(@event, completed.Token); + } + catch (ChannelClosedException) + { + // This exception is acceptable and happens when an exception has been thrown before. + return; + } + } + } +} diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerProcessor.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerProcessor.cs index 876b49a38..9e165bff8 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerProcessor.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/EventConsumerProcessor.cs @@ -12,11 +12,11 @@ using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.EventSourcing.Consume { - public class EventConsumerProcessor + public class EventConsumerProcessor : IEventSubscriber { private readonly SimpleState state; private readonly IEventFormatter eventFormatter; - private readonly IEventConsumer? eventConsumer; + private readonly IEventConsumer eventConsumer; private readonly IEventStore eventStore; private readonly ILogger log; private readonly AsyncLock asyncLock = new AsyncLock(); @@ -51,7 +51,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume public virtual async Task CompleteAsync() { - if (currentSubscription is BatchSubscriber batchSubscriber) + if (currentSubscription is BatchSubscription batchSubscriber) { try { @@ -64,24 +64,24 @@ namespace Squidex.Infrastructure.EventSourcing.Consume } } - public virtual Task OnEventsAsync(IEventSubscription subscription, IReadOnlyList> events, string position) + public virtual async ValueTask OnNextAsync(IEventSubscription subscription, ParsedEvents @event) { - return UpdateAsync(async () => + await UpdateAsync(async () => { if (!ReferenceEquals(subscription, currentSubscription)) { return; } - await DispatchAsync(events); + await DispatchAsync(@event.Events); - State = State.Handled(position, events.Count); + State = State.Handled(@event.Position, @event.Events.Count); }, State.Position); } - public virtual Task OnErrorAsync(IEventSubscription subscription, Exception exception) + public virtual async ValueTask OnErrorAsync(IEventSubscription subscription, Exception exception) { - return UpdateAsync(() => + await UpdateAsync(() => { if (!ReferenceEquals(subscription, currentSubscription)) { @@ -238,7 +238,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume { if (currentSubscription == null) { - currentSubscription = CreateSubscription(); + currentSubscription = CreateRetrySubscription(this); } else { @@ -246,17 +246,18 @@ namespace Squidex.Infrastructure.EventSourcing.Consume } } - private IEventSubscription CreateSubscription() + protected IEventSubscription CreatePipeline(IEventSubscriber subscriber) { - return new BatchSubscriber(this, eventFormatter, eventConsumer!, CreateRetrySubscription); + return new BatchSubscription(eventConsumer, subscriber, + x => new ParseSubscription(eventConsumer, eventFormatter, x, CreateSubscription)); } - protected virtual IEventSubscription CreateRetrySubscription(IEventSubscriber subscriber) + protected virtual IEventSubscription CreateRetrySubscription(IEventSubscriber subscriber) { - return new RetrySubscription(subscriber, CreateSubscription); + return new RetrySubscription(subscriber, CreatePipeline); } - protected virtual IEventSubscription CreateSubscription(IEventSubscriber subscriber) + protected virtual IEventSubscription CreateSubscription(IEventSubscriber subscriber) { return eventStore.CreateSubscription(subscriber, eventConsumer!.EventsFilter, State.Position); } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParseSubscription.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParseSubscription.cs new file mode 100644 index 000000000..b060eed6b --- /dev/null +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParseSubscription.cs @@ -0,0 +1,149 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using System.Threading.Channels; + +namespace Squidex.Infrastructure.EventSourcing.Consume +{ + internal sealed class ParseSubscription : IEventSubscriber, IEventSubscription + { + private readonly Channel deserializeQueue; + private readonly CancellationTokenSource completed = new CancellationTokenSource(); + private readonly Task deserializeTask; + private readonly IEventSubscription eventSubscription; + + public ParseSubscription( + IEventConsumer eventConsumer, + IEventFormatter eventFormatter, + IEventSubscriber eventSubscriber, + EventSubscriptionSource eventSource) + { + eventSubscription = eventSource(this); + + deserializeQueue = Channel.CreateBounded(new BoundedChannelOptions(2) + { + AllowSynchronousContinuations = true, + SingleReader = true, + SingleWriter = true + }); + +#pragma warning disable MA0040 // Flow the cancellation token + deserializeTask = Task.Run(async () => + { + try + { + var isFailed = false; + + await foreach (var input in deserializeQueue.Reader.ReadAllAsync(completed.Token)) + { + switch (input) + { + case Exception exception: + { + // Not very likely that the task is cancelled. + await eventSubscriber.OnErrorAsync(this, exception); + + isFailed = true; + break; + } + + case StoredEvent storedEvent: + { + Envelope? @event = null; + + if (eventConsumer.Handles(storedEvent)) + { + @event = eventFormatter.ParseIfKnown(storedEvent); + } + + // Parsing takes a little bit of time, so the task might have been cancelled. + if (!completed.IsCancellationRequested) + { + // Also invoke the subscriber if the event is null to update the position. + await eventSubscriber.OnNextAsync(this, new ParsedEvent(@event, storedEvent.EventPosition)); + } + + break; + } + } + + if (isFailed) + { + break; + } + } + } + catch (OperationCanceledException) + { + return; + } + catch (Exception ex) + { + if (!completed.IsCancellationRequested) + { + await eventSubscriber.OnErrorAsync(this, ex); + } + } + }).ContinueWith(x => deserializeQueue.Writer.TryComplete(x.Exception)); + } + + public void Dispose() + { + if (completed.IsCancellationRequested) + { + return; + } + + // It is not necessary to dispose the cancellation token source. + completed.Cancel(); + + // We do not lock here, it is the responsibility of the source subscription to be thread safe. + eventSubscription.Dispose(); + } + + public async ValueTask CompleteAsync() + { + await eventSubscription.CompleteAsync(); + + deserializeQueue.Writer.TryComplete(); + + await deserializeTask; + } + + public void WakeUp() + { + eventSubscription.WakeUp(); + } + + async ValueTask IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) + { + try + { + // Forward the exception from one task only. + await deserializeQueue.Writer.WriteAsync(exception, completed.Token); + } + catch (ChannelClosedException) + { + // This exception is acceptable and happens when an exception has been thrown before. + return; + } + } + + async ValueTask IEventSubscriber.OnNextAsync(IEventSubscription subscription, StoredEvent @event) + { + try + { + await deserializeQueue.Writer.WriteAsync(@event, completed.Token); + } + catch (ChannelClosedException) + { + // This exception is acceptable and happens when an exception has been thrown before. + return; + } + } + } +} diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParsedEvent.cs b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParsedEvent.cs new file mode 100644 index 000000000..f7f62bc29 --- /dev/null +++ b/backend/src/Squidex.Infrastructure/EventSourcing/Consume/ParsedEvent.cs @@ -0,0 +1,16 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +#pragma warning disable SA1313 // Parameter names should begin with lower-case letter +#pragma warning disable MA0048 // File name must match type name + +namespace Squidex.Infrastructure.EventSourcing.Consume +{ + public record struct ParsedEvent(Envelope? Event, string Position); + + public record struct ParsedEvents(List> Events, string Position); +} diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs index 90f9ce77f..0b81586ed 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/IEventStore.cs @@ -35,7 +35,7 @@ namespace Squidex.Infrastructure.EventSourcing Task DeleteStreamAsync(string streamName, CancellationToken ct = default); - IEventSubscription CreateSubscription(IEventSubscriber subscriber, string? streamFilter = null, string? position = null); + IEventSubscription CreateSubscription(IEventSubscriber eventSubscriber, string? streamFilter = null, string? position = null); async Task AppendUnsafeAsync(IEnumerable commits, CancellationToken ct = default) diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/IEventSubscriber.cs b/backend/src/Squidex.Infrastructure/EventSourcing/IEventSubscriber.cs index c4c25abdf..6e9412111 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/IEventSubscriber.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/IEventSubscriber.cs @@ -5,11 +5,15 @@ // All rights reserved. Licensed under the MIT license. // ========================================================================== +#pragma warning disable MA0048 // File name must match type name + namespace Squidex.Infrastructure.EventSourcing { - public interface IEventSubscriber + public delegate IEventSubscription EventSubscriptionSource(IEventSubscriber target); + + public interface IEventSubscriber { - ValueTask OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent); + ValueTask OnNextAsync(IEventSubscription subscription, T @event); ValueTask OnErrorAsync(IEventSubscription subscription, Exception exception); } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs b/backend/src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs index 1402f8fae..d6f8d8cd2 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/IEventSubscription.cs @@ -1,4 +1,4 @@ -// ========================================================================== +// ========================================================================== // Squidex Headless CMS // ========================================================================== // Copyright (c) Squidex UG (haftungsbeschraenkt) @@ -9,8 +9,8 @@ namespace Squidex.Infrastructure.EventSourcing { public interface IEventSubscription : IDisposable { - void WakeUp() - { - } + void WakeUp(); + + ValueTask CompleteAsync(); } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs b/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs index c8e14eb62..927240c74 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/PollingSubscription.cs @@ -16,7 +16,7 @@ namespace Squidex.Infrastructure.EventSourcing public PollingSubscription( IEventStore eventStore, - IEventSubscriber eventSubscriber, + IEventSubscriber eventSubscriber, string? streamFilter, string? position) { @@ -26,7 +26,7 @@ namespace Squidex.Infrastructure.EventSourcing { await foreach (var storedEvent in eventStore.QueryAllAsync(streamFilter, position, ct: ct)) { - await eventSubscriber.OnEventAsync(this, storedEvent); + await eventSubscriber.OnNextAsync(this, storedEvent); position = storedEvent.EventPosition; } @@ -38,14 +38,19 @@ namespace Squidex.Infrastructure.EventSourcing }); } - public void WakeUp() + public ValueTask CompleteAsync() { - timer.SkipCurrentDelay(); + return new ValueTask(timer.StopAsync()); } public void Dispose() { timer.StopAsync().Forget(); } + + public void WakeUp() + { + timer.SkipCurrentDelay(); + } } } diff --git a/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs b/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs index 75653bbe4..1a6d07de9 100644 --- a/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs +++ b/backend/src/Squidex.Infrastructure/EventSourcing/RetrySubscription.cs @@ -9,24 +9,25 @@ using Squidex.Infrastructure.Tasks; namespace Squidex.Infrastructure.EventSourcing { - public sealed class RetrySubscription : IEventSubscription, IEventSubscriber + public sealed class RetrySubscription : IEventSubscription, IEventSubscriber { private readonly RetryWindow retryWindow = new RetryWindow(TimeSpan.FromMinutes(5), 5); private readonly AsyncLock lockObject = new AsyncLock(); - private readonly IEventSubscriber eventSubscriber; - private readonly Func eventSubscriptionFactory; + private readonly IEventSubscriber eventSubscriber; + private readonly EventSubscriptionSource eventSource; private CancellationTokenSource timerCancellation = new CancellationTokenSource(); private IEventSubscription? currentSubscription; public int ReconnectWaitMs { get; set; } = 5000; - public RetrySubscription(IEventSubscriber eventSubscriber, Func eventSubscriptionFactory) + public RetrySubscription(IEventSubscriber eventSubscriber, + EventSubscriptionSource eventSource) { Guard.NotNull(eventSubscriber); - Guard.NotNull(eventSubscriptionFactory); + Guard.NotNull(eventSource); this.eventSubscriber = eventSubscriber; - this.eventSubscriptionFactory = eventSubscriptionFactory; + this.eventSource = eventSource; Subscribe(); } @@ -48,7 +49,7 @@ namespace Squidex.Infrastructure.EventSourcing return; } - currentSubscription = eventSubscriptionFactory(this); + currentSubscription = eventSource(this); } private void Unsubscribe() @@ -72,7 +73,12 @@ namespace Squidex.Infrastructure.EventSourcing currentSubscription?.WakeUp(); } - async ValueTask IEventSubscriber.OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + public ValueTask CompleteAsync() + { + return currentSubscription?.CompleteAsync() ?? default; + } + + async ValueTask IEventSubscriber.OnNextAsync(IEventSubscription subscription, T @event) { using (await lockObject.EnterAsync(default)) { @@ -81,11 +87,11 @@ namespace Squidex.Infrastructure.EventSourcing return; } - await eventSubscriber.OnEventAsync(this, storedEvent); + await eventSubscriber.OnNextAsync(this, @event); } } - async ValueTask IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) + async ValueTask IEventSubscriber.OnErrorAsync(IEventSubscription subscription, Exception exception) { if (exception is OperationCanceledException) { diff --git a/backend/src/Squidex.Infrastructure/Tasks/AsyncHelper.cs b/backend/src/Squidex.Infrastructure/Tasks/AsyncHelper.cs index f514adc83..5e964a30c 100644 --- a/backend/src/Squidex.Infrastructure/Tasks/AsyncHelper.cs +++ b/backend/src/Squidex.Infrastructure/Tasks/AsyncHelper.cs @@ -51,37 +51,14 @@ namespace Squidex.Infrastructure.Tasks .GetResult(); } - public static async ValueTask WhenAllThrottledAsync(IEnumerable source, Func action, int maxDegreeOfParallelism = 0, - CancellationToken ct = default) - { - if (maxDegreeOfParallelism <= 0) - { - maxDegreeOfParallelism = Environment.ProcessorCount * 2; - } - - var semaphore = new SemaphoreSlim(maxDegreeOfParallelism); - - foreach (var item in source) - { - await semaphore.WaitAsync(ct); - try - { - await action(item, ct); - } - finally - { - semaphore.Release(); - } - } - } - - public static void Batch(this Channel source, Channel target, Func, TOut> converter, int batchSize, int timeout, + public static void Batch(this Channel source, Channel target, int batchSize, int timeout, CancellationToken ct = default) { Task.Run(async () => { var batch = new List(batchSize); + // Just a marker object to force sending out new batches. var force = new object(); await using var timer = new Timer(_ => source.Writer.TryWrite(force)); @@ -90,19 +67,24 @@ namespace Squidex.Infrastructure.Tasks { if (batch.Count > 0) { - await target.Writer.WriteAsync(converter(batch), ct); - batch.Clear(); + await target.Writer.WriteAsync(batch, ct); + + // Create a new batch, because the value is shared and might be processes by another concurrent task. + batch = new List(); } } + // Exceptions usually that the process was stopped and the channel closed, therefore we do not catch them. await foreach (var item in source.Reader.ReadAllAsync(ct)) { if (ReferenceEquals(item, force)) { + // Our item is the marker object from the timer. await TrySendAsync(); } else if (item is TIn typed) { + // The timeout just with the last event and should push events out if no further events are received. timer.Change(timeout, Timeout.Infinite); batch.Add(typed); diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs index d0372a502..e0eaf6368 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorTests.cs @@ -18,7 +18,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume { public sealed class MyEventConsumerProcessor : EventConsumerProcessor { - public IEventSubscriber? Subscriber { get; set; } + public IEventSubscriber? Subscriber { get; set; } public MyEventConsumerProcessor( IPersistenceFactory persistenceFactory, @@ -30,12 +30,12 @@ namespace Squidex.Infrastructure.EventSourcing.Consume { } - protected override IEventSubscription CreateRetrySubscription(IEventSubscriber subscriber) + protected override IEventSubscription CreateRetrySubscription(IEventSubscriber subscriber) { - return CreateSubscription(subscriber); + return CreatePipeline(subscriber); } - protected override IEventSubscription CreateSubscription(IEventSubscriber subscriber) + protected override IEventSubscription CreateSubscription(IEventSubscriber subscriber) { Subscriber = subscriber; @@ -65,7 +65,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume } }; - A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .Returns(eventSubscription); A.CallTo(() => eventConsumer.Name) @@ -108,7 +108,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume AssertGrainState(isStopped: true, position: initialPosition); - A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustNotHaveHappened(); } @@ -122,7 +122,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume AssertGrainState(isStopped: false, position: initialPosition); - A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappenedOnceExactly(); } @@ -138,7 +138,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume AssertGrainState(isStopped: false, position: initialPosition); - A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappenedOnceExactly(); } @@ -152,7 +152,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume AssertGrainState(isStopped: false, position: initialPosition); - A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappenedOnceExactly(); } @@ -198,10 +198,10 @@ namespace Squidex.Infrastructure.EventSourcing.Consume A.CallTo(() => eventSubscription.Dispose()) .MustHaveHappenedOnceExactly(); - A.CallTo(() => eventStore.CreateSubscription(A._, A._, state.Snapshot.Position)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, state.Snapshot.Position)) .MustHaveHappenedOnceExactly(); - A.CallTo(() => eventStore.CreateSubscription(A._, A._, null)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, null)) .MustHaveHappenedOnceExactly(); } @@ -211,7 +211,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); AssertGrainState(isStopped: false, position: storedEvent.EventPosition, count: 1); @@ -232,11 +232,11 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, storedEvent); - await OnEventAsync(eventSubscription, storedEvent); - await OnEventAsync(eventSubscription, storedEvent); - await OnEventAsync(eventSubscription, storedEvent); - await OnEventAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); @@ -258,11 +258,11 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, storedEvent); - await OnEventAsync(eventSubscription, storedEvent); - await OnEventAsync(eventSubscription, storedEvent); - await OnEventAsync(eventSubscription, storedEvent); - await OnEventAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); @@ -284,7 +284,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); AssertGrainState(isStopped: false, position: storedEvent.EventPosition); @@ -305,7 +305,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); AssertGrainState(isStopped: false, position: storedEvent.EventPosition); @@ -323,7 +323,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await sut.OnEventsAsync(A.Fake(), new[] { envelope }.ToList(), storedEvent.EventPosition); + await sut.OnNextAsync(A.Fake(), new ParsedEvents(new[] { envelope }.ToList(), storedEvent.EventPosition)); await sut.CompleteAsync(); AssertGrainState(isStopped: false, position: initialPosition); @@ -416,7 +416,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); AssertGrainState(isStopped: true, position: initialPosition, error: ex.Message); @@ -442,7 +442,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); AssertGrainState(isStopped: true, position: initialPosition, error: ex.Message); @@ -468,7 +468,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); await sut.StopAsync(); @@ -486,7 +486,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume A.CallTo(() => eventSubscription.Dispose()) .MustHaveHappenedOnceExactly(); - A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappened(2, Times.Exactly); } @@ -501,7 +501,7 @@ namespace Squidex.Infrastructure.EventSourcing.Consume await sut.InitializeAsync(default); await sut.ActivateAsync(); - await OnEventAsync(eventSubscription, storedEvent); + await OnNextAsync(eventSubscription, storedEvent); await sut.CompleteAsync(); AssertGrainState(isStopped: true, position: storedEvent.EventPosition, error: ex.Message, 1); @@ -512,9 +512,9 @@ namespace Squidex.Infrastructure.EventSourcing.Consume return sut.Subscriber?.OnErrorAsync(subscription, exception) ?? default; } - private ValueTask OnEventAsync(IEventSubscription subscription, StoredEvent @event) + private ValueTask OnNextAsync(IEventSubscription subscription, StoredEvent @event) { - return sut.Subscriber?.OnEventAsync(subscription, @event) ?? default; + return sut.Subscriber?.OnNextAsync(subscription, @event) ?? default; } private void AssertGrainState(bool isStopped = false, string? position = null, string? error = null, int count = 0) diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs index 0611da061..75468c10d 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs @@ -16,7 +16,7 @@ namespace Squidex.Infrastructure.EventSourcing private readonly Lazy sut; private string subscriptionPosition; - public sealed class EventSubscriber : IEventSubscriber + public sealed class EventSubscriber : IEventSubscriber { public List Events { get; } = new List(); @@ -26,16 +26,20 @@ namespace Squidex.Infrastructure.EventSourcing { } + public void WakeUp() + { + } + public ValueTask OnErrorAsync(IEventSubscription subscription, Exception exception) { throw exception; } - public ValueTask OnEventAsync(IEventSubscription subscription, StoredEvent storedEvent) + public ValueTask OnNextAsync(IEventSubscription subscription, StoredEvent @event) { - LastPosition = storedEvent.EventPosition; + LastPosition = @event.EventPosition; - Events.Add(storedEvent); + Events.Add(@event); return default; } diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs index df0682b60..e68fbb0ce 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/PollingSubscriptionTests.cs @@ -13,7 +13,7 @@ namespace Squidex.Infrastructure.EventSourcing public class PollingSubscriptionTests { private readonly IEventStore eventStore = A.Fake(); - private readonly IEventSubscriber eventSubscriber = A.Fake(); + private readonly IEventSubscriber eventSubscriber = A.Fake>(); private readonly string position = Guid.NewGuid().ToString(); [Fact] diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs index 3f9f0b065..e3a342417 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/RetrySubscriptionTests.cs @@ -13,17 +13,17 @@ namespace Squidex.Infrastructure.EventSourcing public class RetrySubscriptionTests { private readonly IEventStore eventStore = A.Fake(); - private readonly IEventSubscriber eventSubscriber = A.Fake(); + private readonly IEventSubscriber eventSubscriber = A.Fake>(); private readonly IEventSubscription eventSubscription = A.Fake(); - private readonly IEventSubscriber sutSubscriber; - private readonly RetrySubscription sut; + private readonly IEventSubscriber sutSubscriber; + private readonly RetrySubscription sut; public RetrySubscriptionTests() { - A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .Returns(eventSubscription); - sut = new RetrySubscription(eventSubscriber, s => eventStore.CreateSubscription(s)) { ReconnectWaitMs = 50 }; + sut = new RetrySubscription(eventSubscriber, s => eventStore.CreateSubscription(s)) { ReconnectWaitMs = 50 }; sutSubscriber = sut; } @@ -49,7 +49,7 @@ namespace Squidex.Infrastructure.EventSourcing A.CallTo(() => eventSubscription.Dispose()) .MustHaveHappened(2, Times.Exactly); - A.CallTo(() => eventStore.CreateSubscription(A._, A._, A._)) + A.CallTo(() => eventStore.CreateSubscription(A>._, A._, A._)) .MustHaveHappened(2, Times.Exactly); A.CallTo(() => eventSubscriber.OnErrorAsync(eventSubscription, A._)) @@ -92,11 +92,11 @@ namespace Squidex.Infrastructure.EventSourcing { var @event = new StoredEvent("Stream", "1", 2, new EventData("Type", new EnvelopeHeaders(), "Payload")); - await OnEventAsync(eventSubscription, @event); + await OnNextAsync(eventSubscription, @event); sut.Dispose(); - A.CallTo(() => eventSubscriber.OnEventAsync(sut, @event)) + A.CallTo(() => eventSubscriber.OnNextAsync(sut, @event)) .MustHaveHappened(); } @@ -105,11 +105,11 @@ namespace Squidex.Infrastructure.EventSourcing { var @event = new StoredEvent("Stream", "1", 2, new EventData("Type", new EnvelopeHeaders(), "Payload")); - await OnEventAsync(A.Fake(), @event); + await OnNextAsync(A.Fake(), @event); sut.Dispose(); - A.CallTo(() => eventSubscriber.OnEventAsync(A._, A._)) + A.CallTo(() => eventSubscriber.OnNextAsync(A._, A._)) .MustNotHaveHappened(); } @@ -118,9 +118,9 @@ namespace Squidex.Infrastructure.EventSourcing return sutSubscriber.OnErrorAsync(subscriber, ex); } - private ValueTask OnEventAsync(IEventSubscription subscriber, StoredEvent ev) + private ValueTask OnNextAsync(IEventSubscription subscriber, StoredEvent ev) { - return sutSubscriber.OnEventAsync(subscriber, ev); + return sutSubscriber.OnNextAsync(subscriber, ev); } } }