diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorIntegrationTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorIntegrationTests.cs new file mode 100644 index 000000000..43a84acd5 --- /dev/null +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/EventConsumerProcessorIntegrationTests.cs @@ -0,0 +1,142 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Microsoft.Extensions.DependencyInjection; +using MongoDB.Driver; +using Squidex.Caching; +using Squidex.Infrastructure.Reflection; +using Squidex.Infrastructure.States; +using Squidex.Infrastructure.TestHelpers; +using Xunit; + +namespace Squidex.Infrastructure.EventSourcing.Consume +{ + public abstract class EventConsumerProcessorIntegrationTests + { + private readonly Lazy store; + + public sealed class EventConsumer : IEventConsumer + { + public List Events { get; } = new List(); + + public string Name => "Consumer"; + + public Func EventReceived { get; set; } + + public async Task On(Envelope @event) + { + Events.Add(@event.Headers.EventId()); + + if (EventReceived != null) + { + await EventReceived(Events.Count); + } + } + } + + protected IEventStore EventStore + { + get => store.Value; + } + + protected EventConsumerProcessorIntegrationTests() + { +#pragma warning disable MA0056 // Do not call overridable members in constructor + store = new Lazy(CreateStore); +#pragma warning restore MA0056 // Do not call overridable members in constructor + } + + public abstract IEventStore CreateStore(); + + [Theory] + [InlineData(0)] + [InlineData(100)] + public async Task Should_subscribe_with_parallel_writes(int startStop) + { + var numTasks = 100; + var numEvents = 50; + + var eventConsumer = new EventConsumer(); + + var mongoClient = new MongoClient(TestConfig.Configuration["mongodb:configuration"]); + var mongoDatabase = mongoClient.GetDatabase(TestConfig.Configuration["mongodb:database"]); + + var typeFactory = new TypeNameRegistry().Map(typeof(MyEvent), "MyEvent"); + + var services = new ServiceCollection() + .AddLogging() + .AddSingleton(TestUtils.DefaultSerializer) + .AddSingleton(EventStore) + .AddSingleton(eventConsumer) + .AddSingleton(mongoClient) + .AddSingleton(mongoDatabase) + .AddSingleton(typeFactory) + .AddSingleton(typeof(IPersistenceFactory<>), typeof(Store<>)) + .AddSingleton() + .AddSingleton(eventConsumer) + .AddSingleton() + .AddSingleton() + .AddSingleton(typeof(ISnapshotStore<>), typeof(MongoSnapshotStore<>)) + .BuildServiceProvider(); + + var processor = services.GetRequiredService(); + + var persistenceFactory = services.GetRequiredService>(); + + // Also start the event consumer, because it might be stopped from previous run. + await processor.InitializeAsync(default); + await processor.ActivateAsync(); + await processor.StartAsync(); + + async Task StartStop() + { + await processor.StopAsync(); + await processor.StartAsync(); + } + + eventConsumer.EventReceived = i => + { + if (startStop > 0 && i % startStop == 0) + { + // Do not await the task here, other wise we could create deadlock. + StartStop().Forget(); + } + + return Task.CompletedTask; + }; + + // Create events in parallel. + await Parallel.ForEachAsync(Enumerable.Range(0, numTasks), async (i, ct) => + { + var persistence = persistenceFactory.WithEventSourcing(typeof(None), DomainId.NewGuid(), null); + + for (var j = 0; j < numEvents; j++) + { + await persistence.WriteEventsAsync(new List> + { + Envelope.Create(new MyEvent()) + }); + } + }); + + var expectedEvents = numEvents * numTasks; + + // Wait for all events to arrive. + using (var cts = new CancellationTokenSource(20000)) + { + while (!cts.IsCancellationRequested && eventConsumer.Events.Count < expectedEvents) + { + await Task.Delay(100); + } + } + + await processor.StopAsync(); + + Assert.Equal(expectedEvents, eventConsumer.Events.Count); + } + } +} diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/MongoEventConsumerProcessorIntegrationTests_Direct.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/MongoEventConsumerProcessorIntegrationTests_Direct.cs new file mode 100644 index 000000000..7aeccdef4 --- /dev/null +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/Consume/MongoEventConsumerProcessorIntegrationTests_Direct.cs @@ -0,0 +1,29 @@ +// ========================================================================== +// Squidex Headless CMS +// ========================================================================== +// Copyright (c) Squidex UG (haftungsbeschraenkt) +// All rights reserved. Licensed under the MIT license. +// ========================================================================== + +using Xunit; + +#pragma warning disable SA1300 // Element should begin with upper-case letter + +namespace Squidex.Infrastructure.EventSourcing.Consume +{ + [Trait("Category", "Dependencies")] + public class MongoEventConsumerProcessorIntegrationTests_Direct : EventConsumerProcessorIntegrationTests, IClassFixture + { + public MongoEventStoreFixture _ { get; } + + public MongoEventConsumerProcessorIntegrationTests_Direct(MongoEventStoreDirectFixture fixture) + { + _ = fixture; + } + + public override IEventStore CreateStore() + { + return _.EventStore; + } + } +} diff --git a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs index 0dfb9b899..1c9f6abb6 100644 --- a/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs +++ b/backend/tests/Squidex.Infrastructure.Tests/EventSourcing/EventStoreTests.cs @@ -232,14 +232,17 @@ namespace Squidex.Infrastructure.EventSourcing { var streamName = $"test-{Guid.NewGuid()}"; + var numTasks = 50; + var numEvents = 100; + // Append and read in parallel. var readEvents = await QueryWithSubscriptionAsync($"^{streamName}", async () => { - await Parallel.ForEachAsync(Enumerable.Range(0, 50), async (i, ct) => + await Parallel.ForEachAsync(Enumerable.Range(0, numTasks), async (i, ct) => { var fullStreamName = $"{streamName}-{Guid.NewGuid()}"; - for (var j = 0; j < 100; j++) + for (var j = 0; j < numEvents; j++) { var commit1 = new[] { @@ -251,7 +254,7 @@ namespace Squidex.Infrastructure.EventSourcing }); }); - Assert.Equal(5000, readEvents?.Count); + Assert.Equal(numEvents * numTasks, readEvents?.Count); } [Fact]