mirror of https://github.com/Squidex/squidex.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
149 lines
4.8 KiB
149 lines
4.8 KiB
// ==========================================================================
|
|
// Squidex Headless CMS
|
|
// ==========================================================================
|
|
// Copyright (c) Squidex UG (haftungsbeschraenkt)
|
|
// All rights reserved. Licensed under the MIT license.
|
|
// ==========================================================================
|
|
|
|
using Microsoft.Extensions.DependencyInjection;
|
|
using Squidex.Caching;
|
|
using Squidex.Events;
|
|
using Squidex.Infrastructure;
|
|
using Squidex.Infrastructure.EventSourcing;
|
|
using Squidex.Infrastructure.EventSourcing.Consume;
|
|
using Squidex.Infrastructure.Reflection;
|
|
using Squidex.Infrastructure.States;
|
|
using Squidex.Infrastructure.TestHelpers;
|
|
|
|
#pragma warning disable MA0040 // Forward the CancellationToken parameter to methods that take one
|
|
|
|
namespace Squidex.MongoDb.Infrastructure;
|
|
|
|
public abstract class EventConsumerProcessorIntegrationTests
|
|
{
|
|
private readonly Lazy<IEventStore> store;
|
|
|
|
public sealed class EventConsumer : IEventConsumer
|
|
{
|
|
public List<Guid> Events { get; } = [];
|
|
|
|
public string Name => "Consumer";
|
|
|
|
public Func<int, Task> EventReceived { get; set; }
|
|
|
|
public async Task On(Envelope<IEvent> @event)
|
|
{
|
|
Events.Add(@event.Headers.EventId());
|
|
|
|
if (EventReceived != null)
|
|
{
|
|
await EventReceived(Events.Count);
|
|
}
|
|
}
|
|
}
|
|
|
|
private class MyEvent : IEvent
|
|
{
|
|
}
|
|
|
|
protected IEventStore EventStore
|
|
{
|
|
get => store.Value;
|
|
}
|
|
|
|
protected EventConsumerProcessorIntegrationTests()
|
|
{
|
|
#pragma warning disable MA0056 // Do not call overridable members in constructor
|
|
store = new Lazy<IEventStore>(CreateStore);
|
|
#pragma warning restore MA0056 // Do not call overridable members in constructor
|
|
}
|
|
|
|
public abstract IEventStore CreateStore();
|
|
|
|
[Theory]
|
|
[InlineData(0)]
|
|
[InlineData(100)]
|
|
public async Task Should_subscribe_with_parallel_writes(int startStop)
|
|
{
|
|
var numTasks = 100;
|
|
var numEvents = 50;
|
|
|
|
var eventConsumer = new EventConsumer();
|
|
|
|
var mongoClient = MongoClientFactory.Create(TestConfig.Configuration["mongoDb:configurationDirect"]);
|
|
var mongoDatabase = mongoClient.GetDatabase(TestConfig.Configuration["mongodb:database"]);
|
|
|
|
var typeRegistry = new TypeRegistry().Add<IEvent, MyEvent>("MyEvent");
|
|
|
|
var services = new ServiceCollection()
|
|
.AddLogging()
|
|
.AddSingleton(TestUtils.DefaultSerializer)
|
|
.AddSingleton(EventStore)
|
|
.AddSingleton(eventConsumer)
|
|
.AddSingleton(mongoClient)
|
|
.AddSingleton(mongoDatabase)
|
|
.AddSingleton(typeRegistry)
|
|
.AddSingleton(typeof(IPersistenceFactory<>), typeof(Store<>))
|
|
.AddSingleton<EventConsumerProcessor>()
|
|
.AddSingleton<IEventConsumer>(eventConsumer)
|
|
.AddSingleton<IEventFormatter, DefaultEventFormatter>()
|
|
.AddSingleton<IEventStreamNames, DefaultEventStreamNames>()
|
|
.AddSingleton(typeof(ISnapshotStore<>), typeof(MongoSnapshotStore<>))
|
|
.BuildServiceProvider();
|
|
|
|
var processor = services.GetRequiredService<EventConsumerProcessor>();
|
|
|
|
var persistenceFactory = services.GetRequiredService<IPersistenceFactory<None>>();
|
|
|
|
// Also start the event consumer, because it might be stopped from previous run.
|
|
await processor.InitializeAsync(default);
|
|
await processor.ActivateAsync();
|
|
await processor.StartAsync();
|
|
|
|
async Task StartStop()
|
|
{
|
|
await processor.StopAsync();
|
|
await processor.StartAsync();
|
|
}
|
|
|
|
eventConsumer.EventReceived = i =>
|
|
{
|
|
if (startStop > 0 && i % startStop == 0)
|
|
{
|
|
// Do not await the task here, other wise we could create deadlock.
|
|
StartStop().Forget();
|
|
}
|
|
|
|
return Task.CompletedTask;
|
|
};
|
|
|
|
// Create events in parallel.
|
|
await Parallel.ForEachAsync(Enumerable.Range(0, numTasks), async (i, ct) =>
|
|
{
|
|
var persistence = persistenceFactory.WithEventSourcing(typeof(None), DomainId.NewGuid(), null);
|
|
|
|
for (var j = 0; j < numEvents; j++)
|
|
{
|
|
await persistence.WriteEventsAsync(
|
|
[
|
|
Envelope.Create(new MyEvent()),
|
|
]);
|
|
}
|
|
});
|
|
|
|
var expectedEvents = numEvents * numTasks;
|
|
|
|
// Wait for all events to arrive.
|
|
using (var cts = new CancellationTokenSource(20000))
|
|
{
|
|
while (!cts.IsCancellationRequested && eventConsumer.Events.Count < expectedEvents)
|
|
{
|
|
await Task.Delay(100);
|
|
}
|
|
}
|
|
|
|
await processor.StopAsync();
|
|
|
|
Assert.Equal(expectedEvents, eventConsumer.Events.Count);
|
|
}
|
|
}
|
|
|